Updated arguments for sessions RPC

This commit is contained in:
Trial97
2020-10-02 15:38:46 +03:00
committed by Dan Christian Bogos
parent 863e9b719d
commit a289fc0f59
7 changed files with 238 additions and 31 deletions

View File

@@ -162,6 +162,46 @@ type AttrArgsProcessEvent struct {
Context *string // attach the event to a context
ProcessRuns *int // number of loops for ProcessEvent
*utils.CGREventWithOpts
clnb bool //rpcclonable
}
// SetCloneable sets if the args should be clonned on internal connections
func (attr *AttrArgsProcessEvent) SetCloneable(rpcCloneable bool) {
attr.clnb = rpcCloneable
}
// RPCClone implements rpcclient.RPCCloner interface
func (attr *AttrArgsProcessEvent) RPCClone() (interface{}, error) {
if !attr.clnb {
return attr, nil
}
return attr.Clone(), nil
}
// Clone creates a clone of the object
func (attr *AttrArgsProcessEvent) Clone() *AttrArgsProcessEvent {
var attrIDs []string
if attr.AttributeIDs != nil {
attrIDs = make([]string, len(attr.AttributeIDs))
for i, id := range attr.AttributeIDs {
attrIDs[i] = id
}
}
var ctx *string
if attr.Context != nil {
ctx = utils.StringPointer(*attr.Context)
}
var procRuns *int
if attr.ProcessRuns != nil {
procRuns = new(int)
*procRuns = *attr.ProcessRuns
}
return &AttrArgsProcessEvent{
AttributeIDs: attrIDs,
Context: ctx,
ProcessRuns: procRuns,
CGREventWithOpts: attr.CGREventWithOpts.Clone(),
}
}
// processEvent will match event with attribute profile and do the necessary replacements

View File

@@ -720,6 +720,35 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithOpts, reply *string) (err error)
type ArgV1ProcessEvent struct {
Flags []string
utils.CGREventWithOpts
clnb bool //rpcclonable
}
// SetCloneable sets if the args should be clonned on internal connections
func (attr *ArgV1ProcessEvent) SetCloneable(rpcCloneable bool) {
attr.clnb = rpcCloneable
}
// RPCClone implements rpcclient.RPCCloner interface
func (attr *ArgV1ProcessEvent) RPCClone() (interface{}, error) {
if !attr.clnb {
return attr, nil
}
return attr.Clone(), nil
}
// Clone creates a clone of the object
func (attr *ArgV1ProcessEvent) Clone() *ArgV1ProcessEvent {
var flags []string
if attr.Flags != nil {
flags = make([]string, len(attr.Flags))
for i, id := range attr.Flags {
flags[i] = id
}
}
return &ArgV1ProcessEvent{
Flags: flags,
CGREventWithOpts: *attr.CGREventWithOpts.Clone(),
}
}
// V1ProcessEvent will process the CGREvent

View File

@@ -573,6 +573,30 @@ type ArgsGetRoutes struct {
MaxCost string // toDo: try with interface{} here
*utils.CGREventWithOpts
utils.Paginator
clnb bool //rpcclonable
}
// SetCloneable sets if the args should be clonned on internal connections
func (attr *ArgsGetRoutes) SetCloneable(rpcCloneable bool) {
attr.clnb = rpcCloneable
}
// RPCClone implements rpcclient.RPCCloner interface
func (attr *ArgsGetRoutes) RPCClone() (interface{}, error) {
if !attr.clnb {
return attr, nil
}
return attr.Clone(), nil
}
// Clone creates a clone of the object
func (attr *ArgsGetRoutes) Clone() *ArgsGetRoutes {
return &ArgsGetRoutes{
IgnoreErrors: attr.IgnoreErrors,
MaxCost: attr.MaxCost,
Paginator: attr.Paginator.Clone(),
CGREventWithOpts: attr.CGREventWithOpts.Clone(),
}
}
func (args *ArgsGetRoutes) asOptsGetRoutes() (opts *optsGetRoutes, err error) {

View File

@@ -227,6 +227,35 @@ func (sS *StatService) Call(serviceMethod string, args interface{}, reply interf
type StatsArgsProcessEvent struct {
StatIDs []string
*utils.CGREventWithOpts
clnb bool //rpcclonable
}
// SetCloneable sets if the args should be clonned on internal connections
func (attr *StatsArgsProcessEvent) SetCloneable(rpcCloneable bool) {
attr.clnb = rpcCloneable
}
// RPCClone implements rpcclient.RPCCloner interface
func (attr *StatsArgsProcessEvent) RPCClone() (interface{}, error) {
if !attr.clnb {
return attr, nil
}
return attr.Clone(), nil
}
// Clone creates a clone of the object
func (attr *StatsArgsProcessEvent) Clone() *StatsArgsProcessEvent {
var statsIDs []string
if attr.StatIDs != nil {
statsIDs = make([]string, len(attr.StatIDs))
for i, id := range attr.StatIDs {
statsIDs[i] = id
}
}
return &StatsArgsProcessEvent{
StatIDs: statsIDs,
CGREventWithOpts: attr.CGREventWithOpts.Clone(),
}
}
// processEvent processes a new event, dispatching to matching queues

View File

@@ -302,6 +302,35 @@ func (tS *ThresholdService) matchingThresholdsForEvent(args *ThresholdsArgsProce
type ThresholdsArgsProcessEvent struct {
ThresholdIDs []string
*utils.CGREventWithOpts
clnb bool //rpcclonable
}
// SetCloneable sets if the args should be clonned on internal connections
func (attr *ThresholdsArgsProcessEvent) SetCloneable(rpcCloneable bool) {
attr.clnb = rpcCloneable
}
// RPCClone implements rpcclient.RPCCloner interface
func (attr *ThresholdsArgsProcessEvent) RPCClone() (interface{}, error) {
if !attr.clnb {
return attr, nil
}
return attr.Clone(), nil
}
// Clone creates a clone of the object
func (attr *ThresholdsArgsProcessEvent) Clone() *ThresholdsArgsProcessEvent {
var thIDs []string
if attr.ThresholdIDs != nil {
thIDs = make([]string, len(attr.ThresholdIDs))
for i, id := range attr.ThresholdIDs {
thIDs[i] = id
}
}
return &ThresholdsArgsProcessEvent{
ThresholdIDs: thIDs,
CGREventWithOpts: attr.CGREventWithOpts.Clone(),
}
}
// processEvent processes a new event, dispatching to matching thresholds

View File

@@ -376,6 +376,7 @@ func (sS *SessionS) forceSTerminate(s *Session, extraUsage time.Duration, tUsage
engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RequestType)) {
argsProc.Flags = append(argsProc.Flags, utils.MetaRALs)
}
argsProc.SetCloneable(true)
if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().CDRsConns, nil,
utils.CDRsV1ProcessEvent, argsProc, &reply); err != nil {
utils.Logger.Warning(
@@ -401,6 +402,7 @@ func (sS *SessionS) forceSTerminate(s *Session, extraUsage time.Duration, tUsage
UsageID: s.ResourceID,
Units: 1,
}
argsRU.SetCloneable(true)
if err := sS.connMgr.Call(sS.cgrCfg.SessionSCfg().ResSConns, nil,
utils.ResourceSv1ReleaseResources,
argsRU, &reply); err != nil {
@@ -1909,7 +1911,7 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.ClientConnector,
return utils.NewErrMandatoryIeMissing("subsystems")
}
if args.GetAttributes {
rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts)
rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts, false)
if err == nil {
args.CGREvent = rplyAttr.CGREvent
args.Opts = rplyAttr.Opts
@@ -1962,7 +1964,7 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.ClientConnector,
}
if args.GetRoutes {
routesReply, err := sS.getRoutes(args.CGREvent.Clone(), args.Paginator,
args.RoutesIgnoreErrors, args.RoutesMaxCost, args.Opts)
args.RoutesIgnoreErrors, args.RoutesMaxCost, args.Opts, false)
if err != nil {
return err
}
@@ -1971,7 +1973,7 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.ClientConnector,
}
}
if args.ProcessThresholds {
tIDs, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, args.Opts)
tIDs, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, args.Opts, true)
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.",
@@ -1981,7 +1983,7 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.ClientConnector,
authReply.ThresholdIDs = &tIDs
}
if args.ProcessStats {
sIDs, err := sS.processStats(args.CGREvent, args.StatIDs, args.Opts)
sIDs, err := sS.processStats(args.CGREvent, args.StatIDs, args.Opts, false)
if err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
@@ -2194,7 +2196,7 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.ClientConnector,
}
originID, _ := args.CGREvent.FieldAsString(utils.OriginID)
if args.GetAttributes {
rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts)
rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts, false)
if err == nil {
args.CGREvent = rplyAttr.CGREvent
args.Opts = rplyAttr.Opts
@@ -2265,7 +2267,7 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.ClientConnector,
}
}
if args.ProcessThresholds {
tIDs, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, args.Opts)
tIDs, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, args.Opts, true)
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.",
@@ -2275,7 +2277,7 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.ClientConnector,
rply.ThresholdIDs = &tIDs
}
if args.ProcessStats {
sIDs, err := sS.processStats(args.CGREvent, args.StatIDs, args.Opts)
sIDs, err := sS.processStats(args.CGREvent, args.StatIDs, args.Opts, false)
if err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
@@ -2424,7 +2426,7 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.ClientConnector,
args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant
}
if args.GetAttributes {
rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts)
rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts, false)
if err == nil {
args.CGREvent = rplyAttr.CGREvent
args.Opts = rplyAttr.Opts
@@ -2633,7 +2635,7 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.ClientConnector,
}
}
if args.ProcessThresholds {
_, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, args.Opts)
_, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, args.Opts, true)
if err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
@@ -2643,7 +2645,7 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.ClientConnector,
}
}
if args.ProcessStats {
_, err := sS.processStats(args.CGREvent, args.StatIDs, args.Opts)
_, err := sS.processStats(args.CGREvent, args.StatIDs, args.Opts, false)
if err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
@@ -2690,7 +2692,7 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.ClientConnector,
cgrEvWithArgDisp.Event[utils.Source] = utils.MetaSessionS
}
return sS.processCDR(cgrEvWithArgDisp, []string{utils.MetaRALs}, rply)
return sS.processCDR(cgrEvWithArgDisp, []string{utils.MetaRALs}, rply, false)
}
// NewV1ProcessMessageArgs is a constructor for MessageArgs used by ProcessMessage
@@ -2867,7 +2869,7 @@ func (sS *SessionS) BiRPCv1ProcessMessage(clnt rpcclient.ClientConnector,
originID := me.GetStringIgnoreErrors(utils.OriginID)
if args.GetAttributes {
rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts)
rplyAttr, err := sS.processAttributes(args.CGREvent, args.AttributeIDs, args.Opts, false)
if err == nil {
args.CGREvent = rplyAttr.CGREvent
args.Opts = rplyAttr.Opts
@@ -2900,7 +2902,7 @@ func (sS *SessionS) BiRPCv1ProcessMessage(clnt rpcclient.ClientConnector,
}
if args.GetRoutes {
routesReply, err := sS.getRoutes(args.CGREvent.Clone(), args.Paginator,
args.RoutesIgnoreErrors, args.RoutesMaxCost, args.Opts)
args.RoutesIgnoreErrors, args.RoutesMaxCost, args.Opts, false)
if err != nil {
return err
}
@@ -2919,7 +2921,7 @@ func (sS *SessionS) BiRPCv1ProcessMessage(clnt rpcclient.ClientConnector,
rply.MaxUsage = &maxUsage
}
if args.ProcessThresholds {
tIDs, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, args.Opts)
tIDs, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, args.Opts, true)
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.",
@@ -2929,7 +2931,7 @@ func (sS *SessionS) BiRPCv1ProcessMessage(clnt rpcclient.ClientConnector,
rply.ThresholdIDs = &tIDs
}
if args.ProcessStats {
sIDs, err := sS.processStats(args.CGREvent, args.StatIDs, args.Opts)
sIDs, err := sS.processStats(args.CGREvent, args.StatIDs, args.Opts, false)
if err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
@@ -3110,7 +3112,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
rply.Attributes = make(map[string]*engine.AttrSProcessEventReply)
for runID, cgrEv := range getDerivedEvents(events, argsFlagsWithParams[utils.MetaAttributes].Has(utils.MetaDerivedReply)) {
rplyAttr, err := sS.processAttributes(cgrEv.CGREvent, attrIDs, cgrEv.Opts)
rplyAttr, err := sS.processAttributes(cgrEv.CGREvent, attrIDs, cgrEv.Opts, false)
if err != nil {
if err.Error() != utils.ErrNotFound.Error() {
return utils.NewErrAttributeS(err)
@@ -3137,7 +3139,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
maxCost = utils.MetaRoutesEventCost
}
for runID, cgrEv := range getDerivedEvents(events, flags.Has(utils.MetaDerivedReply)) {
routesReply, err := sS.getRoutes(cgrEv.CGREvent.Clone(), args.Paginator, ignoreErrors, maxCost, cgrEv.Opts)
routesReply, err := sS.getRoutes(cgrEv.CGREvent.Clone(), args.Paginator, ignoreErrors, maxCost, cgrEv.Opts, false)
if err != nil {
return err
}
@@ -3152,7 +3154,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
rply.ThresholdIDs = make(map[string][]string)
thIDs := argsFlagsWithParams.ParamsSlice(utils.MetaThresholds, utils.MetaIDs)
for runID, cgrEv := range getDerivedEvents(events, argsFlagsWithParams[utils.MetaThresholds].Has(utils.MetaDerivedReply)) {
tIDs, err := sS.processThreshold(cgrEv.CGREvent, thIDs, args.Opts)
tIDs, err := sS.processThreshold(cgrEv.CGREvent, thIDs, args.Opts, true)
if err != nil && err.Error() != utils.ErrNotFound.Error() {
if blockError {
return utils.NewErrThresholdS(err)
@@ -3171,7 +3173,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
rply.StatQueueIDs = make(map[string][]string)
stIDs := argsFlagsWithParams.ParamsSlice(utils.MetaStats, utils.MetaIDs)
for runID, cgrEv := range getDerivedEvents(events, argsFlagsWithParams[utils.MetaStats].Has(utils.MetaDerivedReply)) {
sIDs, err := sS.processStats(cgrEv.CGREvent, stIDs, cgrEv.Opts)
sIDs, err := sS.processStats(cgrEv.CGREvent, stIDs, cgrEv.Opts, true)
if err != nil &&
err.Error() != utils.ErrNotFound.Error() {
if blockError {
@@ -3263,6 +3265,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
UsageID: originID,
Units: 1,
}
attrRU.SetCloneable(true)
var resMessage string
// check what we need to do for resources (*authorization/*allocation)
//check for subflags and convert them into utils.FlagsWithParams
@@ -3461,7 +3464,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
flgs := argsFlagsWithParams[utils.MetaCDRs].SliceFlags()
var cdrRply string
for _, cgrEv := range getDerivedEvents(events, argsFlagsWithParams[utils.MetaCDRs].Has(utils.MetaDerivedReply)) {
if err := sS.processCDR(cgrEv, flgs, &cdrRply); err != nil {
if err := sS.processCDR(cgrEv, flgs, &cdrRply, false); err != nil {
if blockError {
return utils.NewErrCDRS(err)
}
@@ -3524,7 +3527,7 @@ func (sS *SessionS) BiRPCv1GetCost(clnt rpcclient.ClientConnector,
// check for *attribute
if argsFlagsWithParams.Has(utils.MetaAttributes) {
rplyAttr, err := sS.processAttributes(args.CGREvent,
argsFlagsWithParams.ParamsSlice(utils.MetaAttributes, utils.MetaIDs), args.Opts)
argsFlagsWithParams.ParamsSlice(utils.MetaAttributes, utils.MetaIDs), args.Opts, false)
if err == nil {
args.CGREvent = rplyAttr.CGREvent
args.Opts = rplyAttr.Opts
@@ -3684,7 +3687,7 @@ func (sS *SessionS) BiRPCv1DeactivateSessions(clnt rpcclient.ClientConnector,
return
}
func (sS *SessionS) processCDR(cgrEv *utils.CGREventWithOpts, flags []string, rply *string) (err error) {
func (sS *SessionS) processCDR(cgrEv *utils.CGREventWithOpts, flags []string, rply *string, clnb bool) (err error) {
if cgrEv.Tenant == "" {
cgrEv.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant
}
@@ -3704,11 +3707,13 @@ func (sS *SessionS) processCDR(cgrEv *utils.CGREventWithOpts, flags []string, rp
// found in cache
s = sIface.(*Session)
} else { // no cached session, CDR will be handled by CDRs
argsProc := &engine.ArgV1ProcessEvent{
Flags: flags,
CGREventWithOpts: *cgrEv,
}
argsProc.SetCloneable(clnb)
return sS.connMgr.Call(sS.cgrCfg.SessionSCfg().CDRsConns, nil, utils.CDRsV1ProcessEvent,
&engine.ArgV1ProcessEvent{
Flags: flags,
CGREventWithOpts: *cgrEv,
}, rply)
argsProc, rply)
}
// Use previously stored Session to generate CDRs
@@ -3725,6 +3730,7 @@ func (sS *SessionS) processCDR(cgrEv *utils.CGREventWithOpts, flags []string, rp
fmt.Sprintf("%s:false", utils.MetaAttributes)},
CGREventWithOpts: *cgrEv,
}
argsProc.SetCloneable(clnb)
if mp := engine.MapEvent(cgrEv.Event); unratedReqs.HasField(mp.GetStringIgnoreErrors(utils.RequestType)) { // order additional rating for unrated request types
argsProc.Flags = append(argsProc.Flags, fmt.Sprintf("%s:true", utils.MetaRALs))
}
@@ -3744,7 +3750,7 @@ func (sS *SessionS) processCDR(cgrEv *utils.CGREventWithOpts, flags []string, rp
}
// processThreshold will receive the event and send it to ThresholdS to be processed
func (sS *SessionS) processThreshold(cgrEv *utils.CGREvent, thIDs []string, opts map[string]interface{}) (tIDs []string, err error) {
func (sS *SessionS) processThreshold(cgrEv *utils.CGREvent, thIDs []string, opts map[string]interface{}, clnb bool) (tIDs []string, err error) {
if len(sS.cgrCfg.SessionSCfg().ThreshSConns) == 0 {
return tIDs, utils.NewErrNotConnected(utils.ThresholdS)
}
@@ -3758,13 +3764,14 @@ func (sS *SessionS) processThreshold(cgrEv *utils.CGREvent, thIDs []string, opts
if len(thIDs) != 0 {
thEv.ThresholdIDs = thIDs
}
thEv.SetCloneable(clnb)
//initialize the returned variable
err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().ThreshSConns, nil, utils.ThresholdSv1ProcessEvent, thEv, &tIDs)
return
}
// processStats will receive the event and send it to StatS to be processed
func (sS *SessionS) processStats(cgrEv *utils.CGREvent, stsIDs []string, opts map[string]interface{}) (sIDs []string, err error) {
func (sS *SessionS) processStats(cgrEv *utils.CGREvent, stsIDs []string, opts map[string]interface{}, clnb bool) (sIDs []string, err error) {
if len(sS.cgrCfg.SessionSCfg().StatSConns) == 0 {
return sIDs, utils.NewErrNotConnected(utils.StatS)
}
@@ -3779,6 +3786,7 @@ func (sS *SessionS) processStats(cgrEv *utils.CGREvent, stsIDs []string, opts ma
if len(stsIDs) != 0 {
statArgs.StatIDs = stsIDs
}
statArgs.SetCloneable(clnb)
//initialize the returned variable
err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().StatSConns, nil, utils.StatSv1ProcessEvent, statArgs, &sIDs)
return
@@ -3786,7 +3794,7 @@ func (sS *SessionS) processStats(cgrEv *utils.CGREvent, stsIDs []string, opts ma
// getRoutes will receive the event and send it to SupplierS to find the suppliers
func (sS *SessionS) getRoutes(cgrEv *utils.CGREvent, pag utils.Paginator, ignoreErrors bool,
maxCost string, opts map[string]interface{}) (routesReply engine.SortedRoutes, err error) {
maxCost string, opts map[string]interface{}, clnb bool) (routesReply engine.SortedRoutes, err error) {
if len(sS.cgrCfg.SessionSCfg().RouteSConns) == 0 {
return routesReply, utils.NewErrNotConnected(utils.RouteS)
}
@@ -3802,6 +3810,7 @@ func (sS *SessionS) getRoutes(cgrEv *utils.CGREvent, pag utils.Paginator, ignore
IgnoreErrors: ignoreErrors,
MaxCost: maxCost,
}
sArgs.SetCloneable(clnb)
if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().RouteSConns, nil, utils.RouteSv1GetRoutes,
sArgs, &routesReply); err != nil {
return routesReply, utils.NewErrRouteS(err)
@@ -3811,7 +3820,7 @@ func (sS *SessionS) getRoutes(cgrEv *utils.CGREvent, pag utils.Paginator, ignore
// processAttributes will receive the event and send it to AttributeS to be processed
func (sS *SessionS) processAttributes(cgrEv *utils.CGREvent, attrIDs []string,
opts engine.MapEvent) (rplyEv engine.AttrSProcessEventReply, err error) {
opts engine.MapEvent, clnb bool) (rplyEv engine.AttrSProcessEventReply, err error) {
if len(sS.cgrCfg.SessionSCfg().AttrSConns) == 0 {
return rplyEv, utils.NewErrNotConnected(utils.AttributeS)
}
@@ -3836,6 +3845,7 @@ func (sS *SessionS) processAttributes(cgrEv *utils.CGREvent, attrIDs []string,
AttributeIDs: attrIDs,
ProcessRuns: processRuns,
}
attrArgs.SetCloneable(clnb)
err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().AttrSConns, nil, utils.AttributeSv1ProcessEvent,
attrArgs, &rplyEv)
return

View File

@@ -41,7 +41,6 @@ type PaginatorWithSearch struct {
type Paginator struct {
Limit *int // Limit the number of items returned
Offset *int // Offset of the first item returned (eg: use Limit*Page in case of PerPage items)
}
func (pgnt *Paginator) PaginateStringSlice(in []string) (out []string) {
@@ -75,6 +74,25 @@ func (pgnt *Paginator) PaginateStringSlice(in []string) (out []string) {
return
}
// Clone creates a clone of the object
func (pgnt Paginator) Clone() Paginator {
var limit *int
if pgnt.Limit != nil {
limit = new(int)
*limit = *pgnt.Limit
}
var offset *int
if pgnt.Offset != nil {
offset = new(int)
*offset = *pgnt.Offset
}
return Paginator{
Limit: limit,
Offset: offset,
}
}
// TPDestination represents one destination in storDB
type TPDestination struct {
TPid string // Tariff plan id
@@ -963,6 +981,34 @@ type ArgRSv1ResourceUsage struct {
UsageID string // ResourceUsage Identifier
UsageTTL *time.Duration
Units float64
clnb bool //rpcclonable
}
// SetCloneable sets if the args should be clonned on internal connections
func (attr *ArgRSv1ResourceUsage) SetCloneable(rpcCloneable bool) {
attr.clnb = rpcCloneable
}
// RPCClone implements rpcclient.RPCCloner interface
func (attr *ArgRSv1ResourceUsage) RPCClone() (interface{}, error) {
if !attr.clnb {
return attr, nil
}
return attr.Clone(), nil
}
// Clone creates a clone of the object
func (attr *ArgRSv1ResourceUsage) Clone() *ArgRSv1ResourceUsage {
var usageTTL *time.Duration
if attr.UsageTTL != nil {
usageTTL = DurationPointer(*attr.UsageTTL)
}
return &ArgRSv1ResourceUsage{
UsageID: attr.UsageID,
UsageTTL: usageTTL,
Units: attr.Units,
CGREventWithOpts: attr.CGREventWithOpts.Clone(),
}
}
type ArgsComputeFilterIndexIDs struct {