diff --git a/ers/filecsv.go b/ers/filecsv.go index f3300125b..8bb314b54 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -146,6 +146,7 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { var record []string if record, err = csvReader.Read(); err != nil { if err == io.EOF { + err = nil //If it reaches the end of the file, return nil break } return diff --git a/ers/filecsv_it_test.go b/ers/filecsv_it_test.go index be1acced2..16847cf28 100644 --- a/ers/filecsv_it_test.go +++ b/ers/filecsv_it_test.go @@ -21,7 +21,6 @@ package ers import ( "fmt" - "io" "net/rpc" "os" "path" @@ -377,6 +376,7 @@ func testCsvITKillEngine(t *testing.T) { func TestFileCSVProcessEvent(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers[0].ProcessedPath = "" + cfg.ERsCfg().Readers[0].HeaderDefineChar = ":" fltrs := &engine.FilterS{} filePath := "/tmp/TestFileCSVProcessEvent/" if err := os.MkdirAll(filePath, 0777); err != nil { @@ -386,7 +386,8 @@ func TestFileCSVProcessEvent(t *testing.T) { if err != nil { t.Error(err) } - file.Write([]byte(",a,ToR,b,c,d,e,f,g,h,i,j,k,l")) + file.Write([]byte(`:Test,,*voice,OriginCDR1,*prepaid,,cgrates.org,*call,1001,SUBJECT_TEST_1001,1002,2021-01-07 17:00:02 +0000 UTC,2021-01-07 17:00:04 +0000 UTC,1h2m + :Test2,,*voice,OriginCDR1,*prepaid,,cgrates.org,*call,1001,SUBJECT_TEST_1001,1002,2021-01-07 17:00:02 +0000 UTC,2021-01-07 17:00:04 +0000 UTC,1h2m`)) file.Close() eR := &CSVFileER{ cgrCfg: cfg, @@ -401,25 +402,109 @@ func TestFileCSVProcessEvent(t *testing.T) { expEvent := &utils.CGREvent{ Tenant: "cgrates.org", Event: map[string]interface{}{ - utils.AccountField: "g", - utils.AnswerTime: "k", - utils.Category: "f", - utils.Destination: "i", - utils.OriginID: "b", - utils.RequestType: "c", - utils.SetupTime: "j", - utils.Subject: "h", - utils.Tenant: "e", - utils.ToR: "ToR", - utils.Usage: "l", + utils.AccountField: "1001", + utils.AnswerTime: "2021-01-07 17:00:04 +0000 UTC", + utils.Category: "*call", + utils.Destination: "1002", + utils.OriginID: "OriginCDR1", + utils.RequestType: "*prepaid", + utils.SetupTime: "2021-01-07 17:00:02 +0000 UTC", + utils.Subject: "SUBJECT_TEST_1001", + utils.Tenant: "cgrates.org", + utils.ToR: "*voice", + utils.Usage: "1h2m", }, APIOpts: map[string]interface{}{}, } eR.conReqs <- struct{}{} + + eR.Config().Fields = []*config.FCTemplate{ + { + Tag: "ToR", + Type: utils.MetaVariable, + Path: "*cgreq.ToR", + Value: config.NewRSRParsersMustCompile("~*req.2", utils.InfieldSep), + Mandatory: true, + }, + { + Tag: "OriginID", + Type: utils.MetaVariable, + Path: "*cgreq.OriginID", + Value: config.NewRSRParsersMustCompile("~*req.3", utils.InfieldSep), + Mandatory: true, + }, + { + Tag: "RequestType", + Type: utils.MetaVariable, + Path: "*cgreq.RequestType", + Value: config.NewRSRParsersMustCompile("~*req.4", utils.InfieldSep), + Mandatory: true, + }, + { + Tag: "Tenant", + Type: utils.MetaVariable, + Path: "*cgreq.Tenant", + Value: config.NewRSRParsersMustCompile("~*req.6", utils.InfieldSep), + Mandatory: true, + }, + { + Tag: "Category", + Type: utils.MetaVariable, + Path: "*cgreq.Category", + Value: config.NewRSRParsersMustCompile("~*req.7", utils.InfieldSep), + Mandatory: true, + }, + { + Tag: "Account", + Type: utils.MetaVariable, + Path: "*cgreq.Account", + Value: config.NewRSRParsersMustCompile("~*req.8", utils.InfieldSep), + Mandatory: true, + }, + { + Tag: "Subject", + Type: utils.MetaVariable, + Path: "*cgreq.Subject", + Value: config.NewRSRParsersMustCompile("~*req.9", utils.InfieldSep), + Mandatory: true, + }, + { + Tag: "Destination", + Type: utils.MetaVariable, + Path: "*cgreq.Destination", + Value: config.NewRSRParsersMustCompile("~*req.10", utils.InfieldSep), + Mandatory: true, + }, + { + Tag: "SetupTime", + Type: utils.MetaVariable, + Path: "*cgreq.SetupTime", + Value: config.NewRSRParsersMustCompile("~*req.11", utils.InfieldSep), + Mandatory: true, + }, + { + Tag: "AnswerTime", + Type: utils.MetaVariable, + Path: "*cgreq.AnswerTime", + Value: config.NewRSRParsersMustCompile("~*req.12", utils.InfieldSep), + Mandatory: true, + }, + { + Tag: "Usage", + Type: utils.MetaVariable, + Path: "*cgreq.Usage", + Value: config.NewRSRParsersMustCompile("~*req.13", utils.InfieldSep), + Mandatory: true, + }, + } + + for idx := range eR.Config().Fields { + eR.Config().Fields[idx].ComputePath() + } + fname := "file1.csv" - errExpect := io.EOF - if err := eR.processFile(filePath, fname); err == nil || err != errExpect { - t.Errorf("Expected %v but received %v", errExpect, err) + if err := eR.processFile(filePath, fname); err != nil { + t.Error(err) } select { case data := <-eR.rdrEvents: @@ -458,6 +543,96 @@ func TestFileCSVProcessEventError(t *testing.T) { } } +func TestFileCSVProcessEventError2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := &engine.FilterS{} + filePath := "/tmp/TestFileCSVProcessEvent/" + fname := "file1.csv" + if err := os.MkdirAll(filePath, 0777); err != nil { + t.Error(err) + } + file, err := os.Create(path.Join(filePath, "file1.csv")) + if err != nil { + t.Error(err) + } + file.Write([]byte(`#ToR,OriginID,RequestType,Tenant,Category,Account,Subject,Destination,SetupTime,AnswerTime,Usage + ,,*voice,OriginCDR1,*prepaid,,cgrates.org,*call,1001,SUBJECT_TEST_1001,1002,2021-01-07 17:00:02 +0000 UTC,2021-01-07 17:00:04 +0000 UTC,1h2m`)) + file.Close() + eR := &CSVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/ers/out/", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + + eR.Config().Fields = []*config.FCTemplate{ + {}, + } + + errExpect := "unsupported type: <>" + if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) + } +} + +func TestFileCSVProcessEventError3(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.ERsCfg().Readers[0].Fields = []*config.FCTemplate{} + data := engine.NewInternalDB(nil, nil, true) + dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := engine.NewFilterS(cfg, nil, dm) + filePath := "/tmp/TestFileCSVProcessEvent/" + fname := "file1.csv" + if err := os.MkdirAll(filePath, 0777); err != nil { + t.Error(err) + } + file, err := os.Create(path.Join(filePath, "file1.csv")) + if err != nil { + t.Error(err) + } + file.Write([]byte(`#ToR,OriginID,RequestType,Tenant,Category,Account,Subject,Destination,SetupTime,AnswerTime,Usage + ,,*voice,OriginCDR1,*prepaid,,cgrates.org,*call,1001,SUBJECT_TEST_1001,1002,2021-01-07 17:00:02 +0000 UTC,2021-01-07 17:00:04 +0000 UTC,1h2m`)) + file.Close() + eR := &CSVFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/ers/out/", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + + // + eR.Config().Filters = []string{"Filter1"} + errExpect := "NOT_FOUND:Filter1" + if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } + + // + eR.Config().Filters = []string{"*exists:~*req..Account:"} + errExpect = "Invalid fieldPath [ Account]" + if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) + } +} + func TestFileCSVDirErr(t *testing.T) { cfg := config.NewDefaultCGRConfig() fltrs := &engine.FilterS{} @@ -503,9 +678,6 @@ func TestFileCSV(t *testing.T) { } } eR.Config().RunDelay = 1 * time.Millisecond - if err := eR.Serve(); err != nil { - t.Error(err) - } os.Create(path.Join(filePath, "file1.txt")) eR.Config().RunDelay = 1 * time.Millisecond if err := eR.Serve(); err != nil { diff --git a/ers/filejson_it_test.go b/ers/filejson_it_test.go index 88b7ac1e5..deaa58d1b 100644 --- a/ers/filejson_it_test.go +++ b/ers/filejson_it_test.go @@ -403,6 +403,126 @@ func TestFileJSONProcessEventReadError(t *testing.T) { } } +func TestFileJSONProcessEventError2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := &engine.FilterS{} + filePath := "/tmp/TestFileJSONProcessEvent/" + if err := os.MkdirAll(filePath, 0777); err != nil { + t.Error(err) + } + file, err := os.Create(path.Join(filePath, "file1.json")) + if err != nil { + t.Error(err) + } + fcTemp := map[string]interface{}{ + "2": "tor_test", + "3": "originid_test", + "4": "requestType_test", + "6": "tenant_test", + "7": "category_test", + "8": "account_test", + "9": "subject_test", + "10": "destination_test", + "11": "setupTime_test", + "12": "answerTime_test", + "13": "usage_test", + } + rcv, err := json.Marshal(fcTemp) + if err != nil { + t.Error(err) + } + file.Write([]byte(rcv)) + file.Close() + eR := &JSONFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/ErsJSON/out/", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + + eR.Config().Fields = []*config.FCTemplate{ + {}, + } + fname := "file1.json" + errExpect := "unsupported type: <>" + if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) + } +} + +func TestFileJSONProcessEventError3(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.ERsCfg().Readers[0].Fields = []*config.FCTemplate{} + data := engine.NewInternalDB(nil, nil, true) + dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := engine.NewFilterS(cfg, nil, dm) + filePath := "/tmp/TestFileJSONProcessEvent/" + if err := os.MkdirAll(filePath, 0777); err != nil { + t.Error(err) + } + file, err := os.Create(path.Join(filePath, "file1.json")) + if err != nil { + t.Error(err) + } + fcTemp := map[string]interface{}{ + "2": "tor_test", + "3": "originid_test", + "4": "requestType_test", + "6": "tenant_test", + "7": "category_test", + "8": "account_test", + "9": "subject_test", + "10": "destination_test", + "11": "setupTime_test", + "12": "answerTime_test", + "13": "usage_test", + } + rcv, err := json.Marshal(fcTemp) + if err != nil { + t.Error(err) + } + file.Write([]byte(rcv)) + file.Close() + eR := &JSONFileER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/ErsJSON/out/", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + fname := "file1.json" + + // + eR.Config().Filters = []string{"Filter1"} + errExpect := "NOT_FOUND:Filter1" + if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } + + // + eR.Config().Filters = []string{"*exists:~*req..Account:"} + if err := eR.processFile(filePath, fname); err != nil { + t.Error(err) + } + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) + } +} + func TestFileJSON(t *testing.T) { cfg := config.NewDefaultCGRConfig() fltrs := &engine.FilterS{}