From 22250fe9ccf2750fb07c9276a98e9a9f3cdae0b1 Mon Sep 17 00:00:00 2001 From: TeoV Date: Mon, 8 Apr 2019 11:33:04 -0400 Subject: [PATCH] Add Wrapper for CDR and ExternalCDR and use it in methods --- agents/diamagent.go | 24 +- apier/v1/cdrs.go | 8 +- apier/v1/dispatcher.go | 8 +- apier/v1/sessions.go | 2 +- apier/v1/sessionsbirpc.go | 2 +- data/tariffplans/dispatchers/Attributes.csv | 1 + dispatchers/cdrs.go | 10 +- dispatchers/cdrs_it_test.go | 61 +++++ dispatchers/libdispatcher.go | 6 +- engine/cdr.go | 12 +- engine/cdrs.go | 48 ++-- engine/chargers.go | 11 +- engine/resources.go | 8 +- engine/stats.go | 8 +- engine/storage_utils.go | 6 +- engine/suppliers.go | 9 +- sessions/sessions.go | 247 ++++++++------------ utils/apitpdata.go | 6 +- 18 files changed, 264 insertions(+), 213 deletions(-) create mode 100644 dispatchers/cdrs_it_test.go diff --git a/agents/diamagent.go b/agents/diamagent.go index eed3564ff..64428474c 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -354,8 +354,30 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor, if reqProcessor.Flags.HasKey(utils.MetaCDRs) && !reqProcessor.Flags.HasKey(utils.MetaDryRun) { var rplyCDRs string + //compose the arguments for SessionSv1ProcessCDR + argProcessCDR := &utils.CGREventWithArgDispatcher{ + CGREvent: cgrEv, + } + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + argProcessCDR.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + argProcessCDR.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + argProcessCDR.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } if err = da.sS.Call(utils.SessionSv1ProcessCDR, - cgrEv, &rplyCDRs); err != nil { + argProcessCDR, &rplyCDRs); err != nil { agReq.CGRReply.Set([]string{utils.Error}, err.Error(), false, false) } } diff --git a/apier/v1/cdrs.go b/apier/v1/cdrs.go index e14bf73c3..0b8f19359 100644 --- a/apier/v1/cdrs.go +++ b/apier/v1/cdrs.go @@ -86,7 +86,7 @@ type CDRsV1 struct { } // ProcessCDR will process a CDR in CGRateS internal format -func (cdrSv1 *CDRsV1) ProcessCDR(cdr *engine.CDR, reply *string) error { +func (cdrSv1 *CDRsV1) ProcessCDR(cdr *engine.CDRWithArgDispatcher, reply *string) error { return cdrSv1.CDRs.V1ProcessCDR(cdr, reply) } @@ -96,7 +96,7 @@ func (cdrSv1 *CDRsV1) ProcessEvent(arg *engine.ArgV1ProcessEvent, reply *string) } // ProcessExternalCDR will process a CDR in external format -func (cdrSv1 *CDRsV1) ProcessExternalCDR(cdr *engine.ExternalCDR, reply *string) error { +func (cdrSv1 *CDRsV1) ProcessExternalCDR(cdr *engine.ExternalCDRWithArgDispatcher, reply *string) error { return cdrSv1.CDRs.V1ProcessExternalCDR(cdr, reply) } @@ -110,11 +110,11 @@ func (cdrSv1 *CDRsV1) StoreSessionCost(attr *engine.AttrCDRSStoreSMCost, reply * return cdrSv1.CDRs.V1StoreSessionCost(attr, reply) } -func (cdrSv1 *CDRsV1) CountCDRs(args *utils.RPCCDRsFilter, reply *int64) error { +func (cdrSv1 *CDRsV1) CountCDRs(args *utils.RPCCDRsFilterWithArgDispatcher, reply *int64) error { return cdrSv1.CDRs.V1CountCDRs(args, reply) } -func (cdrSv1 *CDRsV1) GetCDRs(args utils.RPCCDRsFilter, reply *[]*engine.CDR) error { +func (cdrSv1 *CDRsV1) GetCDRs(args utils.RPCCDRsFilterWithArgDispatcher, reply *[]*engine.CDR) error { return cdrSv1.CDRs.V1GetCDRs(args, reply) } diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index 428845209..d67b13747 100755 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -686,11 +686,11 @@ func (dS *DispatcherSCDRsV1) Ping(args *utils.CGREventWithArgDispatcher, reply * return dS.dS.CDRsV1Ping(args, reply) } -func (dS *DispatcherSCDRsV1) GetCDRs(args utils.RPCCDRsFilter, reply *[]*engine.CDR) error { +func (dS *DispatcherSCDRsV1) GetCDRs(args utils.RPCCDRsFilterWithArgDispatcher, reply *[]*engine.CDR) error { return dS.dS.CDRsV1GetCDRs(args, reply) } -func (dS *DispatcherSCDRsV1) CountCDRs(args *utils.RPCCDRsFilter, reply *int64) error { +func (dS *DispatcherSCDRsV1) CountCDRs(args *utils.RPCCDRsFilterWithArgDispatcher, reply *int64) error { return dS.dS.CDRsV1CountCDRs(args, reply) } @@ -702,7 +702,7 @@ func (dS *DispatcherSCDRsV1) RateCDRs(args *engine.ArgRateCDRs, reply *string) e return dS.dS.CDRsV1RateCDRs(args, reply) } -func (dS *DispatcherSCDRsV1) ProcessExternalCDR(args *engine.ExternalCDR, reply *string) error { +func (dS *DispatcherSCDRsV1) ProcessExternalCDR(args *engine.ExternalCDRWithArgDispatcher, reply *string) error { return dS.dS.CDRsV1ProcessExternalCDR(args, reply) } @@ -710,6 +710,6 @@ func (dS *DispatcherSCDRsV1) ProcessEvent(args *engine.ArgV1ProcessEvent, reply return dS.dS.CDRsV1ProcessEvent(args, reply) } -func (dS *DispatcherSCDRsV1) ProcessCDR(args *engine.CDR, reply *string) error { +func (dS *DispatcherSCDRsV1) ProcessCDR(args *engine.CDRWithArgDispatcher, reply *string) error { return dS.dS.CDRsV1ProcessCDR(args, reply) } diff --git a/apier/v1/sessions.go b/apier/v1/sessions.go index 2d94de3e7..4e5a48c6a 100644 --- a/apier/v1/sessions.go +++ b/apier/v1/sessions.go @@ -67,7 +67,7 @@ func (ssv1 *SessionSv1) TerminateSession(args *sessions.V1TerminateSessionArgs, return ssv1.Ss.BiRPCv1TerminateSession(nil, args, rply) } -func (ssv1 *SessionSv1) ProcessCDR(cgrEv *utils.CGREvent, rply *string) error { +func (ssv1 *SessionSv1) ProcessCDR(cgrEv *utils.CGREventWithArgDispatcher, rply *string) error { return ssv1.Ss.BiRPCv1ProcessCDR(nil, cgrEv, rply) } diff --git a/apier/v1/sessionsbirpc.go b/apier/v1/sessionsbirpc.go index b4596c24c..bd5992bd5 100644 --- a/apier/v1/sessionsbirpc.go +++ b/apier/v1/sessionsbirpc.go @@ -86,7 +86,7 @@ func (ssv1 *SessionSv1) BiRPCv1TerminateSession(clnt *rpc2.Client, args *session return ssv1.Ss.BiRPCv1TerminateSession(clnt, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(clnt *rpc2.Client, cgrEv *utils.CGREvent, rply *string) error { +func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(clnt *rpc2.Client, cgrEv *utils.CGREventWithArgDispatcher, rply *string) error { return ssv1.Ss.BiRPCv1ProcessCDR(clnt, cgrEv, rply) } diff --git a/data/tariffplans/dispatchers/Attributes.csv b/data/tariffplans/dispatchers/Attributes.csv index 34e5de7a9..1bb7922c2 100644 --- a/data/tariffplans/dispatchers/Attributes.csv +++ b/data/tariffplans/dispatchers/Attributes.csv @@ -12,3 +12,4 @@ cgrates.org,ATTR_API_RSP_AUTH,*auth,*string:~APIKey:rsp12345,,,APIMethods,*const cgrates.org,ATTR_API_CHC_AUTH,*auth,*string:~APIKey:chc12345,,,APIMethods,*constant,CacheSv1.Ping&CacheSv1.GetCacheStats&CacheSv1.LoadCache&CacheSv1.PrecacheStatus&CacheSv1.GetItemIDs&CacheSv1.HasItem&CacheSv1.GetItemExpiryTime&CacheSv1.ReloadCache&CacheSv1.RemoveItem&CacheSv1.FlushCache&CacheSv1.Clear,false,20 cgrates.org,ATTR_API_GRD_AUTH,*auth,*string:~APIKey:grd12345,,,APIMethods,*constant,GuardianSv1.Ping&GuardianSv1.RemoteLock&GuardianSv1.RemoteUnlock,false,20 cgrates.org,ATTR_API_SCHD_AUTH,*auth,*string:~APIKey:sched12345,,,APIMethods,*constant,SchedulerSv1.Ping,false,20 +cgrates.org,ATTR_API_CDRS_AUTH,*auth,*string:~APIKey:cdrs12345,,,APIMethods,*constant,CDRsV1.Ping,false,20 \ No newline at end of file diff --git a/dispatchers/cdrs.go b/dispatchers/cdrs.go index fe3b52ee3..f86f2a742 100644 --- a/dispatchers/cdrs.go +++ b/dispatchers/cdrs.go @@ -42,7 +42,7 @@ func (dS *DispatcherService) CDRsV1Ping(args *utils.CGREventWithArgDispatcher, utils.CDRsV1Ping, args.CGREvent, reply) } -func (dS *DispatcherService) CDRsV1GetCDRs(args utils.RPCCDRsFilter, reply *[]*engine.CDR) (err error) { +func (dS *DispatcherService) CDRsV1GetCDRs(args utils.RPCCDRsFilterWithArgDispatcher, reply *[]*engine.CDR) (err error) { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing("ArgDispatcher") } @@ -57,7 +57,7 @@ func (dS *DispatcherService) CDRsV1GetCDRs(args utils.RPCCDRsFilter, reply *[]*e utils.CDRsV1GetCDRs, args, reply) } -func (dS *DispatcherService) CDRsV1CountCDRs(args *utils.RPCCDRsFilter, reply *int64) (err error) { +func (dS *DispatcherService) CDRsV1CountCDRs(args *utils.RPCCDRsFilterWithArgDispatcher, reply *int64) (err error) { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing("ArgDispatcher") } @@ -93,7 +93,7 @@ func (dS *DispatcherService) CDRsV1RateCDRs(args *engine.ArgRateCDRs, reply *str } if dS.attrS != nil { if err = dS.authorize(utils.CDRsV1RateCDRs, - args.Tenant, + args.TenantArg.Tenant, args.APIKey, utils.TimePointer(time.Now())); err != nil { return } @@ -102,7 +102,7 @@ func (dS *DispatcherService) CDRsV1RateCDRs(args *engine.ArgRateCDRs, reply *str utils.CDRsV1RateCDRs, args, reply) } -func (dS *DispatcherService) CDRsV1ProcessExternalCDR(args *engine.ExternalCDR, reply *string) (err error) { +func (dS *DispatcherService) CDRsV1ProcessExternalCDR(args *engine.ExternalCDRWithArgDispatcher, reply *string) (err error) { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing("ArgDispatcher") } @@ -132,7 +132,7 @@ func (dS *DispatcherService) CDRsV1ProcessEvent(args *engine.ArgV1ProcessEvent, utils.CDRsV1ProcessEvent, args, reply) } -func (dS *DispatcherService) CDRsV1ProcessCDR(args *engine.CDR, reply *string) (err error) { +func (dS *DispatcherService) CDRsV1ProcessCDR(args *engine.CDRWithArgDispatcher, reply *string) (err error) { if args.ArgDispatcher == nil { return utils.NewErrMandatoryIeMissing("ArgDispatcher") } diff --git a/dispatchers/cdrs_it_test.go b/dispatchers/cdrs_it_test.go new file mode 100644 index 000000000..9e1692c21 --- /dev/null +++ b/dispatchers/cdrs_it_test.go @@ -0,0 +1,61 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package dispatchers + +import ( + "testing" + + "github.com/cgrates/cgrates/utils" +) + +var sTestsDspCDRs = []func(t *testing.T){ + testDspCDRsPing, +} + +//Test start here +func TestDspCDRsITMySQL(t *testing.T) { + testDsp(t, sTestsDspCDRs, "TestDspCDRs", "all", "all2", "attributes", "dispatchers", "tutorial", "oldtutorial", "dispatchers") +} + +func TestDspCDRsITMongo(t *testing.T) { + testDsp(t, sTestsDspCDRs, "TestDspCDRs", "all", "all2", "attributes_mongo", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers") +} + +func testDspCDRsPing(t *testing.T) { + var reply string + if err := allEngine.RCP.Call(utils.CDRsV1Ping, new(utils.CGREvent), &reply); err != nil { + t.Error(err) + } else if reply != utils.Pong { + t.Errorf("Received: %s", reply) + } + if err := dispEngine.RCP.Call(utils.CDRsV1Ping, &utils.CGREventWithArgDispatcher{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + }, + ArgDispatcher: &utils.ArgDispatcher{ + APIKey: utils.StringPointer("cdrs12345"), + }, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.Pong { + t.Errorf("Received: %s", reply) + } +} diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index 78196d0dc..edb10a095 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -215,9 +215,11 @@ func (_ *singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeI hostIDs []string, serviceMethod string, args interface{}, reply interface{}) (err error) { var dH *engine.DispatcherHost if routeID != nil && *routeID != "" { + // overwrite routeID with RouteID:Subsystem + *routeID = utils.ConcatenatedKey(*routeID, subsystem) // use previously discovered route if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes, - utils.ConcatenatedKey(subsystem, *routeID)); ok && x != nil { + *routeID); ok && x != nil { dH = x.(*engine.DispatcherHost) if err = dH.Call(serviceMethod, args, reply); !utils.IsNetworkError(err) { return @@ -233,7 +235,7 @@ func (_ *singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeI continue } if routeID != nil && *routeID != "" { // cache the discovered route - engine.Cache.Set(utils.CacheDispatcherRoutes, utils.ConcatenatedKey(subsystem, *routeID), dH, + engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, dH, nil, true, utils.EmptyString) } break diff --git a/engine/cdr.go b/engine/cdr.go index 941c05e71..1f9a4830b 100644 --- a/engine/cdr.go +++ b/engine/cdr.go @@ -104,7 +104,6 @@ type CDR struct { CostSource string // The source of this cost Cost float64 // CostDetails *EventCost // Attach the cost details to CDR when possible - *utils.ArgDispatcher } // AddDefaults will add missing information based on other fields @@ -805,7 +804,6 @@ type ExternalCDR struct { CostDetails string ExtraInfo string PreRated bool // Mark the CDR as rated so we do not process it during mediation - *utils.ArgDispatcher } // Used when authorizing requests from outside, eg ApierV1.GetMaxUsage @@ -881,3 +879,13 @@ func (self *UsageRecord) AsCallDescriptor(timezone string, denyNegative bool) (* func (self *UsageRecord) GetId() string { return utils.Sha1(self.ToR, self.RequestType, self.Tenant, self.Category, self.Account, self.Subject, self.Destination, self.SetupTime, self.AnswerTime, self.Usage) } + +type CDRWithArgDispatcher struct { + *CDR + *utils.ArgDispatcher +} + +type ExternalCDRWithArgDispatcher struct { + *ExternalCDR + *utils.ArgDispatcher +} diff --git a/engine/cdrs.go b/engine/cdrs.go index 0b504b6b6..a7e8f8311 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -43,7 +43,7 @@ func cgrCdrHandler(w http.ResponseWriter, r *http.Request) { } cdr := cgrCdr.AsCDR(cdrServer.cgrCfg.GeneralCfg().DefaultTimezone) var ignored string - if err := cdrServer.V1ProcessCDR(cdr, &ignored); err != nil { + if err := cdrServer.V1ProcessCDR(&CDRWithArgDispatcher{CDR: cdr}, &ignored); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> processing CDR: %s, err: <%s>", utils.CDRs, cdr, err.Error())) @@ -60,7 +60,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { } cdr := fsCdr.AsCDR(cdrServer.cgrCfg.GeneralCfg().DefaultTimezone) var ignored string - if err := cdrServer.V1ProcessCDR(cdr, &ignored); err != nil { + if err := cdrServer.V1ProcessCDR(&CDRWithArgDispatcher{CDR: cdr}, &ignored); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> processing CDR: %s, err: <%s>", utils.CDRs, cdr, err.Error())) @@ -137,7 +137,7 @@ func (cdrS *CDRServer) storeSMCost(smCost *SMCost, checkDuplicate bool) error { // rateCDR will populate cost field // Returns more than one rated CDR in case of SMCost retrieved based on prefix -func (cdrS *CDRServer) rateCDR(cdr *CDR) ([]*CDR, error) { +func (cdrS *CDRServer) rateCDR(cdr *CDRWithArgDispatcher) ([]*CDR, error) { var qryCC *CallCost var err error if cdr.RequestType == utils.META_NONE { @@ -197,11 +197,11 @@ func (cdrS *CDRServer) rateCDR(cdr *CDR) ([]*CDR, error) { cdr.CostDetails = NewEventCostFromCallCost(qryCC, cdr.CGRID, cdr.RunID) } cdr.CostDetails.Compute() - return []*CDR{cdr}, nil + return []*CDR{cdr.CDR}, nil } // getCostFromRater will retrieve the cost from RALs -func (cdrS *CDRServer) getCostFromRater(cdr *CDR) (*CallCost, error) { +func (cdrS *CDRServer) getCostFromRater(cdr *CDRWithArgDispatcher) (*CallCost, error) { cc := new(CallCost) var err error timeStart := cdr.AnswerTime @@ -266,13 +266,13 @@ func (cdrS *CDRServer) attrStoExpThdStat(cgrEv *utils.CGREventWithArgDispatcher, return } -func (cdrS *CDRServer) rateCDRWithErr(cdr *CDR) (ratedCDRs []*CDR) { +func (cdrS *CDRServer) rateCDRWithErr(cdr *CDRWithArgDispatcher) (ratedCDRs []*CDR) { var err error ratedCDRs, err = cdrS.rateCDR(cdr) if err != nil { cdr.Cost = -1.0 // If there was an error, mark the CDR cdr.ExtraInfo = err.Error() - ratedCDRs = []*CDR{cdr} + ratedCDRs = []*CDR{cdr.CDR} } return } @@ -300,12 +300,10 @@ func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREventWithArgDispatcher, partExec = true continue } - for _, rtCDR := range cdrS.rateCDRWithErr(cdr) { + for _, rtCDR := range cdrS.rateCDRWithErr(&CDRWithArgDispatcher{CDR: cdr, ArgDispatcher: cgrEv.ArgDispatcher}) { arg := &utils.CGREventWithArgDispatcher{ - CGREvent: rtCDR.AsCGREvent(), - } - if cgrEv.ArgDispatcher != nil { - arg.ArgDispatcher = cgrEv.ArgDispatcher + CGREvent: rtCDR.AsCGREvent(), + ArgDispatcher: cgrEv.ArgDispatcher, } if errProc := cdrS.attrStoExpThdStat(arg, attrS, store, export, thdS, statS); errProc != nil { @@ -426,7 +424,7 @@ func (cdrS *CDRServer) Call(serviceMethod string, args interface{}, reply interf } // V1ProcessCDR processes a CDR -func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) { +func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithArgDispatcher, reply *string) (err error) { if cdr.CGRID == utils.EmptyString { // Populate CGRID if not present cdr.ComputeCGRID() } @@ -473,9 +471,7 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) { ID: utils.UUIDSha1Prefix(), Event: cdr.AsMapStringIface(), }, - } - if cdr.ArgDispatcher != nil { - cgrEv.ArgDispatcher = cdr.ArgDispatcher + ArgDispatcher: cdr.ArgDispatcher, } if cdrS.attrS != nil { if err = cdrS.attrSProcessEvent(cgrEv); err != nil { @@ -484,7 +480,7 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) { } } if cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs { // Store *raw CDR - if err = cdrS.cdrDb.SetCDR(cdr, false); err != nil { + if err = cdrS.cdrDb.SetCDR(cdr.CDR, false); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> storing primary CDR %+v, got error: %s", utils.CDRs, cdr, err.Error())) @@ -493,7 +489,7 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) { } } if len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 { - cdrS.exportCDRs([]*CDR{cdr}) // Replicate raw CDR + cdrS.exportCDRs([]*CDR{cdr.CDR}) // Replicate raw CDR } if cdrS.thdS != nil { go cdrS.thdSProcessEvent(cgrEv) @@ -600,7 +596,8 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er err = utils.ErrPartiallyExecuted return } - for _, rtCDR := range cdrS.rateCDRWithErr(cdr) { + for _, rtCDR := range cdrS.rateCDRWithErr(&CDRWithArgDispatcher{CDR: cdr, + ArgDispatcher: arg.ArgDispatcher}) { cgrEv := &utils.CGREventWithArgDispatcher{ CGREvent: rtCDR.AsCGREvent(), ArgDispatcher: arg.ArgDispatcher, @@ -728,6 +725,7 @@ type ArgRateCDRs struct { ThresholdS *bool StatS *bool // Set to true if the CDRs should be sent to stats server *utils.ArgDispatcher + *utils.TenantArg } // V1RateCDRs is used for re-/rate CDRs which are already stored within StorDB @@ -777,16 +775,18 @@ func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) { } // Used to process external CDRs -func (cdrS *CDRServer) V1ProcessExternalCDR(eCDR *ExternalCDR, reply *string) error { - cdr, err := NewCDRFromExternalCDR(eCDR, cdrS.cgrCfg.GeneralCfg().DefaultTimezone) +func (cdrS *CDRServer) V1ProcessExternalCDR(eCDR *ExternalCDRWithArgDispatcher, reply *string) error { + cdr, err := NewCDRFromExternalCDR(eCDR.ExternalCDR, + cdrS.cgrCfg.GeneralCfg().DefaultTimezone) if err != nil { return err } - return cdrS.V1ProcessCDR(cdr, reply) + return cdrS.V1ProcessCDR(&CDRWithArgDispatcher{CDR: cdr, + ArgDispatcher: eCDR.ArgDispatcher}, reply) } // V1GetCDRs returns CDRs from DB -func (cdrS *CDRServer) V1GetCDRs(args utils.RPCCDRsFilter, cdrs *[]*CDR) error { +func (cdrS *CDRServer) V1GetCDRs(args utils.RPCCDRsFilterWithArgDispatcher, cdrs *[]*CDR) error { cdrsFltr, err := args.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone) if err != nil { if err.Error() != utils.NotFoundCaps { @@ -803,7 +803,7 @@ func (cdrS *CDRServer) V1GetCDRs(args utils.RPCCDRsFilter, cdrs *[]*CDR) error { } // V1CountCDRs counts CDRs from DB -func (cdrS *CDRServer) V1CountCDRs(args *utils.RPCCDRsFilter, cnt *int64) error { +func (cdrS *CDRServer) V1CountCDRs(args *utils.RPCCDRsFilterWithArgDispatcher, cnt *int64) error { cdrsFltr, err := args.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone) if err != nil { if err.Error() != utils.NotFoundCaps { diff --git a/engine/chargers.go b/engine/chargers.go index 6ee288c9e..fa43748da 100644 --- a/engine/chargers.go +++ b/engine/chargers.go @@ -129,12 +129,11 @@ func (cS *ChargerService) processEvent(cgrEv *utils.CGREventWithArgDispatcher) ( } args := &AttrArgsProcessEvent{ - AttributeIDs: cP.AttributeIDs, - Context: utils.StringPointer(utils.MetaChargers), - ProcessRuns: nil, - CGREvent: *clonedEv.CGREvent} - if clonedEv.ArgDispatcher != nil { - args.ArgDispatcher = clonedEv.ArgDispatcher + AttributeIDs: cP.AttributeIDs, + Context: utils.StringPointer(utils.MetaChargers), + ProcessRuns: nil, + CGREvent: *clonedEv.CGREvent, + ArgDispatcher: clonedEv.ArgDispatcher, } var evReply AttrSProcessEventReply if err = cS.attrS.Call(utils.AttributeSv1ProcessEvent, diff --git a/engine/resources.go b/engine/resources.go index 30d44ec7c..7b7d88dbb 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -526,10 +526,10 @@ func (rS *ResourceService) processThresholds(r *Resource, argDispatcher *utils.A Event: map[string]interface{}{ utils.EventType: utils.ResourceUpdate, utils.ResourceID: r.ID, - utils.Usage: r.totalUsage()}}} - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if argDispatcher != nil { - thEv.ArgDispatcher = argDispatcher + utils.Usage: r.totalUsage(), + }, + }, + ArgDispatcher: argDispatcher, } var tIDs []string if err = rS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && diff --git a/engine/stats.go b/engine/stats.go index f2303c72d..92dca3679 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -281,10 +281,10 @@ func (sS *StatService) processEvent(args *StatsArgsProcessEvent) (statQueueIDs [ ID: utils.GenUUID(), Event: map[string]interface{}{ utils.EventType: utils.StatUpdate, - utils.StatID: sq.ID}}} - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - thEv.ArgDispatcher = args.ArgDispatcher + utils.StatID: sq.ID, + }, + }, + ArgDispatcher: args.ArgDispatcher, } for metricID, metric := range sq.SQMetrics { thEv.Event[metricID] = metric.GetValue() diff --git a/engine/storage_utils.go b/engine/storage_utils.go index cb0641667..388f9ae9e 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -169,13 +169,15 @@ type SMCost struct { type AttrCDRSStoreSMCost struct { Cost *SMCost CheckDuplicate bool - *utils.TenantWithArgDispatcher + *utils.ArgDispatcher + *utils.TenantArg } type ArgsV2CDRSStoreSMCost struct { Cost *V2SMCost CheckDuplicate bool - *utils.TenantWithArgDispatcher + *utils.ArgDispatcher + *utils.TenantArg } type V2SMCost struct { diff --git a/engine/suppliers.go b/engine/suppliers.go index da465b6cc..d1f64d0a0 100644 --- a/engine/suppliers.go +++ b/engine/suppliers.go @@ -478,12 +478,9 @@ func (spS *SupplierService) V1GetSuppliers(args *ArgsGetSuppliers, reply *Sorted } if spS.attributeS != nil { attrArgs := &AttrArgsProcessEvent{ - Context: utils.StringPointer(utils.MetaSuppliers), - CGREvent: args.CGREvent, - } - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - attrArgs.ArgDispatcher = args.ArgDispatcher + Context: utils.StringPointer(utils.MetaSuppliers), + CGREvent: args.CGREvent, + ArgDispatcher: args.ArgDispatcher, } var rplyEv AttrSProcessEventReply if err := spS.attributeS.Call(utils.AttributeSv1ProcessEvent, diff --git a/sessions/sessions.go b/sessions/sessions.go index 262347bc5..e9a1be476 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -391,17 +391,15 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs var reply string for _, cgrEv := range cgrEvs { argsProc := &engine.ArgV1ProcessEvent{ - CGREvent: *cgrEv, - ChargerS: utils.BoolPointer(false), - AttributeS: utils.BoolPointer(false)} - + CGREvent: *cgrEv, + ChargerS: utils.BoolPointer(false), + AttributeS: utils.BoolPointer(false), + ArgDispatcher: s.ArgDispatcher, + } if unratedReqs.HasField( // order additional rating for unrated request types engine.NewMapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RequestType)) { argsProc.RALs = utils.BoolPointer(true) } - if s.ArgDispatcher != nil { - argsProc.ArgDispatcher = s.ArgDispatcher - } if err = sS.cdrS.Call(utils.CDRsV1ProcessEvent, argsProc, &reply); err != nil { utils.Logger.Warning( fmt.Sprintf( @@ -415,12 +413,10 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs if sS.resS != nil && s.ResourceID != "" { var reply string argsRU := utils.ArgRSv1ResourceUsage{ - CGREvent: cgrEv, - UsageID: s.ResourceID, - Units: 1, - } - if s.ArgDispatcher != nil { - argsRU.ArgDispatcher = s.ArgDispatcher + CGREvent: cgrEv, + UsageID: s.ResourceID, + Units: 1, + ArgDispatcher: s.ArgDispatcher, } if err := sS.resS.Call(utils.ResourceSv1ReleaseResources, argsRU, &reply); err != nil { @@ -600,18 +596,16 @@ func (sS *SessionS) refundSession(s *Session, sRunIdx int, rUsage time.Duration) } } cd := &engine.CallDescriptor{ - CgrID: s.CGRID, - RunID: sr.Event.GetStringIgnoreErrors(utils.RunID), - Category: sr.CD.Category, - Tenant: sr.CD.Tenant, - Subject: sr.CD.Subject, - Account: sr.CD.Account, - Destination: sr.CD.Destination, - TOR: sr.CD.TOR, - Increments: incrmts, - } - if s.ArgDispatcher != nil { - cd.ArgDispatcher = s.ArgDispatcher + CgrID: s.CGRID, + RunID: sr.Event.GetStringIgnoreErrors(utils.RunID), + Category: sr.CD.Category, + Tenant: sr.CD.Tenant, + Subject: sr.CD.Subject, + Account: sr.CD.Account, + Destination: sr.CD.Destination, + TOR: sr.CD.TOR, + Increments: incrmts, + ArgDispatcher: s.ArgDispatcher, } var acnt engine.Account if err = sS.ralS.Call(utils.ResponderRefundIncrements, cd, &acnt); err != nil { @@ -645,9 +639,10 @@ func (sS *SessionS) storeSCost(s *Session, sRunIdx int) (err error) { argSmCost := &engine.ArgsV2CDRSStoreSMCost{ Cost: smCost, CheckDuplicate: true, - } - if s.ArgDispatcher != nil { - argSmCost.ArgDispatcher = s.ArgDispatcher + ArgDispatcher: s.ArgDispatcher, + TenantArg: &utils.TenantArg{ + Tenant: s.Tenant, + }, } var reply string if err := sS.cdrS.Call(utils.CDRsV2StoreSessionCost, @@ -974,11 +969,7 @@ func (sS *SessionS) forkSession(s *Session) (err error) { ID: utils.UUIDSha1Prefix(), Event: s.EventStart.AsMapInterface(), }, - } - // in case we have ArgDispatcher in session we populate CGREvent - // so DispatcherS can verify the APIKey/RouteID if it's necessary - if s.ArgDispatcher != nil { - cgrEv.ArgDispatcher = s.ArgDispatcher + ArgDispatcher: s.ArgDispatcher, } var chrgrs []*engine.ChrgSProcessEventReply if err = sS.chargerS.Call(utils.ChargerSv1ProcessEvent, @@ -1710,12 +1701,9 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, return utils.NewErrNotConnected(utils.AttributeS) } attrArgs := &engine.AttrArgsProcessEvent{ - Context: utils.StringPointer(utils.MetaSessionS), - CGREvent: args.CGREvent, - } - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - attrArgs.ArgDispatcher = args.ArgDispatcher + Context: utils.StringPointer(utils.MetaSessionS), + CGREvent: args.CGREvent, + ArgDispatcher: args.ArgDispatcher, } var rplyEv engine.AttrSProcessEventReply if err := sS.attrS.Call(utils.AttributeSv1ProcessEvent, @@ -1749,13 +1737,10 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, } var allocMsg string attrRU := utils.ArgRSv1ResourceUsage{ - CGREvent: args.CGREvent, - UsageID: originID, - Units: 1, - } - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - attrRU.ArgDispatcher = args.ArgDispatcher + CGREvent: args.CGREvent, + UsageID: originID, + Units: 1, + ArgDispatcher: args.ArgDispatcher, } if err = sS.resS.Call(utils.ResourceSv1AuthorizeResources, attrRU, &allocMsg); err != nil { @@ -1773,14 +1758,11 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, } var splsReply engine.SortedSuppliers sArgs := &engine.ArgsGetSuppliers{ - IgnoreErrors: args.SuppliersIgnoreErrors, - MaxCost: args.SuppliersMaxCost, - CGREvent: *cgrEv, - Paginator: args.Paginator, - } - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - sArgs.ArgDispatcher = args.ArgDispatcher + IgnoreErrors: args.SuppliersIgnoreErrors, + MaxCost: args.SuppliersMaxCost, + CGREvent: *cgrEv, + Paginator: args.Paginator, + ArgDispatcher: args.ArgDispatcher, } if err = sS.splS.Call(utils.SupplierSv1GetSuppliers, sArgs, &splsReply); err != nil { @@ -1796,11 +1778,8 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, } var tIDs []string thEv := &engine.ArgsProcessEvent{ - CGREvent: args.CGREvent, - } - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - thEv.ArgDispatcher = args.ArgDispatcher + CGREvent: args.CGREvent, + ArgDispatcher: args.ArgDispatcher, } if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && err.Error() != utils.ErrNotFound.Error() { @@ -1814,10 +1793,9 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, if sS.statS == nil { return utils.NewErrNotConnected(utils.StatService) } - statArgs := &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent} - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - statArgs.ArgDispatcher = args.ArgDispatcher + statArgs := &engine.StatsArgsProcessEvent{ + CGREvent: args.CGREvent, + ArgDispatcher: args.ArgDispatcher, } var statReply []string if err := sS.statS.Call(utils.StatSv1ProcessEvent, @@ -1997,12 +1975,9 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, return utils.NewErrNotConnected(utils.AttributeS) } attrArgs := &engine.AttrArgsProcessEvent{ - Context: utils.StringPointer(utils.MetaSessionS), - CGREvent: args.CGREvent, - } - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - attrArgs.ArgDispatcher = args.ArgDispatcher + Context: utils.StringPointer(utils.MetaSessionS), + CGREvent: args.CGREvent, + ArgDispatcher: args.ArgDispatcher, } var rplyEv engine.AttrSProcessEventReply if err := sS.attrS.Call(utils.AttributeSv1ProcessEvent, @@ -2026,13 +2001,10 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, return utils.NewErrMandatoryIeMissing(utils.OriginID) } attrRU := utils.ArgRSv1ResourceUsage{ - CGREvent: args.CGREvent, - UsageID: originID, - Units: 1, - } - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - attrRU.ArgDispatcher = args.ArgDispatcher + CGREvent: args.CGREvent, + UsageID: originID, + Units: 1, + ArgDispatcher: args.ArgDispatcher, } var allocMessage string if err = sS.resS.Call(utils.ResourceSv1AllocateResources, @@ -2071,11 +2043,8 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, } var tIDs []string thEv := &engine.ArgsProcessEvent{ - CGREvent: args.CGREvent, - } - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - thEv.ArgDispatcher = args.ArgDispatcher + CGREvent: args.CGREvent, + ArgDispatcher: args.ArgDispatcher, } if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && @@ -2091,10 +2060,9 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, return utils.NewErrNotConnected(utils.StatService) } var statReply []string - statArgs := &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent} - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - statArgs.ArgDispatcher = args.ArgDispatcher + statArgs := &engine.StatsArgsProcessEvent{ + CGREvent: args.CGREvent, + ArgDispatcher: args.ArgDispatcher, } if err := sS.statS.Call(utils.StatSv1ProcessEvent, statArgs, &statReply); err != nil && @@ -2153,8 +2121,11 @@ func (sS *SessionS) BiRPCv1InitiateSessionWithDigest(clnt rpcclient.RpcClientCon // NewV1UpdateSessionArgs is a constructor for update session arguments func NewV1UpdateSessionArgs(attrs, acnts bool, cgrEv utils.CGREvent) (args *V1UpdateSessionArgs) { - args = &V1UpdateSessionArgs{GetAttributes: attrs, - UpdateSession: acnts, CGREvent: cgrEv} + args = &V1UpdateSessionArgs{ + GetAttributes: attrs, + UpdateSession: acnts, + CGREvent: cgrEv, + } //check if we have APIKey in event and in case it has add it in ArgDispatcher apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] if hasApiKey { @@ -2249,12 +2220,9 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection, return utils.NewErrNotConnected(utils.AttributeS) } attrArgs := &engine.AttrArgsProcessEvent{ - Context: utils.StringPointer(utils.MetaSessionS), - CGREvent: args.CGREvent, - } - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - attrArgs.ArgDispatcher = args.ArgDispatcher + Context: utils.StringPointer(utils.MetaSessionS), + CGREvent: args.CGREvent, + ArgDispatcher: args.ArgDispatcher, } var rplyEv engine.AttrSProcessEventReply if err := sS.attrS.Call(utils.AttributeSv1ProcessEvent, @@ -2417,13 +2385,10 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, } var reply string argsRU := utils.ArgRSv1ResourceUsage{ - CGREvent: args.CGREvent, - UsageID: originID, // same ID should be accepted by first group since the previous resource should be expired - Units: 1, - } - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - argsRU.ArgDispatcher = args.ArgDispatcher + CGREvent: args.CGREvent, + UsageID: originID, // same ID should be accepted by first group since the previous resource should be expired + Units: 1, + ArgDispatcher: args.ArgDispatcher, } if err = sS.resS.Call(utils.ResourceSv1ReleaseResources, argsRU, &reply); err != nil { @@ -2436,11 +2401,8 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, } var tIDs []string thEv := &engine.ArgsProcessEvent{ - CGREvent: args.CGREvent, - } - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - thEv.ArgDispatcher = args.ArgDispatcher + CGREvent: args.CGREvent, + ArgDispatcher: args.ArgDispatcher, } if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && err.Error() != utils.ErrNotFound.Error() { @@ -2454,10 +2416,9 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, return utils.NewErrNotConnected(utils.StatS) } var statReply []string - statArgs := &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent} - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - statArgs.ArgDispatcher = args.ArgDispatcher + statArgs := &engine.StatsArgsProcessEvent{ + CGREvent: args.CGREvent, + ArgDispatcher: args.ArgDispatcher, } if err := sS.statS.Call(utils.StatSv1ProcessEvent, statArgs, &statReply); err != nil && @@ -2473,14 +2434,14 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, // BiRPCv1ProcessCDR sends the CDR to CDRs func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, - cgrEv *utils.CGREvent, rply *string) (err error) { - if cgrEv.ID == "" { - cgrEv.ID = utils.GenUUID() + cgrEvWithArgDisp *utils.CGREventWithArgDispatcher, rply *string) (err error) { + if cgrEvWithArgDisp.ID == "" { + cgrEvWithArgDisp.ID = utils.GenUUID() } // RPC caching if sS.cgrCfg.CacheCfg()[utils.CacheRPCResponses].Limit != 0 { - cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessCDR, cgrEv.ID) + cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessCDR, cgrEvWithArgDisp.ID) refID := guardian.Guardian.GuardIDs("", sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic defer guardian.Guardian.UnguardIDs(refID) @@ -2498,7 +2459,7 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, } // end of RPC caching - ev := engine.NewSafEvent(cgrEv.Event) + ev := engine.NewSafEvent(cgrEvWithArgDisp.Event) cgrID := GetSetCGRID(ev) ss := sS.getRelocateSessions(cgrID, ev.GetStringIgnoreErrors(utils.InitialOriginID), @@ -2517,7 +2478,8 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, } if s == nil { // no cached session, CDR will be handled by CDRs return sS.cdrS.Call(utils.CDRsV1ProcessEvent, - &engine.ArgV1ProcessEvent{CGREvent: *cgrEv}, rply) + &engine.ArgV1ProcessEvent{CGREvent: *cgrEvWithArgDisp.CGREvent, + ArgDispatcher: cgrEvWithArgDisp.ArgDispatcher}, rply) } // Use previously stored Session to generate CDRs @@ -2538,9 +2500,11 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, var withErrors bool for _, cgrEv := range cgrEvs { argsProc := &engine.ArgV1ProcessEvent{ - CGREvent: *cgrEv, - ChargerS: utils.BoolPointer(false), - AttributeS: utils.BoolPointer(false)} + CGREvent: *cgrEv, + ChargerS: utils.BoolPointer(false), + AttributeS: utils.BoolPointer(false), + ArgDispatcher: cgrEvWithArgDisp.ArgDispatcher, + } if unratedReqs.HasField( // order additional rating for unrated request types engine.NewMapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RequestType)) { argsProc.RALs = utils.BoolPointer(true) @@ -2671,12 +2635,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, return utils.NewErrNotConnected(utils.AttributeS) } attrArgs := &engine.AttrArgsProcessEvent{ - Context: utils.StringPointer(utils.MetaSessionS), - CGREvent: args.CGREvent, - } - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - attrArgs.ArgDispatcher = args.ArgDispatcher + Context: utils.StringPointer(utils.MetaSessionS), + CGREvent: args.CGREvent, + ArgDispatcher: args.ArgDispatcher, } var rplyEv engine.AttrSProcessEventReply if err := sS.attrS.Call(utils.AttributeSv1ProcessEvent, @@ -2700,13 +2661,10 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, return utils.NewErrMandatoryIeMissing(utils.OriginID) } attrRU := utils.ArgRSv1ResourceUsage{ - CGREvent: args.CGREvent, - UsageID: originID, - Units: 1, - } - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - attrRU.ArgDispatcher = args.ArgDispatcher + CGREvent: args.CGREvent, + UsageID: originID, + Units: 1, + ArgDispatcher: args.ArgDispatcher, } var allocMessage string if err = sS.resS.Call(utils.ResourceSv1AllocateResources, @@ -2729,11 +2687,8 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, } var tIDs []string thEv := &engine.ArgsProcessEvent{ - CGREvent: args.CGREvent, - } - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - thEv.ArgDispatcher = args.ArgDispatcher + CGREvent: args.CGREvent, + ArgDispatcher: args.ArgDispatcher, } if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && @@ -2748,10 +2703,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, return utils.NewErrNotConnected(utils.StatS) } var statReply []string - statArgs := &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent} - // in case we receive ArgDispatcher we add it to be used by DispatcherS - if args.ArgDispatcher != nil { - statArgs.ArgDispatcher = args.ArgDispatcher + statArgs := &engine.StatsArgsProcessEvent{ + CGREvent: args.CGREvent, + ArgDispatcher: args.ArgDispatcher, } if err := sS.statS.Call(utils.StatSv1ProcessEvent, statArgs, &statReply); err != nil && @@ -2919,11 +2873,12 @@ func (sS *SessionS) BiRPCV1ProcessCDR(clnt rpcclient.RpcClientConnection, ev engine.MapEvent, rply *string) (err error) { return sS.BiRPCv1ProcessCDR( clnt, - &utils.CGREvent{ - Tenant: utils.FirstNonEmpty( - ev.GetStringIgnoreErrors(utils.Tenant), - sS.cgrCfg.GeneralCfg().DefaultTenant), - ID: utils.UUIDSha1Prefix(), - Event: ev}, + &utils.CGREventWithArgDispatcher{ + CGREvent: &utils.CGREvent{ + Tenant: utils.FirstNonEmpty( + ev.GetStringIgnoreErrors(utils.Tenant), + sS.cgrCfg.GeneralCfg().DefaultTenant), + ID: utils.UUIDSha1Prefix(), + Event: ev}}, rply) } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 54cdac8b1..5d39a002e 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -828,7 +828,6 @@ type RPCCDRsFilter struct { MaxCost *float64 // End of the usage interval (<) OrderBy string // Ascendent/Descendent Paginator // Add pagination - *TenantWithArgDispatcher } func (self *RPCCDRsFilter) AsCDRsFilter(timezone string) (*CDRsFilter, error) { @@ -1358,3 +1357,8 @@ func AppendToSMCostFilter(smcFilter *SMCostFilter, fieldType, fieldName string, } return smcFilter, err } + +type RPCCDRsFilterWithArgDispatcher struct { + *RPCCDRsFilter + *TenantWithArgDispatcher +}