diff --git a/apier/v1/cache_replication_it_test.go b/apier/v1/cache_replication_it_test.go new file mode 100644 index 000000000..2ab62ec39 --- /dev/null +++ b/apier/v1/cache_replication_it_test.go @@ -0,0 +1,239 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package v1 + +import ( + "net/rpc" + "path" + "reflect" + "sort" + "testing" + "time" + + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + + "github.com/cgrates/cgrates/config" +) + +var ( + engine1Cfg *config.CGRConfig + engine1RPC *rpc.Client + engine1CfgPath string + engine2Cfg *config.CGRConfig + engine2RPC *rpc.Client + engine2CfgPath string + + sTestsCacheSReplicate = []func(t *testing.T){ + testCacheSReplicateLoadConfig, + testCacheSReplicateInitDataDb, + testCacheSReplicateInitStorDb, + testCacheSReplicateStartEngine, + testCacheSReplicateRpcConn, + testCacheSReplicateLoadTariffPlanFromFolder, + testCacheSReplicateProcessAttributes, + testCacheSReplicateProcessRateProfile, + testCacheSReplicateStopEngine, + } +) + +func TestCacheSv1ReplicateIT(t *testing.T) { + for _, stest := range sTestsCacheSReplicate { + t.Run("TestCacheSv1ReplicateIT", stest) + } +} + +func testCacheSReplicateLoadConfig(t *testing.T) { + var err error + engine1CfgPath = path.Join(*dataDir, "conf", "samples", "replication_cache", "engine1") + if engine1Cfg, err = config.NewCGRConfigFromPath(engine1CfgPath); err != nil { + t.Error(err) + } + engine2CfgPath = path.Join(*dataDir, "conf", "samples", "replication_cache", "engine2") + if engine2Cfg, err = config.NewCGRConfigFromPath(engine2CfgPath); err != nil { + t.Error(err) + } +} + +func testCacheSReplicateInitDataDb(t *testing.T) { + if err := engine.InitDataDb(engine1Cfg); err != nil { + t.Fatal(err) + } + if err := engine.InitDataDb(engine2Cfg); err != nil { + t.Fatal(err) + } +} + +// Empty tables before using them +func testCacheSReplicateInitStorDb(t *testing.T) { + if err := engine.InitStorDb(engine1Cfg); err != nil { + t.Fatal(err) + } + if err := engine.InitStorDb(engine2Cfg); err != nil { + t.Fatal(err) + } +} + +// Start engine +func testCacheSReplicateStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(engine1CfgPath, *waitRater); err != nil { + t.Fatal(err) + } + if _, err := engine.StartEngine(engine2CfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testCacheSReplicateRpcConn(t *testing.T) { + var err error + engine1RPC, err = newRPCClient(engine1Cfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to RPC: ", err.Error()) + } + engine2RPC, err = newRPCClient(engine2Cfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to RPC: ", err.Error()) + } +} + +func testCacheSReplicateLoadTariffPlanFromFolder(t *testing.T) { + var reply string + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "testit")} + if err := engine2RPC.Call(utils.APIerSv1LoadTariffPlanFromFolder, attrs, &reply); err != nil { + t.Error(err) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) +} + +func testCacheSReplicateProcessAttributes(t *testing.T) { + ev := &engine.AttrArgsProcessEvent{ + Context: utils.StringPointer(utils.MetaSessionS), + CGREventWithOpts: &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testCacheSReplicateProcessAttributes", + Event: map[string]interface{}{ + utils.Account: "1001", + }, + }, + }, + } + eRply := &engine.AttrSProcessEventReply{ + MatchedProfiles: []string{"ATTR_ACNT_1001"}, + AlteredFields: []string{utils.MetaReq + utils.NestingSep + "OfficeGroup"}, + CGREventWithOpts: &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testCacheSReplicateProcessAttributes", + Event: map[string]interface{}{ + utils.Account: "1001", + "OfficeGroup": "Marketing", + }, + }, + }, + } + var rplyEv engine.AttrSProcessEventReply + if err := engine1RPC.Call(utils.AttributeSv1ProcessEvent, + ev, &rplyEv); err != nil { + t.Error(err) + } else { + sort.Strings(eRply.AlteredFields) + sort.Strings(rplyEv.AlteredFields) + if !reflect.DeepEqual(eRply, &rplyEv) { // second for reversed order of attributes + t.Errorf("Expecting: %s, received: %s", + utils.ToJSON(eRply), utils.ToJSON(rplyEv)) + } + } + if err := engine2RPC.Call(utils.AttributeSv1ProcessEvent, + ev, &rplyEv); err != nil { + t.Error(err) + } else { + sort.Strings(eRply.AlteredFields) + sort.Strings(rplyEv.AlteredFields) + if !reflect.DeepEqual(eRply, &rplyEv) { // second for reversed order of attributes + t.Errorf("Expecting: %s, received: %s", + utils.ToJSON(eRply), utils.ToJSON(rplyEv)) + } + } +} + +func testCacheSReplicateProcessRateProfile(t *testing.T) { + var rply *engine.RateProfileCost + argsRt := &utils.ArgsCostForEvent{ + CGREventWithOpts: &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: utils.UUIDSha1Prefix(), + Event: map[string]interface{}{ + utils.Account: "1002", + }, + }, + }, + } + rate1 := &engine.Rate{ + ID: "RT_ALWAYS", + Weight: 0, + ActivationTimes: "* * * * *", + IntervalRates: []*engine.IntervalRate{ + { + IntervalStart: 0, + RecurrentFee: 0.01, + Unit: time.Minute, + Increment: time.Second, + }, + }, + } + exp := &engine.RateProfileCost{ + ID: "RT_SPECIAL_1002", + Cost: 0.01, + RoundingDecimals: 4, + RoundingMethod: utils.MetaUp, + + RateSIntervals: []*engine.RateSInterval{{ + UsageStart: 0, + Increments: []*engine.RateSIncrement{{ + UsageStart: 0, + Usage: time.Minute, + Rate: rate1, + IntervalRateIndex: 0, + CompressFactor: 60, + }}, + CompressFactor: 1, + }}, + } + if err := engine1RPC.Call(utils.RateSv1CostForEvent, &argsRt, &rply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(exp, rply) { + t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(exp), utils.ToJSON(rply)) + } + if err := engine2RPC.Call(utils.RateSv1CostForEvent, &argsRt, &rply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(exp, rply) { + t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(exp), utils.ToJSON(rply)) + + } +} + +func testCacheSReplicateStopEngine(t *testing.T) { + if err := engine.KillEngine(300); err != nil { + t.Error(err) + } +} diff --git a/apier/v1/ees_it_test.go b/apier/v1/ees_it_test.go index af558f7fc..144a46091 100644 --- a/apier/v1/ees_it_test.go +++ b/apier/v1/ees_it_test.go @@ -218,7 +218,7 @@ func testEEsExportCDRs(t *testing.T) { } func testEEsVerifyExports(t *testing.T) { - time.Sleep(time.Second) + time.Sleep(time.Second + 600*time.Millisecond) var files []string err := filepath.Walk("/tmp/testCSV/", func(path string, info os.FileInfo, err error) error { if strings.HasSuffix(path, utils.CSVSuffix) { diff --git a/data/conf/samples/replication_cache/engine1/cgrates.json b/data/conf/samples/replication_cache/engine1/cgrates.json new file mode 100644 index 000000000..b325260e3 --- /dev/null +++ b/data/conf/samples/replication_cache/engine1/cgrates.json @@ -0,0 +1,75 @@ +{ +"general": { + "log_level": 7, + "node_id": "Engine1" +}, + + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2180" +}, + +"data_db": { + "db_type": "redis", + "db_port": 6379, + "db_name": "10" +}, + +"stor_db": { + "db_password": "CGRateS.org" +}, + + +"rpc_conns": { + "connCache": { + "strategy": "*broadcast_sync", + "conns": [ + {"address": "127.0.0.1:2013", "transport":"*gob"}, + {"address": "127.0.0.1:2023", "transport":"*gob"} + ] + } +}, + + +"caches":{ + "partitions": { + "*attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": true}, + "*attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": true}, + "*rate_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": true}, + "*rate_profile_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": true}, + "*rate_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": true} + }, + "replication_conns": ["connCache"] +}, + + +"rals": { + "enabled": true +}, + + +"attributes": { + "enabled": true +}, + + +"rates": { + "enabled": true +}, + + +"schedulers": { + "enabled": true +}, + + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"] +}, + + + +}, diff --git a/data/conf/samples/replication_cache/engine2/cgrates.json b/data/conf/samples/replication_cache/engine2/cgrates.json new file mode 100644 index 000000000..ddb912305 --- /dev/null +++ b/data/conf/samples/replication_cache/engine2/cgrates.json @@ -0,0 +1,52 @@ +{ +"general": { + "log_level": 7, + "node_id": "Engine2" +}, + + +"listen": { + "rpc_json": ":2022", + "rpc_gob": ":2023", + "http": ":2280", +}, + +"data_db": { + "db_type": "redis", + "db_port": 6379, + "db_name": "10", +}, + + +"stor_db": { + "db_password": "CGRateS.org", +}, + + +"rals": { + "enabled": true, +}, + + +"attributes": { + "enabled": true, +}, + + +"rates": { + "enabled": true, +}, + + +"schedulers": { + "enabled": true, +}, + + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + + +} diff --git a/data/tariffplans/testit/RateProfiles.csv b/data/tariffplans/testit/RateProfiles.csv index 1692c528e..c1e20b2e0 100644 --- a/data/tariffplans/testit/RateProfiles.csv +++ b/data/tariffplans/testit/RateProfiles.csv @@ -1,5 +1,5 @@ #Tenant,ID,FilterIDs,ActivationInterval,Weight,RoundingMethod,RoundingDecimals,MinCost,MaxCost,MaxCostStrategy,RateID,RateFilterIDs,RateActivationStart,RateWeight,RateBlocker,RateIntervalStart,RateFixedFee,RateRecurrentFee,RateUnit,RateIncrement -cgrates.org,RT_SPECIAL_1002,,,0,*up,4,0,0,*free,RT_ALWAYS,,"* * * * *",0,false,0s,,0.01,1m,1s +cgrates.org,RT_SPECIAL_1002,*string:~*req.Account:1002,,10,*up,4,0,0,*free,RT_ALWAYS,,"* * * * *",0,false,0s,,0.01,1m,1s cgrates.org,RT_RETAIL1,,,0,*up,4,0,0,*free,RT_ALWAYS,,"* * * * *",0,false,0s,,0.4,1m,30s cgrates.org,RT_RETAIL1,,,,,,,,,RT_ALWAYS,,"* * * * *",0,false,1m,,0.2,1m,10s diff --git a/engine/caches.go b/engine/caches.go index 9f4f3f3ba..28bd29e8d 100644 --- a/engine/caches.go +++ b/engine/caches.go @@ -36,6 +36,8 @@ var Cache *CacheS func init() { Cache = NewCacheS(config.CgrConfig(), nil, nil) + gob.Register(new(AttributeProfile)) + gob.Register(new(AttributeProfileWithOpts)) // Threshold gob.Register(new(Threshold)) gob.Register(new(ThresholdProfile)) @@ -96,6 +98,11 @@ func init() { gob.Register(url.Values{}) gob.Register(json.RawMessage{}) gob.Register(BalanceSummaries{}) + + gob.Register(new(utils.ArgCacheReplicateSet)) + gob.Register(new(utils.ArgCacheReplicateRemove)) + + gob.Register(utils.StringSet{}) } //SetCache shared the cache from other subsystems diff --git a/general_tests/export_it_test.go b/general_tests/export_it_test.go index ecf3765ff..fb85bb106 100644 --- a/general_tests/export_it_test.go +++ b/general_tests/export_it_test.go @@ -368,9 +368,9 @@ func testExpVerifyRateProfiles(t *testing.T) { splPrf := &engine.RateProfile{ Tenant: "cgrates.org", ID: "RT_SPECIAL_1002", - FilterIDs: []string{}, + FilterIDs: []string{"*string:~*req.Account:1002"}, ActivationInterval: nil, - Weight: 0, + Weight: 10, RoundingDecimals: 4, RoundingMethod: utils.ROUNDING_UP, MinCost: 0, diff --git a/utils/consts.go b/utils/consts.go index 5c8edf561..5f616a783 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -506,6 +506,7 @@ const ( MetaEEs = "*ees" MetaRateS = "*rates" MetaContinue = "*continue" + MetaUp = "*up" Migrator = "migrator" UnsupportedMigrationTask = "unsupported migration task" NoStorDBConnection = "not connected to StorDB"