diff --git a/apier/v1/ers.go b/apier/v1/ers.go index 7e9cbd5aa..5f99ce36c 100644 --- a/apier/v1/ers.go +++ b/apier/v1/ers.go @@ -43,7 +43,6 @@ func (eeSv1 *ErSv1) Ping(ctx *context.Context, ign *utils.CGREvent, reply *strin // // Note: This API is not safe to call concurrently for the same reader. Ensure the current files finish being // processed before calling again. -func (eeSv1 *ErSv1) RunReader(ctx *context.Context, args utils.StringWithAPIOpts, - reply *string) error { - return eeSv1.erS.V1RunReader(ctx, args, reply) +func (eeSv1 *ErSv1) RunReader(ctx *context.Context, params ers.V1RunReaderParams, reply *string) error { + return eeSv1.erS.V1RunReader(ctx, params, reply) } diff --git a/ers/ers.go b/ers/ers.go index 28266dd99..664369c92 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -81,15 +81,23 @@ type ERService struct { partialCache *ltcache.Cache } +// V1RunReaderParams contains required parameters for an ErSv1.RunReader request. +type V1RunReaderParams struct { + Tenant string + ID string // unique identifier of the request + ReaderID string + APIOpts map[string]any +} + // V1RunReader processes files in the configured directory for the given reader. This function handles files // based on the reader's type and configuration. Only available for readers that are not processing files // automatically (RunDelay should equal 0). // // Note: This API is not safe to call concurrently for the same reader. Ensure the current files finish being // processed before calling again. -func (erS *ERService) V1RunReader(ctx *context.Context, rdrID utils.StringWithAPIOpts, reply *string) error { - rdrCfg := erS.cfg.ERsCfg().ReaderCfg(rdrID.Arg) - er, has := erS.rdrs[rdrID.Arg] +func (erS *ERService) V1RunReader(ctx *context.Context, params V1RunReaderParams, reply *string) error { + rdrCfg := erS.cfg.ERsCfg().ReaderCfg(params.ReaderID) + er, has := erS.rdrs[params.ReaderID] if !has || rdrCfg == nil { return utils.ErrNotFound } @@ -98,13 +106,13 @@ func (erS *ERService) V1RunReader(ctx *context.Context, rdrID utils.StringWithAP } switch rdr := er.(type) { case *CSVFileER: - processReaderDir(rdr.dir, utils.CSVSuffix, rdr.processFile) + processReaderDir(rdr.sourceDir, utils.CSVSuffix, rdr.processFile) case *XMLFileER: - processReaderDir(rdr.dir, utils.XMLSuffix, rdr.processFile) + processReaderDir(rdr.sourceDir, utils.XMLSuffix, rdr.processFile) case *FWVFileER: - processReaderDir(rdr.dir, utils.FWVSuffix, rdr.processFile) + processReaderDir(rdr.sourceDir, utils.FWVSuffix, rdr.processFile) case *JSONFileER: - processReaderDir(rdr.dir, utils.JSNSuffix, rdr.processFile) + processReaderDir(rdr.sourceDir, utils.JSNSuffix, rdr.processFile) default: return errors.New("reader type does not yet support manual processing") } diff --git a/ers/ers_it_test.go b/ers/ers_it_test.go index 36a9c0658..0be185359 100644 --- a/ers/ers_it_test.go +++ b/ers/ers_it_test.go @@ -86,10 +86,7 @@ func TestERsAddReader(t *testing.T) { func TestERsListenAndServeErr(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers = []*config.EventReaderCfg{ - { - ID: "", - Type: "", - }, + {}, } fltrS := &engine.FilterS{} srv := NewERService(cfg, fltrS, nil) @@ -141,7 +138,6 @@ func TestERsListenAndServeRdrErr(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers = []*config.EventReaderCfg{ { - ID: "", Type: utils.MetaNone, }, } @@ -162,7 +158,6 @@ func TestERsListenAndServeStopchan(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers = []*config.EventReaderCfg{ { - ID: "", Type: utils.MetaNone, }, } @@ -182,7 +177,6 @@ func TestERsListenAndServeRdrEvents(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers = []*config.EventReaderCfg{ { - ID: "", Type: utils.MetaNone, }, } @@ -193,16 +187,8 @@ func TestERsListenAndServeRdrEvents(t *testing.T) { srv.rdrErr = make(chan error, 1) srv.rdrEvents = make(chan *erEvent, 1) srv.rdrEvents <- &erEvent{ - cgrEvent: &utils.CGREvent{ - Tenant: "", - ID: "", - Time: nil, - Event: nil, - APIOpts: nil, - }, - rdrCfg: &config.EventReaderCfg{ - ID: "", - }, + cgrEvent: &utils.CGREvent{}, + rdrCfg: &config.EventReaderCfg{}, } go func() { time.Sleep(10 * time.Millisecond) @@ -218,7 +204,6 @@ func TestERsListenAndServeCfgRldChan(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers = []*config.EventReaderCfg{ { - ID: "", Type: utils.MetaNone, }, } @@ -351,14 +336,7 @@ func TestERsListenAndServeCfgRldChan5(t *testing.T) { fltrS := &engine.FilterS{} srv := NewERService(cfg, fltrS, nil) exp := &CSVFileER{ - cgrCfg: cfg, - cfgIdx: 0, - fltrS: nil, - dir: "", - rdrEvents: nil, - rdrError: nil, - rdrExit: nil, - conReqs: nil, + cgrCfg: cfg, } var evRdr EventReader = exp srv.rdrs = map[string]EventReader{ @@ -438,13 +416,7 @@ func TestERsProcessEvent(t *testing.T) { }, }, } - cgrEvent := &utils.CGREvent{ - Tenant: "", - ID: "", - Time: nil, - Event: nil, - APIOpts: nil, - } + cgrEvent := &utils.CGREvent{} err := srv.processEvent(cgrEvent, rdrCfg) if err == nil || err.Error() != "unsupported reqType: <>" { t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", "unsupported reqType: <>", err) @@ -467,13 +439,7 @@ func TestERsProcessEvent2(t *testing.T) { }, }, } - cgrEvent := &utils.CGREvent{ - Tenant: "", - ID: "", - Time: nil, - Event: nil, - APIOpts: nil, - } + cgrEvent := &utils.CGREvent{} err := srv.processEvent(cgrEvent, rdrCfg) if err != nil { t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", nil, err) @@ -550,10 +516,6 @@ func TestERsProcessEvent5(t *testing.T) { }, } cgrEvent := &utils.CGREvent{ - Tenant: "", - ID: "", - Time: nil, - Event: nil, APIOpts: map[string]any{ utils.OptsRoutesLimit: true, }, @@ -633,10 +595,6 @@ func TestERsProcessEvent8(t *testing.T) { }, } cgrEvent := &utils.CGREvent{ - Tenant: "", - ID: "", - Time: nil, - Event: nil, APIOpts: map[string]any{ utils.OptsRoutesLimit: true, }, @@ -664,10 +622,6 @@ func TestERsProcessEvent9(t *testing.T) { }, } cgrEvent := &utils.CGREvent{ - Tenant: "", - ID: "", - Time: nil, - Event: nil, APIOpts: map[string]any{ utils.OptsRoutesLimit: true, }, @@ -696,9 +650,6 @@ func TestERsProcessEvent10(t *testing.T) { }, } cgrEvent := &utils.CGREvent{ - Tenant: "", - ID: "", - Time: nil, Event: map[string]any{ utils.Usage: time.Second, }, @@ -753,9 +704,6 @@ func TestERsProcessEvent11(t *testing.T) { }, } cgrEvent := &utils.CGREvent{ - Tenant: "", - ID: "", - Time: nil, Event: map[string]any{ utils.Usage: 0, }, diff --git a/ers/filecsv.go b/ers/filecsv.go index 8b140e5cf..4098e6273 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -44,7 +44,7 @@ func NewCSVFileER(cfg *config.CGRConfig, cfgIdx int, cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, - dir: srcPath, + sourceDir: srcPath, rdrEvents: rdrEvents, partialEvents: partialEvents, rdrError: rdrErr, @@ -62,13 +62,12 @@ type CSVFileER struct { cgrCfg *config.CGRConfig cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - dir string + sourceDir string // path to the directory monitored by the reader for new events rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to rdrError chan error rdrExit chan struct{} conReqs chan struct{} // limit number of opened files - } func (rdr *CSVFileER) Config() *config.EventReaderCfg { @@ -84,11 +83,11 @@ func (rdr *CSVFileER) serveDefault() { tm.Stop() utils.Logger.Info( fmt.Sprintf("<%s> stop monitoring path <%s>", - utils.ERs, rdr.dir)) + utils.ERs, rdr.sourceDir)) return case <-tm.C: } - processReaderDir(rdr.dir, utils.CSVSuffix, rdr.processFile) + processReaderDir(rdr.sourceDir, utils.CSVSuffix, rdr.processFile) tm.Reset(rdr.Config().RunDelay) } } @@ -98,7 +97,7 @@ func (rdr *CSVFileER) Serve() (err error) { case time.Duration(0): // 0 disables the automatic read, maybe done per API return case time.Duration(-1): - return utils.WatchDir(rdr.dir, rdr.processFile, + return utils.WatchDir(rdr.sourceDir, rdr.processFile, utils.ERs, rdr.rdrExit) default: go rdr.serveDefault() @@ -112,7 +111,7 @@ func (rdr *CSVFileER) processFile(fName string) (err error) { processFile := <-rdr.conReqs // Queue here for maxOpenFiles defer func() { rdr.conReqs <- processFile }() } - absPath := path.Join(rdr.dir, fName) + absPath := path.Join(rdr.sourceDir, fName) utils.Logger.Info( fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) var file *os.File diff --git a/ers/filecsv_it_test.go b/ers/filecsv_it_test.go index 241ecfd71..ff7ebd8c7 100644 --- a/ers/filecsv_it_test.go +++ b/ers/filecsv_it_test.go @@ -395,7 +395,7 @@ func TestFileCSVProcessEvent(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: filePath, + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -532,7 +532,7 @@ func TestFileCSVProcessEventError(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: filePath, + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -565,7 +565,7 @@ func TestFileCSVProcessEventError2(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: filePath, + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -609,7 +609,7 @@ func TestFileCSVProcessEventError3(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: filePath, + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -642,7 +642,7 @@ func TestFileCSVDirErr(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/ers/out/", + sourceDir: "/tmp/ers/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -662,7 +662,7 @@ func TestFileCSV(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/ers/out/", + sourceDir: "/tmp/ers/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -682,7 +682,7 @@ func TestFileCSVServeDefault(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/ers/out/", + sourceDir: "/tmp/ers/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -719,7 +719,7 @@ func TestFileCSVExit(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/ers/out/", + sourceDir: "/tmp/ers/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), diff --git a/ers/filefwv.go b/ers/filefwv.go index 0cfcc81cc..27cb2a281 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -45,7 +45,7 @@ func NewFWVFileER(cfg *config.CGRConfig, cfgIdx int, cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, - dir: srcPath, + sourceDir: srcPath, rdrEvents: rdrEvents, partialEvents: partialEvents, rdrError: rdrErr, @@ -64,7 +64,7 @@ type FWVFileER struct { cgrCfg *config.CGRConfig cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - dir string + sourceDir string // path to the directory monitored by the reader for new events rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to rdrError chan error @@ -92,11 +92,11 @@ func (rdr *FWVFileER) serveDefault() { tm.Stop() utils.Logger.Info( fmt.Sprintf("<%s> stop monitoring path <%s>", - utils.ERs, rdr.dir)) + utils.ERs, rdr.sourceDir)) return case <-tm.C: } - processReaderDir(rdr.dir, utils.FWVSuffix, rdr.processFile) + processReaderDir(rdr.sourceDir, utils.FWVSuffix, rdr.processFile) tm.Reset(rdr.Config().RunDelay) } } @@ -106,7 +106,7 @@ func (rdr *FWVFileER) Serve() (err error) { case time.Duration(0): // 0 disables the automatic read, maybe done per API return case time.Duration(-1): - return utils.WatchDir(rdr.dir, rdr.processFile, + return utils.WatchDir(rdr.sourceDir, rdr.processFile, utils.ERs, rdr.rdrExit) default: go rdr.serveDefault() @@ -120,7 +120,7 @@ func (rdr *FWVFileER) processFile(fName string) (err error) { processFile := <-rdr.conReqs // Queue here for maxOpenFiles defer func() { rdr.conReqs <- processFile }() } - absPath := path.Join(rdr.dir, fName) + absPath := path.Join(rdr.sourceDir, fName) utils.Logger.Info( fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) var file *os.File diff --git a/ers/filefwv_it_test.go b/ers/filefwv_it_test.go index eec5787f8..76b87d5b0 100644 --- a/ers/filefwv_it_test.go +++ b/ers/filefwv_it_test.go @@ -211,9 +211,9 @@ func TestNewFWVFileER(t *testing.T) { cfgIdx := 0 cfg.ERsCfg().Readers[cfgIdx].SourcePath = "/" expected := &FWVFileER{ - cgrCfg: cfg, - cfgIdx: cfgIdx, - dir: "", + cgrCfg: cfg, + cfgIdx: cfgIdx, + sourceDir: "", } cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs = 1 result, err := NewFWVFileER(cfg, cfgIdx, nil, nil, nil, nil, nil) @@ -285,7 +285,7 @@ func TestFileFWVProcessEvent(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: filePath, + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -350,7 +350,7 @@ func TestFileFWV(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/fwvErs/out", + sourceDir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -385,7 +385,7 @@ func TestFileFWVServeDefault(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/fwvErs/out", + sourceDir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -421,7 +421,7 @@ func TestFileFWVExit(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/fwvErs/out", + sourceDir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -445,7 +445,7 @@ func TestFileFWVProcessTrailer(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/fwvErs/out", + sourceDir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -505,7 +505,7 @@ func TestFileFWVProcessTrailerError1(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/fwvErs/out", + sourceDir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -541,7 +541,7 @@ func TestFileFWVProcessTrailerError2(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/fwvErs/out", + sourceDir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -588,7 +588,7 @@ func TestFileFWVProcessTrailerError3(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/fwvErs/out", + sourceDir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -619,7 +619,7 @@ func TestFileFWVCreateHeaderMap(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/fwvErs/out", + sourceDir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -668,7 +668,7 @@ func TestFileFWVCreateHeaderMapError1(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/fwvErs/out", + sourceDir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -694,7 +694,7 @@ func TestFileFWVCreateHeaderMapError2(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/fwvErs/out", + sourceDir: "/tmp/fwvErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), diff --git a/ers/filejson.go b/ers/filejson.go index b0ffee8c6..d81bbf548 100644 --- a/ers/filejson.go +++ b/ers/filejson.go @@ -46,7 +46,7 @@ func NewJSONFileER(cfg *config.CGRConfig, cfgIdx int, cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, - dir: srcPath, + sourceDir: srcPath, rdrEvents: rdrEvents, partialEvents: partialEvents, rdrError: rdrErr, @@ -65,7 +65,7 @@ type JSONFileER struct { cgrCfg *config.CGRConfig cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - dir string + sourceDir string // path to the directory monitored by the reader for new events rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to rdrError chan error @@ -86,11 +86,11 @@ func (rdr *JSONFileER) serveDefault() { tm.Stop() utils.Logger.Info( fmt.Sprintf("<%s> stop monitoring path <%s>", - utils.ERs, rdr.dir)) + utils.ERs, rdr.sourceDir)) return case <-tm.C: } - processReaderDir(rdr.dir, utils.JSNSuffix, rdr.processFile) + processReaderDir(rdr.sourceDir, utils.JSNSuffix, rdr.processFile) tm.Reset(rdr.Config().RunDelay) } } @@ -100,7 +100,7 @@ func (rdr *JSONFileER) Serve() (err error) { case time.Duration(0): // 0 disables the automatic read, maybe done per API return case time.Duration(-1): - return utils.WatchDir(rdr.dir, rdr.processFile, + return utils.WatchDir(rdr.sourceDir, rdr.processFile, utils.ERs, rdr.rdrExit) default: go rdr.serveDefault() @@ -114,7 +114,7 @@ func (rdr *JSONFileER) processFile(fName string) (err error) { processFile := <-rdr.conReqs // Queue here for maxOpenFiles defer func() { rdr.conReqs <- processFile }() } - absPath := path.Join(rdr.dir, fName) + absPath := path.Join(rdr.sourceDir, fName) utils.Logger.Info( fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) var file *os.File diff --git a/ers/filejson_it_test.go b/ers/filejson_it_test.go index f7bb00553..5ac9faaff 100644 --- a/ers/filejson_it_test.go +++ b/ers/filejson_it_test.go @@ -338,7 +338,7 @@ func TestFileJSONProcessEvent(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: filePath, + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -391,7 +391,7 @@ func TestFileJSONProcessEventReadError(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: filePath, + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -439,7 +439,7 @@ func TestFileJSONProcessEventError2(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: filePath, + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -498,7 +498,7 @@ func TestFileJSONProcessEventError3(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: filePath, + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -531,7 +531,7 @@ func TestFileJSON(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/ErsJSON/out/", + sourceDir: "/tmp/ErsJSON/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -550,7 +550,7 @@ func TestFileJSONServeDefault(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/ErsJSON/out/", + sourceDir: "/tmp/ErsJSON/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -586,7 +586,7 @@ func TestFileJSONExit(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/ErsJSON/out/", + sourceDir: "/tmp/ErsJSON/out/", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), diff --git a/ers/filexml.go b/ers/filexml.go index 09a363bb7..9c7514a52 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -46,7 +46,7 @@ func NewXMLFileER(cfg *config.CGRConfig, cfgIdx int, cgrCfg: cfg, cfgIdx: cfgIdx, fltrS: fltrS, - dir: srcPath, + sourceDir: srcPath, rdrEvents: rdrEvents, partialEvents: partialEvents, rdrError: rdrErr, @@ -65,7 +65,7 @@ type XMLFileER struct { cgrCfg *config.CGRConfig cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - dir string + sourceDir string // path to the directory monitored by the reader for new events rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to rdrError chan error @@ -82,7 +82,7 @@ func (rdr *XMLFileER) Serve() (err error) { case time.Duration(0): // 0 disables the automatic read, maybe done per API return case time.Duration(-1): - return utils.WatchDir(rdr.dir, rdr.processFile, + return utils.WatchDir(rdr.sourceDir, rdr.processFile, utils.ERs, rdr.rdrExit) default: go func() { @@ -94,11 +94,11 @@ func (rdr *XMLFileER) Serve() (err error) { tm.Stop() utils.Logger.Info( fmt.Sprintf("<%s> stop monitoring path <%s>", - utils.ERs, rdr.dir)) + utils.ERs, rdr.sourceDir)) return case <-tm.C: } - processReaderDir(rdr.dir, utils.XMLSuffix, rdr.processFile) + processReaderDir(rdr.sourceDir, utils.XMLSuffix, rdr.processFile) tm.Reset(rdr.Config().RunDelay) } }() @@ -132,7 +132,7 @@ func (rdr *XMLFileER) processFile(fName string) error { processFile := <-rdr.conReqs // Queue here for maxOpenFiles defer func() { rdr.conReqs <- processFile }() } - absPath := path.Join(rdr.dir, fName) + absPath := path.Join(rdr.sourceDir, fName) utils.Logger.Info( fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) file, err := os.Open(absPath) diff --git a/ers/filexml_it_test.go b/ers/filexml_it_test.go index 930021a03..d4395db39 100644 --- a/ers/filexml_it_test.go +++ b/ers/filexml_it_test.go @@ -349,7 +349,7 @@ func TestNewXMLFileER(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/xmlErs/out", + sourceDir: "/tmp/xmlErs/out", rdrEvents: nil, rdrError: nil, rdrExit: nil, @@ -402,7 +402,7 @@ func TestFileXMLProcessEvent(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: filePath, + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -458,7 +458,7 @@ func TestFileXMLProcessEventError1(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: filePath, + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -504,7 +504,7 @@ func TestFileXMLProcessEVentError2(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: filePath, + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -554,7 +554,7 @@ func TestFileXMLProcessEventParseError(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: filePath, + sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), @@ -579,14 +579,14 @@ func TestFileXML(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/xmlErs/out", + sourceDir: "/tmp/xmlErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } eR.conReqs <- struct{}{} - err := os.MkdirAll(eR.dir, 0777) + err := os.MkdirAll(eR.sourceDir, 0777) if err != nil { t.Error(err) } @@ -604,7 +604,7 @@ func TestFileXML(t *testing.T) { eR.Config().Fields[0].ComputePath() for i := 1; i < 4; i++ { - if _, err := os.Create(path.Join(eR.dir, fmt.Sprintf("file%d.xml", i))); err != nil { + if _, err := os.Create(path.Join(eR.sourceDir, fmt.Sprintf("file%d.xml", i))); err != nil { t.Error(err) } } @@ -618,7 +618,7 @@ func TestFileXML(t *testing.T) { if err := eR.Serve(); err != nil { t.Error(err) } - os.Create(path.Join(eR.dir, "file1.txt")) + os.Create(path.Join(eR.sourceDir, "file1.txt")) eR.Config().RunDelay = 1 * time.Millisecond if err := eR.Serve(); err != nil { t.Error(err) @@ -632,14 +632,14 @@ func TestFileXMLError(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/xmlErsError/out", + sourceDir: "/tmp/xmlErsError/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } eR.conReqs <- struct{}{} - err := os.MkdirAll(eR.dir, 0777) + err := os.MkdirAll(eR.sourceDir, 0777) if err != nil { t.Error(err) } @@ -657,11 +657,11 @@ func TestFileXMLError(t *testing.T) { eR.Config().Fields[0].ComputePath() for i := 1; i < 4; i++ { - if _, err := os.Create(path.Join(eR.dir, fmt.Sprintf("file%d.xml", i))); err != nil { + if _, err := os.Create(path.Join(eR.sourceDir, fmt.Sprintf("file%d.xml", i))); err != nil { t.Error(err) } } - os.Create(path.Join(eR.dir, "file1.txt")) + os.Create(path.Join(eR.sourceDir, "file1.txt")) eR.Config().RunDelay = 1 * time.Millisecond if err := eR.Serve(); err != nil { t.Error(err) @@ -675,7 +675,7 @@ func TestFileXMLExit(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, - dir: "/tmp/xmlErs/out", + sourceDir: "/tmp/xmlErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), diff --git a/ers/filexml_test.go b/ers/filexml_test.go index 42a883b37..80ab66ed3 100644 --- a/ers/filexml_test.go +++ b/ers/filexml_test.go @@ -32,7 +32,7 @@ func TestERSNewXMLFileER(t *testing.T) { cgrCfg: cfg, cfgIdx: 0, fltrS: nil, - dir: "/var/spool/cgrates/ers/in", + sourceDir: "/var/spool/cgrates/ers/in", rdrEvents: nil, rdrError: nil, rdrExit: nil, diff --git a/ers/readers_test.go b/ers/readers_test.go index 0d68a2aaa..7d4e671e4 100644 --- a/ers/readers_test.go +++ b/ers/readers_test.go @@ -55,7 +55,7 @@ func TestNewCsvReader(t *testing.T) { cgrCfg: cfg, cfgIdx: 1, fltrS: fltr, - dir: cfg.ERsCfg().Readers[1].SourcePath, + sourceDir: cfg.ERsCfg().Readers[1].SourcePath, rdrEvents: nil, rdrError: nil, rdrExit: nil,