diff --git a/ers/ers.go b/ers/ers.go index f6d475bef..91653da61 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -24,6 +24,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) @@ -44,10 +45,11 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, // ERService is managing the EventReaders type ERService struct { sync.RWMutex - cfg *config.CGRConfig - rdrs map[string]EventReader // map[rdrID]EventReader - rdrPaths map[string]string // used for reloads in case of path changes - stopLsn map[string]chan struct{} // map[rdrID] chan struct{} + cfg *config.CGRConfig + rdrs map[string]EventReader // map[rdrID]EventReader + rdrPaths map[string]string // used for reloads in case of path changes + stopLsn map[string]chan struct{} // map[rdrID] chan struct{} + rdrEvents chan *erEvent // receive here the events from readers filterS *engine.FilterS sS rpcclient.RpcClientConnection // connection towards SessionS @@ -65,8 +67,19 @@ func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) { } } go erS.handleReloads(cfgRldChan) - e := <-erS.exitChan - erS.exitChan <- e // put back for the others listening for shutdown request + for { + select { + case e := <-erS.exitChan: + erS.exitChan <- e // put back for the others listening for shutdown request + break + case erEv := <-erS.rdrEvents: + if err := erS.processEvent(erEv.cgrEvent, erEv.rdrCfg); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> reading event: <%s> got error: <%s>", + utils.ERs, utils.ToIJSON(erEv.cgrEvent), err.Error())) + } + } + } return } @@ -81,21 +94,6 @@ func (erS *ERService) addReader(rdrCfg *config.EventReaderCfg) (err error) { return rdr.Subscribe() } -// setDirWatchers sets up directory watchers -/*func (erS *ERService) setDirWatchers(dirPaths []string) { - for _, dirPath := range dirPaths { - go func() { - if err := erS.watchDir(dirPath); err != nil { - utils.Logger.Crit( - fmt.Sprintf("<%s> watching directory <%s> got error: <%s>", - utils.ERs, dirPath, err.Error())) - erS.exitChan <- true - } - }() - } -} -*/ - // handleReloads will handle the config reloads which are signaled over cfgRldChan func (erS *ERService) handleReloads(cfgRldChan chan struct{}) { for { @@ -141,176 +139,138 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) { } } -/* -// processPath will be called each time a new run should be triggered -func (erS *ERService) processPath(itmPath string, itmID string) error { - rdrs, has := erS.rdrs[itmPath] - if !has { - return fmt.Errorf("no reader for path: <%s>", itmPath) +// processEvent will be called each time a new event is received from readers +func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventReaderCfg) (err error) { + // log the event created if requested by flags + if rdrCfg.Flags.HasKey(utils.MetaLog) { + utils.Logger.Info( + fmt.Sprintf("<%s> LOG, reader: <%s>, message: %s", + utils.ERs, rdrCfg.ID, utils.ToIJSON(cgrEv))) } - for _, rdr := range rdrs { - rdrCfg := rdr.Config() - if err := rdr.Init(itmPath, itmID); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> init reader <%s>, error: <%s>", - utils.ERs, rdrCfg.ID, err.Error())) - continue + // find out reqType + var reqType string + for _, typ := range []string{ + utils.MetaDryRun, utils.MetaAuth, + utils.MetaInitiate, utils.MetaUpdate, + utils.MetaTerminate, utils.MetaMessage, + utils.MetaCDRs, utils.MetaEvent, utils.META_NONE} { + if rdrCfg.Flags.HasKey(typ) { // request type is identified through flags + reqType = typ + break } - for { // reads until no more events are produced - cgrEv, err := rdr.Read() - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> processing reader <%s>, error: <%s>", - utils.ERs, rdrCfg.ID, err.Error())) - continue - } else if cgrEv == nil { - break // no more events - } - // log the event created if requested by flags - if rdrCfg.Flags.HasKey(utils.MetaLog) { - utils.Logger.Info( - fmt.Sprintf("<%s> LOG, reader: <%s>, message: %s", - utils.ERs, rdrCfg.ID, utils.ToIJSON(cgrEv))) - } - // find out reqType - var reqType string - for _, typ := range []string{ - utils.MetaDryRun, utils.MetaAuth, - utils.MetaInitiate, utils.MetaUpdate, - utils.MetaTerminate, utils.MetaMessage, - utils.MetaCDRs, utils.MetaEvent, utils.META_NONE} { - if rdrCfg.Flags.HasKey(typ) { // request type is identified through flags - reqType = typ - break - } - } - // execute the action based on reqType - cgrArgs := cgrEv.ConsumeArgs( - rdrCfg.Flags.HasKey(utils.MetaDispatchers), - reqType == utils.MetaAuth || - reqType == utils.MetaMessage || - reqType == utils.MetaEvent) - switch reqType { - default: - utils.Logger.Warning( - fmt.Sprintf("<%s> processing reader <%s>, unsupported reqType: <%s>", - utils.ERs, rdrCfg.ID, err.Error())) - continue - case utils.META_NONE: // do nothing on CGRateS side - case utils.MetaDryRun: - utils.Logger.Info( - fmt.Sprintf("<%s> DRYRUN, reader: <%s>, CGREvent: <%s>", - utils.DNSAgent, rdrCfg.ID, utils.ToJSON(cgrEv))) - case utils.MetaAuth: - authArgs := sessions.NewV1AuthorizeArgs( - rdrCfg.Flags.HasKey(utils.MetaAttributes), - rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), - rdrCfg.Flags.HasKey(utils.MetaThresholds), - rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), - rdrCfg.Flags.HasKey(utils.MetaStats), - rdrCfg.Flags.ParamsSlice(utils.MetaStats), - rdrCfg.Flags.HasKey(utils.MetaResources), - rdrCfg.Flags.HasKey(utils.MetaAccounts), - rdrCfg.Flags.HasKey(utils.MetaSuppliers), - rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors), - rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost), - cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator, - ) - rply := new(sessions.V1AuthorizeReply) - err = erS.sS.Call(utils.SessionSv1AuthorizeEvent, - authArgs, rply) - case utils.MetaInitiate: - initArgs := sessions.NewV1InitSessionArgs( - rdrCfg.Flags.HasKey(utils.MetaAttributes), - rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), - rdrCfg.Flags.HasKey(utils.MetaThresholds), - rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), - rdrCfg.Flags.HasKey(utils.MetaStats), - rdrCfg.Flags.ParamsSlice(utils.MetaStats), - rdrCfg.Flags.HasKey(utils.MetaResources), - rdrCfg.Flags.HasKey(utils.MetaAccounts), - cgrEv, cgrArgs.ArgDispatcher) - rply := new(sessions.V1InitSessionReply) - err = erS.sS.Call(utils.SessionSv1InitiateSession, - initArgs, rply) - case utils.MetaUpdate: - updateArgs := sessions.NewV1UpdateSessionArgs( - rdrCfg.Flags.HasKey(utils.MetaAttributes), - rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), - rdrCfg.Flags.HasKey(utils.MetaAccounts), - cgrEv, cgrArgs.ArgDispatcher) - rply := new(sessions.V1UpdateSessionReply) - err = erS.sS.Call(utils.SessionSv1UpdateSession, - updateArgs, rply) - case utils.MetaTerminate: - terminateArgs := sessions.NewV1TerminateSessionArgs( - rdrCfg.Flags.HasKey(utils.MetaAccounts), - rdrCfg.Flags.HasKey(utils.MetaResources), - rdrCfg.Flags.HasKey(utils.MetaThresholds), - rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), - rdrCfg.Flags.HasKey(utils.MetaStats), - rdrCfg.Flags.ParamsSlice(utils.MetaStats), - cgrEv, cgrArgs.ArgDispatcher) - rply := utils.StringPointer("") - err = erS.sS.Call(utils.SessionSv1TerminateSession, - terminateArgs, rply) - case utils.MetaMessage: - evArgs := sessions.NewV1ProcessMessageArgs( - rdrCfg.Flags.HasKey(utils.MetaAttributes), - rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), - rdrCfg.Flags.HasKey(utils.MetaThresholds), - rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), - rdrCfg.Flags.HasKey(utils.MetaStats), - rdrCfg.Flags.ParamsSlice(utils.MetaStats), - rdrCfg.Flags.HasKey(utils.MetaResources), - rdrCfg.Flags.HasKey(utils.MetaAccounts), - rdrCfg.Flags.HasKey(utils.MetaSuppliers), - rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors), - rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost), - cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator) - rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone - err = erS.sS.Call(utils.SessionSv1ProcessMessage, - evArgs, rply) - if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { - cgrEv.Event[utils.Usage] = 0 // avoid further debits - } else if rply.MaxUsage != nil { - cgrEv.Event[utils.Usage] = *rply.MaxUsage // make sure the CDR reflects the debit - } - case utils.MetaEvent: - evArgs := &sessions.V1ProcessEventArgs{ - Flags: rdrCfg.Flags.SliceFlags(), - CGREvent: cgrEv, - ArgDispatcher: cgrArgs.ArgDispatcher, - Paginator: *cgrArgs.SupplierPaginator, - } - rply := new(sessions.V1ProcessEventReply) - err = erS.sS.Call(utils.SessionSv1ProcessEvent, - evArgs, rply) - case utils.MetaCDRs: // allow CDR processing - } - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> reader <%s>, error: <%s> posting event", - utils.ERs, rdrCfg.ID, err.Error())) - } - // separate request so we can capture the Terminate/Event also here - if rdrCfg.Flags.HasKey(utils.MetaCDRs) && - !rdrCfg.Flags.HasKey(utils.MetaDryRun) { - rplyCDRs := utils.StringPointer("") - if err = erS.sS.Call(utils.SessionSv1ProcessCDR, - &utils.CGREventWithArgDispatcher{CGREvent: cgrEv, - ArgDispatcher: cgrArgs.ArgDispatcher}, &rplyCDRs); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> reader <%s>, error: <%s> posting event", - utils.ERs, rdrCfg.ID, err.Error())) - } - } + } + // execute the action based on reqType + cgrArgs := cgrEv.ConsumeArgs( + rdrCfg.Flags.HasKey(utils.MetaDispatchers), + reqType == utils.MetaAuth || + reqType == utils.MetaMessage || + reqType == utils.MetaEvent) + switch reqType { + default: + return fmt.Errorf("unsupported reqType: <%s>", reqType) + case utils.META_NONE: // do nothing on CGRateS side + case utils.MetaDryRun: + utils.Logger.Info( + fmt.Sprintf("<%s> DRYRUN, reader: <%s>, CGREvent: <%s>", + utils.DNSAgent, rdrCfg.ID, utils.ToJSON(cgrEv))) + case utils.MetaAuth: + authArgs := sessions.NewV1AuthorizeArgs( + rdrCfg.Flags.HasKey(utils.MetaAttributes), + rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), + rdrCfg.Flags.HasKey(utils.MetaThresholds), + rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), + rdrCfg.Flags.HasKey(utils.MetaStats), + rdrCfg.Flags.ParamsSlice(utils.MetaStats), + rdrCfg.Flags.HasKey(utils.MetaResources), + rdrCfg.Flags.HasKey(utils.MetaAccounts), + rdrCfg.Flags.HasKey(utils.MetaSuppliers), + rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors), + rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost), + cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator, + ) + rply := new(sessions.V1AuthorizeReply) + err = erS.sS.Call(utils.SessionSv1AuthorizeEvent, + authArgs, rply) + case utils.MetaInitiate: + initArgs := sessions.NewV1InitSessionArgs( + rdrCfg.Flags.HasKey(utils.MetaAttributes), + rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), + rdrCfg.Flags.HasKey(utils.MetaThresholds), + rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), + rdrCfg.Flags.HasKey(utils.MetaStats), + rdrCfg.Flags.ParamsSlice(utils.MetaStats), + rdrCfg.Flags.HasKey(utils.MetaResources), + rdrCfg.Flags.HasKey(utils.MetaAccounts), + cgrEv, cgrArgs.ArgDispatcher) + rply := new(sessions.V1InitSessionReply) + err = erS.sS.Call(utils.SessionSv1InitiateSession, + initArgs, rply) + case utils.MetaUpdate: + updateArgs := sessions.NewV1UpdateSessionArgs( + rdrCfg.Flags.HasKey(utils.MetaAttributes), + rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), + rdrCfg.Flags.HasKey(utils.MetaAccounts), + cgrEv, cgrArgs.ArgDispatcher) + rply := new(sessions.V1UpdateSessionReply) + err = erS.sS.Call(utils.SessionSv1UpdateSession, + updateArgs, rply) + case utils.MetaTerminate: + terminateArgs := sessions.NewV1TerminateSessionArgs( + rdrCfg.Flags.HasKey(utils.MetaAccounts), + rdrCfg.Flags.HasKey(utils.MetaResources), + rdrCfg.Flags.HasKey(utils.MetaThresholds), + rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), + rdrCfg.Flags.HasKey(utils.MetaStats), + rdrCfg.Flags.ParamsSlice(utils.MetaStats), + cgrEv, cgrArgs.ArgDispatcher) + rply := utils.StringPointer("") + err = erS.sS.Call(utils.SessionSv1TerminateSession, + terminateArgs, rply) + case utils.MetaMessage: + evArgs := sessions.NewV1ProcessMessageArgs( + rdrCfg.Flags.HasKey(utils.MetaAttributes), + rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), + rdrCfg.Flags.HasKey(utils.MetaThresholds), + rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), + rdrCfg.Flags.HasKey(utils.MetaStats), + rdrCfg.Flags.ParamsSlice(utils.MetaStats), + rdrCfg.Flags.HasKey(utils.MetaResources), + rdrCfg.Flags.HasKey(utils.MetaAccounts), + rdrCfg.Flags.HasKey(utils.MetaSuppliers), + rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors), + rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost), + cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator) + rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone + err = erS.sS.Call(utils.SessionSv1ProcessMessage, + evArgs, rply) + if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { + cgrEv.Event[utils.Usage] = 0 // avoid further debits + } else if rply.MaxUsage != nil { + cgrEv.Event[utils.Usage] = *rply.MaxUsage // make sure the CDR reflects the debit + } + case utils.MetaEvent: + evArgs := &sessions.V1ProcessEventArgs{ + Flags: rdrCfg.Flags.SliceFlags(), + CGREvent: cgrEv, + ArgDispatcher: cgrArgs.ArgDispatcher, + Paginator: *cgrArgs.SupplierPaginator, + } + rply := new(sessions.V1ProcessEventReply) + err = erS.sS.Call(utils.SessionSv1ProcessEvent, + evArgs, rply) + case utils.MetaCDRs: // allow CDR processing + } + if err != nil { + return + } + // separate request so we can capture the Terminate/Event also here + if rdrCfg.Flags.HasKey(utils.MetaCDRs) && + !rdrCfg.Flags.HasKey(utils.MetaDryRun) { + rplyCDRs := utils.StringPointer("") + err = erS.sS.Call(utils.SessionSv1ProcessCDR, + &utils.CGREventWithArgDispatcher{CGREvent: cgrEv, + ArgDispatcher: cgrArgs.ArgDispatcher}, &rplyCDRs) + } - } - if err := rdr.Close(); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> closing reader <%s>, error: <%s>", - utils.ERs, rdr.Config().ID, err.Error())) - } - } - return nil + return } -*/ diff --git a/ers/libers.go b/ers/libers.go index 40800ba36..8177bba5f 100644 --- a/ers/libers.go +++ b/ers/libers.go @@ -22,9 +22,8 @@ import ( "fmt" "path/filepath" - "github.com/cgrates/cgrates/sessions" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" "github.com/fsnotify/fsnotify" ) @@ -61,163 +60,8 @@ func watchDir(dirPath string, f func(itmPath, itmID string) error, } } -// processReader will process events from reader and publish them to SessionS -func processReader(rdr EventReader, sS rpcclient.RpcClient, rdrExit chan struct{}) (err error) { - for { // reads until no more events are produced or exit is signaled - select { - case <-rdrExit: - return - default: - } - rdrCfg := rdr.Config() - cgrEv, err := rdr.Read() - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> processing reader <%s>, error: <%s>", - utils.ERs, rdrCfg.ID, err.Error())) - continue - } else if cgrEv == nil { - break // no more events - } - // log the event created if requested by flags - if rdrCfg.Flags.HasKey(utils.MetaLog) { - utils.Logger.Info( - fmt.Sprintf("<%s> LOG, reader: <%s>, message: %s", - utils.ERs, rdrCfg.ID, utils.ToIJSON(cgrEv))) - } - // find out reqType - var reqType string - for _, typ := range []string{ - utils.MetaDryRun, utils.MetaAuth, - utils.MetaInitiate, utils.MetaUpdate, - utils.MetaTerminate, utils.MetaMessage, - utils.MetaCDRs, utils.MetaEvent, utils.META_NONE} { - if rdrCfg.Flags.HasKey(typ) { // request type is identified through flags - reqType = typ - break - } - } - // execute the action based on reqType - cgrArgs := cgrEv.ConsumeArgs( - rdrCfg.Flags.HasKey(utils.MetaDispatchers), - reqType == utils.MetaAuth || - reqType == utils.MetaMessage || - reqType == utils.MetaEvent) - switch reqType { - default: - utils.Logger.Warning( - fmt.Sprintf("<%s> processing reader <%s>, unsupported reqType: <%s>", - utils.ERs, rdrCfg.ID, err.Error())) - continue - case utils.META_NONE: // do nothing on CGRateS side - case utils.MetaDryRun: - utils.Logger.Info( - fmt.Sprintf("<%s> DRYRUN, reader: <%s>, CGREvent: <%s>", - utils.DNSAgent, rdrCfg.ID, utils.ToJSON(cgrEv))) - case utils.MetaAuth: - authArgs := sessions.NewV1AuthorizeArgs( - rdrCfg.Flags.HasKey(utils.MetaAttributes), - rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), - rdrCfg.Flags.HasKey(utils.MetaThresholds), - rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), - rdrCfg.Flags.HasKey(utils.MetaStats), - rdrCfg.Flags.ParamsSlice(utils.MetaStats), - rdrCfg.Flags.HasKey(utils.MetaResources), - rdrCfg.Flags.HasKey(utils.MetaAccounts), - rdrCfg.Flags.HasKey(utils.MetaSuppliers), - rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors), - rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost), - cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator, - ) - rply := new(sessions.V1AuthorizeReply) - err = sS.Call(utils.SessionSv1AuthorizeEvent, - authArgs, rply) - case utils.MetaInitiate: - initArgs := sessions.NewV1InitSessionArgs( - rdrCfg.Flags.HasKey(utils.MetaAttributes), - rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), - rdrCfg.Flags.HasKey(utils.MetaThresholds), - rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), - rdrCfg.Flags.HasKey(utils.MetaStats), - rdrCfg.Flags.ParamsSlice(utils.MetaStats), - rdrCfg.Flags.HasKey(utils.MetaResources), - rdrCfg.Flags.HasKey(utils.MetaAccounts), - cgrEv, cgrArgs.ArgDispatcher) - rply := new(sessions.V1InitSessionReply) - err = sS.Call(utils.SessionSv1InitiateSession, - initArgs, rply) - case utils.MetaUpdate: - updateArgs := sessions.NewV1UpdateSessionArgs( - rdrCfg.Flags.HasKey(utils.MetaAttributes), - rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), - rdrCfg.Flags.HasKey(utils.MetaAccounts), - cgrEv, cgrArgs.ArgDispatcher) - rply := new(sessions.V1UpdateSessionReply) - err = sS.Call(utils.SessionSv1UpdateSession, - updateArgs, rply) - case utils.MetaTerminate: - terminateArgs := sessions.NewV1TerminateSessionArgs( - rdrCfg.Flags.HasKey(utils.MetaAccounts), - rdrCfg.Flags.HasKey(utils.MetaResources), - rdrCfg.Flags.HasKey(utils.MetaThresholds), - rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), - rdrCfg.Flags.HasKey(utils.MetaStats), - rdrCfg.Flags.ParamsSlice(utils.MetaStats), - cgrEv, cgrArgs.ArgDispatcher) - rply := utils.StringPointer("") - err = sS.Call(utils.SessionSv1TerminateSession, - terminateArgs, rply) - case utils.MetaMessage: - evArgs := sessions.NewV1ProcessMessageArgs( - rdrCfg.Flags.HasKey(utils.MetaAttributes), - rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), - rdrCfg.Flags.HasKey(utils.MetaThresholds), - rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), - rdrCfg.Flags.HasKey(utils.MetaStats), - rdrCfg.Flags.ParamsSlice(utils.MetaStats), - rdrCfg.Flags.HasKey(utils.MetaResources), - rdrCfg.Flags.HasKey(utils.MetaAccounts), - rdrCfg.Flags.HasKey(utils.MetaSuppliers), - rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors), - rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost), - cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator) - rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone - err = sS.Call(utils.SessionSv1ProcessMessage, - evArgs, rply) - if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { - cgrEv.Event[utils.Usage] = 0 // avoid further debits - } else if rply.MaxUsage != nil { - cgrEv.Event[utils.Usage] = *rply.MaxUsage // make sure the CDR reflects the debit - } - case utils.MetaEvent: - evArgs := &sessions.V1ProcessEventArgs{ - Flags: rdrCfg.Flags.SliceFlags(), - CGREvent: cgrEv, - ArgDispatcher: cgrArgs.ArgDispatcher, - Paginator: *cgrArgs.SupplierPaginator, - } - rply := new(sessions.V1ProcessEventReply) - err = sS.Call(utils.SessionSv1ProcessEvent, - evArgs, rply) - case utils.MetaCDRs: // allow CDR processing - } - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> reader <%s>, error: <%s> posting event", - utils.ERs, rdrCfg.ID, err.Error())) - } - // separate request so we can capture the Terminate/Event also here - if rdrCfg.Flags.HasKey(utils.MetaCDRs) && - !rdrCfg.Flags.HasKey(utils.MetaDryRun) { - rplyCDRs := utils.StringPointer("") - if err = sS.Call(utils.SessionSv1ProcessCDR, - &utils.CGREventWithArgDispatcher{CGREvent: cgrEv, - ArgDispatcher: cgrArgs.ArgDispatcher}, &rplyCDRs); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> reader <%s>, error: <%s> posting event", - utils.ERs, rdrCfg.ID, err.Error())) - } - } - } - return +// erEvent is passed from reader to ERs +type erEvent struct { + cgrEvent *utils.CGREvent + rdrCfg *config.EventReaderCfg }