From 9f5053328466d32da5a4c0f04073a462c67b3f3a Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 13 Dec 2019 15:04:20 +0200 Subject: [PATCH] Added test for *broadcast strategy for RPC connection --- .../cgrates.json | 113 ++++++++++ general_tests/broadcast_client_it_test.go | 202 ++++++++++++++++++ 2 files changed, 315 insertions(+) create mode 100644 data/conf/samples/internal_broadcast_replication/cgrates.json create mode 100644 general_tests/broadcast_client_it_test.go diff --git a/data/conf/samples/internal_broadcast_replication/cgrates.json b/data/conf/samples/internal_broadcast_replication/cgrates.json new file mode 100644 index 000000000..d9c4a1f15 --- /dev/null +++ b/data/conf/samples/internal_broadcast_replication/cgrates.json @@ -0,0 +1,113 @@ +{ +// CGRateS Configuration file +// + + +"general": { + "log_level": 7, + "reply_timeout": "50s", +}, + +"rpc_conns": { + "broadcast_conn": { + "strategy": "*broadcast", + "conns": [ + {"address": "127.0.0.1:2012", "transport":"*json"}, + {"address": "127.0.0.1:3012", "transport":"*json"}, + ], + }, +}, + + +"listen": { + "rpc_json": ":3012", + "rpc_gob": ":3013", + "http": ":3080", +}, + + +"data_db": { + "db_type": "*internal", +}, + + +"stor_db": { + "db_type": "*internal", +}, + + +"rals": { + "enabled": true, + "thresholds_conns": ["broadcast_conn"], + "max_increments":3000000, +}, + + +"scheduler": { + "enabled": true, + "cdrs_conns": ["broadcast_conn"], +}, + + +"cdrs": { + "enabled": true, + "chargers_conns":["broadcast_conn"], +}, + + +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["broadcast_conn"], +}, + + +"resources": { + "enabled": true, + "store_interval": "-1", + "thresholds_conns": ["broadcast_conn"] +}, + + +"stats": { + "enabled": true, + "store_interval": "-1", + "thresholds_conns": ["broadcast_conn"], +}, + +"thresholds": { + "enabled": true, + "store_interval": "-1", +}, + + +"suppliers": { + "enabled": true, + "prefix_indexed_fields":["Destination"], + "stats_conns": ["broadcast_conn"], + "resources_conns": ["broadcast_conn"], +}, + + +"sessions": { + "enabled": true, + "listen_bijson": "127.0.0.1:3014", // address where to listen for bidirectional JSON-RPC requests + "suppliers_conns": ["broadcast_conn"], + "resources_conns": ["broadcast_conn"], + "attributes_conns": ["broadcast_conn"], + "rals_conns": ["broadcast_conn"], + "cdrs_conns": ["broadcast_conn"], + "chargers_conns": ["broadcast_conn"], +}, + + +"apier": { + "scheduler_conns": ["broadcast_conn"], +}, + + +} diff --git a/general_tests/broadcast_client_it_test.go b/general_tests/broadcast_client_it_test.go new file mode 100644 index 000000000..004bd0a23 --- /dev/null +++ b/general_tests/broadcast_client_it_test.go @@ -0,0 +1,202 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package general_tests + +import ( + "net/rpc" + "path" + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + brodcastCfgPath string + brodcastInternalCfgPath string + brodcastCfg *config.CGRConfig + brodcastInternalCfg *config.CGRConfig + brodcastRPC *rpc.Client + brodcastInternalRPC *rpc.Client + + sTestBrodcastIt = []func(t *testing.T){ + testbrodcastItLoadConfig, + // testbrodcastItResetDataDB, + // testbrodcastItResetStorDb, + testbrodcastItStartEngine, + testbrodcastItRPCConn, + testbrodcastItLoadFromFolder, + + testbrodcastItProccessEvent, + testbrodcastItGetCDRs, + + testbrodcastItStopCgrEngine, + } +) + +func TestBrodcastRPC(t *testing.T) { + for _, stest := range sTestBrodcastIt { + t.Run("TestBrodcastRPC", stest) + } +} + +// test for 0 balance with session terminate with 1s usage +func testbrodcastItLoadConfig(t *testing.T) { + var err error + brodcastCfgPath = path.Join(*dataDir, "conf", "samples", "internal_broadcast_replication") + if brodcastCfg, err = config.NewCGRConfigFromPath(brodcastCfgPath); err != nil { + t.Error(err) + } + brodcastInternalCfgPath = path.Join(*dataDir, "conf", "samples", "tutinternal") + if brodcastInternalCfg, err = config.NewCGRConfigFromPath(brodcastCfgPath); err != nil { + t.Error(err) + } +} + +// func testbrodcastItResetDataDB(t *testing.T) { +// if err := engine.InitDataDb(brodcastCfg); err != nil { +// t.Fatal(err) +// } +// } + +// func testbrodcastItResetStorDb(t *testing.T) { +// if err := engine.InitStorDb(brodcastCfg); err != nil { +// t.Fatal(err) +// } +// } + +func testbrodcastItStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(brodcastCfgPath, *waitRater); err != nil { + t.Fatal(err) + } + if _, err := engine.StartEngine(brodcastInternalCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testbrodcastItRPCConn(t *testing.T) { + var err error + brodcastRPC, err = newRPCClient(brodcastCfg.ListenCfg()) + if err != nil { + t.Fatal(err) + } + brodcastInternalRPC, err = newRPCClient(brodcastInternalCfg.ListenCfg()) + if err != nil { + t.Fatal(err) + } +} + +func testbrodcastItLoadFromFolder(t *testing.T) { + var reply string + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")} + if err := brodcastRPC.Call(utils.ApierV1LoadTariffPlanFromFolder, attrs, &reply); err != nil { + t.Error(err) + } + if err := brodcastInternalRPC.Call(utils.ApierV1LoadTariffPlanFromFolder, attrs, &reply); err != nil { + t.Error(err) + } + time.Sleep(500 * time.Millisecond) +} + +func testbrodcastItProccessEvent(t *testing.T) { + args := utils.CGREventWithArgDispatcher{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "TestSSv1ItProcessCDR", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.Category: utils.CALL, + utils.ToR: utils.VOICE, + utils.OriginID: "TestSSv1It1Brodcast", + utils.RequestType: utils.META_POSTPAID, + utils.Account: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: 10 * time.Minute, + }, + }, + } + + var rply string + if err := brodcastRPC.Call(utils.SessionSv1ProcessCDR, args, &rply); err != nil { + t.Fatal(err) + } + if rply != utils.OK { + t.Errorf("Unexpected reply: %s", rply) + } + time.Sleep(100 * time.Millisecond) +} +func testbrodcastItGetCDRs(t *testing.T) { + eCDR := &engine.CDR{ + CGRID: "ad6cb338dea6eaf2e81507623fbd6b00f60c374f", + RunID: "*default", + OrderID: 0, + OriginHost: "", + Source: "*sessions", + OriginID: "TestSSv1It1Brodcast", + ToR: "*voice", + RequestType: "*postpaid", + Tenant: "cgrates.org", + Category: "call", + Account: "1001", + Subject: "1001", + Destination: "1002", + SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + Usage: 600000000000, + ExtraFields: make(map[string]string), + ExtraInfo: "NOT_CONNECTED: RALs", + Partial: false, + PreRated: false, + CostSource: "", + Cost: -1, + } + var cdrs []*engine.CDR + args := utils.RPCCDRsFilterWithArgDispatcher{RPCCDRsFilter: &utils.RPCCDRsFilter{RunIDs: []string{utils.MetaDefault}}} + if err := brodcastRPC.Call(utils.CDRsV1GetCDRs, args, &cdrs); err != nil { + t.Fatal("Unexpected error: ", err.Error()) + } else if len(cdrs) != 1 { + t.Fatal("Unexpected number of CDRs returned: ", len(cdrs)) + } + cdrs[0].OrderID = 0 // reset the OrderID + if !reflect.DeepEqual(eCDR, cdrs[0]) { + t.Errorf("Expected: %s ,received: %s", utils.ToJSON(eCDR), utils.ToJSON(cdrs[0])) + } + + if err := brodcastInternalRPC.Call(utils.CDRsV1GetCDRs, args, &cdrs); err != nil { + t.Fatal("Unexpected error: ", err.Error()) + } else if len(cdrs) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(cdrs)) + } + cdrs[0].OrderID = 0 // reset the OrderID + if !reflect.DeepEqual(eCDR, cdrs[0]) { + t.Errorf("Expected: %s ,received: %s", utils.ToJSON(eCDR), utils.ToJSON(cdrs[0])) + } +} + +func testbrodcastItStopCgrEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +}