diff --git a/engine/attributes.go b/engine/attributes.go index 07b16e17f..55dbc5432 100644 --- a/engine/attributes.go +++ b/engine/attributes.go @@ -162,6 +162,46 @@ type AttrArgsProcessEvent struct { Context *string // attach the event to a context ProcessRuns *int // number of loops for ProcessEvent *utils.CGREventWithOpts + clnb bool //rpcclonable +} + +// SetCloneable sets if the args should be clonned on internal connections +func (attr *AttrArgsProcessEvent) SetCloneable(rpcCloneable bool) { + attr.clnb = rpcCloneable +} + +// RPCClone implements rpcclient.RPCCloner interface +func (attr *AttrArgsProcessEvent) RPCClone() (interface{}, error) { + if !attr.clnb { + return attr, nil + } + return attr.Clone(), nil +} + +// Clone creates a clone of the object +func (attr *AttrArgsProcessEvent) Clone() *AttrArgsProcessEvent { + var attrIDs []string + if attr.AttributeIDs != nil { + attrIDs = make([]string, len(attr.AttributeIDs)) + for i, id := range attr.AttributeIDs { + attrIDs[i] = id + } + } + var ctx *string + if attr.Context != nil { + ctx = utils.StringPointer(*attr.Context) + } + var procRuns *int + if attr.ProcessRuns != nil { + procRuns = new(int) + *procRuns = *attr.ProcessRuns + } + return &AttrArgsProcessEvent{ + AttributeIDs: attrIDs, + Context: ctx, + ProcessRuns: procRuns, + CGREventWithOpts: attr.CGREventWithOpts.Clone(), + } } // processEvent will match event with attribute profile and do the necessary replacements diff --git a/engine/cdrs.go b/engine/cdrs.go index 1347bed61..569508e6b 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -720,6 +720,35 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithOpts, reply *string) (err error) type ArgV1ProcessEvent struct { Flags []string utils.CGREventWithOpts + clnb bool //rpcclonable +} + +// SetCloneable sets if the args should be clonned on internal connections +func (attr *ArgV1ProcessEvent) SetCloneable(rpcCloneable bool) { + attr.clnb = rpcCloneable +} + +// RPCClone implements rpcclient.RPCCloner interface +func (attr *ArgV1ProcessEvent) RPCClone() (interface{}, error) { + if !attr.clnb { + return attr, nil + } + return attr.Clone(), nil +} + +// Clone creates a clone of the object +func (attr *ArgV1ProcessEvent) Clone() *ArgV1ProcessEvent { + var flags []string + if attr.Flags != nil { + flags = make([]string, len(attr.Flags)) + for i, id := range attr.Flags { + flags[i] = id + } + } + return &ArgV1ProcessEvent{ + Flags: flags, + CGREventWithOpts: *attr.CGREventWithOpts.Clone(), + } } // V1ProcessEvent will process the CGREvent diff --git a/engine/routes.go b/engine/routes.go index 084593c22..4564e19d4 100644 --- a/engine/routes.go +++ b/engine/routes.go @@ -573,6 +573,30 @@ type ArgsGetRoutes struct { MaxCost string // toDo: try with interface{} here *utils.CGREventWithOpts utils.Paginator + clnb bool //rpcclonable +} + +// SetCloneable sets if the args should be clonned on internal connections +func (attr *ArgsGetRoutes) SetCloneable(rpcCloneable bool) { + attr.clnb = rpcCloneable +} + +// RPCClone implements rpcclient.RPCCloner interface +func (attr *ArgsGetRoutes) RPCClone() (interface{}, error) { + if !attr.clnb { + return attr, nil + } + return attr.Clone(), nil +} + +// Clone creates a clone of the object +func (attr *ArgsGetRoutes) Clone() *ArgsGetRoutes { + return &ArgsGetRoutes{ + IgnoreErrors: attr.IgnoreErrors, + MaxCost: attr.MaxCost, + Paginator: attr.Paginator.Clone(), + CGREventWithOpts: attr.CGREventWithOpts.Clone(), + } } func (args *ArgsGetRoutes) asOptsGetRoutes() (opts *optsGetRoutes, err error) { diff --git a/engine/stats.go b/engine/stats.go index 56d90f602..8717b73bb 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -227,6 +227,35 @@ func (sS *StatService) Call(serviceMethod string, args interface{}, reply interf type StatsArgsProcessEvent struct { StatIDs []string *utils.CGREventWithOpts + clnb bool //rpcclonable +} + +// SetCloneable sets if the args should be clonned on internal connections +func (attr *StatsArgsProcessEvent) SetCloneable(rpcCloneable bool) { + attr.clnb = rpcCloneable +} + +// RPCClone implements rpcclient.RPCCloner interface +func (attr *StatsArgsProcessEvent) RPCClone() (interface{}, error) { + if !attr.clnb { + return attr, nil + } + return attr.Clone(), nil +} + +// Clone creates a clone of the object +func (attr *StatsArgsProcessEvent) Clone() *StatsArgsProcessEvent { + var statsIDs []string + if attr.StatIDs != nil { + statsIDs = make([]string, len(attr.StatIDs)) + for i, id := range attr.StatIDs { + statsIDs[i] = id + } + } + return &StatsArgsProcessEvent{ + StatIDs: statsIDs, + CGREventWithOpts: attr.CGREventWithOpts.Clone(), + } } // processEvent processes a new event, dispatching to matching queues diff --git a/engine/thresholds.go b/engine/thresholds.go index 00e6dd8d2..e4b0e01b7 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -302,6 +302,35 @@ func (tS *ThresholdService) matchingThresholdsForEvent(args *ThresholdsArgsProce type ThresholdsArgsProcessEvent struct { ThresholdIDs []string *utils.CGREventWithOpts + clnb bool //rpcclonable +} + +// SetCloneable sets if the args should be clonned on internal connections +func (attr *ThresholdsArgsProcessEvent) SetCloneable(rpcCloneable bool) { + attr.clnb = rpcCloneable +} + +// RPCClone implements rpcclient.RPCCloner interface +func (attr *ThresholdsArgsProcessEvent) RPCClone() (interface{}, error) { + if !attr.clnb { + return attr, nil + } + return attr.Clone(), nil +} + +// Clone creates a clone of the object +func (attr *ThresholdsArgsProcessEvent) Clone() *ThresholdsArgsProcessEvent { + var thIDs []string + if attr.ThresholdIDs != nil { + thIDs = make([]string, len(attr.ThresholdIDs)) + for i, id := range attr.ThresholdIDs { + thIDs[i] = id + } + } + return &ThresholdsArgsProcessEvent{ + ThresholdIDs: thIDs, + CGREventWithOpts: attr.CGREventWithOpts.Clone(), + } } // processEvent processes a new event, dispatching to matching thresholds diff --git a/sessions/sessions.go b/sessions/sessions.go index 0143a57b1..529670b5a 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -376,6 +376,7 @@ func (sS *SessionS) forceSTerminate(s *Session, extraUsage time.Duration, tUsage engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RequestType)) { argsProc.Flags = append(argsProc.Flags, utils.MetaRALs) } + argsProc.SetCloneable(true) if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().CDRsConns, nil, utils.CDRsV1ProcessEvent, argsProc, &reply); err != nil { utils.Logger.Warning( @@ -401,6 +402,7 @@ func (sS *SessionS) forceSTerminate(s *Session, extraUsage time.Duration, tUsage UsageID: s.ResourceID, Units: 1, } + argsRU.SetCloneable(true) if err := sS.connMgr.Call(sS.cgrCfg.SessionSCfg().ResSConns, nil, utils.ResourceSv1ReleaseResources, argsRU, &reply); err != nil { @@ -1909,7 +1911,7 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.ClientConnector, return utils.NewErrMandatoryIeMissing("subsystems") } if args.GetAttributes { - rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts) + rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts, false) if err == nil { args.CGREvent = rplyAttr.CGREvent args.Opts = rplyAttr.Opts @@ -1962,7 +1964,7 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.ClientConnector, } if args.GetRoutes { routesReply, err := sS.getRoutes(args.CGREvent.Clone(), args.Paginator, - args.RoutesIgnoreErrors, args.RoutesMaxCost, args.Opts) + args.RoutesIgnoreErrors, args.RoutesMaxCost, args.Opts, false) if err != nil { return err } @@ -1971,7 +1973,7 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.ClientConnector, } } if args.ProcessThresholds { - tIDs, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, args.Opts) + tIDs, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, args.Opts, true) if err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", @@ -1981,7 +1983,7 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.ClientConnector, authReply.ThresholdIDs = &tIDs } if args.ProcessStats { - sIDs, err := sS.processStats(args.CGREvent, args.StatIDs, args.Opts) + sIDs, err := sS.processStats(args.CGREvent, args.StatIDs, args.Opts, false) if err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( @@ -2194,7 +2196,7 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.ClientConnector, } originID, _ := args.CGREvent.FieldAsString(utils.OriginID) if args.GetAttributes { - rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts) + rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts, false) if err == nil { args.CGREvent = rplyAttr.CGREvent args.Opts = rplyAttr.Opts @@ -2265,7 +2267,7 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.ClientConnector, } } if args.ProcessThresholds { - tIDs, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, args.Opts) + tIDs, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, args.Opts, true) if err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", @@ -2275,7 +2277,7 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.ClientConnector, rply.ThresholdIDs = &tIDs } if args.ProcessStats { - sIDs, err := sS.processStats(args.CGREvent, args.StatIDs, args.Opts) + sIDs, err := sS.processStats(args.CGREvent, args.StatIDs, args.Opts, false) if err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( @@ -2424,7 +2426,7 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.ClientConnector, args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant } if args.GetAttributes { - rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts) + rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts, false) if err == nil { args.CGREvent = rplyAttr.CGREvent args.Opts = rplyAttr.Opts @@ -2633,7 +2635,7 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.ClientConnector, } } if args.ProcessThresholds { - _, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, args.Opts) + _, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, args.Opts, true) if err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( @@ -2643,7 +2645,7 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.ClientConnector, } } if args.ProcessStats { - _, err := sS.processStats(args.CGREvent, args.StatIDs, args.Opts) + _, err := sS.processStats(args.CGREvent, args.StatIDs, args.Opts, false) if err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( @@ -2690,7 +2692,7 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.ClientConnector, cgrEvWithArgDisp.Event[utils.Source] = utils.MetaSessionS } - return sS.processCDR(cgrEvWithArgDisp, []string{utils.MetaRALs}, rply) + return sS.processCDR(cgrEvWithArgDisp, []string{utils.MetaRALs}, rply, false) } // NewV1ProcessMessageArgs is a constructor for MessageArgs used by ProcessMessage @@ -2867,7 +2869,7 @@ func (sS *SessionS) BiRPCv1ProcessMessage(clnt rpcclient.ClientConnector, originID := me.GetStringIgnoreErrors(utils.OriginID) if args.GetAttributes { - rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts) + rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts, false) if err == nil { args.CGREvent = rplyAttr.CGREvent args.Opts = rplyAttr.Opts @@ -2900,7 +2902,7 @@ func (sS *SessionS) BiRPCv1ProcessMessage(clnt rpcclient.ClientConnector, } if args.GetRoutes { routesReply, err := sS.getRoutes(args.CGREvent.Clone(), args.Paginator, - args.RoutesIgnoreErrors, args.RoutesMaxCost, args.Opts) + args.RoutesIgnoreErrors, args.RoutesMaxCost, args.Opts, false) if err != nil { return err } @@ -2919,7 +2921,7 @@ func (sS *SessionS) BiRPCv1ProcessMessage(clnt rpcclient.ClientConnector, rply.MaxUsage = &maxUsage } if args.ProcessThresholds { - tIDs, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, args.Opts) + tIDs, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, args.Opts, true) if err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", @@ -2929,7 +2931,7 @@ func (sS *SessionS) BiRPCv1ProcessMessage(clnt rpcclient.ClientConnector, rply.ThresholdIDs = &tIDs } if args.ProcessStats { - sIDs, err := sS.processStats(args.CGREvent, args.StatIDs, args.Opts) + sIDs, err := sS.processStats(args.CGREvent, args.StatIDs, args.Opts, false) if err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( @@ -3110,7 +3112,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, rply.Attributes = make(map[string]*engine.AttrSProcessEventReply) for runID, cgrEv := range getDerivedEvents(events, argsFlagsWithParams[utils.MetaAttributes].Has(utils.MetaDerivedReply)) { - rplyAttr, err := sS.processAttributes(cgrEv.CGREvent, attrIDs, cgrEv.Opts) + rplyAttr, err := sS.processAttributes(cgrEv.CGREvent, attrIDs, cgrEv.Opts, false) if err != nil { if err.Error() != utils.ErrNotFound.Error() { return utils.NewErrAttributeS(err) @@ -3137,7 +3139,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, maxCost = utils.MetaRoutesEventCost } for runID, cgrEv := range getDerivedEvents(events, flags.Has(utils.MetaDerivedReply)) { - routesReply, err := sS.getRoutes(cgrEv.CGREvent.Clone(), args.Paginator, ignoreErrors, maxCost, cgrEv.Opts) + routesReply, err := sS.getRoutes(cgrEv.CGREvent.Clone(), args.Paginator, ignoreErrors, maxCost, cgrEv.Opts, false) if err != nil { return err } @@ -3152,7 +3154,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, rply.ThresholdIDs = make(map[string][]string) thIDs := argsFlagsWithParams.ParamsSlice(utils.MetaThresholds, utils.MetaIDs) for runID, cgrEv := range getDerivedEvents(events, argsFlagsWithParams[utils.MetaThresholds].Has(utils.MetaDerivedReply)) { - tIDs, err := sS.processThreshold(cgrEv.CGREvent, thIDs, args.Opts) + tIDs, err := sS.processThreshold(cgrEv.CGREvent, thIDs, args.Opts, true) if err != nil && err.Error() != utils.ErrNotFound.Error() { if blockError { return utils.NewErrThresholdS(err) @@ -3171,7 +3173,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, rply.StatQueueIDs = make(map[string][]string) stIDs := argsFlagsWithParams.ParamsSlice(utils.MetaStats, utils.MetaIDs) for runID, cgrEv := range getDerivedEvents(events, argsFlagsWithParams[utils.MetaStats].Has(utils.MetaDerivedReply)) { - sIDs, err := sS.processStats(cgrEv.CGREvent, stIDs, cgrEv.Opts) + sIDs, err := sS.processStats(cgrEv.CGREvent, stIDs, cgrEv.Opts, true) if err != nil && err.Error() != utils.ErrNotFound.Error() { if blockError { @@ -3263,6 +3265,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, UsageID: originID, Units: 1, } + attrRU.SetCloneable(true) var resMessage string // check what we need to do for resources (*authorization/*allocation) //check for subflags and convert them into utils.FlagsWithParams @@ -3461,7 +3464,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, flgs := argsFlagsWithParams[utils.MetaCDRs].SliceFlags() var cdrRply string for _, cgrEv := range getDerivedEvents(events, argsFlagsWithParams[utils.MetaCDRs].Has(utils.MetaDerivedReply)) { - if err := sS.processCDR(cgrEv, flgs, &cdrRply); err != nil { + if err := sS.processCDR(cgrEv, flgs, &cdrRply, false); err != nil { if blockError { return utils.NewErrCDRS(err) } @@ -3524,7 +3527,7 @@ func (sS *SessionS) BiRPCv1GetCost(clnt rpcclient.ClientConnector, // check for *attribute if argsFlagsWithParams.Has(utils.MetaAttributes) { rplyAttr, err := sS.processAttributes(args.CGREvent, - argsFlagsWithParams.ParamsSlice(utils.MetaAttributes, utils.MetaIDs), args.Opts) + argsFlagsWithParams.ParamsSlice(utils.MetaAttributes, utils.MetaIDs), args.Opts, false) if err == nil { args.CGREvent = rplyAttr.CGREvent args.Opts = rplyAttr.Opts @@ -3684,7 +3687,7 @@ func (sS *SessionS) BiRPCv1DeactivateSessions(clnt rpcclient.ClientConnector, return } -func (sS *SessionS) processCDR(cgrEv *utils.CGREventWithOpts, flags []string, rply *string) (err error) { +func (sS *SessionS) processCDR(cgrEv *utils.CGREventWithOpts, flags []string, rply *string, clnb bool) (err error) { if cgrEv.Tenant == "" { cgrEv.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant } @@ -3704,11 +3707,13 @@ func (sS *SessionS) processCDR(cgrEv *utils.CGREventWithOpts, flags []string, rp // found in cache s = sIface.(*Session) } else { // no cached session, CDR will be handled by CDRs + argsProc := &engine.ArgV1ProcessEvent{ + Flags: flags, + CGREventWithOpts: *cgrEv, + } + argsProc.SetCloneable(clnb) return sS.connMgr.Call(sS.cgrCfg.SessionSCfg().CDRsConns, nil, utils.CDRsV1ProcessEvent, - &engine.ArgV1ProcessEvent{ - Flags: flags, - CGREventWithOpts: *cgrEv, - }, rply) + argsProc, rply) } // Use previously stored Session to generate CDRs @@ -3725,6 +3730,7 @@ func (sS *SessionS) processCDR(cgrEv *utils.CGREventWithOpts, flags []string, rp fmt.Sprintf("%s:false", utils.MetaAttributes)}, CGREventWithOpts: *cgrEv, } + argsProc.SetCloneable(clnb) if mp := engine.MapEvent(cgrEv.Event); unratedReqs.HasField(mp.GetStringIgnoreErrors(utils.RequestType)) { // order additional rating for unrated request types argsProc.Flags = append(argsProc.Flags, fmt.Sprintf("%s:true", utils.MetaRALs)) } @@ -3744,7 +3750,7 @@ func (sS *SessionS) processCDR(cgrEv *utils.CGREventWithOpts, flags []string, rp } // processThreshold will receive the event and send it to ThresholdS to be processed -func (sS *SessionS) processThreshold(cgrEv *utils.CGREvent, thIDs []string, opts map[string]interface{}) (tIDs []string, err error) { +func (sS *SessionS) processThreshold(cgrEv *utils.CGREvent, thIDs []string, opts map[string]interface{}, clnb bool) (tIDs []string, err error) { if len(sS.cgrCfg.SessionSCfg().ThreshSConns) == 0 { return tIDs, utils.NewErrNotConnected(utils.ThresholdS) } @@ -3758,13 +3764,14 @@ func (sS *SessionS) processThreshold(cgrEv *utils.CGREvent, thIDs []string, opts if len(thIDs) != 0 { thEv.ThresholdIDs = thIDs } + thEv.SetCloneable(clnb) //initialize the returned variable err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().ThreshSConns, nil, utils.ThresholdSv1ProcessEvent, thEv, &tIDs) return } // processStats will receive the event and send it to StatS to be processed -func (sS *SessionS) processStats(cgrEv *utils.CGREvent, stsIDs []string, opts map[string]interface{}) (sIDs []string, err error) { +func (sS *SessionS) processStats(cgrEv *utils.CGREvent, stsIDs []string, opts map[string]interface{}, clnb bool) (sIDs []string, err error) { if len(sS.cgrCfg.SessionSCfg().StatSConns) == 0 { return sIDs, utils.NewErrNotConnected(utils.StatS) } @@ -3779,6 +3786,7 @@ func (sS *SessionS) processStats(cgrEv *utils.CGREvent, stsIDs []string, opts ma if len(stsIDs) != 0 { statArgs.StatIDs = stsIDs } + statArgs.SetCloneable(clnb) //initialize the returned variable err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().StatSConns, nil, utils.StatSv1ProcessEvent, statArgs, &sIDs) return @@ -3786,7 +3794,7 @@ func (sS *SessionS) processStats(cgrEv *utils.CGREvent, stsIDs []string, opts ma // getRoutes will receive the event and send it to SupplierS to find the suppliers func (sS *SessionS) getRoutes(cgrEv *utils.CGREvent, pag utils.Paginator, ignoreErrors bool, - maxCost string, opts map[string]interface{}) (routesReply engine.SortedRoutes, err error) { + maxCost string, opts map[string]interface{}, clnb bool) (routesReply engine.SortedRoutes, err error) { if len(sS.cgrCfg.SessionSCfg().RouteSConns) == 0 { return routesReply, utils.NewErrNotConnected(utils.RouteS) } @@ -3802,6 +3810,7 @@ func (sS *SessionS) getRoutes(cgrEv *utils.CGREvent, pag utils.Paginator, ignore IgnoreErrors: ignoreErrors, MaxCost: maxCost, } + sArgs.SetCloneable(clnb) if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().RouteSConns, nil, utils.RouteSv1GetRoutes, sArgs, &routesReply); err != nil { return routesReply, utils.NewErrRouteS(err) @@ -3811,7 +3820,7 @@ func (sS *SessionS) getRoutes(cgrEv *utils.CGREvent, pag utils.Paginator, ignore // processAttributes will receive the event and send it to AttributeS to be processed func (sS *SessionS) processAttributes(cgrEv *utils.CGREvent, attrIDs []string, - opts engine.MapEvent) (rplyEv engine.AttrSProcessEventReply, err error) { + opts engine.MapEvent, clnb bool) (rplyEv engine.AttrSProcessEventReply, err error) { if len(sS.cgrCfg.SessionSCfg().AttrSConns) == 0 { return rplyEv, utils.NewErrNotConnected(utils.AttributeS) } @@ -3836,6 +3845,7 @@ func (sS *SessionS) processAttributes(cgrEv *utils.CGREvent, attrIDs []string, AttributeIDs: attrIDs, ProcessRuns: processRuns, } + attrArgs.SetCloneable(clnb) err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().AttrSConns, nil, utils.AttributeSv1ProcessEvent, attrArgs, &rplyEv) return diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 2d167344c..49f0aba4c 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -41,7 +41,6 @@ type PaginatorWithSearch struct { type Paginator struct { Limit *int // Limit the number of items returned Offset *int // Offset of the first item returned (eg: use Limit*Page in case of PerPage items) - } func (pgnt *Paginator) PaginateStringSlice(in []string) (out []string) { @@ -75,6 +74,25 @@ func (pgnt *Paginator) PaginateStringSlice(in []string) (out []string) { return } +// Clone creates a clone of the object +func (pgnt Paginator) Clone() Paginator { + var limit *int + if pgnt.Limit != nil { + limit = new(int) + *limit = *pgnt.Limit + } + + var offset *int + if pgnt.Offset != nil { + offset = new(int) + *offset = *pgnt.Offset + } + return Paginator{ + Limit: limit, + Offset: offset, + } +} + // TPDestination represents one destination in storDB type TPDestination struct { TPid string // Tariff plan id @@ -963,6 +981,34 @@ type ArgRSv1ResourceUsage struct { UsageID string // ResourceUsage Identifier UsageTTL *time.Duration Units float64 + clnb bool //rpcclonable +} + +// SetCloneable sets if the args should be clonned on internal connections +func (attr *ArgRSv1ResourceUsage) SetCloneable(rpcCloneable bool) { + attr.clnb = rpcCloneable +} + +// RPCClone implements rpcclient.RPCCloner interface +func (attr *ArgRSv1ResourceUsage) RPCClone() (interface{}, error) { + if !attr.clnb { + return attr, nil + } + return attr.Clone(), nil +} + +// Clone creates a clone of the object +func (attr *ArgRSv1ResourceUsage) Clone() *ArgRSv1ResourceUsage { + var usageTTL *time.Duration + if attr.UsageTTL != nil { + usageTTL = DurationPointer(*attr.UsageTTL) + } + return &ArgRSv1ResourceUsage{ + UsageID: attr.UsageID, + UsageTTL: usageTTL, + Units: attr.Units, + CGREventWithOpts: attr.CGREventWithOpts.Clone(), + } } type ArgsComputeFilterIndexIDs struct {