diff --git a/ers/ers.go b/ers/ers.go index adc0cd57e..ebe20db1e 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -28,6 +28,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" "github.com/fsnotify/fsnotify" @@ -123,7 +124,7 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) { // index in use IDs for path, rdrs := range erS.rdrs { for i, rdr := range rdrs { - inUseIDs[rdr.ID()] = &erCfgRef{path: path, idx: i} + inUseIDs[rdr.Config().ID] = &erCfgRef{path: path, idx: i} } } // find out removed ids @@ -204,7 +205,7 @@ func (erS *ERService) watchDir(dirPath string) (err error) { if ev.Op&fsnotify.Create == fsnotify.Create && path.Ext(ev.Name) == utils.CSVSuffix { go func() { //Enable async processing here - if err = erS.processFPath(filepath.Dir(ev.Name), filepath.Base(ev.Name)); err != nil { + if err = erS.processPath(filepath.Dir(ev.Name), filepath.Base(ev.Name)); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> processing path <%s>, error: <%s>", utils.ERs, ev.Name, err.Error())) } @@ -216,29 +217,174 @@ func (erS *ERService) watchDir(dirPath string) (err error) { } } -// processFPath is called by inotify or manual handler to process a file with path -func (erS *ERService) processFPath(dirPath string, fName string) (err error) { - rdrs, has := erS.rdrs[dirPath] +// processPath will be called each time a new run should be triggered +func (erS *ERService) processPath(itmPath string, itmID string) error { + rdrs, has := erS.rdrs[itmPath] if !has { - return fmt.Errorf("no reader for path: <%s>", dirPath) + return fmt.Errorf("no reader for path: <%s>", itmPath) } - fPath := path.Join(dirPath, fName) for _, rdr := range rdrs { - if errRdr := rdr.Init(fPath); errRdr != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> processing filePath <%s>, error: <%s>", - utils.ERs, fPath, errRdr.Error())) + rdrCfg := rdr.Config() + if err := rdr.Init(itmPath, itmID); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> init reader <%s>, error: <%s>", + utils.ERs, rdrCfg.ID, err.Error())) continue } - /*for { - if cdr, err := rdr.Read(); err != nil { - + for { // reads until no more events are produced + cgrEv, err := rdr.Read() + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> processing reader <%s>, error: <%s>", + utils.ERs, rdrCfg.ID, err.Error())) + continue + } else if cgrEv == nil { + break // no more events } + // log the event created if requested by flags + if rdrCfg.Flags.HasKey(utils.MetaLog) { + utils.Logger.Info( + fmt.Sprintf("<%s> LOG, reader: <%s>, message: %s", + utils.ERs, rdrCfg.ID, utils.ToIJSON(cgrEv))) + } + // find out reqType + var reqType string + for _, typ := range []string{ + utils.MetaDryRun, utils.MetaAuth, + utils.MetaInitiate, utils.MetaUpdate, + utils.MetaTerminate, utils.MetaMessage, + utils.MetaCDRs, utils.MetaEvent, utils.META_NONE} { + if rdrCfg.Flags.HasKey(typ) { // request type is identified through flags + reqType = typ + break + } + } + // execute the action based on reqType + cgrArgs := cgrEv.ConsumeArgs( + rdrCfg.Flags.HasKey(utils.MetaDispatchers), + reqType == utils.MetaAuth || + reqType == utils.MetaMessage || + reqType == utils.MetaEvent) + switch reqType { + default: + utils.Logger.Warning( + fmt.Sprintf("<%s> processing reader <%s>, unsupported reqType: <%s>", + utils.ERs, rdrCfg.ID, err.Error())) + continue + case utils.META_NONE: // do nothing on CGRateS side + case utils.MetaDryRun: + utils.Logger.Info( + fmt.Sprintf("<%s> DRYRUN, reader: <%s>, CGREvent: <%s>", + utils.DNSAgent, rdrCfg.ID, utils.ToJSON(cgrEv))) + case utils.MetaAuth: + authArgs := sessions.NewV1AuthorizeArgs( + rdrCfg.Flags.HasKey(utils.MetaAttributes), + rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), + rdrCfg.Flags.HasKey(utils.MetaThresholds), + rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), + rdrCfg.Flags.HasKey(utils.MetaStats), + rdrCfg.Flags.ParamsSlice(utils.MetaStats), + rdrCfg.Flags.HasKey(utils.MetaResources), + rdrCfg.Flags.HasKey(utils.MetaAccounts), + rdrCfg.Flags.HasKey(utils.MetaSuppliers), + rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors), + rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost), + cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator, + ) + rply := new(sessions.V1AuthorizeReply) + err = erS.sS.Call(utils.SessionSv1AuthorizeEvent, + authArgs, rply) + case utils.MetaInitiate: + initArgs := sessions.NewV1InitSessionArgs( + rdrCfg.Flags.HasKey(utils.MetaAttributes), + rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), + rdrCfg.Flags.HasKey(utils.MetaThresholds), + rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), + rdrCfg.Flags.HasKey(utils.MetaStats), + rdrCfg.Flags.ParamsSlice(utils.MetaStats), + rdrCfg.Flags.HasKey(utils.MetaResources), + rdrCfg.Flags.HasKey(utils.MetaAccounts), + cgrEv, cgrArgs.ArgDispatcher) + rply := new(sessions.V1InitSessionReply) + err = erS.sS.Call(utils.SessionSv1InitiateSession, + initArgs, rply) + case utils.MetaUpdate: + updateArgs := sessions.NewV1UpdateSessionArgs( + rdrCfg.Flags.HasKey(utils.MetaAttributes), + rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), + rdrCfg.Flags.HasKey(utils.MetaAccounts), + cgrEv, cgrArgs.ArgDispatcher) + rply := new(sessions.V1UpdateSessionReply) + err = erS.sS.Call(utils.SessionSv1UpdateSession, + updateArgs, rply) + case utils.MetaTerminate: + terminateArgs := sessions.NewV1TerminateSessionArgs( + rdrCfg.Flags.HasKey(utils.MetaAccounts), + rdrCfg.Flags.HasKey(utils.MetaResources), + rdrCfg.Flags.HasKey(utils.MetaThresholds), + rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), + rdrCfg.Flags.HasKey(utils.MetaStats), + rdrCfg.Flags.ParamsSlice(utils.MetaStats), + cgrEv, cgrArgs.ArgDispatcher) + rply := utils.StringPointer("") + err = erS.sS.Call(utils.SessionSv1TerminateSession, + terminateArgs, rply) + case utils.MetaMessage: + evArgs := sessions.NewV1ProcessMessageArgs( + rdrCfg.Flags.HasKey(utils.MetaAttributes), + rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), + rdrCfg.Flags.HasKey(utils.MetaThresholds), + rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), + rdrCfg.Flags.HasKey(utils.MetaStats), + rdrCfg.Flags.ParamsSlice(utils.MetaStats), + rdrCfg.Flags.HasKey(utils.MetaResources), + rdrCfg.Flags.HasKey(utils.MetaAccounts), + rdrCfg.Flags.HasKey(utils.MetaSuppliers), + rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors), + rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost), + cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator) + rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone + err = erS.sS.Call(utils.SessionSv1ProcessMessage, + evArgs, rply) + if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { + cgrEv.Event[utils.Usage] = 0 // avoid further debits + } else if rply.MaxUsage != nil { + cgrEv.Event[utils.Usage] = *rply.MaxUsage // make sure the CDR reflects the debit + } + case utils.MetaEvent: + evArgs := &sessions.V1ProcessEventArgs{ + Flags: rdrCfg.Flags.SliceFlags(), + CGREvent: cgrEv, + ArgDispatcher: cgrArgs.ArgDispatcher, + Paginator: *cgrArgs.SupplierPaginator, + } + rply := new(sessions.V1ProcessEventReply) + err = erS.sS.Call(utils.SessionSv1ProcessEvent, + evArgs, rply) + case utils.MetaCDRs: // allow CDR processing + } + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> reader <%s>, error: <%s> posting event", + utils.ERs, rdrCfg.ID, err.Error())) + } + // separate request so we can capture the Terminate/Event also here + if rdrCfg.Flags.HasKey(utils.MetaCDRs) && + !rdrCfg.Flags.HasKey(utils.MetaDryRun) { + rplyCDRs := utils.StringPointer("") + if err = erS.sS.Call(utils.SessionSv1ProcessCDR, + &utils.CGREventWithArgDispatcher{CGREvent: cgrEv, + ArgDispatcher: cgrArgs.ArgDispatcher}, &rplyCDRs); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> reader <%s>, error: <%s> posting event", + utils.ERs, rdrCfg.ID, err.Error())) + } + } + } - */ - if errRdr := rdr.Close(); errRdr != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> closing filePath <%s>, error: <%s>", - utils.ERs, fPath, errRdr.Error())) + if err := rdr.Close(); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> closing reader <%s>, error: <%s>", + utils.ERs, rdr.Config().ID, err.Error())) } } - return + return nil } diff --git a/ers/filecsv.go b/ers/filecsv.go index dc8959ad2..4629ae368 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -31,15 +31,11 @@ func NewCSVFileER(cfg *config.EventReaderCfg) (er EventReader, err error) { type CSVFileER struct { } -func (csv *CSVFileER) ID() (id string) { - return -} - func (csv *CSVFileER) Config() (rdrCfg *config.EventReaderCfg) { return } -func (csv *CSVFileER) Init(args interface{}) (err error) { +func (csv *CSVFileER) Init(itmPath, itmID string) (err error) { return } diff --git a/ers/reader.go b/ers/reader.go index fc21e01ac..3c81d5852 100644 --- a/ers/reader.go +++ b/ers/reader.go @@ -26,12 +26,11 @@ import ( ) type EventReader interface { - ID() string // configuration identifier - Config() *config.EventReaderCfg // reader configuration - Init(args interface{}) error // init will initialize the Reader, ie: open the file to read or http connection - Read() (*utils.CGREvent, error) // process a single record in the events file - Processed() int64 // number of records processed - Close() error // called when the reader should release resources + Config() *config.EventReaderCfg // reader configuration + Init(itmPath, itmName string) error // init will initialize the Reader, ie: open the file to read or http connection + Read() (*utils.CGREvent, error) // process a single record in the events file + Processed() int64 // number of records processed + Close() error // called when the reader should release resources } // NewEventReader instantiates the event reader based on configuration at index