From d93fc46fc2ac66e501dce76987dcd5907e32b599 Mon Sep 17 00:00:00 2001 From: Tripon Alexandru-Ionut Date: Fri, 19 Apr 2019 14:00:44 +0300 Subject: [PATCH] Added *dispatchers to agents flags/subsystems --- agents/asterisk_event.go | 3 +++ agents/diamagent.go | 3 +++ agents/fsagent.go | 7 ++++++- agents/fsevent.go | 9 +++++++++ agents/httpagent.go | 3 +++ agents/kamagent.go | 6 +++++- agents/kamevent.go | 9 +++++++++ agents/radagent.go | 3 +++ .../samples/dispatchers/radagent/cgrates.json | 18 +++++++++--------- 9 files changed, 50 insertions(+), 11 deletions(-) diff --git a/agents/asterisk_event.go b/agents/asterisk_event.go index 047c5e815..c0f8fdcbe 100644 --- a/agents/asterisk_event.go +++ b/agents/asterisk_event.go @@ -292,6 +292,9 @@ func (smaEv *SMAsteriskEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) args.ProcessStats = strings.Index(smaEv.Subsystems(), utils.MetaStats) != -1 args.ArgDispatcher = cgrEv.ConsumeArgDispatcher() + if strings.Index(smaEv.Subsystems(), utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil { + args.ArgDispatcher = new(utils.ArgDispatcher) + } return } diff --git a/agents/diamagent.go b/agents/diamagent.go index 65e3cd976..736aea040 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -255,6 +255,9 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.RequestProcessor, } cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep) argDisp := cgrEv.ConsumeArgDispatcher() + if reqProcessor.Flags.HasKey(utils.MetaDispatchers) && argDisp == nil { + argDisp = new(utils.ArgDispatcher) + } var reqType string for _, typ := range []string{ utils.MetaDryRun, utils.MetaAuth, diff --git a/agents/fsagent.go b/agents/fsagent.go index 83e114007..c2290feed 100644 --- a/agents/fsagent.go +++ b/agents/fsagent.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "reflect" + "strings" "time" "github.com/cgrates/cgrates/config" @@ -265,8 +266,12 @@ func (sm *FSsessions) onChannelHangupComplete(fsev FSEvent, connId string) { if err != nil { return } + argsDisp := cgrEv.ConsumeArgDispatcher() + if strings.Index(fsev[VarCGRSubsystems], utils.MetaDispatchers) != -1 && argsDisp == nil { + argsDisp = new(utils.ArgDispatcher) + } if err := sm.sS.Call(utils.SessionSv1ProcessCDR, - &utils.CGREventWithArgDispatcher{CGREvent: cgrEv, ArgDispatcher: cgrEv.ConsumeArgDispatcher()}, &reply); err != nil { + &utils.CGREventWithArgDispatcher{CGREvent: cgrEv, ArgDispatcher: argsDisp}, &reply); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Failed processing CGREvent: %s, error: <%s>", utils.FreeSWITCHAgent, utils.ToJSON(cgrEv), err.Error())) } diff --git a/agents/fsevent.go b/agents/fsevent.go index 57f573739..cef0d466e 100644 --- a/agents/fsevent.go +++ b/agents/fsevent.go @@ -425,6 +425,9 @@ func (fsev FSEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) { args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 args.ArgDispatcher = cgrEv.ConsumeArgDispatcher() + if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil { + args.ArgDispatcher = new(utils.ArgDispatcher) + } return } @@ -448,6 +451,9 @@ func (fsev FSEvent) V1InitSessionArgs() (args *sessions.V1InitSessionArgs) { args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 args.ArgDispatcher = cgrEv.ConsumeArgDispatcher() + if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil { + args.ArgDispatcher = new(utils.ArgDispatcher) + } return } @@ -470,6 +476,9 @@ func (fsev FSEvent) V1TerminateSessionArgs() (args *sessions.V1TerminateSessionA args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 args.ArgDispatcher = cgrEv.ConsumeArgDispatcher() + if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil { + args.ArgDispatcher = new(utils.ArgDispatcher) + } return } diff --git a/agents/httpagent.go b/agents/httpagent.go index 2b2b4cd06..345bd0be2 100644 --- a/agents/httpagent.go +++ b/agents/httpagent.go @@ -105,6 +105,9 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.RequestProcessor, } cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep) argDisp := cgrEv.ConsumeArgDispatcher() + if reqProcessor.Flags.HasKey(utils.MetaDispatchers) && argDisp == nil { + argDisp = new(utils.ArgDispatcher) + } var reqType string for _, typ := range []string{ utils.MetaDryRun, utils.MetaAuth, diff --git a/agents/kamagent.go b/agents/kamagent.go index 160f58f7a..b565fa315 100644 --- a/agents/kamagent.go +++ b/agents/kamagent.go @@ -203,8 +203,12 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) { return } cgrEv.Event[utils.OriginHost] = ka.conns[connID].RemoteAddr().String() + argsDisp := cgrEv.ConsumeArgDispatcher() + if strings.Index(kev[utils.CGRSubsystems], utils.MetaDispatchers) != -1 && argsDisp == nil { + argsDisp = new(utils.ArgDispatcher) + } if err := ka.sessionS.Call(utils.SessionSv1ProcessCDR, - &utils.CGREventWithArgDispatcher{CGREvent: cgrEv, ArgDispatcher: cgrEv.ConsumeArgDispatcher()}, &reply); err != nil { + &utils.CGREventWithArgDispatcher{CGREvent: cgrEv, ArgDispatcher: argsDisp}, &reply); err != nil { utils.Logger.Err(fmt.Sprintf("%s> failed processing CGREvent: %s, error: %s", utils.KamailioAgent, utils.ToJSON(cgrEv), err.Error())) } diff --git a/agents/kamevent.go b/agents/kamevent.go index 1f97f67ec..0c1e3fdfb 100644 --- a/agents/kamevent.go +++ b/agents/kamevent.go @@ -192,6 +192,9 @@ func (kev KamEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) { args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 args.ArgDispatcher = cgrEv.ConsumeArgDispatcher() + if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil { + args.ArgDispatcher = new(utils.ArgDispatcher) + } return } @@ -256,6 +259,9 @@ func (kev KamEvent) V1InitSessionArgs() (args *sessions.V1InitSessionArgs) { args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 args.ArgDispatcher = cgrEv.ConsumeArgDispatcher() + if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil { + args.ArgDispatcher = new(utils.ArgDispatcher) + } return } @@ -278,6 +284,9 @@ func (kev KamEvent) V1TerminateSessionArgs() (args *sessions.V1TerminateSessionA args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 args.ArgDispatcher = cgrEv.ConsumeArgDispatcher() + if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil { + args.ArgDispatcher = new(utils.ArgDispatcher) + } return } diff --git a/agents/radagent.go b/agents/radagent.go index 018a4df57..350b14d48 100644 --- a/agents/radagent.go +++ b/agents/radagent.go @@ -149,6 +149,9 @@ func (ra *RadiusAgent) processRequest(reqProcessor *config.RequestProcessor, } cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep) argDisp := cgrEv.ConsumeArgDispatcher() + if reqProcessor.Flags.HasKey(utils.MetaDispatchers) && argDisp == nil { + argDisp = new(utils.ArgDispatcher) + } var reqType string for _, typ := range []string{ utils.MetaDryRun, utils.MetaAuth, diff --git a/data/conf/samples/dispatchers/radagent/cgrates.json b/data/conf/samples/dispatchers/radagent/cgrates.json index 22297e33e..9af1a7a98 100644 --- a/data/conf/samples/dispatchers/radagent/cgrates.json +++ b/data/conf/samples/dispatchers/radagent/cgrates.json @@ -43,9 +43,9 @@ "dispatchers":{ "enabled": true, - "attributes_conns": [ - {"address": "*internal"}, - ], + // "attributes_conns": [ + // {"address": "*internal"}, + // ], }, "radius_agent": { @@ -54,11 +54,11 @@ { "id": "KamailioAuth", "filters": ["*string:~*vars.*radReqType:*radAuth"], - "flags": ["*auth", "*accounts"], + "flags": ["*auth", "*accounts","*dispatchers"], "continue_on_success": false, "request_fields":[ {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call"}, - {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"}, + // {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"}, {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*prepaid", "mandatory": true}, {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", @@ -82,11 +82,11 @@ { "id": "KamailioAccountingStart", "filters": ["*string:~*req.Acct-Status-Type:Start"], - "flags": ["*initiate", "*attributes", "*resources", "*accounts"], + "flags": ["*initiate", "*attributes", "*resources", "*accounts","*dispatchers"], "continue_on_success": false, "request_fields":[ {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call"}, - {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"}, + // {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"}, {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*prepaid", "mandatory": true}, {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", @@ -110,11 +110,11 @@ { "id": "KamailioAccountingStop", "filters": ["*string:~*req.Acct-Status-Type:Stop"], - "flags": ["*terminate", "*resources", "*accounts", "*cdrs"], + "flags": ["*terminate", "*resources", "*accounts", "*cdrs","*dispatchers"], "continue_on_success": false, "request_fields":[ {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call"}, - {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"}, + // {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"}, {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*prepaid", "mandatory": true}, {"tag": "OriginID", "field_id": "OriginID", "type": "*composed",