diff --git a/ers/ers.go b/ers/ers.go index e746cfcca..f6d475bef 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -46,7 +46,9 @@ type ERService struct { sync.RWMutex cfg *config.CGRConfig rdrs map[string]EventReader // map[rdrID]EventReader + rdrPaths map[string]string // used for reloads in case of path changes stopLsn map[string]chan struct{} // map[rdrID] chan struct{} + filterS *engine.FilterS sS rpcclient.RpcClientConnection // connection towards SessionS exitChan chan bool @@ -70,12 +72,12 @@ func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) { // addReader will add a new reader to the service func (erS *ERService) addReader(rdrCfg *config.EventReaderCfg) (err error) { + erS.stopLsn[rdrCfg.ID] = make(chan struct{}) var rdr EventReader - if rdr, err = NewEventReader(rdrCfg); err != nil { + if rdr, err = NewEventReader(rdrCfg, erS.stopLsn[rdrCfg.ID], erS.exitChan); err != nil { return } erS.rdrs[rdrCfg.ID] = rdr - erS.stopLsn[rdrCfg.ID] = make(chan struct{}) return rdr.Subscribe() } @@ -102,6 +104,7 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) { return case <-cfgRldChan: cfgIDs := make(map[string]*config.EventReaderCfg) + pathReloaded := make(map[string]struct{}) // index config IDs for _, rdrCfg := range erS.cfg.ERsCfg().Readers { cfgIDs[rdrCfg.ID] = rdrCfg @@ -109,8 +112,11 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) { erS.Lock() // remove the necessary ids for id := range erS.rdrs { - if _, has := cfgIDs[id]; has { // still present - continue + if newCfg, has := cfgIDs[id]; has { // still present + if newCfg.SourcePath == erS.rdrPaths[id] { + continue + } + pathReloaded[id] = struct{}{} } delete(erS.rdrs, id) close(erS.stopLsn[id]) @@ -119,7 +125,9 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) { // add new ids for id, rdrCfg := range cfgIDs { if _, has := erS.rdrs[id]; has { - continue + if _, has := pathReloaded[id]; !has { + continue + } } if err := erS.addReader(rdrCfg); err != nil { utils.Logger.Crit( diff --git a/ers/filecsv.go b/ers/filecsv.go index 5200efb22..7c32b0898 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -19,28 +19,48 @@ along with this program. If not, see package ers import ( + "fmt" + "strings" + "sync" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) -func NewCSVFileER(cfg *config.EventReaderCfg) (er EventReader, err error) { - return new(CSVFileER), nil +func NewCSVFileER(cfg *config.EventReaderCfg, + rdrExit chan struct{}, appExit chan bool) (er EventReader, err error) { + srcPath := cfg.SourcePath + if strings.HasSuffix(srcPath, utils.Slash) { + srcPath = srcPath[:len(srcPath)-1] + } + return &CSVFileER{erCfg: cfg, rdrDir: srcPath, + rdrExit: rdrExit, appExit: appExit}, nil } // CSVFileER implements EventReader interface for .csv files type CSVFileER struct { + sync.RWMutex + erCfg *config.EventReaderCfg + rdrDir string + rdrExit chan struct{} + appExit chan bool } -func (csv *CSVFileER) Config() (rdrCfg *config.EventReaderCfg) { - return +func (csv *CSVFileER) Config() *config.EventReaderCfg { + return csv.erCfg } -func (csv *CSVFileER) Init(itmPath, itmID string) (err error) { - return -} - -func (csv *CSVFileER) Subscribe() (err error) { - return +func (csv *CSVFileER) Subscribe() error { + go func() { + if err := watchDir(csv.rdrDir, csv.processDir, + utils.ERs, csv.rdrExit); err != nil { + utils.Logger.Crit( + fmt.Sprintf("<%s> watching directory <%s> got error: <%s>", + utils.ERs, csv.rdrDir, err.Error())) + csv.appExit <- true + } + }() + return nil } func (csv *CSVFileER) Read() (ev *utils.CGREvent, err error) { @@ -54,3 +74,7 @@ func (csv *CSVFileER) Processed() (nrItms int64) { func (csv *CSVFileER) Close() (err error) { return } + +func (csv *CSVFileER) processDir(itmPath, itmID string) (err error) { + return +} diff --git a/ers/libers.go b/ers/libers.go index 00bcb5757..40800ba36 100644 --- a/ers/libers.go +++ b/ers/libers.go @@ -22,7 +22,9 @@ import ( "fmt" "path/filepath" + "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" "github.com/fsnotify/fsnotify" ) @@ -58,3 +60,164 @@ func watchDir(dirPath string, f func(itmPath, itmID string) error, } } } + +// processReader will process events from reader and publish them to SessionS +func processReader(rdr EventReader, sS rpcclient.RpcClient, rdrExit chan struct{}) (err error) { + for { // reads until no more events are produced or exit is signaled + select { + case <-rdrExit: + return + default: + } + rdrCfg := rdr.Config() + cgrEv, err := rdr.Read() + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> processing reader <%s>, error: <%s>", + utils.ERs, rdrCfg.ID, err.Error())) + continue + } else if cgrEv == nil { + break // no more events + } + // log the event created if requested by flags + if rdrCfg.Flags.HasKey(utils.MetaLog) { + utils.Logger.Info( + fmt.Sprintf("<%s> LOG, reader: <%s>, message: %s", + utils.ERs, rdrCfg.ID, utils.ToIJSON(cgrEv))) + } + // find out reqType + var reqType string + for _, typ := range []string{ + utils.MetaDryRun, utils.MetaAuth, + utils.MetaInitiate, utils.MetaUpdate, + utils.MetaTerminate, utils.MetaMessage, + utils.MetaCDRs, utils.MetaEvent, utils.META_NONE} { + if rdrCfg.Flags.HasKey(typ) { // request type is identified through flags + reqType = typ + break + } + } + // execute the action based on reqType + cgrArgs := cgrEv.ConsumeArgs( + rdrCfg.Flags.HasKey(utils.MetaDispatchers), + reqType == utils.MetaAuth || + reqType == utils.MetaMessage || + reqType == utils.MetaEvent) + switch reqType { + default: + utils.Logger.Warning( + fmt.Sprintf("<%s> processing reader <%s>, unsupported reqType: <%s>", + utils.ERs, rdrCfg.ID, err.Error())) + continue + case utils.META_NONE: // do nothing on CGRateS side + case utils.MetaDryRun: + utils.Logger.Info( + fmt.Sprintf("<%s> DRYRUN, reader: <%s>, CGREvent: <%s>", + utils.DNSAgent, rdrCfg.ID, utils.ToJSON(cgrEv))) + case utils.MetaAuth: + authArgs := sessions.NewV1AuthorizeArgs( + rdrCfg.Flags.HasKey(utils.MetaAttributes), + rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), + rdrCfg.Flags.HasKey(utils.MetaThresholds), + rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), + rdrCfg.Flags.HasKey(utils.MetaStats), + rdrCfg.Flags.ParamsSlice(utils.MetaStats), + rdrCfg.Flags.HasKey(utils.MetaResources), + rdrCfg.Flags.HasKey(utils.MetaAccounts), + rdrCfg.Flags.HasKey(utils.MetaSuppliers), + rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors), + rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost), + cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator, + ) + rply := new(sessions.V1AuthorizeReply) + err = sS.Call(utils.SessionSv1AuthorizeEvent, + authArgs, rply) + case utils.MetaInitiate: + initArgs := sessions.NewV1InitSessionArgs( + rdrCfg.Flags.HasKey(utils.MetaAttributes), + rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), + rdrCfg.Flags.HasKey(utils.MetaThresholds), + rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), + rdrCfg.Flags.HasKey(utils.MetaStats), + rdrCfg.Flags.ParamsSlice(utils.MetaStats), + rdrCfg.Flags.HasKey(utils.MetaResources), + rdrCfg.Flags.HasKey(utils.MetaAccounts), + cgrEv, cgrArgs.ArgDispatcher) + rply := new(sessions.V1InitSessionReply) + err = sS.Call(utils.SessionSv1InitiateSession, + initArgs, rply) + case utils.MetaUpdate: + updateArgs := sessions.NewV1UpdateSessionArgs( + rdrCfg.Flags.HasKey(utils.MetaAttributes), + rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), + rdrCfg.Flags.HasKey(utils.MetaAccounts), + cgrEv, cgrArgs.ArgDispatcher) + rply := new(sessions.V1UpdateSessionReply) + err = sS.Call(utils.SessionSv1UpdateSession, + updateArgs, rply) + case utils.MetaTerminate: + terminateArgs := sessions.NewV1TerminateSessionArgs( + rdrCfg.Flags.HasKey(utils.MetaAccounts), + rdrCfg.Flags.HasKey(utils.MetaResources), + rdrCfg.Flags.HasKey(utils.MetaThresholds), + rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), + rdrCfg.Flags.HasKey(utils.MetaStats), + rdrCfg.Flags.ParamsSlice(utils.MetaStats), + cgrEv, cgrArgs.ArgDispatcher) + rply := utils.StringPointer("") + err = sS.Call(utils.SessionSv1TerminateSession, + terminateArgs, rply) + case utils.MetaMessage: + evArgs := sessions.NewV1ProcessMessageArgs( + rdrCfg.Flags.HasKey(utils.MetaAttributes), + rdrCfg.Flags.ParamsSlice(utils.MetaAttributes), + rdrCfg.Flags.HasKey(utils.MetaThresholds), + rdrCfg.Flags.ParamsSlice(utils.MetaThresholds), + rdrCfg.Flags.HasKey(utils.MetaStats), + rdrCfg.Flags.ParamsSlice(utils.MetaStats), + rdrCfg.Flags.HasKey(utils.MetaResources), + rdrCfg.Flags.HasKey(utils.MetaAccounts), + rdrCfg.Flags.HasKey(utils.MetaSuppliers), + rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors), + rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost), + cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator) + rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone + err = sS.Call(utils.SessionSv1ProcessMessage, + evArgs, rply) + if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { + cgrEv.Event[utils.Usage] = 0 // avoid further debits + } else if rply.MaxUsage != nil { + cgrEv.Event[utils.Usage] = *rply.MaxUsage // make sure the CDR reflects the debit + } + case utils.MetaEvent: + evArgs := &sessions.V1ProcessEventArgs{ + Flags: rdrCfg.Flags.SliceFlags(), + CGREvent: cgrEv, + ArgDispatcher: cgrArgs.ArgDispatcher, + Paginator: *cgrArgs.SupplierPaginator, + } + rply := new(sessions.V1ProcessEventReply) + err = sS.Call(utils.SessionSv1ProcessEvent, + evArgs, rply) + case utils.MetaCDRs: // allow CDR processing + } + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> reader <%s>, error: <%s> posting event", + utils.ERs, rdrCfg.ID, err.Error())) + } + // separate request so we can capture the Terminate/Event also here + if rdrCfg.Flags.HasKey(utils.MetaCDRs) && + !rdrCfg.Flags.HasKey(utils.MetaDryRun) { + rplyCDRs := utils.StringPointer("") + if err = sS.Call(utils.SessionSv1ProcessCDR, + &utils.CGREventWithArgDispatcher{CGREvent: cgrEv, + ArgDispatcher: cgrArgs.ArgDispatcher}, &rplyCDRs); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> reader <%s>, error: <%s> posting event", + utils.ERs, rdrCfg.ID, err.Error())) + } + } + } + return +} diff --git a/ers/reader.go b/ers/reader.go index 4230c3d0b..fb52ecc8c 100644 --- a/ers/reader.go +++ b/ers/reader.go @@ -26,21 +26,20 @@ import ( ) type EventReader interface { - Config() *config.EventReaderCfg // reader configuration - Init(itmPath, itmName string) error // init will initialize the Reader, ie: open the file to read or http connection - Subscribe() error // subscribe the reader on the path - Read() (*utils.CGREvent, error) // process a single record in the events file - Processed() int64 // number of records processed - Close() error // called when the reader should release resources + Config() *config.EventReaderCfg + Subscribe() error // subscribe the reader on the path + Read() (*utils.CGREvent, error) // process a single record in the events file + Processed() int64 // number of records processed } // NewEventReader instantiates the event reader based on configuration at index -func NewEventReader(rdrCfg *config.EventReaderCfg) (er EventReader, err error) { +func NewEventReader(rdrCfg *config.EventReaderCfg, + rdrExit chan struct{}, appExit chan bool) (er EventReader, err error) { switch rdrCfg.Type { default: err = fmt.Errorf("unsupported reader type: <%s>", rdrCfg.Type) case utils.MetaFileCSV: - return NewCSVFileER(rdrCfg) + return NewCSVFileER(rdrCfg, rdrExit, appExit) } return }