Removed Routes Paginator from SessionS APIs

This commit is contained in:
Trial97
2021-08-24 16:22:53 +03:00
committed by Dan Christian Bogos
parent f823bb9805
commit 3c5f32aeeb
17 changed files with 112 additions and 279 deletions

View File

@@ -334,14 +334,6 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.RequestProcessor,
break
}
}
var cgrArgs utils.Paginator
if reqType == utils.MetaAuthorize || reqType == utils.MetaMessage || reqType == utils.MetaEvent {
if cgrArgs, err = utils.GetRoutePaginatorFromOpts(cgrEv.APIOpts); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> args extraction failed because <%s>",
utils.DiameterAgent, err.Error()))
err = nil // reset the error and continue the processing
}
}
if reqProcessor.Flags.Has(utils.MetaLog) {
utils.Logger.Info(
@@ -369,8 +361,7 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.RequestProcessor,
reqProcessor.Flags.GetBool(utils.MetaRoutes),
reqProcessor.Flags.Has(utils.MetaRoutesIgnoreErrors),
reqProcessor.Flags.Has(utils.MetaRoutesEventCost),
cgrEv, cgrArgs,
reqProcessor.Flags.Has(utils.MetaFD),
cgrEv, reqProcessor.Flags.Has(utils.MetaFD),
reqProcessor.Flags.ParamValue(utils.MetaRoutesMaxCost),
)
rply := new(sessions.V1AuthorizeReply)
@@ -396,10 +387,9 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.RequestProcessor,
cgrEv, &rply)
agReq.setCGRReply(nil, err)
case utils.MetaMessage:
msgArgs := sessions.NewV1ProcessMessageArgs(cgrEv, cgrArgs)
rply := new(sessions.V1ProcessMessageReply)
err = da.connMgr.Call(da.ctx, da.cgrCfg.DiameterAgentCfg().SessionSConns, utils.SessionSv1ProcessMessage,
msgArgs, rply)
cgrEv, rply)
// if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
// cgrEv.Event[utils.Usage] = 0 // avoid further debits
// } else
@@ -410,13 +400,9 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.RequestProcessor,
rply.SetMaxUsageNeeded(messageS)
agReq.setCGRReply(rply, err)
case utils.MetaEvent:
evArgs := &sessions.V1ProcessEventArgs{
Paginator: cgrArgs,
CGREvent: cgrEv,
}
rply := new(sessions.V1ProcessEventReply)
err = da.connMgr.Call(da.ctx, da.cgrCfg.DiameterAgentCfg().SessionSConns, utils.SessionSv1ProcessEvent,
evArgs, rply)
cgrEv, rply)
// if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
// cgrEv.Event[utils.Usage] = 0 // avoid further debits
// } else

View File

@@ -361,28 +361,26 @@ func TestProcessRequest(t *testing.T) {
var id string
if arg == nil {
t.Errorf("args is nil")
} else if rargs, can := arg.(*sessions.V1ProcessMessageArgs); !can {
} else if rargs, can := arg.(*utils.CGREvent); !can {
t.Errorf("args is not of sessions.V1ProcessMessageArgs type")
} else {
id = rargs.ID
}
expargs := &sessions.V1ProcessMessageArgs{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: id,
Event: map[string]interface{}{
"Account": "1001",
"Category": "call",
"Destination": "1003",
"OriginHost": "local",
"OriginID": "123456",
"ToR": "*voice",
"Usage": "10s",
},
APIOpts: map[string]interface{}{
utils.OptsSesAttributeS: "true",
utils.OptsSesMessage: "true",
},
expargs := &utils.CGREvent{
Tenant: "cgrates.org",
ID: id,
Event: map[string]interface{}{
"Account": "1001",
"Category": "call",
"Destination": "1003",
"OriginHost": "local",
"OriginID": "123456",
"ToR": "*voice",
"Usage": "10s",
},
APIOpts: map[string]interface{}{
utils.OptsSesAttributeS: "true",
utils.OptsSesMessage: "true",
},
}
if !reflect.DeepEqual(expargs, arg) {

View File

@@ -192,16 +192,7 @@ func (da *DNSAgent) processRequest(reqProcessor *config.RequestProcessor,
break
}
}
var cgrArgs utils.Paginator
if reqType == utils.MetaAuthorize ||
reqType == utils.MetaMessage ||
reqType == utils.MetaEvent {
if cgrArgs, err = utils.GetRoutePaginatorFromOpts(cgrEv.APIOpts); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> args extraction failed because <%s>",
utils.DNSAgent, err.Error()))
err = nil // reset the error and continue the processing
}
}
if reqProcessor.Flags.Has(utils.MetaLog) {
utils.Logger.Info(
fmt.Sprintf("<%s> LOG, processorID: <%s>, message: %s",
@@ -228,7 +219,7 @@ func (da *DNSAgent) processRequest(reqProcessor *config.RequestProcessor,
reqProcessor.Flags.GetBool(utils.MetaRoutes),
reqProcessor.Flags.Has(utils.MetaRoutesIgnoreErrors),
reqProcessor.Flags.Has(utils.MetaRoutesEventCost),
cgrEv, cgrArgs, reqProcessor.Flags.Has(utils.MetaFD),
cgrEv, reqProcessor.Flags.Has(utils.MetaFD),
reqProcessor.Flags.ParamValue(utils.MetaRoutesMaxCost),
)
rply := new(sessions.V1AuthorizeReply)
@@ -258,11 +249,10 @@ func (da *DNSAgent) processRequest(reqProcessor *config.RequestProcessor,
cgrEv, &rply)
agReq.setCGRReply(nil, err)
case utils.MetaMessage:
evArgs := sessions.NewV1ProcessMessageArgs(cgrEv, cgrArgs)
rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone
err = da.connMgr.Call(context.TODO(), da.cgrCfg.DNSAgentCfg().SessionSConns,
utils.SessionSv1ProcessMessage,
evArgs, rply)
cgrEv, rply)
// if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
// cgrEv.Event[utils.Usage] = 0 // avoid further debits
// } else
@@ -273,14 +263,10 @@ func (da *DNSAgent) processRequest(reqProcessor *config.RequestProcessor,
rply.SetMaxUsageNeeded(messageS)
agReq.setCGRReply(rply, err)
case utils.MetaEvent:
evArgs := &sessions.V1ProcessEventArgs{
CGREvent: cgrEv,
Paginator: cgrArgs,
}
rply := new(sessions.V1ProcessEventReply)
err = da.connMgr.Call(context.TODO(), da.cgrCfg.DNSAgentCfg().SessionSConns,
utils.SessionSv1ProcessEvent,
evArgs, rply)
cgrEv, rply)
// if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
// cgrEv.Event[utils.Usage] = 0 // avoid further debits
// } else

View File

@@ -125,16 +125,7 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.RequestProcessor,
break
}
}
var cgrArgs utils.Paginator
if reqType == utils.MetaAuthorize ||
reqType == utils.MetaMessage ||
reqType == utils.MetaEvent {
if cgrArgs, err = utils.GetRoutePaginatorFromOpts(cgrEv.APIOpts); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> args extraction failed because <%s>",
utils.HTTPAgent, err.Error()))
err = nil // reset the error and continue the processing
}
}
if reqProcessor.Flags.Has(utils.MetaLog) {
utils.Logger.Info(
fmt.Sprintf("<%s> LOG, processorID: %s, http message: %s",
@@ -161,7 +152,7 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.RequestProcessor,
reqProcessor.Flags.GetBool(utils.MetaRoutes),
reqProcessor.Flags.Has(utils.MetaRoutesIgnoreErrors),
reqProcessor.Flags.Has(utils.MetaRoutesEventCost),
cgrEv, cgrArgs, reqProcessor.Flags.Has(utils.MetaFD),
cgrEv, reqProcessor.Flags.Has(utils.MetaFD),
reqProcessor.Flags.ParamValue(utils.MetaRoutesMaxCost),
)
rply := new(sessions.V1AuthorizeReply)
@@ -187,10 +178,9 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.RequestProcessor,
cgrEv, &rply)
agReq.setCGRReply(nil, err)
case utils.MetaMessage:
evArgs := sessions.NewV1ProcessMessageArgs(cgrEv, cgrArgs)
rply := new(sessions.V1ProcessMessageReply)
err = ha.connMgr.Call(context.TODO(), ha.sessionConns, utils.SessionSv1ProcessMessage,
evArgs, rply)
cgrEv, rply)
// if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
// cgrEv.Event[utils.Usage] = 0 // avoid further debits
// } else
@@ -201,13 +191,9 @@ func (ha *HTTPAgent) processRequest(reqProcessor *config.RequestProcessor,
rply.SetMaxUsageNeeded(messageS)
agReq.setCGRReply(nil, err)
case utils.MetaEvent:
evArgs := &sessions.V1ProcessEventArgs{
CGREvent: cgrEv,
Paginator: cgrArgs,
}
rply := new(sessions.V1ProcessEventReply)
err = ha.connMgr.Call(context.TODO(), ha.sessionConns, utils.SessionSv1ProcessEvent,
evArgs, rply)
cgrEv, rply)
// if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
// cgrEv.Event[utils.Usage] = 0 // avoid further debits
// } else

View File

@@ -287,13 +287,13 @@ func (ka *KamailioAgent) onCgrProcessMessage(evData []byte, connIdx int) {
}
}
procEvArgs := kev.V1ProcessMessageArgs()
procEvArgs := kev.AsCGREvent(config.CgrConfig().GeneralCfg().DefaultTimezone)
if procEvArgs == nil {
utils.Logger.Err(fmt.Sprintf("<%s> event: %s cannot generate process message session arguments",
utils.KamailioAgent, kev[utils.OriginID]))
return
}
procEvArgs.CGREvent.Event[EvapiConnID] = connIdx // Attach the connection ID
procEvArgs.Event[EvapiConnID] = connIdx // Attach the connection ID
var processReply sessions.V1ProcessMessageReply
err = ka.connMgr.Call(ka.ctx, ka.cfg.SessionSConns, utils.SessionSv1ProcessMessage, procEvArgs, &processReply)

View File

@@ -227,18 +227,8 @@ func (kev KamEvent) AsKamAuthReply(authArgs *sessions.V1AuthorizeArgs,
return
}
// V1ProcessMessageArgs returns the arguments used in SessionSv1.ProcessMessage
func (kev KamEvent) V1ProcessMessageArgs() (args *sessions.V1ProcessMessageArgs) {
cgrEv := kev.AsCGREvent(config.CgrConfig().GeneralCfg().DefaultTimezone)
cgrArgs, _ := utils.GetRoutePaginatorFromOpts(cgrEv.APIOpts)
return &sessions.V1ProcessMessageArgs{ // defaults
CGREvent: cgrEv,
Paginator: cgrArgs,
}
}
// AsKamProcessMessageReply builds up a Kamailio ProcessEvent based on arguments and reply from SessionS
func (kev KamEvent) AsKamProcessMessageReply(procEvArgs *sessions.V1ProcessMessageArgs,
func (kev KamEvent) AsKamProcessMessageReply(procEvArgs *utils.CGREvent,
procEvReply *sessions.V1ProcessMessageReply, rplyErr error) (kar *KamReply, err error) {
evName := CGR_PROCESS_MESSAGE
if kamRouReply, has := kev[KamReplyRoute]; has {

View File

@@ -286,33 +286,6 @@ func TestKamEvAsKamAuthReply(t *testing.T) {
}
}
func TestKamEvV1ProcessMessageArgs(t *testing.T) {
kamEv := KamEvent{"event": "CGR_PROCESS_MESSAGE",
"callid": "46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0",
"from_tag": "bf71ad59", "to_tag": "7351fecf",
"cgr_reqtype": utils.MetaPostpaid, "cgr_account": "1001",
"cgr_destination": "1002", "cgr_answertime": "1419839310",
"cgr_duration": "3", "cgr_pdd": "4",
utils.CGRRoute: "supplier2",
utils.CGRDisconnectCause: "200"}
expected := &sessions.V1ProcessMessageArgs{
CGREvent: &utils.CGREvent{
Tenant: utils.FirstNonEmpty(kamEv[utils.Tenant],
config.CgrConfig().GeneralCfg().DefaultTenant),
ID: utils.UUIDSha1Prefix(),
Event: kamEv.AsMapStringInterface(),
},
}
rcv := kamEv.V1ProcessMessageArgs()
if !reflect.DeepEqual(expected.CGREvent.Tenant, rcv.CGREvent.Tenant) {
t.Errorf("Expecting: %+v, received: %+v", expected.CGREvent.Tenant, rcv.CGREvent.Tenant)
} else if !reflect.DeepEqual(expected.CGREvent.Event, rcv.CGREvent.Event) {
t.Errorf("Expecting: %+v, received: %+v", expected.CGREvent.Event, rcv.CGREvent.Event)
} else if !reflect.DeepEqual(expected.CGREvent.Event, rcv.CGREvent.Event) {
t.Errorf("Expecting: %+v, received: %+v", expected.CGREvent.Event, rcv.CGREvent.Event)
}
}
func TestKamEvAsKamProcessEventReply(t *testing.T) {
kamEv := KamEvent{"event": "CGR_PROCESS_MESSAGE",
"callid": "46c01a5c249b469e76333fc6bfa87f6a@0:0:0:0:0:0:0:0",
@@ -324,14 +297,12 @@ func TestKamEvAsKamProcessEventReply(t *testing.T) {
utils.CGRDisconnectCause: "200",
utils.OptsSesMessage: "true",
}
procEvArgs := &sessions.V1ProcessMessageArgs{
CGREvent: &utils.CGREvent{
Tenant: utils.FirstNonEmpty(kamEv[utils.Tenant],
config.CgrConfig().GeneralCfg().DefaultTenant),
ID: utils.UUIDSha1Prefix(),
Event: kamEv.AsMapStringInterface(),
APIOpts: kamEv.GetOptions(),
},
procEvArgs := &utils.CGREvent{
Tenant: utils.FirstNonEmpty(kamEv[utils.Tenant],
config.CgrConfig().GeneralCfg().DefaultTenant),
ID: utils.UUIDSha1Prefix(),
Event: kamEv.AsMapStringInterface(),
APIOpts: kamEv.GetOptions(),
}
procEvhRply := &sessions.V1ProcessMessageReply{
MaxUsage: utils.DurationPointer(5 * time.Second),
@@ -350,14 +321,12 @@ func TestKamEvAsKamProcessEventReply(t *testing.T) {
KamReplyRoute: "CGR_PROFILE_REPLY",
utils.OptsSesAttributeS: "true",
}
procEvArgs = &sessions.V1ProcessMessageArgs{
CGREvent: &utils.CGREvent{
Tenant: utils.FirstNonEmpty(kamEv[utils.Tenant],
config.CgrConfig().GeneralCfg().DefaultTenant),
ID: utils.UUIDSha1Prefix(),
Event: kamEv.AsMapStringInterface(),
APIOpts: kamEv.GetOptions(),
},
procEvArgs = &utils.CGREvent{
Tenant: utils.FirstNonEmpty(kamEv[utils.Tenant],
config.CgrConfig().GeneralCfg().DefaultTenant),
ID: utils.UUIDSha1Prefix(),
Event: kamEv.AsMapStringInterface(),
APIOpts: kamEv.GetOptions(),
}
procEvhRply = &sessions.V1ProcessMessageReply{
Attributes: &engine.AttrSProcessEventReply{

View File

@@ -183,16 +183,7 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R
break
}
}
var cgrArgs utils.Paginator
if reqType == utils.MetaAuthorize ||
reqType == utils.MetaMessage ||
reqType == utils.MetaEvent {
if cgrArgs, err = utils.GetRoutePaginatorFromOpts(cgrEv.APIOpts); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> args extraction failed because <%s>",
utils.RadiusAgent, err.Error()))
err = nil // reset the error and continue the processing
}
}
if reqProcessor.Flags.Has(utils.MetaLog) {
utils.Logger.Info(
fmt.Sprintf("<%s> LOG, processorID: %s, radius message: %s",
@@ -219,7 +210,7 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R
reqProcessor.Flags.GetBool(utils.MetaRoutes),
reqProcessor.Flags.Has(utils.MetaRoutesIgnoreErrors),
reqProcessor.Flags.Has(utils.MetaRoutesEventCost),
cgrEv, cgrArgs, reqProcessor.Flags.Has(utils.MetaFD),
cgrEv, reqProcessor.Flags.Has(utils.MetaFD),
reqProcessor.Flags.ParamValue(utils.MetaRoutesMaxCost),
)
rply := new(sessions.V1AuthorizeReply)
@@ -245,9 +236,8 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R
cgrEv, &rply)
agReq.setCGRReply(nil, err)
case utils.MetaMessage:
evArgs := sessions.NewV1ProcessMessageArgs(cgrEv, cgrArgs)
rply := new(sessions.V1ProcessMessageReply)
err = ra.connMgr.Call(context.TODO(), ra.cgrCfg.RadiusAgentCfg().SessionSConns, utils.SessionSv1ProcessMessage, evArgs, rply)
err = ra.connMgr.Call(context.TODO(), ra.cgrCfg.RadiusAgentCfg().SessionSConns, utils.SessionSv1ProcessMessage, cgrEv, rply)
// if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
// cgrEv.Event[utils.Usage] = 0 // avoid further debits
// } else
@@ -258,13 +248,9 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R
rply.SetMaxUsageNeeded(messageS)
agReq.setCGRReply(rply, err)
case utils.MetaEvent:
evArgs := &sessions.V1ProcessEventArgs{
CGREvent: cgrEv,
Paginator: cgrArgs,
}
rply := new(sessions.V1ProcessEventReply)
err = ra.connMgr.Call(context.TODO(), ra.cgrCfg.RadiusAgentCfg().SessionSConns, utils.SessionSv1ProcessEvent,
evArgs, rply)
cgrEv, rply)
// if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
// cgrEv.Event[utils.Usage] = 0 // avoid further debits
// } else

View File

@@ -396,16 +396,7 @@ func (sa *SIPAgent) processRequest(reqProcessor *config.RequestProcessor,
break
}
}
var cgrArgs utils.Paginator
if reqType == utils.MetaAuthorize ||
reqType == utils.MetaMessage ||
reqType == utils.MetaEvent {
if cgrArgs, err = utils.GetRoutePaginatorFromOpts(cgrEv.APIOpts); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> args extraction failed because <%s>",
utils.SIPAgent, err.Error()))
err = nil // reset the error and continue the processing
}
}
if reqProcessor.Flags.Has(utils.MetaLog) {
utils.Logger.Info(
fmt.Sprintf("<%s> LOG, processorID: %s, SIP message: %s",
@@ -432,7 +423,7 @@ func (sa *SIPAgent) processRequest(reqProcessor *config.RequestProcessor,
reqProcessor.Flags.GetBool(utils.MetaRoutes),
reqProcessor.Flags.Has(utils.MetaRoutesIgnoreErrors),
reqProcessor.Flags.Has(utils.MetaRoutesEventCost),
cgrEv, cgrArgs, reqProcessor.Flags.Has(utils.MetaFD),
cgrEv, reqProcessor.Flags.Has(utils.MetaFD),
reqProcessor.Flags.ParamValue(utils.MetaRoutesMaxCost),
)
rply := new(sessions.V1AuthorizeReply)
@@ -441,14 +432,9 @@ func (sa *SIPAgent) processRequest(reqProcessor *config.RequestProcessor,
rply.SetMaxUsageNeeded(authArgs.GetMaxUsage)
agReq.setCGRReply(rply, err)
case utils.MetaEvent:
evArgs := &sessions.V1ProcessEventArgs{
CGREvent: cgrEv,
Paginator: cgrArgs,
}
rply := new(sessions.V1ProcessEventReply)
err = sa.connMgr.Call(context.TODO(), sa.cfg.SIPAgentCfg().SessionSConns, utils.SessionSv1ProcessEvent,
evArgs, rply)
cgrEv, rply)
// if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
// cgrEv.Event[utils.Usage] = 0 // avoid further debits
// } else

View File

@@ -76,12 +76,12 @@ func (ssv1 *SessionSv1) ProcessCDR(ctx *context.Context, cgrEv *utils.CGREvent,
return ssv1.sS.BiRPCv1ProcessCDR(ctx, cgrEv, rply)
}
func (ssv1 *SessionSv1) ProcessMessage(ctx *context.Context, args *sessions.V1ProcessMessageArgs,
func (ssv1 *SessionSv1) ProcessMessage(ctx *context.Context, args *utils.CGREvent,
rply *sessions.V1ProcessMessageReply) error {
return ssv1.sS.BiRPCv1ProcessMessage(ctx, args, rply)
}
func (ssv1 *SessionSv1) ProcessEvent(ctx *context.Context, args *sessions.V1ProcessEventArgs,
func (ssv1 *SessionSv1) ProcessEvent(ctx *context.Context, args *utils.CGREvent,
rply *sessions.V1ProcessEventReply) error {
return ssv1.sS.BiRPCv1ProcessEvent(ctx, args, rply)
}

View File

@@ -27,7 +27,7 @@ func init() {
c := &CmdSessionsProcessEvent{
name: "session_process_message",
rpcMethod: utils.SessionSv1ProcessMessage,
rpcParams: &sessions.V1ProcessMessageArgs{},
rpcParams: &utils.CGREvent{},
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
@@ -36,7 +36,7 @@ func init() {
type CmdSessionsProcessEvent struct {
name string
rpcMethod string
rpcParams *sessions.V1ProcessMessageArgs
rpcParams *utils.CGREvent
*CommandExecuter
}
@@ -50,9 +50,7 @@ func (self *CmdSessionsProcessEvent) RpcMethod() string {
func (self *CmdSessionsProcessEvent) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &sessions.V1ProcessMessageArgs{
CGREvent: new(utils.CGREvent),
}
self.rpcParams = new(utils.CGREvent)
}
return self.rpcParams
}

View File

@@ -137,28 +137,28 @@ func (dS *DispatcherService) SessionSv1ProcessCDR(args *utils.CGREvent,
return dS.Dispatch(context.TODO(), args, utils.MetaSessionS, utils.SessionSv1ProcessCDR, args, reply)
}
func (dS *DispatcherService) SessionSv1ProcessMessage(args *sessions.V1ProcessMessageArgs,
func (dS *DispatcherService) SessionSv1ProcessMessage(args *utils.CGREvent,
reply *sessions.V1ProcessMessageReply) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
args.Tenant = utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if err = dS.authorize(utils.SessionSv1ProcessMessage, args.CGREvent.Tenant,
if err = dS.authorize(utils.SessionSv1ProcessMessage, args.Tenant,
utils.IfaceAsString(args.APIOpts[utils.OptsAPIKey])); err != nil {
return
}
}
return dS.Dispatch(context.TODO(), args.CGREvent, utils.MetaSessionS, utils.SessionSv1ProcessMessage, args, reply)
return dS.Dispatch(context.TODO(), args, utils.MetaSessionS, utils.SessionSv1ProcessMessage, args, reply)
}
func (dS *DispatcherService) SessionSv1ProcessEvent(args *sessions.V1ProcessEventArgs,
func (dS *DispatcherService) SessionSv1ProcessEvent(args *utils.CGREvent,
reply *sessions.V1ProcessEventReply) (err error) {
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
args.Tenant = utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if err = dS.authorize(utils.SessionSv1ProcessEvent, args.CGREvent.Tenant,
if err = dS.authorize(utils.SessionSv1ProcessEvent, args.Tenant,
utils.IfaceAsString(args.APIOpts[utils.OptsAPIKey])); err != nil {
return
}
}
return dS.Dispatch(context.TODO(), args.CGREvent, utils.MetaSessionS, utils.SessionSv1ProcessEvent, args, reply)
return dS.Dispatch(context.TODO(), args, utils.MetaSessionS, utils.SessionSv1ProcessEvent, args, reply)
}
func (dS *DispatcherService) SessionSv1GetActiveSessions(args *utils.SessionFilter,

View File

@@ -182,10 +182,8 @@ func TestDspSessionSv1ProcessCDRErrorNil(t *testing.T) {
func TestDspSessionSv1ProcessMessageNil(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
CGREvent := &sessions.V1ProcessMessageArgs{
CGREvent: &utils.CGREvent{
Tenant: "tenant",
},
CGREvent := &utils.CGREvent{
Tenant: "tenant",
}
var reply *sessions.V1ProcessMessageReply
result := dspSrv.SessionSv1ProcessMessage(CGREvent, reply)
@@ -199,10 +197,8 @@ func TestDspSessionSv1ProcessMessageErrorNil(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
cgrCfg.DispatcherSCfg().AttributeSConns = []string{"test"}
CGREvent := &sessions.V1ProcessMessageArgs{
CGREvent: &utils.CGREvent{
Tenant: "tenant",
},
CGREvent := &utils.CGREvent{
Tenant: "tenant",
}
var reply *sessions.V1ProcessMessageReply
result := dspSrv.SessionSv1ProcessMessage(CGREvent, reply)
@@ -215,10 +211,8 @@ func TestDspSessionSv1ProcessMessageErrorNil(t *testing.T) {
func TestDspSessionSv1ProcessEventNil(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
CGREvent := &sessions.V1ProcessEventArgs{
CGREvent: &utils.CGREvent{
Tenant: "tenant",
},
CGREvent := &utils.CGREvent{
Tenant: "tenant",
}
var reply *sessions.V1ProcessEventReply
result := dspSrv.SessionSv1ProcessEvent(CGREvent, reply)
@@ -232,10 +226,8 @@ func TestDspSessionSv1ProcessEventErrorNil(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
cgrCfg.DispatcherSCfg().AttributeSConns = []string{"test"}
CGREvent := &sessions.V1ProcessEventArgs{
CGREvent: &utils.CGREvent{
Tenant: "tenant",
},
CGREvent := &utils.CGREvent{
Tenant: "tenant",
}
var reply *sessions.V1ProcessEventReply
result := dspSrv.SessionSv1ProcessEvent(CGREvent, reply)

View File

@@ -189,16 +189,6 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
break
}
}
var cgrArgs utils.Paginator
if reqType == utils.MetaAuthorize ||
reqType == utils.MetaMessage ||
reqType == utils.MetaEvent {
if cgrArgs, err = utils.GetRoutePaginatorFromOpts(cgrEv.APIOpts); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> args extraction for reader <%s> failed because <%s>",
utils.ERs, rdrCfg.ID, err.Error()))
err = nil // reset the error and continue the processing
}
}
// execute the action based on reqType
switch reqType {
default:
@@ -221,8 +211,7 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
rdrCfg.Flags.Has(utils.MetaRoutes),
rdrCfg.Flags.Has(utils.MetaRoutesIgnoreErrors),
rdrCfg.Flags.Has(utils.MetaRoutesEventCost),
cgrEv, cgrArgs,
rdrCfg.Flags.Has(utils.MetaFD),
cgrEv, rdrCfg.Flags.Has(utils.MetaFD),
rdrCfg.Flags.ParamValue(utils.MetaRoutesMaxCost),
)
rply := new(sessions.V1AuthorizeReply)
@@ -241,10 +230,9 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1TerminateSession,
cgrEv, rply)
case utils.MetaMessage:
evArgs := sessions.NewV1ProcessMessageArgs(cgrEv, cgrArgs)
rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone
err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessMessage,
evArgs, rply)
cgrEv, rply)
// if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
// cgrEv.Event[utils.Usage] = 0 // avoid further debits
// } else
@@ -252,13 +240,9 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
cgrEv.Event[utils.Usage] = rply.MaxUsage // make sure the CDR reflects the debit
}
case utils.MetaEvent:
evArgs := &sessions.V1ProcessEventArgs{
CGREvent: cgrEv,
Paginator: cgrArgs,
}
rply := new(sessions.V1ProcessEventReply)
err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessEvent,
evArgs, rply)
cgrEv, rply)
case utils.MetaCDRs: // allow CDR processing
}
if err != nil {

View File

@@ -274,12 +274,6 @@ func getDerivedEvents(events map[string]*utils.CGREvent, derivedReply bool) map[
}
}
// V1ProcessEventArgs are the options passed to ProcessEvent API
type V1ProcessEventArgs struct {
*utils.CGREvent
utils.Paginator // for routes
}
// V1ProcessEventReply is the reply for the ProcessEvent API
type V1ProcessEventReply struct {
MaxUsage map[string]time.Duration `json:",omitempty"`
@@ -368,20 +362,6 @@ func (v1Rply *V1ProcessEventReply) AsNavigableMap() map[string]*utils.DataNode {
return cgrReply
}
// NewV1ProcessMessageArgs is a constructor for MessageArgs used by ProcessMessage
func NewV1ProcessMessageArgs(cgrEv *utils.CGREvent, routePaginator utils.Paginator) *V1ProcessMessageArgs {
return &V1ProcessMessageArgs{
CGREvent: cgrEv,
Paginator: routePaginator,
}
}
// V1ProcessMessageArgs are the options passed to ProcessMessage API
type V1ProcessMessageArgs struct {
*utils.CGREvent
utils.Paginator
}
// V1ProcessMessageReply is the reply for the ProcessMessage API
type V1ProcessMessageReply struct {
MaxUsage *time.Duration `json:",omitempty"`
@@ -448,8 +428,7 @@ func (v1Rply *V1ProcessMessageReply) AsNavigableMap() map[string]*utils.DataNode
func NewV1AuthorizeArgs(attrs bool, attributeIDs []string,
thrslds bool, thresholdIDs []string, statQueues bool, statIDs []string,
res, maxUsage, routes, routesIgnoreErrs, routesEventCost bool,
cgrEv *utils.CGREvent, routePaginator utils.Paginator,
forceDuration bool, routesMaxCost string) (args *V1AuthorizeArgs) {
cgrEv *utils.CGREvent, forceDuration bool, routesMaxCost string) (args *V1AuthorizeArgs) {
args = &V1AuthorizeArgs{
GetAttributes: attrs,
AuthorizeResources: res,
@@ -466,7 +445,6 @@ func NewV1AuthorizeArgs(attrs bool, attributeIDs []string,
} else {
args.RoutesMaxCost = routesMaxCost
}
args.Paginator = routePaginator
if len(attributeIDs) != 0 {
args.AttributeIDs = attributeIDs
}
@@ -495,7 +473,6 @@ type V1AuthorizeArgs struct {
ThresholdIDs []string
StatIDs []string
*utils.CGREvent
utils.Paginator
}
// ParseFlags will populate the V1AuthorizeArgs flags
@@ -527,7 +504,6 @@ func (args *V1AuthorizeArgs) ParseFlags(flags, sep string) {
args.ForceDuration = true
}
}
args.Paginator, _ = utils.GetRoutePaginatorFromOpts(args.APIOpts)
}
// V1AuthorizeReply are options available in auth reply

View File

@@ -1616,7 +1616,8 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(ctx *context.Context,
}
if args.GetRoutes ||
utils.OptAsBool(args.APIOpts, utils.OptsSesRouteS) {
routesReply, err := sS.getRoutes(ctx, args.CGREvent.Clone(), args.Paginator,
cgrArgs, _ := utils.GetRoutePaginatorFromOpts(args.APIOpts)
routesReply, err := sS.getRoutes(ctx, args.CGREvent.Clone(), cgrArgs,
args.RoutesIgnoreErrors || utils.OptAsBool(args.APIOpts, utils.OptsSesRouteSIgnoreErrors),
utils.FirstNonEmpty(args.RoutesMaxCost, utils.IfaceAsString(args.APIOpts[utils.OptsSesRouteSMaxCost])),
false)
@@ -2150,21 +2151,21 @@ func (sS *SessionS) BiRPCv1ProcessCDR(ctx *context.Context,
// BiRPCv1ProcessMessage processes one event with the right subsystems based on arguments received
func (sS *SessionS) BiRPCv1ProcessMessage(ctx *context.Context,
args *V1ProcessMessageArgs, rply *V1ProcessMessageReply) (err error) {
if args.CGREvent == nil {
args *utils.CGREvent, rply *V1ProcessMessageReply) (err error) {
if args == nil {
return utils.NewErrMandatoryIeMissing(utils.CGREventString)
}
var withErrors bool
if args.CGREvent.ID == utils.EmptyString {
args.CGREvent.ID = utils.GenUUID()
if args.ID == utils.EmptyString {
args.ID = utils.GenUUID()
}
if args.CGREvent.Tenant == utils.EmptyString {
args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant
if args.Tenant == utils.EmptyString {
args.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant
}
// RPC caching
if sS.cgrCfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessMessage, args.CGREvent.ID)
cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessMessage, args.ID)
refID := guardian.Guardian.GuardIDs("",
sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
@@ -2182,7 +2183,7 @@ func (sS *SessionS) BiRPCv1ProcessMessage(ctx *context.Context,
}
// end of RPC caching
me := engine.MapEvent(args.CGREvent.Event)
me := engine.MapEvent(args.Event)
originID := me.GetStringIgnoreErrors(utils.OriginID)
if utils.OptAsBool(args.APIOpts, utils.OptsSesAttributeS) {
@@ -2190,9 +2191,9 @@ func (sS *SessionS) BiRPCv1ProcessMessage(ctx *context.Context,
if atrsIDs, err = utils.OptAsStringSlice(args.APIOpts, utils.OptsSesAttributeIDs); err != nil {
return
}
rplyAttr, err := sS.processAttributes(ctx, args.CGREvent, atrsIDs, false)
rplyAttr, err := sS.processAttributes(ctx, args, atrsIDs, false)
if err == nil {
args.CGREvent = rplyAttr.CGREvent
args = rplyAttr.CGREvent
rply.Attributes = &rplyAttr
} else if err.Error() != utils.ErrNotFound.Error() {
return utils.NewErrAttributeS(err)
@@ -2206,7 +2207,7 @@ func (sS *SessionS) BiRPCv1ProcessMessage(ctx *context.Context,
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
attrRU := &utils.ArgRSv1ResourceUsage{
CGREvent: args.CGREvent,
CGREvent: args,
UsageID: originID,
Units: 1,
}
@@ -2218,7 +2219,8 @@ func (sS *SessionS) BiRPCv1ProcessMessage(ctx *context.Context,
rply.ResourceAllocation = &allocMessage
}
if utils.OptAsBool(args.APIOpts, utils.OptsSesRouteS) {
routesReply, err := sS.getRoutes(ctx, args.CGREvent.Clone(), args.Paginator,
cgrArgs, _ := utils.GetRoutePaginatorFromOpts(args.APIOpts)
routesReply, err := sS.getRoutes(ctx, args.Clone(), cgrArgs,
utils.OptAsBool(args.APIOpts, utils.OptsSesRouteSIgnoreErrors),
utils.IfaceAsString(args.APIOpts[utils.OptsSesRouteSMaxCost]), false)
if err != nil {
@@ -2230,7 +2232,7 @@ func (sS *SessionS) BiRPCv1ProcessMessage(ctx *context.Context,
}
if utils.OptAsBool(args.APIOpts, utils.OptsSesMessage) {
var maxUsage time.Duration
if maxUsage, err = sS.chargeEvent(ctx, args.CGREvent, utils.OptAsBool(args.APIOpts, utils.OptsSesForceDuration)); err != nil {
if maxUsage, err = sS.chargeEvent(ctx, args, utils.OptAsBool(args.APIOpts, utils.OptsSesForceDuration)); err != nil {
return err
}
rply.MaxUsage = &maxUsage
@@ -2240,11 +2242,11 @@ func (sS *SessionS) BiRPCv1ProcessMessage(ctx *context.Context,
if thIDs, err = utils.OptAsStringSlice(args.APIOpts, utils.OptsSesThresholdIDs); err != nil {
return
}
tIDs, err := sS.processThreshold(ctx, args.CGREvent, thIDs, true)
tIDs, err := sS.processThreshold(ctx, args, thIDs, true)
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.",
utils.SessionS, err.Error(), args.CGREvent))
utils.SessionS, err.Error(), args))
withErrors = true
}
rply.ThresholdIDs = &tIDs
@@ -2254,12 +2256,12 @@ func (sS *SessionS) BiRPCv1ProcessMessage(ctx *context.Context,
if stIDs, err = utils.OptAsStringSlice(args.APIOpts, utils.OptsSesStatIDs); err != nil {
return
}
sIDs, err := sS.processStats(ctx, args.CGREvent, stIDs, false)
sIDs, err := sS.processStats(ctx, args, stIDs, false)
if err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v with StatS.",
utils.SessionS, err.Error(), args.CGREvent))
utils.SessionS, err.Error(), args))
withErrors = true
}
rply.StatQueueIDs = &sIDs
@@ -2273,21 +2275,21 @@ func (sS *SessionS) BiRPCv1ProcessMessage(ctx *context.Context,
// BiRPCv1ProcessEvent processes one event with the right subsystems based on arguments received
func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context,
args *V1ProcessEventArgs, rply *V1ProcessEventReply) (err error) {
if args.CGREvent == nil {
args *utils.CGREvent, rply *V1ProcessEventReply) (err error) {
if args == nil {
return utils.NewErrMandatoryIeMissing(utils.CGREventString)
}
var withErrors bool
if args.CGREvent.ID == "" {
args.CGREvent.ID = utils.GenUUID()
if args.ID == "" {
args.ID = utils.GenUUID()
}
if args.CGREvent.Tenant == "" {
args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant
if args.Tenant == "" {
args.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant
}
// RPC caching
if sS.cgrCfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessEvent, args.CGREvent.ID)
cacheKey := utils.ConcatenatedKey(utils.SessionSv1ProcessEvent, args.ID)
refID := guardian.Guardian.GuardIDs("",
sS.cgrCfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
@@ -2307,12 +2309,12 @@ func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context,
blockError := utils.OptAsBool(args.APIOpts, utils.OptsSesBlockerError)
events := map[string]*utils.CGREvent{
utils.MetaRaw: args.CGREvent,
utils.MetaRaw: args,
}
if utils.OptAsBool(args.APIOpts, utils.OptsSesChargerS) {
var chrgrs []*engine.ChrgSProcessEventReply
if chrgrs, err = sS.processChargerS(ctx, args.CGREvent); err != nil {
if chrgrs, err = sS.processChargerS(ctx, args); err != nil {
return
}
for _, chrgr := range chrgrs {
@@ -2339,17 +2341,18 @@ func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context,
rply.Attributes[runID] = &rplyAttr
}
}
args.CGREvent = events[utils.MetaRaw]
args = events[utils.MetaRaw]
}
// get routes if required
if utils.OptAsBool(args.APIOpts, utils.OptsSesRouteS) {
rply.RouteProfiles = make(map[string]engine.SortedRoutesList)
// check in case we have options for suppliers
ignoreErrors := utils.OptAsBool(args.APIOpts, utils.OptsSesRouteSIgnoreErrors)
maxCost := utils.IfaceAsString(args.APIOpts[utils.OptsSesRouteSMaxCost])
for runID, cgrEv := range getDerivedEvents(events, utils.OptAsBool(args.APIOpts, utils.OptsSesRouteSDerivedReply)) {
routesReply, err := sS.getRoutes(ctx, cgrEv.Clone(), args.Paginator, ignoreErrors, maxCost, false)
cgrArgs, _ := utils.GetRoutePaginatorFromOpts(cgrEv.APIOpts)
routesReply, err := sS.getRoutes(ctx, cgrEv.Clone(), cgrArgs,
utils.OptAsBool(cgrEv.APIOpts, utils.OptsSesRouteSIgnoreErrors),
utils.IfaceAsString(cgrEv.APIOpts[utils.OptsSesRouteSMaxCost]), false)
if err != nil {
return err
}

View File

@@ -954,7 +954,6 @@ func TestV1AuthorizeArgsParseFlags1(t *testing.T) {
t.Errorf("Expecting %+v,\n received: %+v", eOut, v1authArgs)
}
//normal check -> without *dispatchers
cgrArgs, _ := utils.GetRoutePaginatorFromOpts(v1authArgs.APIOpts)
eOut = &V1AuthorizeArgs{
GetMaxUsage: true,
AuthorizeResources: true,
@@ -967,7 +966,6 @@ func TestV1AuthorizeArgsParseFlags1(t *testing.T) {
ThresholdIDs: []string{"tr1", "tr2", "tr3"},
ProcessStats: true,
StatIDs: []string{"st1", "st2", "st3"},
Paginator: cgrArgs,
CGREvent: eOut.CGREvent,
ForceDuration: true,
}
@@ -980,7 +978,6 @@ func TestV1AuthorizeArgsParseFlags1(t *testing.T) {
t.Errorf("Expecting %+v,\n received: %+v\n", utils.ToJSON(eOut), utils.ToJSON(v1authArgs))
}
// //normal check -> with *dispatchers
cgrArgs, _ = utils.GetRoutePaginatorFromOpts(v1authArgs.APIOpts)
eOut = &V1AuthorizeArgs{
GetMaxUsage: true,
AuthorizeResources: true,
@@ -993,7 +990,6 @@ func TestV1AuthorizeArgsParseFlags1(t *testing.T) {
ThresholdIDs: []string{"tr1", "tr2", "tr3"},
ProcessStats: true,
StatIDs: []string{"st1", "st2", "st3"},
Paginator: cgrArgs,
CGREvent: eOut.CGREvent,
ForceDuration: true,
}
@@ -1017,7 +1013,6 @@ func TestV1AuthorizeArgsParseFlags1(t *testing.T) {
ThresholdIDs: []string{"tr1", "tr2", "tr3"},
ProcessStats: true,
StatIDs: []string{"st1", "st2", "st3"},
Paginator: cgrArgs,
CGREvent: eOut.CGREvent,
ForceDuration: true,
}
@@ -1451,9 +1446,8 @@ func TestSessionSNewV1AuthorizeArgsWithOpts(t *testing.T) {
CGREvent: cgrEv,
ForceDuration: true,
}
cgrArgs, _ := utils.GetRoutePaginatorFromOpts(cgrEv.APIOpts)
rply := NewV1AuthorizeArgs(true, nil, false, nil, false, nil, true, false,
false, false, false, cgrEv, cgrArgs, true, "")
false, false, false, cgrEv, true, "")
if !reflect.DeepEqual(expected, rply) {
t.Errorf("Expecting %+v, received: %+v", utils.ToJSON(expected), utils.ToJSON(rply))
}
@@ -1470,7 +1464,7 @@ func TestSessionSNewV1AuthorizeArgsWithOpts(t *testing.T) {
ForceDuration: true,
}
rply = NewV1AuthorizeArgs(true, nil, false, nil, true, nil, false, true,
false, true, true, cgrEv, cgrArgs, true, "")
false, true, true, cgrEv, true, "")
if !reflect.DeepEqual(expected, rply) {
t.Errorf("Expecting %+v, received: %+v", utils.ToJSON(expected), utils.ToJSON(rply))
}
@@ -1495,9 +1489,8 @@ func TestSessionSNewV1AuthorizeArgsWithOpts2(t *testing.T) {
CGREvent: cgrEv,
ForceDuration: true,
}
cgrArgs, _ := utils.GetRoutePaginatorFromOpts(cgrEv.APIOpts)
rply := NewV1AuthorizeArgs(true, nil, false, nil, false, nil, true, false, false,
false, false, cgrEv, cgrArgs, true, "")
false, false, cgrEv, true, "")
if !reflect.DeepEqual(expected, rply) {
t.Errorf("Expecting %+v, received: %+v", utils.ToJSON(expected), utils.ToJSON(rply))
}
@@ -1514,7 +1507,7 @@ func TestSessionSNewV1AuthorizeArgsWithOpts2(t *testing.T) {
ForceDuration: true,
}
rply = NewV1AuthorizeArgs(true, nil, false, nil, true, nil, false, true, false,
true, true, cgrEv, cgrArgs, true, "")
true, true, cgrEv, true, "")
if !reflect.DeepEqual(expected, rply) {
t.Errorf("Expecting %+v, received: %+v", utils.ToJSON(expected), utils.ToJSON(rply))
}