Add support for *blocker_error in SessionSv1.ProcessEvent ( + integration test )

This commit is contained in:
TeoV
2020-07-23 09:52:14 +03:00
committed by Dan Christian Bogos
parent a2e531c58d
commit 51b4b7f6d1
4 changed files with 76 additions and 0 deletions

View File

@@ -58,6 +58,8 @@ var sTestSessionSv1ProcessEvent = []func(t *testing.T){
testSSv1ItGetCDRsFromProcessEvent,
testSSv1ItProcessEventWithCDRResourceError,
testSSv1ItGetCDRsFromProcessEventResourceError,
testSSv1ItProcessEventWithCDRResourceErrorBlockError,
testSSv1ItGetCDRsFromProcessEventResourceErrorBlockError,
testSSv1ItStopCgrEngine,
}
@@ -840,3 +842,45 @@ func testSSv1ItGetCDRsFromProcessEventResourceError(t *testing.T) {
}
}
}
func testSSv1ItProcessEventWithCDRResourceErrorBlockError(t *testing.T) {
args := &sessions.V1ProcessEventArgs{
Flags: []string{utils.MetaCDRs + utils.InInFieldSep + utils.MetaRALs,
utils.ConcatenatedKey(utils.MetaResources, utils.MetaRelease),
utils.MetaBlockerError}, // expended to stop the processing because we have error at resource
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testSSv1ItProcessEventWithCDRResourceErrorBlockError",
Event: map[string]interface{}{
utils.Tenant: "cgrates.org",
utils.ToR: utils.VOICE,
utils.OriginID: "testSSv1ItProcessEventWithCDRResourceErrorBlockError",
utils.RequestType: sSV1RequestType,
utils.Account: "1001",
utils.Subject: "ANY2CNT",
utils.Destination: "1002",
utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC),
utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC),
utils.Usage: 10 * time.Minute,
},
},
}
var rply sessions.V1ProcessEventReply
if err := sSv1BiRpc.Call(utils.SessionSv1ProcessEvent,
args, &rply); err == nil || err.Error() != "RESOURCES_ERROR:cannot find usage record with id: testSSv1ItProcessEventWithCDRResourceErrorBlockError" {
t.Error(err)
}
time.Sleep(100 * time.Millisecond)
}
func testSSv1ItGetCDRsFromProcessEventResourceErrorBlockError(t *testing.T) {
var cdrCnt int64
req := &utils.RPCCDRsFilterWithArgDispatcher{RPCCDRsFilter: &utils.RPCCDRsFilter{
OriginIDs: []string{"testSSv1ItProcessEventWithCDRResourceErrorBlockError"}}}
if err := sSApierRpc.Call(utils.CDRsV1GetCDRsCount, req, &cdrCnt); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if cdrCnt != 0 {
t.Error("Unexpected number of CDRs returned: ", cdrCnt)
}
}

View File

@@ -3099,6 +3099,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
//convert from Flags []string to utils.FlagsWithParams
argsFlagsWithParams := utils.FlagsWithParamsFromSlice(args.Flags)
blockError := argsFlagsWithParams.GetBool(utils.MetaBlockerError)
events := map[string]*utils.CGREventWithOpts{
utils.MetaRaw: {
CGREvent: args.CGREvent,
@@ -3176,6 +3177,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
for runID, cgrEv := range getDerivedEvents(events, argsFlagsWithParams[utils.MetaThresholds].Has(utils.MetaDerivedReply)) {
tIDs, err := sS.processThreshold(cgrEv.CGREvent, cgrEv.ArgDispatcher, thIDs)
if err != nil && err.Error() != utils.ErrNotFound.Error() {
if blockError {
return utils.NewErrThresholdS(err)
}
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v for RunID <%s> with ThresholdS.",
utils.SessionS, err.Error(), cgrEv.CGREvent, runID))
@@ -3193,6 +3197,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
sIDs, err := sS.processStats(cgrEv.CGREvent, cgrEv.ArgDispatcher, stIDs)
if err != nil &&
err.Error() != utils.ErrNotFound.Error() {
if blockError {
return utils.NewErrStatS(err)
}
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v for RunID <%s> with StatS.",
utils.SessionS, err.Error(), cgrEv.CGREvent, runID))
@@ -3284,6 +3291,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
case resOpt.Has(utils.MetaAuthorize):
if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().ResSConns, nil, utils.ResourceSv1AuthorizeResources,
attrRU, &resMessage); err != nil {
if blockError {
return utils.NewErrResourceS(err)
}
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v for RunID <%s> with ResourceS.",
utils.SessionS, err.Error(), cgrEv.CGREvent, runID))
@@ -3292,6 +3302,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
case resOpt.Has(utils.MetaAllocate):
if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().ResSConns, nil, utils.ResourceSv1AllocateResources,
attrRU, &resMessage); err != nil {
if blockError {
return utils.NewErrResourceS(err)
}
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v for RunID <%s> with ResourceS.",
utils.SessionS, err.Error(), cgrEv.CGREvent, runID))
@@ -3300,6 +3313,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
case resOpt.Has(utils.MetaRelease):
if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().ResSConns, nil, utils.ResourceSv1ReleaseResources,
attrRU, &resMessage); err != nil {
if blockError {
return utils.NewErrResourceS(err)
}
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v for RunID <%s> with ResourceS.",
utils.SessionS, err.Error(), cgrEv.CGREvent, runID))
@@ -3466,6 +3482,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
var cdrRply string
for _, cgrEv := range getDerivedEvents(events, argsFlagsWithParams[utils.MetaCDRs].Has(utils.MetaDerivedReply)) {
if err := sS.processCDR(cgrEv.CGREvent, cgrEv.ArgDispatcher, flgs, &cdrRply); err != nil {
if blockError {
return utils.NewErrCDRS(err)
}
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with CDRs.",
utils.SessionS, err.Error(), cgrEv.CGREvent))

View File

@@ -374,6 +374,7 @@ const (
MetaCore = "*core"
MetaServiceManager = "*servicemanager"
MetaChargers = "*chargers"
MetaBlockerError = "*blocker_error"
MetaConfig = "*config"
MetaDispatchers = "*dispatchers"
MetaDispatcherHosts = "*dispatcher_hosts"

View File

@@ -184,6 +184,18 @@ func NewErrChargerS(err error) error {
return fmt.Errorf("CHARGERS_ERROR:%s", err)
}
func NewErrStatS(err error) error {
return fmt.Errorf("STATS_ERROR:%s", err)
}
func NewErrCDRS(err error) error {
return fmt.Errorf("CDRS_ERROR:%s", err)
}
func NewErrThresholdS(err error) error {
return fmt.Errorf("THRESHOLDS_ERROR:%s", err)
}
func NewErrDispatcherS(err error) error {
return fmt.Errorf("%s:%s", DispatcherErrorPrefix, err.Error())
}