Added *dispatchers to agents flags/subsystems

This commit is contained in:
Tripon Alexandru-Ionut
2019-04-19 14:00:44 +03:00
committed by Trial97
parent 6b6191706d
commit d93fc46fc2
9 changed files with 50 additions and 11 deletions

View File

@@ -292,6 +292,9 @@ func (smaEv *SMAsteriskEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs)
args.ProcessStats = strings.Index(smaEv.Subsystems(), utils.MetaStats) != -1
args.ArgDispatcher = cgrEv.ConsumeArgDispatcher()
if strings.Index(smaEv.Subsystems(), utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil {
args.ArgDispatcher = new(utils.ArgDispatcher)
}
return
}

View File

@@ -255,6 +255,9 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.RequestProcessor,
}
cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep)
argDisp := cgrEv.ConsumeArgDispatcher()
if reqProcessor.Flags.HasKey(utils.MetaDispatchers) && argDisp == nil {
argDisp = new(utils.ArgDispatcher)
}
var reqType string
for _, typ := range []string{
utils.MetaDryRun, utils.MetaAuth,

View File

@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"reflect"
"strings"
"time"
"github.com/cgrates/cgrates/config"
@@ -265,8 +266,12 @@ func (sm *FSsessions) onChannelHangupComplete(fsev FSEvent, connId string) {
if err != nil {
return
}
argsDisp := cgrEv.ConsumeArgDispatcher()
if strings.Index(fsev[VarCGRSubsystems], utils.MetaDispatchers) != -1 && argsDisp == nil {
argsDisp = new(utils.ArgDispatcher)
}
if err := sm.sS.Call(utils.SessionSv1ProcessCDR,
&utils.CGREventWithArgDispatcher{CGREvent: cgrEv, ArgDispatcher: cgrEv.ConsumeArgDispatcher()}, &reply); err != nil {
&utils.CGREventWithArgDispatcher{CGREvent: cgrEv, ArgDispatcher: argsDisp}, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Failed processing CGREvent: %s, error: <%s>",
utils.FreeSWITCHAgent, utils.ToJSON(cgrEv), err.Error()))
}

View File

@@ -425,6 +425,9 @@ func (fsev FSEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) {
args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1
args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1
args.ArgDispatcher = cgrEv.ConsumeArgDispatcher()
if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil {
args.ArgDispatcher = new(utils.ArgDispatcher)
}
return
}
@@ -448,6 +451,9 @@ func (fsev FSEvent) V1InitSessionArgs() (args *sessions.V1InitSessionArgs) {
args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1
args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1
args.ArgDispatcher = cgrEv.ConsumeArgDispatcher()
if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil {
args.ArgDispatcher = new(utils.ArgDispatcher)
}
return
}
@@ -470,6 +476,9 @@ func (fsev FSEvent) V1TerminateSessionArgs() (args *sessions.V1TerminateSessionA
args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1
args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1
args.ArgDispatcher = cgrEv.ConsumeArgDispatcher()
if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil {
args.ArgDispatcher = new(utils.ArgDispatcher)
}
return
}

View File

@@ -105,6 +105,9 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.RequestProcessor,
}
cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep)
argDisp := cgrEv.ConsumeArgDispatcher()
if reqProcessor.Flags.HasKey(utils.MetaDispatchers) && argDisp == nil {
argDisp = new(utils.ArgDispatcher)
}
var reqType string
for _, typ := range []string{
utils.MetaDryRun, utils.MetaAuth,

View File

@@ -203,8 +203,12 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) {
return
}
cgrEv.Event[utils.OriginHost] = ka.conns[connID].RemoteAddr().String()
argsDisp := cgrEv.ConsumeArgDispatcher()
if strings.Index(kev[utils.CGRSubsystems], utils.MetaDispatchers) != -1 && argsDisp == nil {
argsDisp = new(utils.ArgDispatcher)
}
if err := ka.sessionS.Call(utils.SessionSv1ProcessCDR,
&utils.CGREventWithArgDispatcher{CGREvent: cgrEv, ArgDispatcher: cgrEv.ConsumeArgDispatcher()}, &reply); err != nil {
&utils.CGREventWithArgDispatcher{CGREvent: cgrEv, ArgDispatcher: argsDisp}, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("%s> failed processing CGREvent: %s, error: %s",
utils.KamailioAgent, utils.ToJSON(cgrEv), err.Error()))
}

View File

@@ -192,6 +192,9 @@ func (kev KamEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) {
args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1
args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1
args.ArgDispatcher = cgrEv.ConsumeArgDispatcher()
if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil {
args.ArgDispatcher = new(utils.ArgDispatcher)
}
return
}
@@ -256,6 +259,9 @@ func (kev KamEvent) V1InitSessionArgs() (args *sessions.V1InitSessionArgs) {
args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1
args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1
args.ArgDispatcher = cgrEv.ConsumeArgDispatcher()
if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil {
args.ArgDispatcher = new(utils.ArgDispatcher)
}
return
}
@@ -278,6 +284,9 @@ func (kev KamEvent) V1TerminateSessionArgs() (args *sessions.V1TerminateSessionA
args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1
args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1
args.ArgDispatcher = cgrEv.ConsumeArgDispatcher()
if strings.Index(subsystems, utils.MetaDispatchers) != -1 && args.ArgDispatcher == nil {
args.ArgDispatcher = new(utils.ArgDispatcher)
}
return
}

View File

@@ -149,6 +149,9 @@ func (ra *RadiusAgent) processRequest(reqProcessor *config.RequestProcessor,
}
cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep)
argDisp := cgrEv.ConsumeArgDispatcher()
if reqProcessor.Flags.HasKey(utils.MetaDispatchers) && argDisp == nil {
argDisp = new(utils.ArgDispatcher)
}
var reqType string
for _, typ := range []string{
utils.MetaDryRun, utils.MetaAuth,

View File

@@ -43,9 +43,9 @@
"dispatchers":{
"enabled": true,
"attributes_conns": [
{"address": "*internal"},
],
// "attributes_conns": [
// {"address": "*internal"},
// ],
},
"radius_agent": {
@@ -54,11 +54,11 @@
{
"id": "KamailioAuth",
"filters": ["*string:~*vars.*radReqType:*radAuth"],
"flags": ["*auth", "*accounts"],
"flags": ["*auth", "*accounts","*dispatchers"],
"continue_on_success": false,
"request_fields":[
{"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call"},
{"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"},
// {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"},
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant",
"value": "*prepaid", "mandatory": true},
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed",
@@ -82,11 +82,11 @@
{
"id": "KamailioAccountingStart",
"filters": ["*string:~*req.Acct-Status-Type:Start"],
"flags": ["*initiate", "*attributes", "*resources", "*accounts"],
"flags": ["*initiate", "*attributes", "*resources", "*accounts","*dispatchers"],
"continue_on_success": false,
"request_fields":[
{"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call"},
{"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"},
// {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"},
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant",
"value": "*prepaid", "mandatory": true},
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed",
@@ -110,11 +110,11 @@
{
"id": "KamailioAccountingStop",
"filters": ["*string:~*req.Acct-Status-Type:Stop"],
"flags": ["*terminate", "*resources", "*accounts", "*cdrs"],
"flags": ["*terminate", "*resources", "*accounts", "*cdrs","*dispatchers"],
"continue_on_success": false,
"request_fields":[
{"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call"},
{"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"},
// {"tag": "*api_key", "field_id": "*api_key", "type": "*constant", "value": "ses12345"},
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant",
"value": "*prepaid", "mandatory": true},
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed",