mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
Add ReplyState field to track successful/failed requests
This commit is contained in:
committed by
Ionut Boangiu
parent
2e0de027db
commit
3077544d62
116
ers/ers.go
116
ers/ers.go
@@ -234,6 +234,54 @@ func (erS *ERService) addReader(rdrID string, cfgIdx int) (err error) {
|
||||
func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
|
||||
rdrCfg *config.EventReaderCfg) (err error) {
|
||||
startTime := time.Now()
|
||||
replyState := utils.OK
|
||||
|
||||
// Defer stats and thresholds processing to ensure it happens even with early returns.
|
||||
defer func() {
|
||||
endTime := time.Now()
|
||||
if rdrCfg.Flags.Has(utils.MetaDryRun) {
|
||||
return
|
||||
}
|
||||
rawStatIDs := rdrCfg.Flags.ParamValue(utils.MetaERsStats)
|
||||
rawThIDs := rdrCfg.Flags.ParamValue(utils.MetaERsThresholds)
|
||||
|
||||
// Early return if nothing to process.
|
||||
if rawStatIDs == "" && rawThIDs == "" {
|
||||
return
|
||||
}
|
||||
|
||||
// Clone is needed to prevent data races if requests are sent
|
||||
// asynchronously.
|
||||
ev := cgrEv.Clone()
|
||||
|
||||
ev.Event[utils.ReplyState] = replyState
|
||||
ev.Event[utils.StartTime] = startTime
|
||||
ev.Event[utils.EndTime] = endTime
|
||||
ev.Event[utils.ProcessingTime] = endTime.Sub(startTime)
|
||||
ev.Event[utils.Source] = utils.ERs
|
||||
ev.APIOpts[utils.MetaEventType] = utils.ProcessTime
|
||||
|
||||
if rawStatIDs != "" {
|
||||
statIDs := strings.Split(rawStatIDs, utils.ANDSep)
|
||||
ev.APIOpts[utils.OptsStatsProfileIDs] = statIDs
|
||||
var reply []string
|
||||
if err := erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().StatSConns,
|
||||
utils.StatSv1ProcessEvent, ev, &reply); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> failed to process event in %s: %v",
|
||||
utils.ERs, utils.StatS, err))
|
||||
}
|
||||
}
|
||||
if rawThIDs != "" {
|
||||
thIDs := strings.Split(rawThIDs, utils.ANDSep)
|
||||
ev.APIOpts[utils.OptsThresholdsProfileIDs] = thIDs
|
||||
var reply []string
|
||||
if err := erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().ThresholdSConns,
|
||||
utils.ThresholdSv1ProcessEvent, ev, &reply); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> failed to process event in %s: %v",
|
||||
utils.ERs, utils.ThresholdS, err))
|
||||
}
|
||||
}
|
||||
}()
|
||||
// log the event created if requested by flags
|
||||
if rdrCfg.Flags.Has(utils.MetaLog) {
|
||||
utils.Logger.Info(
|
||||
@@ -257,7 +305,8 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
|
||||
reqType == utils.MetaMessage ||
|
||||
reqType == utils.MetaEvent {
|
||||
if cgrArgs, err = utils.GetRoutePaginatorFromOpts(cgrEv.APIOpts); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> args extraction for reader <%s> failed because <%s>",
|
||||
utils.Logger.Warning(fmt.Sprintf(
|
||||
"<%s> args extraction for reader <%s> failed because <%s>",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
err = nil // reset the error and continue the processing
|
||||
}
|
||||
@@ -291,6 +340,9 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
|
||||
rply := new(sessions.V1AuthorizeReply)
|
||||
err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1AuthorizeEvent,
|
||||
authArgs, rply)
|
||||
if err != nil {
|
||||
replyState = utils.ErrReplyStateAuthorize
|
||||
}
|
||||
case utils.MetaInitiate:
|
||||
initArgs := sessions.NewV1InitSessionArgs(
|
||||
rdrCfg.Flags.Has(utils.MetaAttributes),
|
||||
@@ -305,6 +357,9 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
|
||||
rply := new(sessions.V1InitSessionReply)
|
||||
err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1InitiateSession,
|
||||
initArgs, rply)
|
||||
if err != nil {
|
||||
replyState = utils.ErrReplyStateInitiate
|
||||
}
|
||||
case utils.MetaUpdate:
|
||||
updateArgs := sessions.NewV1UpdateSessionArgs(
|
||||
rdrCfg.Flags.Has(utils.MetaAttributes),
|
||||
@@ -318,6 +373,9 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
|
||||
rply := new(sessions.V1UpdateSessionReply)
|
||||
err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1UpdateSession,
|
||||
updateArgs, rply)
|
||||
if err != nil {
|
||||
replyState = utils.ErrReplyStateUpdate
|
||||
}
|
||||
case utils.MetaTerminate:
|
||||
terminateArgs := sessions.NewV1TerminateSessionArgs(
|
||||
rdrCfg.Flags.Has(utils.MetaAccounts),
|
||||
@@ -330,6 +388,9 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
|
||||
rply := utils.StringPointer("")
|
||||
err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1TerminateSession,
|
||||
terminateArgs, rply)
|
||||
if err != nil {
|
||||
replyState = utils.ErrReplyStateTerminate
|
||||
}
|
||||
case utils.MetaMessage:
|
||||
evArgs := sessions.NewV1ProcessMessageArgs(
|
||||
rdrCfg.Flags.Has(utils.MetaAttributes),
|
||||
@@ -350,6 +411,9 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
|
||||
rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone
|
||||
err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessMessage,
|
||||
evArgs, rply)
|
||||
if err != nil {
|
||||
replyState = utils.ErrReplyStateMessage
|
||||
}
|
||||
if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
|
||||
cgrEv.Event[utils.Usage] = 0 // avoid further debits
|
||||
} else if evArgs.Debit {
|
||||
@@ -364,6 +428,9 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
|
||||
rply := new(sessions.V1ProcessEventReply)
|
||||
err = erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns, utils.SessionSv1ProcessEvent,
|
||||
evArgs, rply)
|
||||
if err != nil {
|
||||
replyState = utils.ErrReplyStateEvent
|
||||
}
|
||||
case utils.MetaCDRs: // allow CDR processing
|
||||
case utils.MetaExport: // allow event exporting
|
||||
}
|
||||
@@ -376,6 +443,7 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
|
||||
rplyCDRs := utils.StringPointer("")
|
||||
if err := erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().SessionSConns,
|
||||
utils.SessionSv1ProcessCDR, cgrEv, rplyCDRs); err != nil {
|
||||
replyState = utils.ErrReplyStateCDRs
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -387,54 +455,10 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent,
|
||||
EeIDs: rdrCfg.EEsIDs,
|
||||
CGREvent: cgrEv,
|
||||
}, &reply); err != nil {
|
||||
replyState = utils.ErrReplyStateExport
|
||||
return err
|
||||
}
|
||||
}
|
||||
endTime := time.Now()
|
||||
if rdrCfg.Flags.Has(utils.MetaDryRun) {
|
||||
return nil
|
||||
}
|
||||
rawStatIDs := rdrCfg.Flags.ParamValue(utils.MetaERsStats)
|
||||
rawThIDs := rdrCfg.Flags.ParamValue(utils.MetaERsThresholds)
|
||||
|
||||
// Early return if nothing to process.
|
||||
if rawStatIDs == "" && rawThIDs == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Clone is needed to prevent data races if requests are sent
|
||||
// asynchronously.
|
||||
ev := cgrEv.Clone()
|
||||
|
||||
ev.Event[utils.StartTime] = startTime
|
||||
ev.Event[utils.EndTime] = endTime
|
||||
ev.Event[utils.ProcessingTime] = endTime.Sub(startTime)
|
||||
ev.Event[utils.Source] = utils.ERs
|
||||
ev.APIOpts[utils.MetaEventType] = utils.ProcessTime
|
||||
|
||||
if rawStatIDs != "" {
|
||||
statIDs := strings.Split(rawStatIDs, utils.ANDSep)
|
||||
ev.APIOpts[utils.OptsStatsProfileIDs] = statIDs
|
||||
var reply []string
|
||||
if err := erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().StatSConns,
|
||||
utils.StatSv1ProcessEvent, ev, &reply); err != nil {
|
||||
return fmt.Errorf("failed to process event in %s: %v",
|
||||
utils.StatS, err)
|
||||
}
|
||||
// NOTE: ProfileIDs APIOpts key persists for the ThresholdS request,
|
||||
// although it would be ignored. Might want to delete it.
|
||||
}
|
||||
if rawThIDs != "" {
|
||||
thIDs := strings.Split(rawThIDs, utils.ANDSep)
|
||||
ev.APIOpts[utils.OptsThresholdsProfileIDs] = thIDs
|
||||
var reply []string
|
||||
if err := erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().ThresholdSConns,
|
||||
utils.ThresholdSv1ProcessEvent, ev, &reply); err != nil {
|
||||
return fmt.Errorf("failed to process event in %s: %v",
|
||||
utils.ThresholdS, err)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user