diff --git a/apier/v1/sessions_process_event_it_test.go b/apier/v1/sessions_process_event_it_test.go index ba7227aaf..b6163d648 100644 --- a/apier/v1/sessions_process_event_it_test.go +++ b/apier/v1/sessions_process_event_it_test.go @@ -58,6 +58,8 @@ var sTestSessionSv1ProcessEvent = []func(t *testing.T){ testSSv1ItGetCDRsFromProcessEvent, testSSv1ItProcessEventWithCDRResourceError, testSSv1ItGetCDRsFromProcessEventResourceError, + testSSv1ItProcessEventWithCDRResourceErrorBlockError, + testSSv1ItGetCDRsFromProcessEventResourceErrorBlockError, testSSv1ItStopCgrEngine, } @@ -840,3 +842,45 @@ func testSSv1ItGetCDRsFromProcessEventResourceError(t *testing.T) { } } } + +func testSSv1ItProcessEventWithCDRResourceErrorBlockError(t *testing.T) { + args := &sessions.V1ProcessEventArgs{ + Flags: []string{utils.MetaCDRs + utils.InInFieldSep + utils.MetaRALs, + utils.ConcatenatedKey(utils.MetaResources, utils.MetaRelease), + utils.MetaBlockerError}, // expended to stop the processing because we have error at resource + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testSSv1ItProcessEventWithCDRResourceErrorBlockError", + Event: map[string]interface{}{ + utils.Tenant: "cgrates.org", + utils.ToR: utils.VOICE, + utils.OriginID: "testSSv1ItProcessEventWithCDRResourceErrorBlockError", + utils.RequestType: sSV1RequestType, + utils.Account: "1001", + utils.Subject: "ANY2CNT", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: 10 * time.Minute, + }, + }, + } + var rply sessions.V1ProcessEventReply + if err := sSv1BiRpc.Call(utils.SessionSv1ProcessEvent, + args, &rply); err == nil || err.Error() != "RESOURCES_ERROR:cannot find usage record with id: testSSv1ItProcessEventWithCDRResourceErrorBlockError" { + t.Error(err) + } + time.Sleep(100 * time.Millisecond) +} + +func testSSv1ItGetCDRsFromProcessEventResourceErrorBlockError(t *testing.T) { + var cdrCnt int64 + req := &utils.RPCCDRsFilterWithArgDispatcher{RPCCDRsFilter: &utils.RPCCDRsFilter{ + OriginIDs: []string{"testSSv1ItProcessEventWithCDRResourceErrorBlockError"}}} + if err := sSApierRpc.Call(utils.CDRsV1GetCDRsCount, req, &cdrCnt); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if cdrCnt != 0 { + t.Error("Unexpected number of CDRs returned: ", cdrCnt) + } + +} diff --git a/sessions/sessions.go b/sessions/sessions.go index 802a44951..458031c7a 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -3099,6 +3099,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, //convert from Flags []string to utils.FlagsWithParams argsFlagsWithParams := utils.FlagsWithParamsFromSlice(args.Flags) + blockError := argsFlagsWithParams.GetBool(utils.MetaBlockerError) events := map[string]*utils.CGREventWithOpts{ utils.MetaRaw: { CGREvent: args.CGREvent, @@ -3176,6 +3177,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, for runID, cgrEv := range getDerivedEvents(events, argsFlagsWithParams[utils.MetaThresholds].Has(utils.MetaDerivedReply)) { tIDs, err := sS.processThreshold(cgrEv.CGREvent, cgrEv.ArgDispatcher, thIDs) if err != nil && err.Error() != utils.ErrNotFound.Error() { + if blockError { + return utils.NewErrThresholdS(err) + } utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing event %+v for RunID <%s> with ThresholdS.", utils.SessionS, err.Error(), cgrEv.CGREvent, runID)) @@ -3193,6 +3197,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, sIDs, err := sS.processStats(cgrEv.CGREvent, cgrEv.ArgDispatcher, stIDs) if err != nil && err.Error() != utils.ErrNotFound.Error() { + if blockError { + return utils.NewErrStatS(err) + } utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing event %+v for RunID <%s> with StatS.", utils.SessionS, err.Error(), cgrEv.CGREvent, runID)) @@ -3284,6 +3291,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, case resOpt.Has(utils.MetaAuthorize): if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().ResSConns, nil, utils.ResourceSv1AuthorizeResources, attrRU, &resMessage); err != nil { + if blockError { + return utils.NewErrResourceS(err) + } utils.Logger.Warning( fmt.Sprintf("<%s> error: <%s> processing event %+v for RunID <%s> with ResourceS.", utils.SessionS, err.Error(), cgrEv.CGREvent, runID)) @@ -3292,6 +3302,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, case resOpt.Has(utils.MetaAllocate): if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().ResSConns, nil, utils.ResourceSv1AllocateResources, attrRU, &resMessage); err != nil { + if blockError { + return utils.NewErrResourceS(err) + } utils.Logger.Warning( fmt.Sprintf("<%s> error: <%s> processing event %+v for RunID <%s> with ResourceS.", utils.SessionS, err.Error(), cgrEv.CGREvent, runID)) @@ -3300,6 +3313,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, case resOpt.Has(utils.MetaRelease): if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().ResSConns, nil, utils.ResourceSv1ReleaseResources, attrRU, &resMessage); err != nil { + if blockError { + return utils.NewErrResourceS(err) + } utils.Logger.Warning( fmt.Sprintf("<%s> error: <%s> processing event %+v for RunID <%s> with ResourceS.", utils.SessionS, err.Error(), cgrEv.CGREvent, runID)) @@ -3466,6 +3482,9 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, var cdrRply string for _, cgrEv := range getDerivedEvents(events, argsFlagsWithParams[utils.MetaCDRs].Has(utils.MetaDerivedReply)) { if err := sS.processCDR(cgrEv.CGREvent, cgrEv.ArgDispatcher, flgs, &cdrRply); err != nil { + if blockError { + return utils.NewErrCDRS(err) + } utils.Logger.Warning( fmt.Sprintf("<%s> error: <%s> processing event %+v with CDRs.", utils.SessionS, err.Error(), cgrEv.CGREvent)) diff --git a/utils/consts.go b/utils/consts.go index 9571ce72c..46859d36b 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -374,6 +374,7 @@ const ( MetaCore = "*core" MetaServiceManager = "*servicemanager" MetaChargers = "*chargers" + MetaBlockerError = "*blocker_error" MetaConfig = "*config" MetaDispatchers = "*dispatchers" MetaDispatcherHosts = "*dispatcher_hosts" diff --git a/utils/errors.go b/utils/errors.go index 05dd09b7e..a621774dd 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -184,6 +184,18 @@ func NewErrChargerS(err error) error { return fmt.Errorf("CHARGERS_ERROR:%s", err) } +func NewErrStatS(err error) error { + return fmt.Errorf("STATS_ERROR:%s", err) +} + +func NewErrCDRS(err error) error { + return fmt.Errorf("CDRS_ERROR:%s", err) +} + +func NewErrThresholdS(err error) error { + return fmt.Errorf("THRESHOLDS_ERROR:%s", err) +} + func NewErrDispatcherS(err error) error { return fmt.Errorf("%s:%s", DispatcherErrorPrefix, err.Error()) }