From 889ea0a87fd50b0441302b9f5e45ad70f5a474c5 Mon Sep 17 00:00:00 2001 From: andronache Date: Mon, 24 May 2021 16:43:00 +0300 Subject: [PATCH] Graceful shutdown integration test for sessions using replication_conns in general tests --- .../cgrates.json | 73 ++++ .../cgrates.json | 36 ++ .../cgrates.json | 73 ++++ .../cgrates.json | 45 +++ .../session_graceful_shutdown_it_test.go | 325 ++++++++++++++++++ 5 files changed, 552 insertions(+) create mode 100644 data/conf/samples/sessions_replication/rplcTestGracefulShutdown1_mongo/cgrates.json create mode 100644 data/conf/samples/sessions_replication/rplcTestGracefulShutdown1_mysql/cgrates.json create mode 100644 data/conf/samples/sessions_replication/rplcTestGracefulShutdown2_mongo/cgrates.json create mode 100644 data/conf/samples/sessions_replication/rplcTestGracefulShutdown2_mysql/cgrates.json create mode 100644 general_tests/session_graceful_shutdown_it_test.go diff --git a/data/conf/samples/sessions_replication/rplcTestGracefulShutdown1_mongo/cgrates.json b/data/conf/samples/sessions_replication/rplcTestGracefulShutdown1_mongo/cgrates.json new file mode 100644 index 000000000..b70e32726 --- /dev/null +++ b/data/conf/samples/sessions_replication/rplcTestGracefulShutdown1_mongo/cgrates.json @@ -0,0 +1,73 @@ +{ +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +"general": { + "log_level": 7, + "node_id":"MasterReplication", +}, + +"listen": { + "rpc_json": "127.0.0.1:2012", + "rpc_gob": "127.0.0.1:2013", + "http": "127.0.0.1:2080", +}, + + +"rpc_conns": { + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:22012", "transport": "*json"}], + }, +}, + + +"data_db": { + "db_type": "mongo", + "db_name": "10", + "db_port": 27017, +}, + + +"stor_db": { + "db_type": "mongo", + "db_name": "cgrates", + "db_port": 27017, +}, + +"rals": { + "enabled": true, +}, + +"schedulers": { + "enabled": true, +}, + +"cdrs": { + "enabled": true, +}, + +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + +"sessions": { + "enabled": true, + "debit_interval": "5ms", // interval to perform debits on. + "replication_conns": ["rplConn"], + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], +}, + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + +} diff --git a/data/conf/samples/sessions_replication/rplcTestGracefulShutdown1_mysql/cgrates.json b/data/conf/samples/sessions_replication/rplcTestGracefulShutdown1_mysql/cgrates.json new file mode 100644 index 000000000..dc65932eb --- /dev/null +++ b/data/conf/samples/sessions_replication/rplcTestGracefulShutdown1_mysql/cgrates.json @@ -0,0 +1,36 @@ +{ + // Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments + // Copyright (C) ITsysCOM GmbH + "general": { + "log_level": 7, + "node_id":"MasterReplication", + }, + + "listen": { + "rpc_json": "127.0.0.1:22012", + "rpc_gob": "127.0.0.1:22013", + "http": "127.0.0.1:22080", + }, + + + "stor_db": { // database used to store offline tariff plans and CDRs + "db_password": "CGRateS.org", // password to use when connecting to stordb + }, + + + "chargers": { + "enabled": true, + }, + + "sessions": { + "enabled": true, + "listen_bijson": "127.0.0.1:22014", + "chargers_conns": ["*internal"], + }, + + "apiers": { + "enabled": true, + }, + + +} diff --git a/data/conf/samples/sessions_replication/rplcTestGracefulShutdown2_mongo/cgrates.json b/data/conf/samples/sessions_replication/rplcTestGracefulShutdown2_mongo/cgrates.json new file mode 100644 index 000000000..b70e32726 --- /dev/null +++ b/data/conf/samples/sessions_replication/rplcTestGracefulShutdown2_mongo/cgrates.json @@ -0,0 +1,73 @@ +{ +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +"general": { + "log_level": 7, + "node_id":"MasterReplication", +}, + +"listen": { + "rpc_json": "127.0.0.1:2012", + "rpc_gob": "127.0.0.1:2013", + "http": "127.0.0.1:2080", +}, + + +"rpc_conns": { + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:22012", "transport": "*json"}], + }, +}, + + +"data_db": { + "db_type": "mongo", + "db_name": "10", + "db_port": 27017, +}, + + +"stor_db": { + "db_type": "mongo", + "db_name": "cgrates", + "db_port": 27017, +}, + +"rals": { + "enabled": true, +}, + +"schedulers": { + "enabled": true, +}, + +"cdrs": { + "enabled": true, +}, + +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + +"sessions": { + "enabled": true, + "debit_interval": "5ms", // interval to perform debits on. + "replication_conns": ["rplConn"], + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], +}, + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + +} diff --git a/data/conf/samples/sessions_replication/rplcTestGracefulShutdown2_mysql/cgrates.json b/data/conf/samples/sessions_replication/rplcTestGracefulShutdown2_mysql/cgrates.json new file mode 100644 index 000000000..72d59740d --- /dev/null +++ b/data/conf/samples/sessions_replication/rplcTestGracefulShutdown2_mysql/cgrates.json @@ -0,0 +1,45 @@ +{ +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +"general": { + "log_level": 7, + "node_id":"MasterReplication", +}, + +"listen": { + "rpc_json": "127.0.0.1:2012", + "rpc_gob": "127.0.0.1:2013", + "http": "127.0.0.1:2080", +}, + + +"rpc_conns": { + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:22012", "transport": "*json"}], + }, +}, + + +"stor_db": { // database used to store offline tariff plans and CDRs + "db_password": "CGRateS.org", // password to use when connecting to stordb +}, + + +"chargers": { + "enabled": true, +}, + +"sessions": { + "enabled": true, + "replication_conns": ["rplConn"], + "chargers_conns": ["*internal"], + "listen_bijson": "127.0.0.1:2014", +}, + +"apiers": { + "enabled": true, +}, + + +} diff --git a/general_tests/session_graceful_shutdown_it_test.go b/general_tests/session_graceful_shutdown_it_test.go new file mode 100644 index 000000000..97b8130fe --- /dev/null +++ b/general_tests/session_graceful_shutdown_it_test.go @@ -0,0 +1,325 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package general_tests + +import ( + "net/rpc" + "os/exec" + "path" + "reflect" + "syscall" + "testing" + "time" + + v1 "github.com/cgrates/cgrates/apier/v1" + + "github.com/cgrates/cgrates/sessions" + + "github.com/cgrates/cgrates/engine" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +var ( + smgRplcCfgPath1, smgRplcCfgPath2 string + smgRplcCfgDIR1, smgRplcCfgDIR2 string + smgRplCfg1, smgRplCfg2 *config.CGRConfig + smgRplcRPC1, smgRplcRPC2 *rpc.Client + testEngine1, testEngine2 *exec.Cmd + sTestsSession1 = []func(t *testing.T){ + testSessionSRplcInitCfg, + testSessionSRplcResetDB, + testSessionSRplcStartEngine, + testSessionSRplcApierRpcConn, + testSessionSRplcApierGetActiveSessionsNotFound, + testSessionSRplcApierSetChargerS, + testSessionSRplcApierGetInitateSessions, + testSessionSRplcApierGetActiveSessions, + testSessionSRplcApierGetPassiveSessions, + testSessionSRplcApierStopSession2, + testSessionSRplcApierGetPassiveSessionsAfterStop, + testSessionSRplcStopCgrEngine, + } +) + +func TestSessionSRplcGracefulShutdown(t *testing.T) { + switch *dbType { + case utils.MetaInternal: + t.SkipNow() + case utils.MetaMySQL: + smgRplcCfgDIR1 = "rplcTestGracefulShutdown1_mysql" + smgRplcCfgDIR2 = "rplcTestGracefulShutdown2_mysql" + case utils.MetaMongo: + t.SkipNow() + case utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("Unknown Database type") + } + + for _, stest1 := range sTestsSession1 { + t.Run(*dbType, stest1) + } +} + +//Init Config +func testSessionSRplcInitCfg(t *testing.T) { + smgRplcCfgPath1 = path.Join(*dataDir, "conf", "samples", "sessions_replication", smgRplcCfgDIR1) + if smgRplCfg1, err = config.NewCGRConfigFromPath(smgRplcCfgPath1); err != nil { + t.Fatal(err) + } + smgRplcCfgPath2 = path.Join(*dataDir, "conf", "samples", "sessions_replication", smgRplcCfgDIR2) + if smgRplCfg2, err = config.NewCGRConfigFromPath(smgRplcCfgPath2); err != nil { + t.Fatal(err) + } +} + +// Remove data in both rating and accounting db +func testSessionSRplcResetDB(t *testing.T) { + if err := engine.InitDataDb(smgRplCfg1); err != nil { + t.Fatal(err) + } + if err := engine.InitStorDb(smgRplCfg1); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine +func testSessionSRplcStartEngine(t *testing.T) { + if _, err = engine.StopStartEngine(smgRplcCfgPath1, *waitRater); err != nil { + t.Fatal(err) + } + if testEngine1, err = engine.StartEngine(smgRplcCfgPath2, *waitRater); err != nil { + t.Fatal(err) + } + +} + +// Connect rpc client to rater +func testSessionSRplcApierRpcConn(t *testing.T) { + if smgRplcRPC1, err = newRPCClient(smgRplCfg1.ListenCfg()); err != nil { + t.Fatal(err) + } + if smgRplcRPC2, err = newRPCClient(smgRplCfg2.ListenCfg()); err != nil { + t.Fatal(err) + } +} + +func testSessionSRplcApierGetActiveSessionsNotFound(t *testing.T) { + aSessions1 := make([]*sessions.ExternalSession, 0) + expected := "NOT_FOUND" + if err := smgRplcRPC1.Call(utils.SessionSv1GetActiveSessions, &utils.SessionFilter{}, &aSessions1); err == nil || err.Error() != expected { + t.Error(err) + } + aSessions2 := make([]*sessions.ExternalSession, 0) + if err := smgRplcRPC2.Call(utils.SessionSv1GetActiveSessions, &utils.SessionFilter{}, &aSessions2); err == nil || err.Error() != expected { + t.Error(err) + } +} + +func testSessionSRplcApierSetChargerS(t *testing.T) { + chargerProfile1 := &v1.ChargerWithAPIOpts{ + ChargerProfile: &engine.ChargerProfile{ + Tenant: "cgrates.org", + ID: "Default", + RunID: utils.MetaDefault, + AttributeIDs: []string{"*none"}, + Weight: 20, + }, + } + var result1 string + if err := smgRplcRPC1.Call(utils.APIerSv1SetChargerProfile, chargerProfile1, &result1); err != nil { + t.Error(err) + } else if result1 != utils.OK { + t.Error("Unexpected reply returned", result1) + } + + chargerProfile2 := &v1.ChargerWithAPIOpts{ + ChargerProfile: &engine.ChargerProfile{ + Tenant: "cgrates.org", + ID: "Default", + RunID: utils.MetaDefault, + AttributeIDs: []string{"*none"}, + Weight: 20, + }, + } + var result2 string + if err := smgRplcRPC2.Call(utils.APIerSv1SetChargerProfile, chargerProfile2, &result2); err != nil { + t.Error(err) + } else if result2 != utils.OK { + t.Error("Unexpected reply returned", result2) + } +} + +func testSessionSRplcApierGetInitateSessions(t *testing.T) { + args := &sessions.V1InitSessionArgs{ + InitSession: true, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "TestSSv1ItInitiateSession", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.RequestType: utils.MetaNone, + utils.CGRID: "testSessionRplCGRID", + utils.OriginID: "testSessionRplORIGINID", + }, + }, + } + var rply sessions.V1InitSessionReply + if err := smgRplcRPC2.Call(utils.SessionSv1InitiateSession, + args, &rply); err != nil { + t.Error(err) + } +} + +func testSessionSRplcApierGetActiveSessions(t *testing.T) { + expected := []*sessions.ExternalSession{ + { + CGRID: "testSessionRplCGRID", + RunID: "*default", + ToR: "", + OriginID: "testSessionRplORIGINID", + OriginHost: "", + Source: "SessionS_", + RequestType: utils.MetaNone, + Tenant: "cgrates.org", + Category: "", + Account: "", + Subject: "", + Destination: "", + SetupTime: time.Time{}, + AnswerTime: time.Time{}, + Usage: 0, + ExtraFields: map[string]string{}, + NodeID: "MasterReplication", + LoopIndex: 0, + DurationIndex: 0, + MaxRate: 0, + MaxRateUnit: 0, + MaxCostSoFar: 0, + DebitInterval: 0, + NextAutoDebit: time.Time{}, + }, + } + aSessions2 := make([]*sessions.ExternalSession, 0) + if err := smgRplcRPC2.Call(utils.SessionSv1GetActiveSessions, &utils.SessionFilter{}, &aSessions2); err != nil { + t.Error(err) + } + if !reflect.DeepEqual(&aSessions2, &expected) { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ToJSON(&aSessions2), utils.ToJSON(&expected)) + + } +} + +func testSessionSRplcApierGetPassiveSessions(t *testing.T) { + expected := []*sessions.ExternalSession{ + { + CGRID: "testSessionRplCGRID", + RunID: "*default", + ToR: "", + OriginID: "testSessionRplORIGINID", + OriginHost: "", + Source: "SessionS_", + RequestType: utils.MetaNone, + Tenant: "cgrates.org", + Category: "", + Account: "", + Subject: "", + Destination: "", + SetupTime: time.Time{}, + AnswerTime: time.Time{}, + Usage: 0, + ExtraFields: map[string]string{}, + NodeID: "MasterReplication", + LoopIndex: 0, + DurationIndex: 0, + MaxRate: 0, + MaxRateUnit: 0, + MaxCostSoFar: 0, + DebitInterval: 0, + NextAutoDebit: time.Time{}, + }, + } + aSessions2 := make([]*sessions.ExternalSession, 0) + if err := smgRplcRPC1.Call(utils.SessionSv1GetPassiveSessions, &utils.SessionFilter{}, &aSessions2); err != nil { + t.Error(err) + } + if !reflect.DeepEqual(&aSessions2, &expected) { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ToJSON(&aSessions2), utils.ToJSON(&expected)) + + } +} + +func testSessionSRplcApierStopSession2(t *testing.T) { + err := testEngine1.Process.Signal(syscall.SIGTERM) + if err != nil { + t.Error(err) + } + err = testEngine1.Wait() + if err != nil { + t.Error(err) + } +} + +func testSessionSRplcApierGetPassiveSessionsAfterStop(t *testing.T) { + expected := []*sessions.ExternalSession{ + { + CGRID: "testSessionRplCGRID", + RunID: "*default", + ToR: "", + OriginID: "testSessionRplORIGINID", + OriginHost: "", + Source: "SessionS_", + RequestType: utils.MetaNone, + Tenant: "cgrates.org", + Category: "", + Account: "", + Subject: "", + Destination: "", + SetupTime: time.Time{}, + AnswerTime: time.Time{}, + Usage: 0, + ExtraFields: map[string]string{}, + NodeID: "MasterReplication", + LoopIndex: 0, + DurationIndex: 0, + MaxRate: 0, + MaxRateUnit: 0, + MaxCostSoFar: 0, + DebitInterval: 0, + NextAutoDebit: time.Time{}, + }, + } + aSessions2 := make([]*sessions.ExternalSession, 0) + if err := smgRplcRPC1.Call(utils.SessionSv1GetPassiveSessions, &utils.SessionFilter{}, &aSessions2); err != nil { + t.Error(err) + } + if !reflect.DeepEqual(&aSessions2, &expected) { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ToJSON(&aSessions2), utils.ToJSON(&expected)) + + } +} + +func testSessionSRplcStopCgrEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +}