Switch from redis keys to scan for GetKeys driver + tests and fixes

This commit is contained in:
ionutboangiu
2022-07-19 17:34:24 +03:00
committed by Dan Christian Bogos
parent d847ec5d98
commit 6bcb9b0008
15 changed files with 255 additions and 77 deletions

View File

@@ -24,7 +24,6 @@ package analyzers
import (
"errors"
"flag"
"fmt"
"net"
"net/rpc"
"net/rpc/jsonrpc"
@@ -239,7 +238,6 @@ func testAnalyzerSV1Search(t *testing.T) {
} else if len(result) != 1 {
t.Errorf("Unexpected result: %s", utils.ToJSON(result))
}
fmt.Println(utils.ToJSON(result))
}
func testAnalyzerSV1Search2(t *testing.T) {

View File

@@ -207,7 +207,6 @@ func testAnalyzerSGetFilterIDs(t *testing.T) {
t.Errorf("Expected %v \n but received \n %v", expFilter, result)
}
time.Sleep(50 * time.Millisecond)
// fmt.Println(utils.ToJSON(result))
}
func testAnalyzerSSearchCall3(t *testing.T) {

View File

@@ -325,7 +325,6 @@ func TestPrecacheStatus(t *testing.T) {
if !reflect.DeepEqual(reply, exp) {
t.Errorf("Expected %v\n but received %v", exp, reply)
}
// fmt.Println(reply)
}
func TestHasGroup(t *testing.T) {

View File

@@ -634,7 +634,7 @@ func (rplSv1 *ReplicatorSv1) SetActionProfile(ctx *context.Context, sp *engine.A
}
func (rplSv1 *ReplicatorSv1) RemoveRateProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
if err = rplSv1.dm.DataDB().RemoveRateProfileDrv(ctx, args.Tenant, args.ID, &[]string{}); err != nil {
if err = rplSv1.dm.DataDB().RemoveRateProfileDrv(ctx, args.Tenant, args.ID, nil); err != nil {
return
}
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"fmt"
"reflect"
"testing"
"time"
@@ -839,6 +838,6 @@ func TestFilterHelpersGetWeightFromDynamics(t *testing.T) {
t.Error(err)
}
if !reflect.DeepEqual(ap.weight, expected) {
fmt.Printf("\nExpected <%+v>, \nReceived <%+v>", expected, ap.weight)
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, ap.weight)
}
}

View File

@@ -26,55 +26,6 @@ import (
"github.com/cgrates/cgrates/utils"
)
// func TestAttrProfileForEvent(t *testing.T) {
// cfg := config.NewDefaultCGRConfig()
// connMng := NewConnManager(cfg)
// dataDB, err := NewDataDBConn(cfg.DataDbCfg().Type,
// cfg.DataDbCfg().Host, cfg.DataDbCfg().Port,
// cfg.DataDbCfg().Name, cfg.DataDbCfg().User,
// cfg.DataDbCfg().Password, cfg.GeneralCfg().DBDataEncoding,
// cfg.DataDbCfg().Opts, cfg.DataDbCfg().Items)
// if err != nil {
// t.Error(err)
// }
// defer dataDB.Close()
// dm := NewDataManager(dataDB, config.CgrConfig().CacheCfg(), connMng)
// attr := &AttributeS{
// dm: dm,
// fltrS: NewFilterS(cfg, connMng, dm),
// cfg: cfg,
// }
// attrPrf := &AttributeProfile{
// Tenant: utils.CGRateSorg,
// ID: "TEST_ATTRIBUTES_TEST",
// FilterIDs: []string{"*string:~*req.Account:1002", "*exists:~*opts.*usage:"},
// Attributes: []*Attribute{
// {
// Path: utils.AccountField,
// Type: utils.MetaConstant,
// Value: nil,
// },
// {
// Path: "*tenant",
// Type: utils.MetaConstant,
// Value: nil,
// },
// },
// Weights: utils.DynamicWeights{
// {
// FilterIDs: []string{"non_existent"},
// Weight: 20,
// },
// },
// }
// attr.dm.SetAttributeProfile(context.Background(), attrPrf, false)
// rcv, err := attr.attributeProfileForEvent(context.Background(), "cgrates.org", []string{"TEST_ATTRIBUTES_TEST"}, utils.MapStorage{"some": "data"}, "", make(map[string]int), 1, false)
// if err != nil {
// t.Error(err)
// }
// fmt.Println(utils.ToJSON(rcv))
// }
func TestParseAtributeUsageDiffVal1(t *testing.T) {
dp := utils.MapStorage{
utils.MetaReq: utils.MapStorage{

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"fmt"
"reflect"
"testing"
@@ -43,6 +42,6 @@ func TestFilterHelpersWeightFromDynamics(t *testing.T) {
t.Error(err)
}
if !reflect.DeepEqual(result, expected) {
fmt.Printf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}

View File

@@ -53,6 +53,7 @@ const (
redisEXISTS = "EXISTS"
redisGET = "GET"
redisSET = "SET"
redisSCAN = "SCAN"
redisLRANGE = "LRANGE"
redisLLEN = "LLEN"
redisRPOP = "RPOP"
@@ -233,10 +234,31 @@ func (rs *RedisStorage) getKeysForFilterIndexesKeys(fkeys []string) (keys []stri
return
}
// func (rs *RedisStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (keys []string, err error) {
// err = rs.Cmd(&keys, redisSCAN, "0", "MATCH", prefix+"*")
// if err != nil {
// return
// }
// if len(keys) != 0 {
// if filterIndexesPrefixMap.Has(prefix) {
// return rs.getKeysForFilterIndexesKeys(keys)
// }
// return
// }
// return nil, nil
// }
func (rs *RedisStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (keys []string, err error) {
err = rs.Cmd(&keys, redisKEYS, prefix+"*")
if err != nil {
return
scan := radix.NewScanner(rs.client, radix.ScanOpts{
Command: redisSCAN,
Pattern: prefix + utils.Meta,
})
var key string
for scan.Next(&key) {
keys = append(keys, key)
}
if err = scan.Close(); err != nil {
return nil, err
}
if len(keys) != 0 {
if filterIndexesPrefixMap.Has(prefix) {

View File

@@ -0,0 +1,136 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"fmt"
"strconv"
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
)
// var rs *RedisStorage
// func init() {
// rs, _ = NewRedisStorage("localhost", 10, "cgrates", "", "json", 10, 20,
// "", false, 5*time.Second, 0, 0, 0, 0, false, "", "", "")
// }
// func storeInDB() {
// chargerProfile := &ChargerProfile{
// ID: "TestA_CHARGER1",
// Tenant: "cgrates.org",
// FilterIDs: []string{"*string:~*req.TestCase:AdminSAPIs"},
// Weights: utils.DynamicWeights{
// {
// Weight: 30,
// },
// },
// Blockers: utils.DynamicBlockers{
// {
// Blocker: false,
// },
// },
// RunID: "run1",
// AttributeIDs: []string{"ATTR_TEST1"},
// }
// id := "ChargerP"
// var prfID string
// for i := 0; i <= 10000; i++ {
// if i%1000 == 0 {
// if (i/1000)%2 == 0 {
// prfID = "TestA:" + strconv.Itoa(i) + ":" + id
// } else {
// prfID = "TestB:" + strconv.Itoa(i) + ":" + id
// }
// }
// chargerProfile.ID = prfID
// rs.SetChargerProfileDrv(context.Background(), chargerProfile)
// }
// }
// func BenchmarkRedisScan(b *testing.B) {
// storeInDB()
// for i := 0; i < b.N; i++ {
// rs.GetKeysForPrefix(context.Background(), "TestA")
// }
// prfx := []string{"TestA", "TestB", "Test"}
// for _, v := range prfx {
// b.Run(fmt.Sprintf("test case: prefix = %q", prfx), func(b *testing.B) {
// for i := 0; i < b.N; i++ {
// rs.GetKeysForPrefix(context.Background(), v)
// }
// })
// }
// rs.Flush("")
// }
func BenchmarkRedisScan(b *testing.B) {
rs, err := NewRedisStorage("127.0.0.1:6379", 10, "cgrates", "", "json", 10, 20,
"", false, 5*time.Second, 0, 0, 0, 0, false, "", "", "")
fmt.Println(err)
fmt.Println(rs)
chargerProfile := &ChargerProfile{
ID: "TestA_CHARGER1",
Tenant: "cgrates.org",
FilterIDs: []string{"*string:~*req.TestCase:AdminSAPIs"},
Weights: utils.DynamicWeights{
{
Weight: 30,
},
},
Blockers: utils.DynamicBlockers{
{
Blocker: false,
},
},
RunID: "run1",
AttributeIDs: []string{"ATTR_ TEST1"},
}
id := "ChargerP"
var prfID string
for i := 0; i <= 20; i++ {
if i%10 == 0 {
if (i/10)%2 == 0 {
prfID = "TestA:"
} else {
prfID = "TestB:"
}
}
prfID = prfID[:6] + strconv.Itoa(i) + id
chargerProfile.ID = prfID
rs.SetChargerProfileDrv(context.Background(), chargerProfile)
}
for i := 0; i < b.N; i++ {
rs.GetKeysForPrefix(context.Background(), "TestA")
}
prfx := []string{"TestA", "TestB", "Test"}
for _, v := range prfx {
b.Run(fmt.Sprintf("test case: prefix = %q", v), func(b *testing.B) {
for i := 0; i < b.N; i++ {
rs.GetKeysForPrefix(context.Background(), v)
}
})
}
rs.Flush("")
}

View File

@@ -1004,8 +1004,10 @@ func TestErsOnEvictedNoCacheDumpFields(t *testing.T) {
}
return nil
})
var compare map[int][]string
compare = make(map[int][]string, 2)
if err != nil {
t.Error(err)
}
compare := make(map[int][]string, 2)
for idx, file := range files {
data, err := os.ReadFile(file)
if err != nil {
@@ -1016,7 +1018,8 @@ func TestErsOnEvictedNoCacheDumpFields(t *testing.T) {
compare[idx] = s
}
if len(compare[0]) != 10 && len(compare[1]) != 9 {
t.Error("Expected 10 and 9")
t.Errorf("expected <%d> and <%d>, \nreceived: <%d> and <%d>",
10, 9, len(compare[0]), len(compare[1]))
}
if err := os.RemoveAll(dirPath); err != nil {
t.Error(err)
@@ -1082,6 +1085,9 @@ func TestERsOnEvictedDumpToJSON(t *testing.T) {
}
return nil
})
if err != nil {
t.Error(err)
}
var compare map[string]interface{}
// compare = make(map[int][]string, 2)
@@ -1106,9 +1112,8 @@ func TestERsOnEvictedDumpToJSON(t *testing.T) {
utils.Password: "secure_pass",
"Additional_Field": "Additional_Value",
}
// fmt.Println(utils.ToJSON(compare))
if !reflect.DeepEqual(exp, compare) {
t.Errorf("Expected %v \n but received \n %v", exp, compare)
t.Errorf("expected <%v>,\nreceived <%v>", utils.ToJSON(exp), utils.ToJSON(compare))
}
if err := os.RemoveAll(dirPath); err != nil {
t.Error(err)
@@ -1211,11 +1216,13 @@ func TestErsOnEvictedDumpToJSONMergeError(t *testing.T) {
utils.Destination: "1003",
utils.OriginID: "1234567",
utils.ToR: utils.MetaSMS,
utils.MetaOriginID: "1133dc80896edf5049b46aa911cb9085eeb27f4d",
utils.Password: "secure_password",
"Additional_Field": "Additional_Value2",
utils.AnswerTime: time.Date(2021, 6, 1, 13, 0, 0, 0, time.UTC),
},
APIOpts: map[string]interface{}{
utils.MetaOriginID: "1133dc80896edf5049b46aa911cb9085eeb27f4d",
},
},
},
rdrCfg: &config.EventReaderCfg{ // CacheDumpFields will be empty
@@ -1237,9 +1244,9 @@ func TestErsOnEvictedDumpToJSONMergeError(t *testing.T) {
rdrEvents: make(chan *erEvent, 1),
filterS: fltrS,
}
expLog := `[WARNING] <ERs> failed posting expired parial events <[{"Tenant":"cgrates.org","ID":"EventErsOnEvicted2","Event":{"Account":"1002","Additional_Field":"Additional_Value2","AnswerTime":"2021-06-01T13:00:00Z","Category":"call","Destination":"1003","OriginID":"1234567","ToR":"*sms","Usage":"12s","password":"secure_password"},"APIOpts":{"*originID":"1133dc80896edf5049b46aa911cb9085eeb27f4d",}},{"Tenant":"cgrates.org","ID":"EventErsOnEvicted","Event":{"Account":"1001","Additional_Field":"Additional_Value","AnswerTime":"2021-06-01T12:00:00Z","Category":"call","Destination":"1002","OriginHost":"local","OriginID":"123456","ToR":"*voice","Usage":"10s","password":"secure_pass"},"APIOpts":{"*originID":"1133dc80896edf5049b46aa911cb9085eeb27f4c",}}]>`
expLog := `[WARNING] <ERs> failed posting expired parial events <[{"Tenant":"cgrates.org","ID":"EventErsOnEvicted2","Event":{"Account":"1002","Additional_Field":"Additional_Value2","AnswerTime":"2021-06-01T13:00:00Z","Category":"call","Destination":"1003","OriginID":"1234567","ToR":"*sms","Usage":"12s","password":"secure_password"},"APIOpts":{"*originID":"1133dc80896edf5049b46aa911cb9085eeb27f4d"}},{"Tenant":"cgrates.org","ID":"EventErsOnEvicted","Event":{"Account":"1001","Additional_Field":"Additional_Value","AnswerTime":"2021-06-01T12:00:00Z","Category":"call","Destination":"1002","OriginHost":"local","OriginID":"123456","ToR":"*voice","Usage":"10s","password":"secure_pass"},"APIOpts":{"*originID":"1133dc80896edf5049b46aa911cb9085eeb27f4c"}}]>`
erS.onEvicted("ID", value)
rcvLog := buf.String()[20:]
rcvLog := buf.String()
if !strings.Contains(rcvLog, expLog) {
t.Errorf("expected <%+v> to be included in: <%+v>", expLog, rcvLog)
}

View File

@@ -23,6 +23,7 @@ import (
"testing"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -38,3 +39,75 @@ func TestGetProcessOptions(t *testing.T) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
func TestLibErsMergePartialEvents(t *testing.T) {
confg := config.NewDefaultCGRConfig()
fltrS := engine.NewFilterS(confg, nil, nil)
cgrEvs := []*utils.CGREvent{
{
Tenant: "cgrates.org",
ID: "ev1",
Event: map[string]interface{}{
"EvField1": "Value1",
"EvField2": "Value4",
utils.AnswerTime: 6.,
},
APIOpts: map[string]interface{}{
"Field1": "Value1",
"Field2": "Value2",
},
},
{
Tenant: "cgrates.org",
ID: "ev2",
Event: map[string]interface{}{
"EvField3": "Value1",
"EvField2": "Value2",
utils.AnswerTime: 4.,
},
APIOpts: map[string]interface{}{
"Field4": "Value2",
"Field2": "Value3",
},
},
{
Tenant: "cgrates.org",
ID: "ev3",
Event: map[string]interface{}{
"EvField2": "Value2",
"EvField4": "Value4",
"EvField3": "Value3",
utils.AnswerTime: 8.,
},
APIOpts: map[string]interface{}{
"Field3": "Value3",
"Field4": "Value4",
},
},
}
exp := &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.AnswerTime: 8.,
"EvField1": "Value1",
"EvField2": "Value2",
"EvField3": "Value3",
"EvField4": "Value4",
},
APIOpts: map[string]interface{}{
"Field1": "Value1",
"Field2": "Value2",
"Field3": "Value3",
"Field4": "Value4",
},
}
if rcv, err := mergePartialEvents(cgrEvs, confg.ERsCfg().Readers[0], fltrS, confg.GeneralCfg().DefaultTenant,
confg.GeneralCfg().DefaultTimezone, confg.GeneralCfg().RSRSep); err != nil {
t.Error(err)
} else {
rcv.ID = utils.EmptyString
if !reflect.DeepEqual(rcv, exp) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ToJSON(exp), utils.ToJSON(rcv))
}
}
}

View File

@@ -1051,7 +1051,6 @@ func TestEventChargerMerge(t *testing.T) {
if !reflect.DeepEqual(expEc, eEvChgs) {
t.Errorf("Expected %v \n but received \n %v", ToJSON(expEc), ToJSON(eEvChgs))
}
// fmt.Println(ToJSON(eEvChgs))
}
func TestEventChargesAppendChargeEntry(t *testing.T) {

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package utils
import (
"fmt"
"reflect"
"testing"
"time"
@@ -1534,8 +1533,7 @@ func TestRateSIncrementCostNoID(t *testing.T) {
cost := rIc.Cost(rts)
if cost != nil {
fmt.Println(cost)
t.Error("Expected to be nil")
t.Errorf("expected cost to be nil, received <%+v>", cost)
}
}
@@ -1765,7 +1763,7 @@ func TestRateSIntervalCost(t *testing.T) {
rcv := rIv.Cost(rts)
exp := decimal.WithContext(DecimalContext).SetUint64(2)
if !reflect.DeepEqual(rcv, exp) {
fmt.Printf("Expected %v \n but received \n %v", exp, rcv)
t.Errorf("expected <%v>,\nreceived <%v>", exp, rcv)
}
}

View File

@@ -299,7 +299,6 @@ func TestStringSetFieldAsInterface(t *testing.T) {
if err != nil {
t.Error(err)
}
// fmt.Println(rcv)
_, err = s.FieldAsInterface([]string{"field2"})
if err != ErrNotFound {
t.Errorf("Expected %v", ErrNotFound)

View File

@@ -18,7 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package utils
import (
"fmt"
"math/cmplx"
"reflect"
"testing"
@@ -236,6 +235,6 @@ func TestContentStructFieldByIndexIsEmpty(t *testing.T) {
},
}
if fieldByIndexIsEmpty(reflect.ValueOf(myStruct), []int{1, 0}) {
fmt.Printf("%v", myStruct)
t.Errorf("%v", myStruct)
}
}