From 6bcb9b00085dc26262ef4e21963901cc9c7dbf33 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 19 Jul 2022 17:34:24 +0300 Subject: [PATCH] Switch from redis keys to scan for GetKeys driver + tests and fixes --- analyzers/analyzers_it_test.go | 2 - apis/analyzer_it_test.go | 1 - apis/cache_test.go | 1 - apis/replicator.go | 2 +- engine/actionprofile_test.go | 3 +- engine/attributes_test.go | 49 ------------ engine/filterhelpers_test.go | 3 +- engine/storage_redis.go | 28 ++++++- engine/storage_redis_test.go | 136 +++++++++++++++++++++++++++++++++ ers/ers_it_test.go | 23 ++++-- ers/libers_test.go | 73 ++++++++++++++++++ utils/eventcharges_test.go | 1 - utils/librates_test.go | 6 +- utils/stringset_test.go | 1 - utils/struct_test.go | 3 +- 15 files changed, 255 insertions(+), 77 deletions(-) create mode 100644 engine/storage_redis_test.go diff --git a/analyzers/analyzers_it_test.go b/analyzers/analyzers_it_test.go index 4dd478868..1354fcbfb 100644 --- a/analyzers/analyzers_it_test.go +++ b/analyzers/analyzers_it_test.go @@ -24,7 +24,6 @@ package analyzers import ( "errors" "flag" - "fmt" "net" "net/rpc" "net/rpc/jsonrpc" @@ -239,7 +238,6 @@ func testAnalyzerSV1Search(t *testing.T) { } else if len(result) != 1 { t.Errorf("Unexpected result: %s", utils.ToJSON(result)) } - fmt.Println(utils.ToJSON(result)) } func testAnalyzerSV1Search2(t *testing.T) { diff --git a/apis/analyzer_it_test.go b/apis/analyzer_it_test.go index e6652b2de..1e8dd37c4 100644 --- a/apis/analyzer_it_test.go +++ b/apis/analyzer_it_test.go @@ -207,7 +207,6 @@ func testAnalyzerSGetFilterIDs(t *testing.T) { t.Errorf("Expected %v \n but received \n %v", expFilter, result) } time.Sleep(50 * time.Millisecond) - // fmt.Println(utils.ToJSON(result)) } func testAnalyzerSSearchCall3(t *testing.T) { diff --git a/apis/cache_test.go b/apis/cache_test.go index 0b4eac024..68d1e97b7 100644 --- a/apis/cache_test.go +++ b/apis/cache_test.go @@ -325,7 +325,6 @@ func TestPrecacheStatus(t *testing.T) { if !reflect.DeepEqual(reply, exp) { t.Errorf("Expected %v\n but received %v", exp, reply) } - // fmt.Println(reply) } func TestHasGroup(t *testing.T) { diff --git a/apis/replicator.go b/apis/replicator.go index 8393c6cdb..3c1b8b846 100644 --- a/apis/replicator.go +++ b/apis/replicator.go @@ -634,7 +634,7 @@ func (rplSv1 *ReplicatorSv1) SetActionProfile(ctx *context.Context, sp *engine.A } func (rplSv1 *ReplicatorSv1) RemoveRateProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { - if err = rplSv1.dm.DataDB().RemoveRateProfileDrv(ctx, args.Tenant, args.ID, &[]string{}); err != nil { + if err = rplSv1.dm.DataDB().RemoveRateProfileDrv(ctx, args.Tenant, args.ID, nil); err != nil { return } if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), diff --git a/engine/actionprofile_test.go b/engine/actionprofile_test.go index e6b15bbd5..77ce39021 100644 --- a/engine/actionprofile_test.go +++ b/engine/actionprofile_test.go @@ -19,7 +19,6 @@ along with this program. If not, see package engine import ( - "fmt" "reflect" "testing" "time" @@ -839,6 +838,6 @@ func TestFilterHelpersGetWeightFromDynamics(t *testing.T) { t.Error(err) } if !reflect.DeepEqual(ap.weight, expected) { - fmt.Printf("\nExpected <%+v>, \nReceived <%+v>", expected, ap.weight) + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, ap.weight) } } diff --git a/engine/attributes_test.go b/engine/attributes_test.go index 8c1a25219..8f5cb9f57 100644 --- a/engine/attributes_test.go +++ b/engine/attributes_test.go @@ -26,55 +26,6 @@ import ( "github.com/cgrates/cgrates/utils" ) -// func TestAttrProfileForEvent(t *testing.T) { -// cfg := config.NewDefaultCGRConfig() -// connMng := NewConnManager(cfg) -// dataDB, err := NewDataDBConn(cfg.DataDbCfg().Type, -// cfg.DataDbCfg().Host, cfg.DataDbCfg().Port, -// cfg.DataDbCfg().Name, cfg.DataDbCfg().User, -// cfg.DataDbCfg().Password, cfg.GeneralCfg().DBDataEncoding, -// cfg.DataDbCfg().Opts, cfg.DataDbCfg().Items) -// if err != nil { -// t.Error(err) -// } -// defer dataDB.Close() -// dm := NewDataManager(dataDB, config.CgrConfig().CacheCfg(), connMng) -// attr := &AttributeS{ -// dm: dm, -// fltrS: NewFilterS(cfg, connMng, dm), -// cfg: cfg, -// } -// attrPrf := &AttributeProfile{ -// Tenant: utils.CGRateSorg, -// ID: "TEST_ATTRIBUTES_TEST", -// FilterIDs: []string{"*string:~*req.Account:1002", "*exists:~*opts.*usage:"}, -// Attributes: []*Attribute{ -// { -// Path: utils.AccountField, -// Type: utils.MetaConstant, -// Value: nil, -// }, -// { -// Path: "*tenant", -// Type: utils.MetaConstant, -// Value: nil, -// }, -// }, -// Weights: utils.DynamicWeights{ -// { -// FilterIDs: []string{"non_existent"}, -// Weight: 20, -// }, -// }, -// } -// attr.dm.SetAttributeProfile(context.Background(), attrPrf, false) -// rcv, err := attr.attributeProfileForEvent(context.Background(), "cgrates.org", []string{"TEST_ATTRIBUTES_TEST"}, utils.MapStorage{"some": "data"}, "", make(map[string]int), 1, false) -// if err != nil { -// t.Error(err) -// } -// fmt.Println(utils.ToJSON(rcv)) -// } - func TestParseAtributeUsageDiffVal1(t *testing.T) { dp := utils.MapStorage{ utils.MetaReq: utils.MapStorage{ diff --git a/engine/filterhelpers_test.go b/engine/filterhelpers_test.go index 24374d4e0..72d1b8f1a 100644 --- a/engine/filterhelpers_test.go +++ b/engine/filterhelpers_test.go @@ -19,7 +19,6 @@ along with this program. If not, see package engine import ( - "fmt" "reflect" "testing" @@ -43,6 +42,6 @@ func TestFilterHelpersWeightFromDynamics(t *testing.T) { t.Error(err) } if !reflect.DeepEqual(result, expected) { - fmt.Printf("\nExpected <%+v>, \nReceived <%+v>", expected, result) + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) } } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 82316ae54..20757c1ca 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -53,6 +53,7 @@ const ( redisEXISTS = "EXISTS" redisGET = "GET" redisSET = "SET" + redisSCAN = "SCAN" redisLRANGE = "LRANGE" redisLLEN = "LLEN" redisRPOP = "RPOP" @@ -233,10 +234,31 @@ func (rs *RedisStorage) getKeysForFilterIndexesKeys(fkeys []string) (keys []stri return } +// func (rs *RedisStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (keys []string, err error) { +// err = rs.Cmd(&keys, redisSCAN, "0", "MATCH", prefix+"*") +// if err != nil { +// return +// } +// if len(keys) != 0 { +// if filterIndexesPrefixMap.Has(prefix) { +// return rs.getKeysForFilterIndexesKeys(keys) +// } +// return +// } +// return nil, nil +// } + func (rs *RedisStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (keys []string, err error) { - err = rs.Cmd(&keys, redisKEYS, prefix+"*") - if err != nil { - return + scan := radix.NewScanner(rs.client, radix.ScanOpts{ + Command: redisSCAN, + Pattern: prefix + utils.Meta, + }) + var key string + for scan.Next(&key) { + keys = append(keys, key) + } + if err = scan.Close(); err != nil { + return nil, err } if len(keys) != 0 { if filterIndexesPrefixMap.Has(prefix) { diff --git a/engine/storage_redis_test.go b/engine/storage_redis_test.go new file mode 100644 index 000000000..2625f6e42 --- /dev/null +++ b/engine/storage_redis_test.go @@ -0,0 +1,136 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package engine + +import ( + "fmt" + "strconv" + "testing" + "time" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/utils" +) + +// var rs *RedisStorage + +// func init() { +// rs, _ = NewRedisStorage("localhost", 10, "cgrates", "", "json", 10, 20, +// "", false, 5*time.Second, 0, 0, 0, 0, false, "", "", "") +// } + +// func storeInDB() { +// chargerProfile := &ChargerProfile{ +// ID: "TestA_CHARGER1", +// Tenant: "cgrates.org", +// FilterIDs: []string{"*string:~*req.TestCase:AdminSAPIs"}, +// Weights: utils.DynamicWeights{ +// { +// Weight: 30, +// }, +// }, +// Blockers: utils.DynamicBlockers{ +// { +// Blocker: false, +// }, +// }, +// RunID: "run1", +// AttributeIDs: []string{"ATTR_TEST1"}, +// } +// id := "ChargerP" +// var prfID string +// for i := 0; i <= 10000; i++ { +// if i%1000 == 0 { +// if (i/1000)%2 == 0 { +// prfID = "TestA:" + strconv.Itoa(i) + ":" + id +// } else { +// prfID = "TestB:" + strconv.Itoa(i) + ":" + id +// } +// } +// chargerProfile.ID = prfID +// rs.SetChargerProfileDrv(context.Background(), chargerProfile) +// } +// } + +// func BenchmarkRedisScan(b *testing.B) { +// storeInDB() +// for i := 0; i < b.N; i++ { +// rs.GetKeysForPrefix(context.Background(), "TestA") +// } +// prfx := []string{"TestA", "TestB", "Test"} +// for _, v := range prfx { +// b.Run(fmt.Sprintf("test case: prefix = %q", prfx), func(b *testing.B) { +// for i := 0; i < b.N; i++ { +// rs.GetKeysForPrefix(context.Background(), v) +// } +// }) +// } +// rs.Flush("") + +// } + +func BenchmarkRedisScan(b *testing.B) { + rs, err := NewRedisStorage("127.0.0.1:6379", 10, "cgrates", "", "json", 10, 20, + "", false, 5*time.Second, 0, 0, 0, 0, false, "", "", "") + fmt.Println(err) + fmt.Println(rs) + chargerProfile := &ChargerProfile{ + ID: "TestA_CHARGER1", + Tenant: "cgrates.org", + FilterIDs: []string{"*string:~*req.TestCase:AdminSAPIs"}, + Weights: utils.DynamicWeights{ + { + Weight: 30, + }, + }, + Blockers: utils.DynamicBlockers{ + { + Blocker: false, + }, + }, + RunID: "run1", + AttributeIDs: []string{"ATTR_ TEST1"}, + } + id := "ChargerP" + var prfID string + for i := 0; i <= 20; i++ { + if i%10 == 0 { + if (i/10)%2 == 0 { + prfID = "TestA:" + } else { + prfID = "TestB:" + } + } + prfID = prfID[:6] + strconv.Itoa(i) + id + chargerProfile.ID = prfID + rs.SetChargerProfileDrv(context.Background(), chargerProfile) + } + for i := 0; i < b.N; i++ { + rs.GetKeysForPrefix(context.Background(), "TestA") + } + prfx := []string{"TestA", "TestB", "Test"} + for _, v := range prfx { + b.Run(fmt.Sprintf("test case: prefix = %q", v), func(b *testing.B) { + for i := 0; i < b.N; i++ { + rs.GetKeysForPrefix(context.Background(), v) + } + }) + } + rs.Flush("") + +} diff --git a/ers/ers_it_test.go b/ers/ers_it_test.go index a72192f7e..5278b1acc 100644 --- a/ers/ers_it_test.go +++ b/ers/ers_it_test.go @@ -1004,8 +1004,10 @@ func TestErsOnEvictedNoCacheDumpFields(t *testing.T) { } return nil }) - var compare map[int][]string - compare = make(map[int][]string, 2) + if err != nil { + t.Error(err) + } + compare := make(map[int][]string, 2) for idx, file := range files { data, err := os.ReadFile(file) if err != nil { @@ -1016,7 +1018,8 @@ func TestErsOnEvictedNoCacheDumpFields(t *testing.T) { compare[idx] = s } if len(compare[0]) != 10 && len(compare[1]) != 9 { - t.Error("Expected 10 and 9") + t.Errorf("expected <%d> and <%d>, \nreceived: <%d> and <%d>", + 10, 9, len(compare[0]), len(compare[1])) } if err := os.RemoveAll(dirPath); err != nil { t.Error(err) @@ -1082,6 +1085,9 @@ func TestERsOnEvictedDumpToJSON(t *testing.T) { } return nil }) + if err != nil { + t.Error(err) + } var compare map[string]interface{} // compare = make(map[int][]string, 2) @@ -1106,9 +1112,8 @@ func TestERsOnEvictedDumpToJSON(t *testing.T) { utils.Password: "secure_pass", "Additional_Field": "Additional_Value", } - // fmt.Println(utils.ToJSON(compare)) if !reflect.DeepEqual(exp, compare) { - t.Errorf("Expected %v \n but received \n %v", exp, compare) + t.Errorf("expected <%v>,\nreceived <%v>", utils.ToJSON(exp), utils.ToJSON(compare)) } if err := os.RemoveAll(dirPath); err != nil { t.Error(err) @@ -1211,11 +1216,13 @@ func TestErsOnEvictedDumpToJSONMergeError(t *testing.T) { utils.Destination: "1003", utils.OriginID: "1234567", utils.ToR: utils.MetaSMS, - utils.MetaOriginID: "1133dc80896edf5049b46aa911cb9085eeb27f4d", utils.Password: "secure_password", "Additional_Field": "Additional_Value2", utils.AnswerTime: time.Date(2021, 6, 1, 13, 0, 0, 0, time.UTC), }, + APIOpts: map[string]interface{}{ + utils.MetaOriginID: "1133dc80896edf5049b46aa911cb9085eeb27f4d", + }, }, }, rdrCfg: &config.EventReaderCfg{ // CacheDumpFields will be empty @@ -1237,9 +1244,9 @@ func TestErsOnEvictedDumpToJSONMergeError(t *testing.T) { rdrEvents: make(chan *erEvent, 1), filterS: fltrS, } - expLog := `[WARNING] failed posting expired parial events <[{"Tenant":"cgrates.org","ID":"EventErsOnEvicted2","Event":{"Account":"1002","Additional_Field":"Additional_Value2","AnswerTime":"2021-06-01T13:00:00Z","Category":"call","Destination":"1003","OriginID":"1234567","ToR":"*sms","Usage":"12s","password":"secure_password"},"APIOpts":{"*originID":"1133dc80896edf5049b46aa911cb9085eeb27f4d",}},{"Tenant":"cgrates.org","ID":"EventErsOnEvicted","Event":{"Account":"1001","Additional_Field":"Additional_Value","AnswerTime":"2021-06-01T12:00:00Z","Category":"call","Destination":"1002","OriginHost":"local","OriginID":"123456","ToR":"*voice","Usage":"10s","password":"secure_pass"},"APIOpts":{"*originID":"1133dc80896edf5049b46aa911cb9085eeb27f4c",}}]>` + expLog := `[WARNING] failed posting expired parial events <[{"Tenant":"cgrates.org","ID":"EventErsOnEvicted2","Event":{"Account":"1002","Additional_Field":"Additional_Value2","AnswerTime":"2021-06-01T13:00:00Z","Category":"call","Destination":"1003","OriginID":"1234567","ToR":"*sms","Usage":"12s","password":"secure_password"},"APIOpts":{"*originID":"1133dc80896edf5049b46aa911cb9085eeb27f4d"}},{"Tenant":"cgrates.org","ID":"EventErsOnEvicted","Event":{"Account":"1001","Additional_Field":"Additional_Value","AnswerTime":"2021-06-01T12:00:00Z","Category":"call","Destination":"1002","OriginHost":"local","OriginID":"123456","ToR":"*voice","Usage":"10s","password":"secure_pass"},"APIOpts":{"*originID":"1133dc80896edf5049b46aa911cb9085eeb27f4c"}}]>` erS.onEvicted("ID", value) - rcvLog := buf.String()[20:] + rcvLog := buf.String() if !strings.Contains(rcvLog, expLog) { t.Errorf("expected <%+v> to be included in: <%+v>", expLog, rcvLog) } diff --git a/ers/libers_test.go b/ers/libers_test.go index 0edd6571b..f5c47e08b 100644 --- a/ers/libers_test.go +++ b/ers/libers_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -38,3 +39,75 @@ func TestGetProcessOptions(t *testing.T) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) } } + +func TestLibErsMergePartialEvents(t *testing.T) { + confg := config.NewDefaultCGRConfig() + fltrS := engine.NewFilterS(confg, nil, nil) + cgrEvs := []*utils.CGREvent{ + { + Tenant: "cgrates.org", + ID: "ev1", + Event: map[string]interface{}{ + "EvField1": "Value1", + "EvField2": "Value4", + utils.AnswerTime: 6., + }, + APIOpts: map[string]interface{}{ + "Field1": "Value1", + "Field2": "Value2", + }, + }, + { + Tenant: "cgrates.org", + ID: "ev2", + Event: map[string]interface{}{ + "EvField3": "Value1", + "EvField2": "Value2", + utils.AnswerTime: 4., + }, + APIOpts: map[string]interface{}{ + "Field4": "Value2", + "Field2": "Value3", + }, + }, + { + Tenant: "cgrates.org", + ID: "ev3", + Event: map[string]interface{}{ + "EvField2": "Value2", + "EvField4": "Value4", + "EvField3": "Value3", + utils.AnswerTime: 8., + }, + APIOpts: map[string]interface{}{ + "Field3": "Value3", + "Field4": "Value4", + }, + }, + } + exp := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + utils.AnswerTime: 8., + "EvField1": "Value1", + "EvField2": "Value2", + "EvField3": "Value3", + "EvField4": "Value4", + }, + APIOpts: map[string]interface{}{ + "Field1": "Value1", + "Field2": "Value2", + "Field3": "Value3", + "Field4": "Value4", + }, + } + if rcv, err := mergePartialEvents(cgrEvs, confg.ERsCfg().Readers[0], fltrS, confg.GeneralCfg().DefaultTenant, + confg.GeneralCfg().DefaultTimezone, confg.GeneralCfg().RSRSep); err != nil { + t.Error(err) + } else { + rcv.ID = utils.EmptyString + if !reflect.DeepEqual(rcv, exp) { + t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ToJSON(exp), utils.ToJSON(rcv)) + } + } +} diff --git a/utils/eventcharges_test.go b/utils/eventcharges_test.go index 595bf029e..b47b29c71 100644 --- a/utils/eventcharges_test.go +++ b/utils/eventcharges_test.go @@ -1051,7 +1051,6 @@ func TestEventChargerMerge(t *testing.T) { if !reflect.DeepEqual(expEc, eEvChgs) { t.Errorf("Expected %v \n but received \n %v", ToJSON(expEc), ToJSON(eEvChgs)) } - // fmt.Println(ToJSON(eEvChgs)) } func TestEventChargesAppendChargeEntry(t *testing.T) { diff --git a/utils/librates_test.go b/utils/librates_test.go index 845466068..9d484c2a9 100644 --- a/utils/librates_test.go +++ b/utils/librates_test.go @@ -19,7 +19,6 @@ along with this program. If not, see package utils import ( - "fmt" "reflect" "testing" "time" @@ -1534,8 +1533,7 @@ func TestRateSIncrementCostNoID(t *testing.T) { cost := rIc.Cost(rts) if cost != nil { - fmt.Println(cost) - t.Error("Expected to be nil") + t.Errorf("expected cost to be nil, received <%+v>", cost) } } @@ -1765,7 +1763,7 @@ func TestRateSIntervalCost(t *testing.T) { rcv := rIv.Cost(rts) exp := decimal.WithContext(DecimalContext).SetUint64(2) if !reflect.DeepEqual(rcv, exp) { - fmt.Printf("Expected %v \n but received \n %v", exp, rcv) + t.Errorf("expected <%v>,\nreceived <%v>", exp, rcv) } } diff --git a/utils/stringset_test.go b/utils/stringset_test.go index 97a949b15..e2546138a 100644 --- a/utils/stringset_test.go +++ b/utils/stringset_test.go @@ -299,7 +299,6 @@ func TestStringSetFieldAsInterface(t *testing.T) { if err != nil { t.Error(err) } - // fmt.Println(rcv) _, err = s.FieldAsInterface([]string{"field2"}) if err != ErrNotFound { t.Errorf("Expected %v", ErrNotFound) diff --git a/utils/struct_test.go b/utils/struct_test.go index 6268471cd..0ab3d32e3 100644 --- a/utils/struct_test.go +++ b/utils/struct_test.go @@ -18,7 +18,6 @@ along with this program. If not, see package utils import ( - "fmt" "math/cmplx" "reflect" "testing" @@ -236,6 +235,6 @@ func TestContentStructFieldByIndexIsEmpty(t *testing.T) { }, } if fieldByIndexIsEmpty(reflect.ValueOf(myStruct), []int{1, 0}) { - fmt.Printf("%v", myStruct) + t.Errorf("%v", myStruct) } }