mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
ERService.processEvent implementation
This commit is contained in:
340
ers/ers.go
340
ers/ers.go
@@ -24,6 +24,7 @@ import (
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/sessions"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
@@ -44,10 +45,11 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS,
|
||||
// ERService is managing the EventReaders
|
||||
type ERService struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
rdrs map[string]EventReader // map[rdrID]EventReader
|
||||
rdrPaths map[string]string // used for reloads in case of path changes
|
||||
stopLsn map[string]chan struct{} // map[rdrID] chan struct{}
|
||||
cfg *config.CGRConfig
|
||||
rdrs map[string]EventReader // map[rdrID]EventReader
|
||||
rdrPaths map[string]string // used for reloads in case of path changes
|
||||
stopLsn map[string]chan struct{} // map[rdrID] chan struct{}
|
||||
rdrEvents chan *erEvent // receive here the events from readers
|
||||
|
||||
filterS *engine.FilterS
|
||||
sS rpcclient.RpcClientConnection // connection towards SessionS
|
||||
@@ -65,8 +67,19 @@ func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) {
|
||||
}
|
||||
}
|
||||
go erS.handleReloads(cfgRldChan)
|
||||
e := <-erS.exitChan
|
||||
erS.exitChan <- e // put back for the others listening for shutdown request
|
||||
for {
|
||||
select {
|
||||
case e := <-erS.exitChan:
|
||||
erS.exitChan <- e // put back for the others listening for shutdown request
|
||||
break
|
||||
case erEv := <-erS.rdrEvents:
|
||||
if err := erS.processEvent(erEv.cgrEvent, erEv.rdrCfg); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading event: <%s> got error: <%s>",
|
||||
utils.ERs, utils.ToIJSON(erEv.cgrEvent), err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -81,21 +94,6 @@ func (erS *ERService) addReader(rdrCfg *config.EventReaderCfg) (err error) {
|
||||
return rdr.Subscribe()
|
||||
}
|
||||
|
||||
// setDirWatchers sets up directory watchers
|
||||
/*func (erS *ERService) setDirWatchers(dirPaths []string) {
|
||||
for _, dirPath := range dirPaths {
|
||||
go func() {
|
||||
if err := erS.watchDir(dirPath); err != nil {
|
||||
utils.Logger.Crit(
|
||||
fmt.Sprintf("<%s> watching directory <%s> got error: <%s>",
|
||||
utils.ERs, dirPath, err.Error()))
|
||||
erS.exitChan <- true
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// handleReloads will handle the config reloads which are signaled over cfgRldChan
|
||||
func (erS *ERService) handleReloads(cfgRldChan chan struct{}) {
|
||||
for {
|
||||
@@ -141,176 +139,138 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
// processPath will be called each time a new run should be triggered
|
||||
func (erS *ERService) processPath(itmPath string, itmID string) error {
|
||||
rdrs, has := erS.rdrs[itmPath]
|
||||
if !has {
|
||||
return fmt.Errorf("no reader for path: <%s>", itmPath)
|
||||
// processEvent will be called each time a new event is received from readers
|
||||
func (erS *ERService) processEvent(cgrEv *utils.CGREvent, rdrCfg *config.EventReaderCfg) (err error) {
|
||||
// log the event created if requested by flags
|
||||
if rdrCfg.Flags.HasKey(utils.MetaLog) {
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> LOG, reader: <%s>, message: %s",
|
||||
utils.ERs, rdrCfg.ID, utils.ToIJSON(cgrEv)))
|
||||
}
|
||||
for _, rdr := range rdrs {
|
||||
rdrCfg := rdr.Config()
|
||||
if err := rdr.Init(itmPath, itmID); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> init reader <%s>, error: <%s>",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
continue
|
||||
// find out reqType
|
||||
var reqType string
|
||||
for _, typ := range []string{
|
||||
utils.MetaDryRun, utils.MetaAuth,
|
||||
utils.MetaInitiate, utils.MetaUpdate,
|
||||
utils.MetaTerminate, utils.MetaMessage,
|
||||
utils.MetaCDRs, utils.MetaEvent, utils.META_NONE} {
|
||||
if rdrCfg.Flags.HasKey(typ) { // request type is identified through flags
|
||||
reqType = typ
|
||||
break
|
||||
}
|
||||
for { // reads until no more events are produced
|
||||
cgrEv, err := rdr.Read()
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> processing reader <%s>, error: <%s>",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
continue
|
||||
} else if cgrEv == nil {
|
||||
break // no more events
|
||||
}
|
||||
// log the event created if requested by flags
|
||||
if rdrCfg.Flags.HasKey(utils.MetaLog) {
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> LOG, reader: <%s>, message: %s",
|
||||
utils.ERs, rdrCfg.ID, utils.ToIJSON(cgrEv)))
|
||||
}
|
||||
// find out reqType
|
||||
var reqType string
|
||||
for _, typ := range []string{
|
||||
utils.MetaDryRun, utils.MetaAuth,
|
||||
utils.MetaInitiate, utils.MetaUpdate,
|
||||
utils.MetaTerminate, utils.MetaMessage,
|
||||
utils.MetaCDRs, utils.MetaEvent, utils.META_NONE} {
|
||||
if rdrCfg.Flags.HasKey(typ) { // request type is identified through flags
|
||||
reqType = typ
|
||||
break
|
||||
}
|
||||
}
|
||||
// execute the action based on reqType
|
||||
cgrArgs := cgrEv.ConsumeArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaDispatchers),
|
||||
reqType == utils.MetaAuth ||
|
||||
reqType == utils.MetaMessage ||
|
||||
reqType == utils.MetaEvent)
|
||||
switch reqType {
|
||||
default:
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> processing reader <%s>, unsupported reqType: <%s>",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
continue
|
||||
case utils.META_NONE: // do nothing on CGRateS side
|
||||
case utils.MetaDryRun:
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> DRYRUN, reader: <%s>, CGREvent: <%s>",
|
||||
utils.DNSAgent, rdrCfg.ID, utils.ToJSON(cgrEv)))
|
||||
case utils.MetaAuth:
|
||||
authArgs := sessions.NewV1AuthorizeArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliers),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost),
|
||||
cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator,
|
||||
)
|
||||
rply := new(sessions.V1AuthorizeReply)
|
||||
err = erS.sS.Call(utils.SessionSv1AuthorizeEvent,
|
||||
authArgs, rply)
|
||||
case utils.MetaInitiate:
|
||||
initArgs := sessions.NewV1InitSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := new(sessions.V1InitSessionReply)
|
||||
err = erS.sS.Call(utils.SessionSv1InitiateSession,
|
||||
initArgs, rply)
|
||||
case utils.MetaUpdate:
|
||||
updateArgs := sessions.NewV1UpdateSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := new(sessions.V1UpdateSessionReply)
|
||||
err = erS.sS.Call(utils.SessionSv1UpdateSession,
|
||||
updateArgs, rply)
|
||||
case utils.MetaTerminate:
|
||||
terminateArgs := sessions.NewV1TerminateSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := utils.StringPointer("")
|
||||
err = erS.sS.Call(utils.SessionSv1TerminateSession,
|
||||
terminateArgs, rply)
|
||||
case utils.MetaMessage:
|
||||
evArgs := sessions.NewV1ProcessMessageArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliers),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost),
|
||||
cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator)
|
||||
rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone
|
||||
err = erS.sS.Call(utils.SessionSv1ProcessMessage,
|
||||
evArgs, rply)
|
||||
if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
|
||||
cgrEv.Event[utils.Usage] = 0 // avoid further debits
|
||||
} else if rply.MaxUsage != nil {
|
||||
cgrEv.Event[utils.Usage] = *rply.MaxUsage // make sure the CDR reflects the debit
|
||||
}
|
||||
case utils.MetaEvent:
|
||||
evArgs := &sessions.V1ProcessEventArgs{
|
||||
Flags: rdrCfg.Flags.SliceFlags(),
|
||||
CGREvent: cgrEv,
|
||||
ArgDispatcher: cgrArgs.ArgDispatcher,
|
||||
Paginator: *cgrArgs.SupplierPaginator,
|
||||
}
|
||||
rply := new(sessions.V1ProcessEventReply)
|
||||
err = erS.sS.Call(utils.SessionSv1ProcessEvent,
|
||||
evArgs, rply)
|
||||
case utils.MetaCDRs: // allow CDR processing
|
||||
}
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reader <%s>, error: <%s> posting event",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
}
|
||||
// separate request so we can capture the Terminate/Event also here
|
||||
if rdrCfg.Flags.HasKey(utils.MetaCDRs) &&
|
||||
!rdrCfg.Flags.HasKey(utils.MetaDryRun) {
|
||||
rplyCDRs := utils.StringPointer("")
|
||||
if err = erS.sS.Call(utils.SessionSv1ProcessCDR,
|
||||
&utils.CGREventWithArgDispatcher{CGREvent: cgrEv,
|
||||
ArgDispatcher: cgrArgs.ArgDispatcher}, &rplyCDRs); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reader <%s>, error: <%s> posting event",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
// execute the action based on reqType
|
||||
cgrArgs := cgrEv.ConsumeArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaDispatchers),
|
||||
reqType == utils.MetaAuth ||
|
||||
reqType == utils.MetaMessage ||
|
||||
reqType == utils.MetaEvent)
|
||||
switch reqType {
|
||||
default:
|
||||
return fmt.Errorf("unsupported reqType: <%s>", reqType)
|
||||
case utils.META_NONE: // do nothing on CGRateS side
|
||||
case utils.MetaDryRun:
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> DRYRUN, reader: <%s>, CGREvent: <%s>",
|
||||
utils.DNSAgent, rdrCfg.ID, utils.ToJSON(cgrEv)))
|
||||
case utils.MetaAuth:
|
||||
authArgs := sessions.NewV1AuthorizeArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliers),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost),
|
||||
cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator,
|
||||
)
|
||||
rply := new(sessions.V1AuthorizeReply)
|
||||
err = erS.sS.Call(utils.SessionSv1AuthorizeEvent,
|
||||
authArgs, rply)
|
||||
case utils.MetaInitiate:
|
||||
initArgs := sessions.NewV1InitSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := new(sessions.V1InitSessionReply)
|
||||
err = erS.sS.Call(utils.SessionSv1InitiateSession,
|
||||
initArgs, rply)
|
||||
case utils.MetaUpdate:
|
||||
updateArgs := sessions.NewV1UpdateSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := new(sessions.V1UpdateSessionReply)
|
||||
err = erS.sS.Call(utils.SessionSv1UpdateSession,
|
||||
updateArgs, rply)
|
||||
case utils.MetaTerminate:
|
||||
terminateArgs := sessions.NewV1TerminateSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := utils.StringPointer("")
|
||||
err = erS.sS.Call(utils.SessionSv1TerminateSession,
|
||||
terminateArgs, rply)
|
||||
case utils.MetaMessage:
|
||||
evArgs := sessions.NewV1ProcessMessageArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliers),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost),
|
||||
cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator)
|
||||
rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone
|
||||
err = erS.sS.Call(utils.SessionSv1ProcessMessage,
|
||||
evArgs, rply)
|
||||
if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
|
||||
cgrEv.Event[utils.Usage] = 0 // avoid further debits
|
||||
} else if rply.MaxUsage != nil {
|
||||
cgrEv.Event[utils.Usage] = *rply.MaxUsage // make sure the CDR reflects the debit
|
||||
}
|
||||
case utils.MetaEvent:
|
||||
evArgs := &sessions.V1ProcessEventArgs{
|
||||
Flags: rdrCfg.Flags.SliceFlags(),
|
||||
CGREvent: cgrEv,
|
||||
ArgDispatcher: cgrArgs.ArgDispatcher,
|
||||
Paginator: *cgrArgs.SupplierPaginator,
|
||||
}
|
||||
rply := new(sessions.V1ProcessEventReply)
|
||||
err = erS.sS.Call(utils.SessionSv1ProcessEvent,
|
||||
evArgs, rply)
|
||||
case utils.MetaCDRs: // allow CDR processing
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// separate request so we can capture the Terminate/Event also here
|
||||
if rdrCfg.Flags.HasKey(utils.MetaCDRs) &&
|
||||
!rdrCfg.Flags.HasKey(utils.MetaDryRun) {
|
||||
rplyCDRs := utils.StringPointer("")
|
||||
err = erS.sS.Call(utils.SessionSv1ProcessCDR,
|
||||
&utils.CGREventWithArgDispatcher{CGREvent: cgrEv,
|
||||
ArgDispatcher: cgrArgs.ArgDispatcher}, &rplyCDRs)
|
||||
}
|
||||
|
||||
}
|
||||
if err := rdr.Close(); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> closing reader <%s>, error: <%s>",
|
||||
utils.ERs, rdr.Config().ID, err.Error()))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return
|
||||
}
|
||||
*/
|
||||
|
||||
166
ers/libers.go
166
ers/libers.go
@@ -22,9 +22,8 @@ import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/cgrates/cgrates/sessions"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
)
|
||||
|
||||
@@ -61,163 +60,8 @@ func watchDir(dirPath string, f func(itmPath, itmID string) error,
|
||||
}
|
||||
}
|
||||
|
||||
// processReader will process events from reader and publish them to SessionS
|
||||
func processReader(rdr EventReader, sS rpcclient.RpcClient, rdrExit chan struct{}) (err error) {
|
||||
for { // reads until no more events are produced or exit is signaled
|
||||
select {
|
||||
case <-rdrExit:
|
||||
return
|
||||
default:
|
||||
}
|
||||
rdrCfg := rdr.Config()
|
||||
cgrEv, err := rdr.Read()
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> processing reader <%s>, error: <%s>",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
continue
|
||||
} else if cgrEv == nil {
|
||||
break // no more events
|
||||
}
|
||||
// log the event created if requested by flags
|
||||
if rdrCfg.Flags.HasKey(utils.MetaLog) {
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> LOG, reader: <%s>, message: %s",
|
||||
utils.ERs, rdrCfg.ID, utils.ToIJSON(cgrEv)))
|
||||
}
|
||||
// find out reqType
|
||||
var reqType string
|
||||
for _, typ := range []string{
|
||||
utils.MetaDryRun, utils.MetaAuth,
|
||||
utils.MetaInitiate, utils.MetaUpdate,
|
||||
utils.MetaTerminate, utils.MetaMessage,
|
||||
utils.MetaCDRs, utils.MetaEvent, utils.META_NONE} {
|
||||
if rdrCfg.Flags.HasKey(typ) { // request type is identified through flags
|
||||
reqType = typ
|
||||
break
|
||||
}
|
||||
}
|
||||
// execute the action based on reqType
|
||||
cgrArgs := cgrEv.ConsumeArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaDispatchers),
|
||||
reqType == utils.MetaAuth ||
|
||||
reqType == utils.MetaMessage ||
|
||||
reqType == utils.MetaEvent)
|
||||
switch reqType {
|
||||
default:
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> processing reader <%s>, unsupported reqType: <%s>",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
continue
|
||||
case utils.META_NONE: // do nothing on CGRateS side
|
||||
case utils.MetaDryRun:
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> DRYRUN, reader: <%s>, CGREvent: <%s>",
|
||||
utils.DNSAgent, rdrCfg.ID, utils.ToJSON(cgrEv)))
|
||||
case utils.MetaAuth:
|
||||
authArgs := sessions.NewV1AuthorizeArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliers),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost),
|
||||
cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator,
|
||||
)
|
||||
rply := new(sessions.V1AuthorizeReply)
|
||||
err = sS.Call(utils.SessionSv1AuthorizeEvent,
|
||||
authArgs, rply)
|
||||
case utils.MetaInitiate:
|
||||
initArgs := sessions.NewV1InitSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := new(sessions.V1InitSessionReply)
|
||||
err = sS.Call(utils.SessionSv1InitiateSession,
|
||||
initArgs, rply)
|
||||
case utils.MetaUpdate:
|
||||
updateArgs := sessions.NewV1UpdateSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := new(sessions.V1UpdateSessionReply)
|
||||
err = sS.Call(utils.SessionSv1UpdateSession,
|
||||
updateArgs, rply)
|
||||
case utils.MetaTerminate:
|
||||
terminateArgs := sessions.NewV1TerminateSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := utils.StringPointer("")
|
||||
err = sS.Call(utils.SessionSv1TerminateSession,
|
||||
terminateArgs, rply)
|
||||
case utils.MetaMessage:
|
||||
evArgs := sessions.NewV1ProcessMessageArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliers),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost),
|
||||
cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator)
|
||||
rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone
|
||||
err = sS.Call(utils.SessionSv1ProcessMessage,
|
||||
evArgs, rply)
|
||||
if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
|
||||
cgrEv.Event[utils.Usage] = 0 // avoid further debits
|
||||
} else if rply.MaxUsage != nil {
|
||||
cgrEv.Event[utils.Usage] = *rply.MaxUsage // make sure the CDR reflects the debit
|
||||
}
|
||||
case utils.MetaEvent:
|
||||
evArgs := &sessions.V1ProcessEventArgs{
|
||||
Flags: rdrCfg.Flags.SliceFlags(),
|
||||
CGREvent: cgrEv,
|
||||
ArgDispatcher: cgrArgs.ArgDispatcher,
|
||||
Paginator: *cgrArgs.SupplierPaginator,
|
||||
}
|
||||
rply := new(sessions.V1ProcessEventReply)
|
||||
err = sS.Call(utils.SessionSv1ProcessEvent,
|
||||
evArgs, rply)
|
||||
case utils.MetaCDRs: // allow CDR processing
|
||||
}
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reader <%s>, error: <%s> posting event",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
}
|
||||
// separate request so we can capture the Terminate/Event also here
|
||||
if rdrCfg.Flags.HasKey(utils.MetaCDRs) &&
|
||||
!rdrCfg.Flags.HasKey(utils.MetaDryRun) {
|
||||
rplyCDRs := utils.StringPointer("")
|
||||
if err = sS.Call(utils.SessionSv1ProcessCDR,
|
||||
&utils.CGREventWithArgDispatcher{CGREvent: cgrEv,
|
||||
ArgDispatcher: cgrArgs.ArgDispatcher}, &rplyCDRs); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reader <%s>, error: <%s> posting event",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
// erEvent is passed from reader to ERs
|
||||
type erEvent struct {
|
||||
cgrEvent *utils.CGREvent
|
||||
rdrCfg *config.EventReaderCfg
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user