diff --git a/data/conf/samples/gocs/au_site/cgrates.json b/data/conf/samples/gocs/au_site/cgrates.json new file mode 100644 index 000000000..9afd5bcc5 --- /dev/null +++ b/data/conf/samples/gocs/au_site/cgrates.json @@ -0,0 +1,85 @@ +{ + + "general": { + "log_level": 7, + "node_id": "AU_SITE", + }, + + "listen": { + "rpc_json": ":3012", + "rpc_gob": ":3013", + "http": ":3080", + }, + + + "rpc_conns": { + "conn1": { + "strategy": "*first", + "conns": [{"address": "127.0.0.1:3012", "transport":"*json"}], + }, + "*internal": { + "strategy": "*first", + "conns": [{"address": "*internal"}], + } + }, + + + "data_db": { + "db_type": "*internal", + "remote_conns": [ + {"address": "127.0.0.1:4012", "transport":"*json"} + ], + "items":{ + "*accounts":{"remote":true}, + "*reverse_destinations": {"remote":true}, + "*destinations": {"remote":true}, + "*rating_plans": {"remote":true}, + "*rating_profiles":{"remote":true}, + "*actions":{"remote":true}, + "*action_plans": {"remote":true}, + "*account_action_plans":{"remote":true}, + "*action_triggers":{"remote":true}, + "*shared_groups":{"remote":true}, + "*timings": {"remote":true}, + "*filters": {"remote":true}, + "*supplier_profiles":{"remote":true}, + "*attribute_profiles":{"remote":true}, + "*charger_profiles": {"remote":true}, + "*dispatcher_profiles":{"remote":true}, + "*dispatcher_hosts":{"remote":true}, + "*filter_indexes" :{"remote":true}, + "*load_ids":{"remote":true} + } + }, + + "stor_db": { + "db_password": "CGRateS.org", + }, + + + "rals": { + "enabled": true, + "max_increments":3000000, + }, + + + "scheduler": { + "enabled": true, + }, + + + "chargers": { + "enabled": true, + }, + + "sessions": { + "enabled": true, + "rals_conns": ["*internal"], + "chargers_conns": ["*internal"], + }, + + "apier": { + "caches_conns":["conn1"], + "scheduler_conns": ["conn1"], + } +} diff --git a/data/conf/samples/gocs/dsp_site/cgrates.json b/data/conf/samples/gocs/dsp_site/cgrates.json new file mode 100644 index 000000000..1a1e57b54 --- /dev/null +++ b/data/conf/samples/gocs/dsp_site/cgrates.json @@ -0,0 +1,50 @@ +{ + + "general": { + "log_level": 7, + "node_id": "DISPATCH_SITE", + "reconnects": 2, + "reply_timeout": "5s", + }, + + + "listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080" + }, + + "data_db": { + "db_type": "*redis", + "db_port": 6379, + "db_name": "13" + }, + + "cache":{ + "*dispatcher_routes": {"limit": -1, "ttl": "2s"} + }, + + "stor_db": { + "db_password": "CGRateS.org" + }, + + + "rals": { + "enabled": true + }, + + + "scheduler": { + "enabled": true + }, + + "dispatchers":{ + "enabled": true + }, + + "apier": { + "scheduler_conns": ["*internal"] + } + + +} diff --git a/data/conf/samples/gocs/us_site/cgrates.json b/data/conf/samples/gocs/us_site/cgrates.json new file mode 100644 index 000000000..6d8fa20b2 --- /dev/null +++ b/data/conf/samples/gocs/us_site/cgrates.json @@ -0,0 +1,105 @@ +{ + + "general": { + "log_level": 7, + "node_id": "US_SITE", + }, + + "listen": { + "rpc_json": ":4012", + "rpc_gob": ":4013", + "http": ":4080", + }, + + "rpc_conns": { + "conn1": { + "strategy": "*first", + "conns": [{"address": "127.0.0.1:4012", "transport":"*json"}], + }, + "*internal": { + "strategy": "*first", + "conns": [{"address": "*internal"}], + } + }, + + "data_db": { + "db_type": "*redis", + "db_port": 6379, + "db_name": "11", + "replication_conns": [ + {"address": "127.0.0.1:3012", "transport":"*json"} + ], + "items":{ + "*accounts":{"replicate":true}, + "*reverse_destinations": {"replicate":false}, + "*destinations": {"replicate":false}, + "*rating_plans": {"replicate":false}, + "*rating_profiles":{"replicate":false}, + "*actions":{"replicate":false}, + "*action_plans": {"replicate":false}, + "*account_action_plans":{"replicate":false}, + "*action_triggers":{"replicate":false}, + "*shared_groups":{"replicate":false}, + "*timings": {"replicate":false}, + "*resource_profiles":{"replicate":false}, + "*resources":{"replicate":false}, + "*statqueue_profiles": {"replicate":false}, + "*statqueues": {"replicate":false}, + "*threshold_profiles": {"replicate":false}, + "*thresholds": {"replicate":false}, + "*filters": {"replicate":false}, + "*supplier_profiles":{"replicate":false}, + "*attribute_profiles":{"replicate":false}, + "*charger_profiles": {"replicate":false}, + "*dispatcher_profiles":{"replicate":false}, + "*dispatcher_hosts":{"replicate":false}, + "*filter_indexes" :{"replicate":false}, + "*load_ids":{"replicate":false} + } + }, + + "stor_db": { + "db_password": "CGRateS.org", + }, + + + "rals": { + "enabled": true, + "max_increments":3000000, + }, + + + "scheduler": { + "enabled": true, + "cdrs_conns": ["conn1"], + }, + + + "cdrs": { + "enabled": true, + "chargers_conns":["conn1"], + "rals_conns": ["*internal"], + }, + + + "chargers": { + "enabled": true, + }, + + + "sessions": { + "enabled": true, + "listen_bijson": ":4014", + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + }, + + + "apier": { + "caches_conns":["conn1"], + "scheduler_conns": ["conn1"], + }, + + +} diff --git a/data/tariffplans/gocs/dsp_site/DispatcherHosts.csv b/data/tariffplans/gocs/dsp_site/DispatcherHosts.csv new file mode 100644 index 000000000..d3b8a6be5 --- /dev/null +++ b/data/tariffplans/gocs/dsp_site/DispatcherHosts.csv @@ -0,0 +1,4 @@ +#Tenant[0],ID[1],Address[2],Transport[3],TLS[4] +cgrates.org,AU_SITE,127.0.0.1:3012,*json,false +cgrates.org,US_SITE,127.0.0.1:4012,*json,false +cgrates.org,SELF,*internal,, \ No newline at end of file diff --git a/data/tariffplans/gocs/dsp_site/DispatcherProfiles.csv b/data/tariffplans/gocs/dsp_site/DispatcherProfiles.csv new file mode 100644 index 000000000..da1647dee --- /dev/null +++ b/data/tariffplans/gocs/dsp_site/DispatcherProfiles.csv @@ -0,0 +1,4 @@ +#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight +cgrates.org,BROADCAST,*sessions,,,*broadcast,,AU_SITE,,20,false,,30 +cgrates.org,BROADCAST,,,,,,US_SITE,,20,,, +cgrates.org,SELF,*any,,,*weight,,SELF,,20,false,,10 \ No newline at end of file diff --git a/general_tests/gocs_it_test.go b/general_tests/gocs_it_test.go new file mode 100644 index 000000000..2b1e03c2f --- /dev/null +++ b/general_tests/gocs_it_test.go @@ -0,0 +1,611 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package general_tests + +import ( + "net/rpc" + "os/exec" + "path" + "reflect" + "testing" + "time" + + v1 "github.com/cgrates/cgrates/apier/v1" + + "github.com/cgrates/cgrates/sessions" + + "github.com/cgrates/cgrates/utils" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" +) + +var ( + auCfgPath, usCfgPath, dspCfgPath string + auCfg, usCfg, dspCfg *config.CGRConfig + auRPC, usRPC, dspRPC *rpc.Client + auEngine, usEngine, dspEngine *exec.Cmd + sTestsGOCS = []func(t *testing.T){ + testGOCSInitCfg, + testGOCSResetDB, + testGOCSStartEngine, + testGOCSApierRpcConn, + testGOCSLoadData, + testGOCSAuthSession, + testGOCSInitSession, + testGOCSKillUSEngine, + testGOCSUpdateSession, + testGOCSStartUSEngine, + testGOCSVerifyAccountsAfterStart, + testGOCSUpdateSession2, + testGOCSTerminateSession, + testGOCSProcessCDR, + testGOCSStopCgrEngine, + } +) + +// Test start here +func TestGOCSIT(t *testing.T) { + for _, stest := range sTestsGOCS { + t.Run("TestGOCSIT", stest) + } +} + +//Init Config +func testGOCSInitCfg(t *testing.T) { + auCfgPath = path.Join(*dataDir, "conf", "samples", "gocs", "au_site") + if auCfg, err = config.NewCGRConfigFromPath(auCfgPath); err != nil { + t.Fatal(err) + } + auCfg.DataFolderPath = *dataDir + config.SetCgrConfig(auCfg) + usCfgPath = path.Join(*dataDir, "conf", "samples", "gocs", "us_site") + if usCfg, err = config.NewCGRConfigFromPath(usCfgPath); err != nil { + t.Fatal(err) + } + dspCfgPath = path.Join(*dataDir, "conf", "samples", "gocs", "dsp_site") + if dspCfg, err = config.NewCGRConfigFromPath(dspCfgPath); err != nil { + t.Fatal(err) + } +} + +// Remove data in both rating and accounting db +func testGOCSResetDB(t *testing.T) { + if err := engine.InitDataDb(auCfg); err != nil { + t.Fatal(err) + } + if err := engine.InitDataDb(usCfg); err != nil { + t.Fatal(err) + } + if err := engine.InitDataDb(dspCfg); err != nil { + t.Fatal(err) + } + if err := engine.InitStorDb(auCfg); err != nil { + t.Fatal(err) + } + if err := engine.InitStorDb(usCfg); err != nil { + t.Fatal(err) + } + if err := engine.InitStorDb(dspCfg); err != nil { + t.Fatal(err) + } + // give some time to flush DataDB and StorDB for all 3 engines + time.Sleep(100 * time.Millisecond) +} + +// Start CGR Engine +func testGOCSStartEngine(t *testing.T) { + if usEngine, err = engine.StopStartEngine(usCfgPath, *waitRater); err != nil { + t.Fatal(err) + } + if auEngine, err = engine.StartEngine(auCfgPath, *waitRater); err != nil { + t.Fatal(err) + } + if dspEngine, err = engine.StartEngine(dspCfgPath, *waitRater); err != nil { + t.Fatal(err) + } + time.Sleep(10 * time.Millisecond) + +} + +// Connect rpc client to rater +func testGOCSApierRpcConn(t *testing.T) { + if auRPC, err = newRPCClient(auCfg.ListenCfg()); err != nil { + t.Fatal(err) + } + if usRPC, err = newRPCClient(usCfg.ListenCfg()); err != nil { + t.Fatal(err) + } + if dspRPC, err = newRPCClient(dspCfg.ListenCfg()); err != nil { + t.Fatal(err) + } +} + +func testGOCSLoadData(t *testing.T) { + chargerProfile := &v1.ChargerWithCache{ + ChargerProfile: &engine.ChargerProfile{ + Tenant: "cgrates.org", + ID: "DEFAULT", + RunID: utils.MetaDefault, + AttributeIDs: []string{utils.META_NONE}, + Weight: 10, + }, + } + var result string + if err := usRPC.Call(utils.ApierV1SetChargerProfile, chargerProfile, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + var rpl *engine.ChargerProfile + if err := usRPC.Call(utils.ApierV1GetChargerProfile, + &utils.TenantID{Tenant: "cgrates.org", ID: "DEFAULT"}, &rpl); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(chargerProfile.ChargerProfile, rpl) { + t.Errorf("Expecting : %+v, received: %+v", chargerProfile.ChargerProfile, rpl) + } + if err := usRPC.Call(utils.ApierV1SetChargerProfile, chargerProfile, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + if err := usRPC.Call(utils.ApierV1GetChargerProfile, + &utils.TenantID{Tenant: "cgrates.org", ID: "DEFAULT"}, &rpl); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(chargerProfile.ChargerProfile, rpl) { + t.Errorf("Expecting : %+v, received: %+v", chargerProfile.ChargerProfile, rpl) + } + + wchan := make(chan struct{}, 1) + go func() { + loaderPath, err := exec.LookPath("cgr-loader") + if err != nil { + t.Error(err) + } + loader := exec.Command(loaderPath, "-config_path", dspCfgPath, "-path", path.Join(*dataDir, "tariffplans", "gocs", "dsp_site")) + + if err := loader.Start(); err != nil { + t.Error(err) + } + loader.Wait() + wchan <- struct{}{} + }() + select { + case <-wchan: + case <-time.After(1 * time.Second): + t.Errorf("cgr-loader failed: ") + } + var acnt *engine.Account + acntAttrs := &utils.AttrGetAccount{ + Tenant: "cgrates.org", + Account: "1001"} + attrSetBalance := utils.AttrSetBalance{ + Tenant: acntAttrs.Tenant, + Account: acntAttrs.Account, + BalanceType: utils.VOICE, + Balance: map[string]interface{}{ + utils.ID: "BALANCE1", + utils.Value: 3540000000000, + utils.Weight: 20, + }, + } + // add a voice balance of 59 minutes + var reply string + if err := usRPC.Call(utils.ApierV1SetBalance, attrSetBalance, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("received: %s", reply) + } + expectedVoice := 3540000000000.0 + if err := usRPC.Call(utils.ApierV2GetAccount, acntAttrs, &acnt); err != nil { + t.Error(err) + } else if rply := acnt.BalanceMap[utils.VOICE].GetTotalValue(); rply != expectedVoice { + t.Errorf("Expecting: %v, received: %v", expectedVoice, rply) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups on au_site +} + +func testGOCSAuthSession(t *testing.T) { + authUsage := 5 * time.Minute + args := &sessions.V1AuthorizeArgs{ + GetMaxUsage: true, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "TestSSv1ItAuth", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ToR: utils.VOICE, + utils.OriginID: "testGOCS", + utils.Category: "call", + utils.RequestType: utils.META_PREPAID, + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.Usage: authUsage, + }, + }, + } + var rply sessions.V1AuthorizeReply + if err := dspRPC.Call(utils.SessionSv1AuthorizeEvent, args, &rply); err != nil { + t.Fatal(err) + } + if rply.MaxUsage != authUsage { + t.Errorf("Unexpected MaxUsage: %v", rply.MaxUsage) + } +} + +func testGOCSInitSession(t *testing.T) { + initUsage := 5 * time.Minute + args := &sessions.V1InitSessionArgs{ + InitSession: true, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "TestSSv1ItInitiateSession", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ToR: utils.VOICE, + utils.OriginID: "testGOCS", + utils.Category: "call", + utils.RequestType: utils.META_PREPAID, + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: initUsage, + }, + }, + } + var rply sessions.V1InitSessionReply + if err := dspRPC.Call(utils.SessionSv1InitiateSession, + args, &rply); err != nil { + t.Fatal(err) + } + if rply.MaxUsage != initUsage { + t.Errorf("Unexpected MaxUsage: %v", rply.MaxUsage) + } + // give a bit of time to session to be replicate + time.Sleep(10 * time.Millisecond) + + aSessions := make([]*sessions.ExternalSession, 0) + if err := auRPC.Call(utils.SessionSv1GetActiveSessions, new(utils.SessionFilter), &aSessions); err != nil { + t.Error(err) + } else if len(aSessions) != 1 { + t.Errorf("wrong active sessions: %s \n , and len(aSessions) %+v", utils.ToJSON(aSessions), len(aSessions)) + } else if aSessions[0].NodeID != "AU_SITE" { + t.Errorf("Expecting : %+v, received: %+v", "AU_SITE", aSessions[0].NodeID) + } else if aSessions[0].Usage != time.Duration(5*time.Minute) { + t.Errorf("Expecting : %+v, received: %+v", time.Duration(5*time.Minute), aSessions[0].MaxCostSoFar) + } + + aSessions = make([]*sessions.ExternalSession, 0) + if err := usRPC.Call(utils.SessionSv1GetActiveSessions, new(utils.SessionFilter), &aSessions); err != nil { + t.Error(err) + } else if len(aSessions) != 1 { + t.Errorf("wrong active sessions: %s \n , and len(aSessions) %+v", utils.ToJSON(aSessions), len(aSessions)) + } else if aSessions[0].NodeID != "US_SITE" { + t.Errorf("Expecting : %+v, received: %+v", "US_SITE", aSessions[0].NodeID) + } else if aSessions[0].Usage != time.Duration(5*time.Minute) { + t.Errorf("Expecting : %+v, received: %+v", time.Duration(5*time.Minute), aSessions[0].Usage) + } + + var acnt *engine.Account + attrAcc := &utils.AttrGetAccount{ + Tenant: "cgrates.org", + Account: "1001", + } + + // 59 mins - 5 mins = 54 mins + if err := auRPC.Call(utils.ApierV2GetAccount, attrAcc, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != 3240000000000.0 { + t.Errorf("Expecting : %+v, received: %+v", 3240000000000.0, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } + + if err := usRPC.Call(utils.ApierV2GetAccount, attrAcc, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != 3240000000000.0 { + t.Errorf("Expecting : %+v, received: %+v", 3240000000000.0, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } + +} + +func testGOCSKillUSEngine(t *testing.T) { + if err := usEngine.Process.Kill(); err != nil { + t.Error(err) + } + time.Sleep(10 * time.Millisecond) +} + +func testGOCSUpdateSession(t *testing.T) { + reqUsage := 5 * time.Minute + args := &sessions.V1UpdateSessionArgs{ + UpdateSession: true, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "TestSSv1ItUpdateSession", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ToR: utils.VOICE, + utils.OriginID: "testGOCS", + utils.Category: "call", + utils.RequestType: utils.META_PREPAID, + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: reqUsage, + }, + }, + } + var rply sessions.V1UpdateSessionReply + + // right now dispatcher receive utils.ErrPartiallyExecuted + // in case of of engines fails + if err := dspRPC.Call(utils.SessionSv1UpdateSession, args, &rply); err == nil || err.Error() != utils.ErrPartiallyExecuted.Error() { + t.Errorf("Expecting : %+v, received: %+v", utils.ErrPartiallyExecuted, err) + } + + aSessions := make([]*sessions.ExternalSession, 0) + if err := auRPC.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err != nil { + t.Error(err) + } else if len(aSessions) != 1 { + t.Errorf("wrong active sessions: %s", utils.ToJSON(aSessions)) + } else if aSessions[0].NodeID != "AU_SITE" { + t.Errorf("Expecting : %+v, received: %+v", "AU_SITE", aSessions[0].NodeID) + } else if aSessions[0].Usage != time.Duration(10*time.Minute) { + t.Errorf("Expecting : %+v, received: %+v", time.Duration(5*time.Minute), aSessions[0].Usage) + } + + var acnt *engine.Account + attrAcc := &utils.AttrGetAccount{ + Tenant: "cgrates.org", + Account: "1001", + } + + // balanced changed in AU_SITE + // 54 min - 5 mins = 49 min + if err := auRPC.Call(utils.ApierV2GetAccount, attrAcc, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != 2940000000000.0 { + t.Errorf("Expecting : %+v, received: %+v", 2940000000000.0, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } + +} + +func testGOCSStartUSEngine(t *testing.T) { + if usEngine, err = engine.StartEngine(usCfgPath, *waitRater); err != nil { + t.Fatal(err) + } + if usRPC, err = newRPCClient(usCfg.ListenCfg()); err != nil { + t.Fatal(err) + } +} + +func testGOCSVerifyAccountsAfterStart(t *testing.T) { + var acnt *engine.Account + attrAcc := &utils.AttrGetAccount{ + Tenant: "cgrates.org", + Account: "1001", + } + // because US_SITE was down we should notice a difference between balance from accounts from US_SITE and AU_SITE + if err := auRPC.Call(utils.ApierV2GetAccount, attrAcc, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != 2940000000000.0 { + t.Errorf("Expecting : %+v, received: %+v", 2940000000000.0, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } + + if err := usRPC.Call(utils.ApierV2GetAccount, attrAcc, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != 3240000000000.0 { + t.Errorf("Expecting : %+v, received: %+v", 3240000000000.0, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } +} + +func testGOCSUpdateSession2(t *testing.T) { + reqUsage := 5 * time.Minute + args := &sessions.V1UpdateSessionArgs{ + UpdateSession: true, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "TestSSv1ItUpdateSession2", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ToR: utils.VOICE, + utils.OriginID: "testGOCS", + utils.Category: "call", + utils.RequestType: utils.META_PREPAID, + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: reqUsage, + }, + }, + } + var rply sessions.V1UpdateSessionReply + // Update the session on both US_SITE and AU_SITE + // With this update the account should be replicate from US_SITE to AU_SITE and forgot about the update than happens on AU_SITE + if err := dspRPC.Call(utils.SessionSv1UpdateSession, args, &rply); err != nil { + t.Errorf("Expecting : %+v, received: %+v", nil, err) + } else if rply.MaxUsage != reqUsage { + t.Errorf("Unexpected MaxUsage: %v", rply.MaxUsage) + } + + aSessions := make([]*sessions.ExternalSession, 0) + if err := auRPC.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err != nil { + t.Error(err) + } else if len(aSessions) != 1 { + t.Errorf("wrong active sessions: %s", utils.ToJSON(aSessions)) + } else if aSessions[0].NodeID != "AU_SITE" { + t.Errorf("Expecting : %+v, received: %+v", "AU_SITE", aSessions[0].NodeID) + } else if aSessions[0].Usage != time.Duration(15*time.Minute) { + t.Errorf("Expecting : %+v, received: %+v", time.Duration(15*time.Minute), aSessions[0].Usage) + } + + aSessions = make([]*sessions.ExternalSession, 0) + if err := usRPC.Call(utils.SessionSv1GetActiveSessions, new(utils.SessionFilter), &aSessions); err != nil { + t.Error(err) + } else if len(aSessions) != 1 { + t.Errorf("wrong active sessions: %s \n , and len(aSessions) %+v", utils.ToJSON(aSessions), len(aSessions)) + } else if aSessions[0].NodeID != "US_SITE" { + t.Errorf("Expecting : %+v, received: %+v", "US_SITE", aSessions[0].NodeID) + } else if aSessions[0].Usage != time.Duration(5*time.Minute) { + t.Errorf("Expecting : %+v, received: %+v", time.Duration(5*time.Minute), aSessions[0].Usage) + } + + var acnt *engine.Account + attrAcc := &utils.AttrGetAccount{ + Tenant: "cgrates.org", + Account: "1001", + } + + if err := auRPC.Call(utils.ApierV2GetAccount, attrAcc, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != 2940000000000.0 { + t.Errorf("Expecting : %+v, received: %+v", 2940000000000.0, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } + + if err := usRPC.Call(utils.ApierV2GetAccount, attrAcc, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != 2940000000000.0 { + t.Errorf("Expecting : %+v, received: %+v", 2940000000000.0, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } +} + +func testGOCSTerminateSession(t *testing.T) { + args := &sessions.V1TerminateSessionArgs{ + TerminateSession: true, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testGOCSTerminateSession", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ToR: utils.VOICE, + utils.OriginID: "testGOCS", + utils.Category: "call", + utils.RequestType: utils.META_PREPAID, + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: 15 * time.Minute, + }, + }, + } + var rply string + // we send terminate session with the correct usage, but because the US_SITE was down + // this lost the previous session operations and will debit more + if err := dspRPC.Call(utils.SessionSv1TerminateSession, + args, &rply); err != nil { + t.Error(err) + } + if rply != utils.OK { + t.Errorf("Unexpected reply: %s", rply) + } + aSessions := make([]*sessions.ExternalSession, 0) + if err := auRPC.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Expected error %s received error %v and reply %s", utils.ErrNotFound, err, utils.ToJSON(aSessions)) + } + if err := usRPC.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Expected error %s received error %v and reply %s", utils.ErrNotFound, err, utils.ToJSON(aSessions)) + } + + var acnt *engine.Account + attrAcc := &utils.AttrGetAccount{ + Tenant: "cgrates.org", + Account: "1001", + } + + if err := auRPC.Call(utils.ApierV2GetAccount, attrAcc, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != 2340000000000.0 { + t.Errorf("Expecting : %+v, received: %+v", 2340000000000.0, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } + + if err := usRPC.Call(utils.ApierV2GetAccount, attrAcc, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != 2340000000000.0 { + t.Errorf("Expecting : %+v, received: %+v", 2340000000000.0, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } +} + +func testGOCSProcessCDR(t *testing.T) { + args := &utils.CGREventWithArgDispatcher{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "TestSSv1ItProcessCDR", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ToR: utils.VOICE, + utils.OriginID: "testGOCS", + utils.Category: "call", + utils.RequestType: utils.META_PREPAID, + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: 15 * time.Minute, + }, + }, + } + var rply string + // process cdr should apply the correction because terminate was debited to much + // 59 - 15 = 44 minutes + if err := usRPC.Call(utils.SessionSv1ProcessCDR, + args, &rply); err != nil { + t.Error(err) + } + if rply != utils.OK { + t.Errorf("Unexpected reply: %s", rply) + } + time.Sleep(100 * time.Millisecond) + var acnt *engine.Account + attrAcc := &utils.AttrGetAccount{ + Tenant: "cgrates.org", + Account: "1001", + } + + if err := auRPC.Call(utils.ApierV2GetAccount, attrAcc, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != 2640000000000.0 { + t.Errorf("Expecting : %+v, received: %+v", 2640000000000.0, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } + + if err := usRPC.Call(utils.ApierV2GetAccount, attrAcc, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != 2640000000000.0 { + t.Errorf("Expecting : %+v, received: %+v", 2640000000000.0, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } +} + +func testGOCSStopCgrEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +} diff --git a/services/datadb.go b/services/datadb.go index c9492405f..80180774a 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -76,7 +76,7 @@ func (db *DataDBService) Start() (err error) { db.cfg.TlsCfg().ClientCerificate, db.cfg.TlsCfg().CaCertificate, db.cfg.GeneralCfg().ConnectAttempts, db.cfg.GeneralCfg().Reconnects, db.cfg.GeneralCfg().ConnectTimeout, db.cfg.GeneralCfg().ReplyTimeout, - db.cfg.DataDbCfg().RmtConns, nil, false) + db.cfg.DataDbCfg().RmtConns, nil, true) if err != nil { log.Fatalf("Coud not confignure dataDB remote connections: %s", err.Error()) } @@ -87,7 +87,7 @@ func (db *DataDBService) Start() (err error) { db.cfg.TlsCfg().ClientCerificate, db.cfg.TlsCfg().CaCertificate, db.cfg.GeneralCfg().ConnectAttempts, db.cfg.GeneralCfg().Reconnects, db.cfg.GeneralCfg().ConnectTimeout, db.cfg.GeneralCfg().ReplyTimeout, - db.cfg.DataDbCfg().RplConns, nil, false) + db.cfg.DataDbCfg().RplConns, nil, true) if err != nil { log.Fatalf("Coud not confignure dataDB replication connections: %s", err.Error()) }