mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-12 02:26:26 +05:00
ERService with processing logic for the generated CGREvent
This commit is contained in:
182
ers/ers.go
182
ers/ers.go
@@ -28,6 +28,7 @@ import (
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/sessions"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
@@ -123,7 +124,7 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) {
|
||||
// index in use IDs
|
||||
for path, rdrs := range erS.rdrs {
|
||||
for i, rdr := range rdrs {
|
||||
inUseIDs[rdr.ID()] = &erCfgRef{path: path, idx: i}
|
||||
inUseIDs[rdr.Config().ID] = &erCfgRef{path: path, idx: i}
|
||||
}
|
||||
}
|
||||
// find out removed ids
|
||||
@@ -204,7 +205,7 @@ func (erS *ERService) watchDir(dirPath string) (err error) {
|
||||
if ev.Op&fsnotify.Create == fsnotify.Create &&
|
||||
path.Ext(ev.Name) == utils.CSVSuffix {
|
||||
go func() { //Enable async processing here
|
||||
if err = erS.processFPath(filepath.Dir(ev.Name), filepath.Base(ev.Name)); err != nil {
|
||||
if err = erS.processPath(filepath.Dir(ev.Name), filepath.Base(ev.Name)); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> processing path <%s>, error: <%s>",
|
||||
utils.ERs, ev.Name, err.Error()))
|
||||
}
|
||||
@@ -216,29 +217,174 @@ func (erS *ERService) watchDir(dirPath string) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
// processFPath is called by inotify or manual handler to process a file with path
|
||||
func (erS *ERService) processFPath(dirPath string, fName string) (err error) {
|
||||
rdrs, has := erS.rdrs[dirPath]
|
||||
// processPath will be called each time a new run should be triggered
|
||||
func (erS *ERService) processPath(itmPath string, itmID string) error {
|
||||
rdrs, has := erS.rdrs[itmPath]
|
||||
if !has {
|
||||
return fmt.Errorf("no reader for path: <%s>", dirPath)
|
||||
return fmt.Errorf("no reader for path: <%s>", itmPath)
|
||||
}
|
||||
fPath := path.Join(dirPath, fName)
|
||||
for _, rdr := range rdrs {
|
||||
if errRdr := rdr.Init(fPath); errRdr != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> processing filePath <%s>, error: <%s>",
|
||||
utils.ERs, fPath, errRdr.Error()))
|
||||
rdrCfg := rdr.Config()
|
||||
if err := rdr.Init(itmPath, itmID); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> init reader <%s>, error: <%s>",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
continue
|
||||
}
|
||||
/*for {
|
||||
if cdr, err := rdr.Read(); err != nil {
|
||||
|
||||
for { // reads until no more events are produced
|
||||
cgrEv, err := rdr.Read()
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> processing reader <%s>, error: <%s>",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
continue
|
||||
} else if cgrEv == nil {
|
||||
break // no more events
|
||||
}
|
||||
// log the event created if requested by flags
|
||||
if rdrCfg.Flags.HasKey(utils.MetaLog) {
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> LOG, reader: <%s>, message: %s",
|
||||
utils.ERs, rdrCfg.ID, utils.ToIJSON(cgrEv)))
|
||||
}
|
||||
// find out reqType
|
||||
var reqType string
|
||||
for _, typ := range []string{
|
||||
utils.MetaDryRun, utils.MetaAuth,
|
||||
utils.MetaInitiate, utils.MetaUpdate,
|
||||
utils.MetaTerminate, utils.MetaMessage,
|
||||
utils.MetaCDRs, utils.MetaEvent, utils.META_NONE} {
|
||||
if rdrCfg.Flags.HasKey(typ) { // request type is identified through flags
|
||||
reqType = typ
|
||||
break
|
||||
}
|
||||
}
|
||||
// execute the action based on reqType
|
||||
cgrArgs := cgrEv.ConsumeArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaDispatchers),
|
||||
reqType == utils.MetaAuth ||
|
||||
reqType == utils.MetaMessage ||
|
||||
reqType == utils.MetaEvent)
|
||||
switch reqType {
|
||||
default:
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> processing reader <%s>, unsupported reqType: <%s>",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
continue
|
||||
case utils.META_NONE: // do nothing on CGRateS side
|
||||
case utils.MetaDryRun:
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> DRYRUN, reader: <%s>, CGREvent: <%s>",
|
||||
utils.DNSAgent, rdrCfg.ID, utils.ToJSON(cgrEv)))
|
||||
case utils.MetaAuth:
|
||||
authArgs := sessions.NewV1AuthorizeArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliers),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost),
|
||||
cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator,
|
||||
)
|
||||
rply := new(sessions.V1AuthorizeReply)
|
||||
err = erS.sS.Call(utils.SessionSv1AuthorizeEvent,
|
||||
authArgs, rply)
|
||||
case utils.MetaInitiate:
|
||||
initArgs := sessions.NewV1InitSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := new(sessions.V1InitSessionReply)
|
||||
err = erS.sS.Call(utils.SessionSv1InitiateSession,
|
||||
initArgs, rply)
|
||||
case utils.MetaUpdate:
|
||||
updateArgs := sessions.NewV1UpdateSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := new(sessions.V1UpdateSessionReply)
|
||||
err = erS.sS.Call(utils.SessionSv1UpdateSession,
|
||||
updateArgs, rply)
|
||||
case utils.MetaTerminate:
|
||||
terminateArgs := sessions.NewV1TerminateSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := utils.StringPointer("")
|
||||
err = erS.sS.Call(utils.SessionSv1TerminateSession,
|
||||
terminateArgs, rply)
|
||||
case utils.MetaMessage:
|
||||
evArgs := sessions.NewV1ProcessMessageArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliers),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost),
|
||||
cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator)
|
||||
rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone
|
||||
err = erS.sS.Call(utils.SessionSv1ProcessMessage,
|
||||
evArgs, rply)
|
||||
if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
|
||||
cgrEv.Event[utils.Usage] = 0 // avoid further debits
|
||||
} else if rply.MaxUsage != nil {
|
||||
cgrEv.Event[utils.Usage] = *rply.MaxUsage // make sure the CDR reflects the debit
|
||||
}
|
||||
case utils.MetaEvent:
|
||||
evArgs := &sessions.V1ProcessEventArgs{
|
||||
Flags: rdrCfg.Flags.SliceFlags(),
|
||||
CGREvent: cgrEv,
|
||||
ArgDispatcher: cgrArgs.ArgDispatcher,
|
||||
Paginator: *cgrArgs.SupplierPaginator,
|
||||
}
|
||||
rply := new(sessions.V1ProcessEventReply)
|
||||
err = erS.sS.Call(utils.SessionSv1ProcessEvent,
|
||||
evArgs, rply)
|
||||
case utils.MetaCDRs: // allow CDR processing
|
||||
}
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reader <%s>, error: <%s> posting event",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
}
|
||||
// separate request so we can capture the Terminate/Event also here
|
||||
if rdrCfg.Flags.HasKey(utils.MetaCDRs) &&
|
||||
!rdrCfg.Flags.HasKey(utils.MetaDryRun) {
|
||||
rplyCDRs := utils.StringPointer("")
|
||||
if err = erS.sS.Call(utils.SessionSv1ProcessCDR,
|
||||
&utils.CGREventWithArgDispatcher{CGREvent: cgrEv,
|
||||
ArgDispatcher: cgrArgs.ArgDispatcher}, &rplyCDRs); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reader <%s>, error: <%s> posting event",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
*/
|
||||
if errRdr := rdr.Close(); errRdr != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> closing filePath <%s>, error: <%s>",
|
||||
utils.ERs, fPath, errRdr.Error()))
|
||||
if err := rdr.Close(); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> closing reader <%s>, error: <%s>",
|
||||
utils.ERs, rdr.Config().ID, err.Error()))
|
||||
}
|
||||
}
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -31,15 +31,11 @@ func NewCSVFileER(cfg *config.EventReaderCfg) (er EventReader, err error) {
|
||||
type CSVFileER struct {
|
||||
}
|
||||
|
||||
func (csv *CSVFileER) ID() (id string) {
|
||||
return
|
||||
}
|
||||
|
||||
func (csv *CSVFileER) Config() (rdrCfg *config.EventReaderCfg) {
|
||||
return
|
||||
}
|
||||
|
||||
func (csv *CSVFileER) Init(args interface{}) (err error) {
|
||||
func (csv *CSVFileER) Init(itmPath, itmID string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -26,12 +26,11 @@ import (
|
||||
)
|
||||
|
||||
type EventReader interface {
|
||||
ID() string // configuration identifier
|
||||
Config() *config.EventReaderCfg // reader configuration
|
||||
Init(args interface{}) error // init will initialize the Reader, ie: open the file to read or http connection
|
||||
Read() (*utils.CGREvent, error) // process a single record in the events file
|
||||
Processed() int64 // number of records processed
|
||||
Close() error // called when the reader should release resources
|
||||
Config() *config.EventReaderCfg // reader configuration
|
||||
Init(itmPath, itmName string) error // init will initialize the Reader, ie: open the file to read or http connection
|
||||
Read() (*utils.CGREvent, error) // process a single record in the events file
|
||||
Processed() int64 // number of records processed
|
||||
Close() error // called when the reader should release resources
|
||||
}
|
||||
|
||||
// NewEventReader instantiates the event reader based on configuration at index
|
||||
|
||||
Reference in New Issue
Block a user