diff --git a/agents/diam_it_test.go b/agents/diam_it_test.go index f7d4584f7..dd1ab88f1 100644 --- a/agents/diam_it_test.go +++ b/agents/diam_it_test.go @@ -77,7 +77,6 @@ func TestDiamItTcp(t *testing.T) { } } -// Test start here func TestDiamItDispatcher(t *testing.T) { isDispatcherActive = true engine.StartEngine(path.Join(*dataDir, "conf", "samples", "dispatchers", "all"), 200) diff --git a/agents/radagent.go b/agents/radagent.go index 5819f6ccc..18d8b37af 100644 --- a/agents/radagent.go +++ b/agents/radagent.go @@ -248,7 +248,7 @@ func (ra *RadiusAgent) processRequest(reqProcessor *config.RARequestProcessor, if reqProcessor.Flags.HasKey(utils.MetaCDRs) { var rplyCDRs string if err = ra.sessionS.Call(utils.SessionSv1ProcessCDR, - cgrEv, &rplyCDRs); err != nil { + cgrEv.AsCGREventWithArgDispatcher(), &rplyCDRs); err != nil { agReq.CGRReply.Set([]string{utils.Error}, err.Error(), false, false) } } diff --git a/agents/radagent_it_test.go b/agents/radagent_it_test.go index 3d8fb31ca..3821a8952 100644 --- a/agents/radagent_it_test.go +++ b/agents/radagent_it_test.go @@ -23,6 +23,7 @@ package agents import ( "net/rpc" "net/rpc/jsonrpc" + "os/exec" "path" "reflect" "testing" @@ -35,13 +36,51 @@ import ( "github.com/cgrates/radigo" ) -var raCfgPath string -var raCfg *config.CGRConfig -var raAuthClnt, raAcctClnt *radigo.Client -var raRPC *rpc.Client +var ( + raonfigDIR string + raCfgPath string + raCfg *config.CGRConfig + raAuthClnt, raAcctClnt *radigo.Client + raRPC *rpc.Client -func TestRAitInitCfg(t *testing.T) { - raCfgPath = path.Join(*dataDir, "conf", "samples", "radagent") + sTestsRadius = []func(t *testing.T){ + testRAitInitCfg, + testRAitResetDataDb, + testRAitResetStorDb, + testRAitStartEngine, + testRAitApierRpcConn, + testRAitTPFromFolder, + testRAitAuth, + testRAitAcctStart, + testRAitAcctStop, + testRAitStopCgrEngine, + } +) + +// Test start here +func TestRAit(t *testing.T) { + engine.KillEngine(0) + raonfigDIR = "radagent" + for _, stest := range sTestsRadius { + t.Run(diamConfigDIR, stest) + } +} + +func TestRAitDispatcher(t *testing.T) { + isDispatcherActive = true + engine.StartEngine(path.Join(*dataDir, "conf", "samples", "dispatchers", "all"), 200) + engine.StartEngine(path.Join(*dataDir, "conf", "samples", "dispatchers", "all2"), 200) + raonfigDIR = "dispatchers/radagent" + testRadiusitResetAllDB(t) + for _, stest := range sTestsRadius { + t.Run(diamConfigDIR, stest) + } + engine.KillEngine(100) + isDispatcherActive = false +} + +func testRAitInitCfg(t *testing.T) { + raCfgPath = path.Join(*dataDir, "conf", "samples", raonfigDIR) // Init config first var err error raCfg, err = config.NewCGRConfigFromPath(raCfgPath) @@ -50,31 +89,48 @@ func TestRAitInitCfg(t *testing.T) { } raCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush() config.SetCgrConfig(raCfg) + if isDispatcherActive { + raCfg.ListenCfg().RPCJSONListen = ":6012" + } +} + +func testRadiusitResetAllDB(t *testing.T) { + cfgPath1 := path.Join(*dataDir, "conf", "samples", "dispatchers", "all") + allCfg, err := config.NewCGRConfigFromPath(cfgPath1) + if err != nil { + t.Fatal(err) + } + if err := engine.InitDataDb(allCfg); err != nil { + t.Fatal(err) + } + if err := engine.InitStorDb(allCfg); err != nil { + t.Fatal(err) + } } // Remove data in both rating and accounting db -func TestRAitResetDataDb(t *testing.T) { +func testRAitResetDataDb(t *testing.T) { if err := engine.InitDataDb(raCfg); err != nil { t.Fatal(err) } } // Wipe out the cdr database -func TestRAitResetStorDb(t *testing.T) { +func testRAitResetStorDb(t *testing.T) { if err := engine.InitStorDb(raCfg); err != nil { t.Fatal(err) } } // Start CGR Engine -func TestRAitStartEngine(t *testing.T) { - if _, err := engine.StopStartEngine(raCfgPath, *waitRater); err != nil { +func testRAitStartEngine(t *testing.T) { + if _, err := engine.StartEngine(raCfgPath, *waitRater); err != nil { t.Fatal(err) } } // Connect rpc client to rater -func TestRAitApierRpcConn(t *testing.T) { +func testRAitApierRpcConn(t *testing.T) { var err error raRPC, err = jsonrpc.Dial("tcp", raCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed if err != nil { @@ -83,12 +139,15 @@ func TestRAitApierRpcConn(t *testing.T) { } // Load the tariff plan, creating accounts and their balances -func TestRAitTPFromFolder(t *testing.T) { +func testRAitTPFromFolder(t *testing.T) { attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "oldtutorial")} var loadInst utils.LoadInstance if err := raRPC.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil { t.Error(err) } + if isDispatcherActive { + testRadiusitTPLoadData(t) + } time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups //add a default charger @@ -107,7 +166,29 @@ func TestRAitTPFromFolder(t *testing.T) { } } -func TestRAitAuth(t *testing.T) { +func testRadiusitTPLoadData(t *testing.T) { + wchan := make(chan struct{}, 1) + go func() { + loaderPath, err := exec.LookPath("cgr-loader") + if err != nil { + t.Error(err) + } + loader := exec.Command(loaderPath, "-config_path", raCfgPath, "-path", path.Join(*dataDir, "tariffplans", "dispatchers")) + + if err := loader.Start(); err != nil { + t.Error(err) + } + loader.Wait() + wchan <- struct{}{} + }() + select { + case <-wchan: + case <-time.After(5 * time.Second): + t.Errorf("cgr-loader failed: ") + } +} + +func testRAitAuth(t *testing.T) { if raAuthClnt, err = radigo.NewClient("udp", "127.0.0.1:1812", "CGRateS.org", dictRad, 1, nil); err != nil { t.Fatal(err) } @@ -147,7 +228,7 @@ func TestRAitAuth(t *testing.T) { } } -func TestRAitAcctStart(t *testing.T) { +func testRAitAcctStart(t *testing.T) { if raAcctClnt, err = radigo.NewClient("udp", "127.0.0.1:1813", "CGRateS.org", dictRad, 1, nil); err != nil { t.Fatal(err) } @@ -206,6 +287,10 @@ func TestRAitAcctStart(t *testing.T) { } // Make sure the sessin is managed by SMG time.Sleep(10 * time.Millisecond) + expUsage := 10 * time.Second + if isDispatcherActive { // no debit interval set on dispatched session + expUsage = 3 * time.Hour + } var aSessions []*sessions.ActiveSession if err := raRPC.Call(utils.SessionSv1GetActiveSessions, map[string]string{utils.RunID: utils.META_DEFAULT, @@ -214,12 +299,12 @@ func TestRAitAcctStart(t *testing.T) { t.Error(err) } else if len(aSessions) != 1 { t.Errorf("Unexpected number of sessions received: %+v", aSessions) - } else if aSessions[0].Usage != 10*time.Second { - t.Errorf("Expecting 10s, received usage: %v\nAnd Session: %s ", aSessions[0].Usage, utils.ToJSON(aSessions)) + } else if aSessions[0].Usage != expUsage { + t.Errorf("Expecting %v, received usage: %v\nAnd Session: %s ", expUsage, aSessions[0].Usage, utils.ToJSON(aSessions)) } } -func TestRAitAcctStop(t *testing.T) { +func testRAitAcctStop(t *testing.T) { req := raAcctClnt.NewRequest(radigo.AccountingRequest, 3) // emulates Kamailio packet for accounting start if err := req.AddAVPWithName("Acct-Status-Type", "Stop", ""); err != nil { t.Error(err) @@ -303,7 +388,7 @@ func TestRAitAcctStop(t *testing.T) { } } -func TestRAitStopCgrEngine(t *testing.T) { +func testRAitStopCgrEngine(t *testing.T) { if err := engine.KillEngine(100); err != nil { t.Error(err) } diff --git a/data/conf/samples/dispatchers/radagent/cgrates.json b/data/conf/samples/dispatchers/radagent/cgrates.json new file mode 100644 index 000000000..22297e33e --- /dev/null +++ b/data/conf/samples/dispatchers/radagent/cgrates.json @@ -0,0 +1,145 @@ +{ +// CGRateS Configuration file +// + +"general": { + "node_id": "DispatcherS1", + "log_level": 7, + "reconnects": 1, +}, + + +"listen": { + "rpc_json": ":2012", // RPC JSON listening address + "rpc_gob": ":2013", // RPC GOB listening address + "http": ":2080", // HTTP listening address +}, + +"data_db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_type": "mongo", // stor database type to use: + "db_port": 27017, // the port to reach the stordb + "db_name": "datadb", + "db_password": "", +}, + +"stor_db": { + "db_type": "mongo", // stor database type to use: + "db_port": 27017, // the port to reach the stordb + "db_name": "stordb", + "db_password": "", +}, + +"attributes": { + "enabled": true +}, + +"scheduler": { + "enabled": true, +}, + +"rals": { + "enabled": true, +}, + +"dispatchers":{ + "enabled": true, + "attributes_conns": [ + {"address": "*internal"}, + ], +}, + +"radius_agent": { + "enabled": true, + "request_processors": [ + { + "id": "KamailioAuth", + "filters": ["*string:~*vars.*radReqType:*radAuth"], + "flags": ["*auth", "*accounts"], + "continue_on_success": false, + "request_fields":[ + {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call"}, + {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", + "value": "*prepaid", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", + "value": "~*req.Acct-Session-Id;-;~*req.Sip-From-Tag", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", + "value": "~*req.User-Name", "mandatory": true}, + {"tag": "Subject", "field_id": "Subject", "type": "*variable", + "value": "~*req.User-Name", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", + "value": "~*req.Called-Station-Id", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", + "value": "~*req.Event-Timestamp", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", + "value": "~*req.Event-Timestamp", "mandatory": true}, + ], + "reply_fields":[ + {"tag": "MaxUsage", "field_id": "SIP-AVP", "type": "*composed", + "value": "session_max_time#;~*cgrep.MaxUsage{*duration_seconds}", "mandatory": true}, + ], + }, + { + "id": "KamailioAccountingStart", + "filters": ["*string:~*req.Acct-Status-Type:Start"], + "flags": ["*initiate", "*attributes", "*resources", "*accounts"], + "continue_on_success": false, + "request_fields":[ + {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call"}, + {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", + "value": "*prepaid", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", + "value": "~*req.Acct-Session-Id;-;~*req.Sip-From-Tag;-;~*req.Sip-To-Tag", "mandatory": true}, + {"tag": "OriginHost", "field_id": "OriginHost", "type": "*composed", + "value": "~*req.NAS-IP-Address", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", + "value": "~*req.User-Name", "mandatory": true}, + {"tag": "Subject", "field_id": "Subject", "type": "*variable", + "value": "~*req.User-Name", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", + "value": "~*req.Called-Station-Id", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", + "value": "~*req.Ascend-User-Acct-Time", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", + "value": "~*req.Ascend-User-Acct-Time", "mandatory": true}, + {"tag": "RemoteAddr" , "field_id": "RemoteAddr", "type": "*remote_host"}, + ], + "reply_fields":[], + }, + { + "id": "KamailioAccountingStop", + "filters": ["*string:~*req.Acct-Status-Type:Stop"], + "flags": ["*terminate", "*resources", "*accounts", "*cdrs"], + "continue_on_success": false, + "request_fields":[ + {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call"}, + {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", + "value": "*prepaid", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", + "value": "~*req.Acct-Session-Id;-;~*req.Sip-From-Tag;-;~*req.Sip-To-Tag", "mandatory": true}, + {"tag": "OriginHost", "field_id": "OriginHost", "type": "*composed", + "value": "~*req.NAS-IP-Address", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", + "value": "~*req.User-Name", "mandatory": true}, + {"tag": "Subject", "field_id": "Subject", "type": "*variable", + "value": "~*req.User-Name", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", + "value": "~*req.Called-Station-Id", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", + "value": "~*req.Ascend-User-Acct-Time", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", + "value": "~*req.Ascend-User-Acct-Time", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*usage_difference", + "value": "~*req.Event-Timestamp;~*req.Ascend-User-Acct-Time", "mandatory": true}, + {"tag": "RemoteAddr" , "field_id": "RemoteAddr", "type": "*remote_host"}, + ], + "reply_fields":[], + }, + + ], + +}, + +} diff --git a/utils/cgrevent.go b/utils/cgrevent.go index 374d0e8d4..b468a6fbe 100644 --- a/utils/cgrevent.go +++ b/utils/cgrevent.go @@ -172,6 +172,32 @@ func (ev *CGREvent) RemFldsWithPrefix(prfx string) { } } +// RemFldsWithPrefix will remove fields starting with prefix from event +func (ev *CGREvent) AsCGREventWithArgDispatcher() (arg *CGREventWithArgDispatcher) { + arg = &CGREventWithArgDispatcher{ + CGREvent: ev, + } + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := ev.Event[MetaApiKey] + if hasApiKey { + arg.ArgDispatcher = &ArgDispatcher{ + APIKey: StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := ev.Event[MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + arg.ArgDispatcher = &ArgDispatcher{ + RouteID: StringPointer(routeIDIface.(string)), + } + } else { + arg.ArgDispatcher.RouteID = StringPointer(routeIDIface.(string)) + } + } + return +} + // CGREvents is a group of generic events processed by CGR services // ie: derived CDRs type CGREvents struct {