From c878c7b6e971b6c5eb1e85e5cfd09660d48b2ebb Mon Sep 17 00:00:00 2001 From: TeoV Date: Sun, 7 Apr 2019 07:34:37 -0400 Subject: [PATCH] Consider ArgDispatcher for Agents and subsystems --- agents/asterisk_event.go | 54 +++++++++++ agents/fsevent.go | 54 +++++++++++ agents/kamevent.go | 54 +++++++++++ engine/chargers.go | 28 +++++- engine/resources.go | 10 +- engine/stats.go | 4 + engine/suppliers.go | 4 + sessions/sessions.go | 191 +++++++++++++++++++++++++++++++++++--- sessions/sessions_test.go | 88 +++++++++++++++++- utils/consts.go | 2 + 10 files changed, 469 insertions(+), 20 deletions(-) diff --git a/agents/asterisk_event.go b/agents/asterisk_event.go index 735f2e430..90c134bbf 100644 --- a/agents/asterisk_event.go +++ b/agents/asterisk_event.go @@ -290,6 +290,24 @@ func (smaEv *SMAsteriskEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) args.GetAttributes = strings.Index(smaEv.Subsystems(), utils.MetaAttributes) != -1 args.ProcessThresholds = strings.Index(smaEv.Subsystems(), utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(smaEv.Subsystems(), utils.MetaStats) != -1 + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + args.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + args.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } return } @@ -309,6 +327,24 @@ func (smaEv *SMAsteriskEvent) V1InitSessionArgs(cgrEv utils.CGREvent) (args *ses args.GetAttributes = strings.Index(subsystems, utils.MetaAttributes) != -1 args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + args.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + args.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } return } @@ -327,5 +363,23 @@ func (smaEv *SMAsteriskEvent) V1TerminateSessionArgs(cgrEv utils.CGREvent) (args args.ReleaseResources = strings.Index(subsystems, utils.MetaResources) != -1 args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + args.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + args.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } return } diff --git a/agents/fsevent.go b/agents/fsevent.go index 306939c45..8038ee7e7 100644 --- a/agents/fsevent.go +++ b/agents/fsevent.go @@ -424,6 +424,24 @@ func (fsev FSEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) { args.GetAttributes = strings.Index(subsystems, utils.MetaAttributes) != -1 args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + args.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + args.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } return } @@ -446,6 +464,24 @@ func (fsev FSEvent) V1InitSessionArgs() (args *sessions.V1InitSessionArgs) { args.GetAttributes = strings.Index(subsystems, utils.MetaAttributes) != -1 args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + args.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + args.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } return } @@ -467,6 +503,24 @@ func (fsev FSEvent) V1TerminateSessionArgs() (args *sessions.V1TerminateSessionA args.ReleaseResources = strings.Index(subsystems, utils.MetaResources) != -1 args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + args.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + args.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } return } diff --git a/agents/kamevent.go b/agents/kamevent.go index 1e96d0cfa..9612b580e 100644 --- a/agents/kamevent.go +++ b/agents/kamevent.go @@ -191,6 +191,24 @@ func (kev KamEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) { args.GetAttributes = strings.Index(subsystems, utils.MetaAttributes) != -1 args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + args.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + args.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } return } @@ -254,6 +272,24 @@ func (kev KamEvent) V1InitSessionArgs() (args *sessions.V1InitSessionArgs) { args.GetAttributes = strings.Index(subsystems, utils.MetaAttributes) != -1 args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + args.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := args.CGREvent.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + args.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } return } @@ -275,6 +311,24 @@ func (kev KamEvent) V1TerminateSessionArgs() (args *sessions.V1TerminateSessionA args.ReleaseResources = strings.Index(subsystems, utils.MetaResources) != -1 args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + args.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + args.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } return } diff --git a/engine/chargers.go b/engine/chargers.go index 479185dc6..b3762acd1 100644 --- a/engine/chargers.go +++ b/engine/chargers.go @@ -128,11 +128,33 @@ func (cS *ChargerService) processEvent(cgrEv *utils.CGREvent) (rply []*ChrgSProc return nil, errors.New("no connection to AttributeS") } + args := &AttrArgsProcessEvent{ + AttributeIDs: cP.AttributeIDs, + Context: utils.StringPointer(utils.MetaChargers), + ProcessRuns: nil, + CGREvent: *clonedEv} + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := clonedEv.Event[utils.MetaApiKey] + if hasApiKey { + args.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := clonedEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + args.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } + var evReply AttrSProcessEventReply if err = cS.attrS.Call(utils.AttributeSv1ProcessEvent, - &AttrArgsProcessEvent{AttributeIDs: cP.AttributeIDs, - Context: utils.StringPointer(utils.MetaChargers), ProcessRuns: nil, CGREvent: *clonedEv}, - &evReply); err != nil { + args, &evReply); err != nil { return nil, err } rply[i].AttributeSProfiles = evReply.MatchedProfiles diff --git a/engine/resources.go b/engine/resources.go index b613a3796..30d44ec7c 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -508,7 +508,7 @@ func (rS *ResourceService) matchingResourcesForEvent(ev *utils.CGREvent, usageTT } // processThresholds will pass the event for resource to ThresholdS -func (rS *ResourceService) processThresholds(r *Resource) (err error) { +func (rS *ResourceService) processThresholds(r *Resource, argDispatcher *utils.ArgDispatcher) (err error) { if rS.thdS == nil { return } @@ -527,6 +527,10 @@ func (rS *ResourceService) processThresholds(r *Resource) (err error) { utils.EventType: utils.ResourceUpdate, utils.ResourceID: r.ID, utils.Usage: r.totalUsage()}}} + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if argDispatcher != nil { + thEv.ArgDispatcher = argDispatcher + } var tIDs []string if err = rS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && err.Error() != utils.ErrNotFound.Error() { @@ -647,7 +651,7 @@ func (rS *ResourceService) V1AllocateResource(args utils.ArgRSv1ResourceUsage, r rS.storedResources[r.TenantID()] = true rS.srMux.Unlock() } - rS.processThresholds(r) + rS.processThresholds(r, args.ArgDispatcher) } *reply = alcMsg return @@ -686,7 +690,7 @@ func (rS *ResourceService) V1ReleaseResource(args utils.ArgRSv1ResourceUsage, re rS.storedResources[r.TenantID()] = true } } - rS.processThresholds(r) + rS.processThresholds(r, args.ArgDispatcher) } if rS.storeInterval != -1 { rS.srMux.Unlock() diff --git a/engine/stats.go b/engine/stats.go index 5243f680f..f2303c72d 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -282,6 +282,10 @@ func (sS *StatService) processEvent(args *StatsArgsProcessEvent) (statQueueIDs [ Event: map[string]interface{}{ utils.EventType: utils.StatUpdate, utils.StatID: sq.ID}}} + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + thEv.ArgDispatcher = args.ArgDispatcher + } for metricID, metric := range sq.SQMetrics { thEv.Event[metricID] = metric.GetValue() } diff --git a/engine/suppliers.go b/engine/suppliers.go index 2df89ee27..da465b6cc 100644 --- a/engine/suppliers.go +++ b/engine/suppliers.go @@ -481,6 +481,10 @@ func (spS *SupplierService) V1GetSuppliers(args *ArgsGetSuppliers, reply *Sorted Context: utils.StringPointer(utils.MetaSuppliers), CGREvent: args.CGREvent, } + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + attrArgs.ArgDispatcher = args.ArgDispatcher + } var rplyEv AttrSProcessEventReply if err := spS.attributeS.Call(utils.AttributeSv1ProcessEvent, attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 { diff --git a/sessions/sessions.go b/sessions/sessions.go index c22167904..29a87e257 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -1523,6 +1523,24 @@ func NewV1AuthorizeArgs(attrs, res, maxUsage, thrslds, if supplsEventCost { args.SuppliersMaxCost = utils.MetaSuppliersEventCost } + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + args.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + args.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } return } @@ -1625,6 +1643,10 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, Context: utils.StringPointer(utils.MetaSessionS), CGREvent: args.CGREvent, } + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + attrArgs.ArgDispatcher = args.ArgDispatcher + } var rplyEv engine.AttrSProcessEventReply if err := sS.attrS.Call(utils.AttributeSv1ProcessEvent, attrArgs, &rplyEv); err == nil { @@ -1661,6 +1683,10 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, UsageID: originID, Units: 1, } + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + attrRU.ArgDispatcher = args.ArgDispatcher + } if err = sS.resS.Call(utils.ResourceSv1AuthorizeResources, attrRU, &allocMsg); err != nil { return utils.NewErrResourceS(err) @@ -1682,6 +1708,10 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, CGREvent: *cgrEv, Paginator: args.Paginator, } + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + sArgs.ArgDispatcher = args.ArgDispatcher + } if err = sS.splS.Call(utils.SupplierSv1GetSuppliers, sArgs, &splsReply); err != nil { return utils.NewErrSupplierS(err) @@ -1698,6 +1728,10 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, thEv := &engine.ArgsProcessEvent{ CGREvent: args.CGREvent, } + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + thEv.ArgDispatcher = args.ArgDispatcher + } if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( @@ -1710,9 +1744,14 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, if sS.statS == nil { return utils.NewErrNotConnected(utils.StatService) } + statArgs := &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent} + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + statArgs.ArgDispatcher = args.ArgDispatcher + } var statReply []string if err := sS.statS.Call(utils.StatSv1ProcessEvent, - &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent}, &statReply); err != nil && + statArgs, &statReply); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing event %+v with StatS.", @@ -1769,8 +1808,8 @@ func (sS *SessionS) BiRPCv1AuthorizeEventWithDigest(clnt rpcclient.RpcClientConn // NewV1InitSessionArgs is a constructor for V1InitSessionArgs func NewV1InitSessionArgs(attrs, resrc, acnt, thrslds, stats bool, - cgrEv utils.CGREvent) *V1InitSessionArgs { - return &V1InitSessionArgs{ + cgrEv utils.CGREvent) (args *V1InitSessionArgs) { + args = &V1InitSessionArgs{ GetAttributes: attrs, AllocateResources: resrc, InitSession: acnt, @@ -1778,6 +1817,25 @@ func NewV1InitSessionArgs(attrs, resrc, acnt, thrslds, stats bool, ProcessStats: stats, CGREvent: cgrEv, } + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + args.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + args.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } + return } // V1InitSessionArgs are options for session initialization request @@ -1872,6 +1930,10 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, Context: utils.StringPointer(utils.MetaSessionS), CGREvent: args.CGREvent, } + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + attrArgs.ArgDispatcher = args.ArgDispatcher + } var rplyEv engine.AttrSProcessEventReply if err := sS.attrS.Call(utils.AttributeSv1ProcessEvent, attrArgs, &rplyEv); err == nil { @@ -1898,6 +1960,10 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, UsageID: originID, Units: 1, } + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + attrRU.ArgDispatcher = args.ArgDispatcher + } var allocMessage string if err = sS.resS.Call(utils.ResourceSv1AllocateResources, attrRU, &allocMessage); err != nil { @@ -1937,6 +2003,10 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, thEv := &engine.ArgsProcessEvent{ CGREvent: args.CGREvent, } + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + thEv.ArgDispatcher = args.ArgDispatcher + } if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && err.Error() != utils.ErrNotFound.Error() { @@ -1951,9 +2021,13 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, return utils.NewErrNotConnected(utils.StatService) } var statReply []string + statArgs := &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent} + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + statArgs.ArgDispatcher = args.ArgDispatcher + } if err := sS.statS.Call(utils.StatSv1ProcessEvent, - &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent}, - &statReply); err != nil && + statArgs, &statReply); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing event %+v with StatS.", @@ -2008,9 +2082,28 @@ func (sS *SessionS) BiRPCv1InitiateSessionWithDigest(clnt rpcclient.RpcClientCon // NewV1UpdateSessionArgs is a constructor for update session arguments func NewV1UpdateSessionArgs(attrs, acnts bool, - cgrEv utils.CGREvent) *V1UpdateSessionArgs { - return &V1UpdateSessionArgs{GetAttributes: attrs, + cgrEv utils.CGREvent) (args *V1UpdateSessionArgs) { + args = &V1UpdateSessionArgs{GetAttributes: attrs, UpdateSession: acnts, CGREvent: cgrEv} + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + args.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + args.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } + return } // V1UpdateSessionArgs contains options for session update @@ -2089,6 +2182,10 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection, Context: utils.StringPointer(utils.MetaSessionS), CGREvent: args.CGREvent, } + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + attrArgs.ArgDispatcher = args.ArgDispatcher + } var rplyEv engine.AttrSProcessEventReply if err := sS.attrS.Call(utils.AttributeSv1ProcessEvent, attrArgs, &rplyEv); err == nil { @@ -2137,13 +2234,32 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection, } func NewV1TerminateSessionArgs(acnts, resrc, thrds, stats bool, - cgrEv utils.CGREvent) *V1TerminateSessionArgs { - return &V1TerminateSessionArgs{ + cgrEv utils.CGREvent) (args *V1TerminateSessionArgs) { + args = &V1TerminateSessionArgs{ TerminateSession: acnts, ReleaseResources: resrc, ProcessThresholds: thrds, ProcessStats: stats, CGREvent: cgrEv} + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + args.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + args.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } + return } type V1TerminateSessionArgs struct { @@ -2235,6 +2351,10 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, UsageID: originID, // same ID should be accepted by first group since the previous resource should be expired Units: 1, } + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + argsRU.ArgDispatcher = args.ArgDispatcher + } if err = sS.resS.Call(utils.ResourceSv1ReleaseResources, argsRU, &reply); err != nil { return utils.NewErrResourceS(err) @@ -2248,6 +2368,10 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, thEv := &engine.ArgsProcessEvent{ CGREvent: args.CGREvent, } + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + thEv.ArgDispatcher = args.ArgDispatcher + } if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( @@ -2260,8 +2384,13 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, return utils.NewErrNotConnected(utils.StatS) } var statReply []string + statArgs := &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent} + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + statArgs.ArgDispatcher = args.ArgDispatcher + } if err := sS.statS.Call(utils.StatSv1ProcessEvent, - &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent}, &statReply); err != nil && + statArgs, &statReply); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing event %+v with StatS.", @@ -2362,8 +2491,8 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, // NewV1ProcessEventArgs is a constructor for EventArgs used by ProcessEvent func NewV1ProcessEventArgs(resrc, acnts, attrs, thds, stats bool, - cgrEv utils.CGREvent) *V1ProcessEventArgs { - return &V1ProcessEventArgs{ + cgrEv utils.CGREvent) (args *V1ProcessEventArgs) { + args = &V1ProcessEventArgs{ AllocateResources: resrc, Debit: acnts, GetAttributes: attrs, @@ -2371,6 +2500,25 @@ func NewV1ProcessEventArgs(resrc, acnts, attrs, thds, stats bool, ProcessStats: stats, CGREvent: cgrEv, } + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + args.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + args.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + args.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } + return } // V1ProcessEventArgs are the options passed to ProcessEvent API @@ -2456,6 +2604,10 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, Context: utils.StringPointer(utils.MetaSessionS), CGREvent: args.CGREvent, } + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + attrArgs.ArgDispatcher = args.ArgDispatcher + } var rplyEv engine.AttrSProcessEventReply if err := sS.attrS.Call(utils.AttributeSv1ProcessEvent, attrArgs, &rplyEv); err == nil { @@ -2482,6 +2634,10 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, UsageID: originID, Units: 1, } + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + attrRU.ArgDispatcher = args.ArgDispatcher + } var allocMessage string if err = sS.resS.Call(utils.ResourceSv1AllocateResources, attrRU, &allocMessage); err != nil { @@ -2505,6 +2661,10 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, thEv := &engine.ArgsProcessEvent{ CGREvent: args.CGREvent, } + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + thEv.ArgDispatcher = args.ArgDispatcher + } if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && err.Error() != utils.ErrNotFound.Error() { @@ -2518,8 +2678,13 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, return utils.NewErrNotConnected(utils.StatS) } var statReply []string + statArgs := &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent} + // in case we receive ArgDispatcher we add it to be used by DispatcherS + if args.ArgDispatcher != nil { + statArgs.ArgDispatcher = args.ArgDispatcher + } if err := sS.statS.Call(utils.StatSv1ProcessEvent, - &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent}, &statReply); err != nil && + statArgs, &statReply); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing event %+v with StatS.", diff --git a/sessions/sessions_test.go b/sessions/sessions_test.go index 9bfa2b067..1deb6eac1 100644 --- a/sessions/sessions_test.go +++ b/sessions/sessions_test.go @@ -1259,5 +1259,91 @@ func TestSessionSrelocateSessionS(t *testing.T) { if !reflect.DeepEqual(rcvS[0], s) { t.Errorf("Expecting %+v, received: %+v", s, rcvS[0]) } - +} + +func TestSessionSNewV1AuthorizeArgsWithArgDispatcher(t *testing.T) { + cgrEv := utils.CGREvent{ + Tenant: "cgrates.org", + ID: "Event", + Event: map[string]interface{}{ + utils.Account: "1001", + utils.Destination: "1002", + utils.MetaApiKey: "testkey", + utils.MetaRouteID: "testrouteid", + }, + } + expected := &V1AuthorizeArgs{ + AuthorizeResources: true, + GetAttributes: true, + CGREvent: cgrEv, + ArgDispatcher: &utils.ArgDispatcher{ + APIKey: utils.StringPointer("testkey"), + RouteID: utils.StringPointer("testrouteid"), + }, + } + rply := NewV1AuthorizeArgs(true, true, false, false, false, false, false, false, cgrEv) + if !reflect.DeepEqual(expected, rply) { + t.Errorf("Expecting %+v, received: %+v", utils.ToJSON(expected), utils.ToJSON(rply)) + } + expected = &V1AuthorizeArgs{ + GetAttributes: true, + AuthorizeResources: false, + GetMaxUsage: true, + ProcessThresholds: false, + ProcessStats: true, + GetSuppliers: false, + SuppliersIgnoreErrors: true, + SuppliersMaxCost: utils.MetaSuppliersEventCost, + CGREvent: cgrEv, + ArgDispatcher: &utils.ArgDispatcher{ + APIKey: utils.StringPointer("testkey"), + RouteID: utils.StringPointer("testrouteid"), + }, + } + rply = NewV1AuthorizeArgs(true, false, true, false, true, false, true, true, cgrEv) + if !reflect.DeepEqual(expected, rply) { + t.Errorf("Expecting %+v, received: %+v", utils.ToJSON(expected), utils.ToJSON(rply)) + } +} + +func TestSessionSNewV1AuthorizeArgsWithArgDispatcher2(t *testing.T) { + cgrEv := utils.CGREvent{ + Tenant: "cgrates.org", + ID: "Event", + Event: map[string]interface{}{ + utils.Account: "1001", + utils.Destination: "1002", + utils.MetaRouteID: "testrouteid", + }, + } + expected := &V1AuthorizeArgs{ + AuthorizeResources: true, + GetAttributes: true, + CGREvent: cgrEv, + ArgDispatcher: &utils.ArgDispatcher{ + RouteID: utils.StringPointer("testrouteid"), + }, + } + rply := NewV1AuthorizeArgs(true, true, false, false, false, false, false, false, cgrEv) + if !reflect.DeepEqual(expected, rply) { + t.Errorf("Expecting %+v, received: %+v", utils.ToJSON(expected), utils.ToJSON(rply)) + } + expected = &V1AuthorizeArgs{ + GetAttributes: true, + AuthorizeResources: false, + GetMaxUsage: true, + ProcessThresholds: false, + ProcessStats: true, + GetSuppliers: false, + SuppliersIgnoreErrors: true, + SuppliersMaxCost: utils.MetaSuppliersEventCost, + CGREvent: cgrEv, + ArgDispatcher: &utils.ArgDispatcher{ + RouteID: utils.StringPointer("testrouteid"), + }, + } + rply = NewV1AuthorizeArgs(true, false, true, false, true, false, true, true, cgrEv) + if !reflect.DeepEqual(expected, rply) { + t.Errorf("Expecting %+v, received: %+v", utils.ToJSON(expected), utils.ToJSON(rply)) + } } diff --git a/utils/consts.go b/utils/consts.go index 4a092bea8..e16a2087e 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -564,6 +564,8 @@ const ( LoadIDs = "load_ids" DNSAgent = "DNSAgent" TLSNoCaps = "tls" + MetaRouteID = "*route_id" + MetaApiKey = "*api_key" ) // Migrator Action