diff --git a/apier/v1/costs.go b/apier/v1/costs.go index c94174d7c..74c5a3460 100644 --- a/apier/v1/costs.go +++ b/apier/v1/costs.go @@ -51,7 +51,7 @@ func (apierSv1 *APIerSv1) GetCost(attrs *AttrGetCost, ec *engine.EventCost) erro cd := &engine.CallDescriptor{ Category: attrs.Category, - Tenant: attrs.Tenant, + Tenant: utils.FirstNonEmpty(attrs.Tenant, apierSv1.Config.GeneralCfg().DefaultTenant), Subject: attrs.Subject, Destination: attrs.Destination, TimeStart: aTime, @@ -91,7 +91,7 @@ func (apierSv1 *APIerSv1) GetDataCost(attrs *AttrGetDataCost, reply *engine.Data } cd := &engine.CallDescriptor{ Category: attrs.Category, - Tenant: attrs.Tenant, + Tenant: utils.FirstNonEmpty(attrs.Tenant, apierSv1.Config.GeneralCfg().DefaultTenant), Subject: attrs.Subject, TimeStart: aTime, TimeEnd: aTime.Add(attrs.Usage), diff --git a/ees/ees.go b/ees/ees.go index f20a86529..22ee5a512 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -165,7 +165,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma } if len(eeCfg.Filters) != 0 { - tnt := cgrEv.Tenant + tnt := utils.FirstNonEmpty(cgrEv.Tenant, eeS.cfg.GeneralCfg().DefaultTenant) if eeTnt, errTnt := eeCfg.Tenant.ParseDataProvider(cgrDp); errTnt == nil && eeTnt != utils.EmptyString { tnt = eeTnt } diff --git a/engine/cdrs.go b/engine/cdrs.go index 7778a5dd7..0d5de052d 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -756,9 +756,12 @@ func (attr *ArgV1ProcessEvent) Clone() *ArgV1ProcessEvent { // V1ProcessEvent will process the CGREvent func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (err error) { - if arg.CGREvent.ID == "" { + if arg.CGREvent.ID == utils.EmptyString { arg.CGREvent.ID = utils.GenUUID() } + if arg.CGREvent.Tenant == utils.EmptyString { + arg.CGREvent.Tenant = cdrS.cgrCfg.GeneralCfg().DefaultTenant + } // RPC caching if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { cacheKey := utils.ConcatenatedKey(utils.CDRsV1ProcessEvent, arg.CGREvent.ID) diff --git a/sessions/sessions.go b/sessions/sessions.go index 0401f31be..0fd9669f2 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -2420,9 +2420,12 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.ClientConnector, if args.CGREvent == nil { return utils.NewErrMandatoryIeMissing(utils.CGREventString) } - if args.CGREvent.ID == "" { + if args.CGREvent.ID == utils.EmptyString { args.CGREvent.ID = utils.GenUUID() } + if args.CGREvent.Tenant == utils.EmptyString { + args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant + } // RPC caching if sS.cgrCfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { @@ -2447,9 +2450,7 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.ClientConnector, if !args.GetAttributes && !args.UpdateSession { return utils.NewErrMandatoryIeMissing("subsystems") } - if args.CGREvent.Tenant == "" { - args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant - } + if args.GetAttributes { rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts, false) if err == nil { @@ -2567,6 +2568,9 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.ClientConnector, if args.CGREvent.ID == "" { args.CGREvent.ID = utils.GenUUID() } + if args.CGREvent.Tenant == "" { + args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant + } // RPC caching if sS.cgrCfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { cacheKey := utils.ConcatenatedKey(utils.SessionSv1TerminateSession, args.CGREvent.ID) @@ -2589,9 +2593,7 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.ClientConnector, if !args.TerminateSession && !args.ReleaseResources { return utils.NewErrMandatoryIeMissing("subsystems") } - if args.CGREvent.Tenant == "" { - args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant - } + ev := engine.MapEvent(args.CGREvent.Event) opts := engine.MapEvent(args.Opts) cgrID := GetSetCGRID(ev) @@ -2688,14 +2690,17 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.ClientConnector, // BiRPCv1ProcessCDR sends the CDR to CDRs func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.ClientConnector, - cgrEvWithArgDisp *utils.CGREventWithOpts, rply *string) (err error) { - if cgrEvWithArgDisp.ID == "" { - cgrEvWithArgDisp.ID = utils.GenUUID() + cgrEv *utils.CGREventWithOpts, rply *string) (err error) { + if cgrEv.ID == utils.EmptyString { + cgrEv.ID = utils.GenUUID() + } + if cgrEv.Tenant == utils.EmptyString { + cgrEv.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant } // RPC caching if sS.cgrCfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { - cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessCDR, cgrEvWithArgDisp.ID) + cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessCDR, cgrEv.ID) refID := guardian.Guardian.GuardIDs("", sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic defer guardian.Guardian.UnguardIDs(refID) @@ -2713,11 +2718,11 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.ClientConnector, } // end of RPC caching // in case that source don't exist add it - if _, has := cgrEvWithArgDisp.Event[utils.Source]; !has { - cgrEvWithArgDisp.Event[utils.Source] = utils.MetaSessionS + if _, has := cgrEv.Event[utils.Source]; !has { + cgrEv.Event[utils.Source] = utils.MetaSessionS } - return sS.processCDR(cgrEvWithArgDisp, []string{utils.MetaRALs}, rply, false) + return sS.processCDR(cgrEv, []string{utils.MetaRALs}, rply, false) } // NewV1ProcessMessageArgs is a constructor for MessageArgs used by ProcessMessage @@ -2863,9 +2868,12 @@ func (sS *SessionS) BiRPCv1ProcessMessage(clnt rpcclient.ClientConnector, return utils.NewErrMandatoryIeMissing(utils.CGREventString) } var withErrors bool - if args.CGREvent.ID == "" { + if args.CGREvent.ID == utils.EmptyString { args.CGREvent.ID = utils.GenUUID() } + if args.CGREvent.Tenant == utils.EmptyString { + args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant + } // RPC caching if sS.cgrCfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { @@ -2887,9 +2895,6 @@ func (sS *SessionS) BiRPCv1ProcessMessage(clnt rpcclient.ClientConnector, } // end of RPC caching - if args.CGREvent.Tenant == "" { - args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant - } me := engine.MapEvent(args.CGREvent.Event) originID := me.GetStringIgnoreErrors(utils.OriginID) @@ -3080,6 +3085,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, if args.CGREvent.ID == "" { args.CGREvent.ID = utils.GenUUID() } + if args.CGREvent.Tenant == "" { + args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant + } // RPC caching if sS.cgrCfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { @@ -3101,10 +3109,6 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, } // end of RPC caching - if args.CGREvent.Tenant == "" { - args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant - } - //convert from Flags []string to utils.FlagsWithParams argsFlagsWithParams := utils.FlagsWithParamsFromSlice(args.Flags) @@ -3522,6 +3526,9 @@ func (sS *SessionS) BiRPCv1GetCost(clnt rpcclient.ClientConnector, if args.CGREvent.ID == "" { args.CGREvent.ID = utils.GenUUID() } + if args.CGREvent.Tenant == "" { + args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant + } // RPC caching if sS.cgrCfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { @@ -3543,10 +3550,6 @@ func (sS *SessionS) BiRPCv1GetCost(clnt rpcclient.ClientConnector, } // end of RPC caching - if args.CGREvent.Tenant == "" { - args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant - } - //convert from Flags []string to utils.FlagsWithParams argsFlagsWithParams := utils.FlagsWithParamsFromSlice(args.Flags) // check for *attribute @@ -3713,9 +3716,7 @@ func (sS *SessionS) BiRPCv1DeactivateSessions(clnt rpcclient.ClientConnector, } func (sS *SessionS) processCDR(cgrEv *utils.CGREventWithOpts, flags []string, rply *string, clnb bool) (err error) { - if cgrEv.Tenant == "" { - cgrEv.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant - } + ev := engine.MapEvent(cgrEv.Event) cgrID := GetSetCGRID(ev) s := sS.getRelocateSession(cgrID,