diff --git a/ers/flatstore_it_test.go b/ers/flatstore_it_test.go
index 63b3d6720..137d90457 100644
--- a/ers/flatstore_it_test.go
+++ b/ers/flatstore_it_test.go
@@ -21,9 +21,12 @@ along with this program. If not, see
package ers
import (
+ "fmt"
+ "io"
"net/rpc"
"os"
"path"
+ "reflect"
"testing"
"time"
@@ -208,3 +211,184 @@ func testFlatstoreITKillEngine(t *testing.T) {
t.Error(err)
}
}
+
+func TestFlatstoreProcessEventError(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltrs := &engine.FilterS{}
+ filePath := "/tmp/invalid/"
+ fname := "file1.csv"
+ eR := &CSVFileER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/flatstoreErs/out",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ eR.conReqs <- struct{}{}
+ errExpect := "open /tmp/invalid/file1.csv: no such file or directory"
+ if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect {
+ t.Errorf("Expected %v but received %v", errExpect, err)
+ }
+}
+
+func TestFlatstoreProcessEvent(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ cfg.ERsCfg().Readers[0].ProcessedPath = ""
+ fltrs := &engine.FilterS{}
+ filePath := "/tmp/TestFlatstoreProcessEvent/"
+ if err := os.MkdirAll(filePath, 0777); err != nil {
+ t.Error(err)
+ }
+ file, err := os.Create(path.Join(filePath, "file1.csv"))
+ if err != nil {
+ t.Error(err)
+ }
+ file.Write([]byte(",a,ToR,b,c,d,e,f,g,h,i,j,k,l"))
+ file.Close()
+ eR := &FlatstoreER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/flatstoreErs/out",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ expEvent := &utils.CGREvent{
+ Tenant: "cgrates.org",
+ Event: map[string]interface{}{
+ utils.AccountField: "g",
+ utils.AnswerTime: "k",
+ utils.Category: "f",
+ utils.Destination: "i",
+ utils.OriginID: "b",
+ utils.RequestType: "c",
+ utils.SetupTime: "j",
+ utils.Subject: "h",
+ utils.Tenant: "e",
+ utils.ToR: "ToR",
+ utils.Usage: "0",
+ },
+ APIOpts: map[string]interface{}{},
+ }
+ eR.conReqs <- struct{}{}
+ fname := "file1.csv"
+ errExpect := io.EOF
+ // eR.Config().FailedCallsPrefix = "randomPrefix"
+ if err := eR.processFile(filePath, fname); err == nil || err != errExpect {
+ t.Errorf("Expected %v but received %v", errExpect, err)
+ }
+ select {
+ case data := <-eR.rdrEvents:
+ expEvent.ID = data.cgrEvent.ID
+ expEvent.Time = data.cgrEvent.Time
+ if !reflect.DeepEqual(data.cgrEvent, expEvent) {
+ t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent))
+ }
+ case <-time.After(50 * time.Millisecond):
+ t.Error("Time limit exceeded")
+ }
+ if err := os.RemoveAll(filePath); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestFlatstore(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltrs := &engine.FilterS{}
+ eR := &FlatstoreER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/flatstoreErs/out",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ eR.conReqs <- struct{}{}
+ filePath := "/tmp/flatstoreErs/out"
+ err := os.MkdirAll(filePath, 0777)
+ if err != nil {
+ t.Error(err)
+ }
+ for i := 1; i < 4; i++ {
+ if _, err := os.Create(path.Join(filePath, fmt.Sprintf("file%d.csv", i))); err != nil {
+ t.Error(err)
+ }
+ }
+ eR.Config().RunDelay = 1 * time.Millisecond
+ if err := eR.Serve(); err != nil {
+ t.Error(err)
+ }
+ os.Create(path.Join(filePath, "file1.txt"))
+ eR.Config().RunDelay = 1 * time.Millisecond
+ if err := eR.Serve(); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestFileFlatstoreExit(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltrs := &engine.FilterS{}
+ eR := &FlatstoreER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/flatstoreErs/out",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ eR.conReqs <- struct{}{}
+ eR.Config().RunDelay = 1 * time.Millisecond
+ if err := eR.Serve(); err != nil {
+ t.Error(err)
+ }
+ eR.rdrExit <- struct{}{}
+}
+
+func TestFlatstoreServeErrTimeDurationNeg1(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ cfgIdx := 0
+ rdr, err := NewFlatstoreER(cfg, cfgIdx, nil, nil, nil, nil)
+ if err != nil {
+ t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
+ }
+ rdr.Config().RunDelay = time.Duration(-1)
+ expected := "no such file or directory"
+ err = rdr.Serve()
+ if err == nil || err.Error() != expected {
+ t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
+ }
+}
+func TestDumpToFileErr(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltrs := &engine.FilterS{}
+ eR := &FlatstoreER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/flatstoreErs/out",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ part1 := &UnpairedRecord{
+ Method: "BYE",
+ Values: []string{"value1", "value2", "value3", "value4", "value5"},
+ }
+ eR.conReqs <- struct{}{}
+ eR.dumpToFile("ID1", part1)
+ errExpect := "open /var/spool/cgrates/ers/out/.tmp: no such file or directory"
+ _, err := os.Open(path.Join(eR.Config().ProcessedPath, part1.FileName+utils.TmpSuffix))
+ if err == nil || err.Error() != errExpect {
+ t.Errorf("Expected %v but received %v", errExpect, err)
+ }
+}
diff --git a/ers/flatstore_test.go b/ers/flatstore_test.go
index 9c2535691..8425296f2 100644
--- a/ers/flatstore_test.go
+++ b/ers/flatstore_test.go
@@ -23,6 +23,7 @@ import (
"testing"
"time"
+ "github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cgrates/config"
@@ -120,3 +121,116 @@ func TestFlatstoreServeNil(t *testing.T) {
t.Errorf("\nExpected: <%+v>, \nreceived: <%+v>", nil, err)
}
}
+
+func TestNewUnpairedRecordErrTimezone(t *testing.T) {
+ record := []string{"TEST1", "TEST2", "TEST3", "TEST4", "TEST5", "TEST6", "TEST7"}
+ timezone, _ := time.Time.Zone(time.Now())
+ fileName := "testfile.csv"
+ errExpect := "unknown time zone EEST"
+ _, err := NewUnpairedRecord(record, timezone, fileName)
+ if err == nil || err.Error() != errExpect {
+ t.Errorf("Expected %v but received %v", errExpect, err)
+ }
+}
+
+func TestNewUnpairedRecordErr(t *testing.T) {
+ record := []string{"invalid"}
+ timezone, _ := time.Time.Zone(time.Now())
+ fileName := "testfile.csv"
+ errExpect := "MISSING_IE"
+ _, err := NewUnpairedRecord(record, timezone, fileName)
+ if err == nil || err.Error() != errExpect {
+ t.Errorf("Expected %v but received %v", errExpect, err)
+ }
+}
+
+func TestPairToRecord(t *testing.T) {
+ part1 := &UnpairedRecord{
+ Method: "INVITE",
+ Values: []string{"value1", "value2", "value3", "value4", "value5", "value6"},
+ }
+ part2 := &UnpairedRecord{
+ Method: "BYE",
+ Values: []string{"value1", "value2", "value3", "value4", "value5", "value6"},
+ }
+ rcv, err := pairToRecord(part1, part2)
+ rcvExpect := append(part1.Values, "0")
+ if err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(rcv, rcvExpect) {
+ t.Errorf("Expected %v but received %v", rcvExpect, rcv)
+ }
+
+ part1.Values = append(part1.Values, "value7", "value8")
+ part2.Values = append(part2.Values, "value7", "value8")
+ _, err = pairToRecord(part1, part2)
+ if err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(part1.Values[7], part2.Values[7]) {
+ t.Errorf("Last INVITE value does not match last BYE value")
+ }
+
+ cfg := config.NewDefaultCGRConfig()
+ fltrs := &engine.FilterS{}
+ eR := &FlatstoreER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/flatstoreErs/out",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ eR.conReqs <- struct{}{}
+ eR.Config().ProcessedPath = "/tmp"
+ part1.FileName = "testfile"
+ eR.dumpToFile("ID1", nil)
+ eR.dumpToFile("ID1", part1)
+ part1.Values = []string{"\n"}
+ eR.dumpToFile("ID1", part1)
+}
+
+func TestPairToRecordReverse(t *testing.T) {
+ part1 := &UnpairedRecord{
+ Method: "BYE",
+ Values: []string{"value1", "value2", "value3", "value4", "value5"},
+ }
+ part2 := &UnpairedRecord{
+ Method: "INVITE",
+ Values: []string{"value1", "value2", "value3", "value4", "value5"},
+ }
+ rcv, err := pairToRecord(part1, part2)
+ rcvExpect := append(part1.Values, "0")
+ if err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(rcv, rcvExpect) {
+ t.Errorf("Expected %v but received %v", rcvExpect, rcv)
+ }
+}
+
+func TestPairToRecordErrors(t *testing.T) {
+ part1 := &UnpairedRecord{
+ Method: "INVITE",
+ Values: []string{"value1", "value2", "value3", "value4", "value5"},
+ }
+ part2 := &UnpairedRecord{
+ Method: "INVITE",
+ }
+ errExpect := "MISSING_BYE"
+ if _, err := pairToRecord(part1, part2); err == nil || err.Error() != errExpect {
+ t.Errorf("Expected %v but received %v", errExpect, err)
+ }
+
+ errExpect = "INCONSISTENT_VALUES_LENGTH"
+ part2.Method = "BYE"
+ if _, err := pairToRecord(part1, part2); err == nil || err.Error() != errExpect {
+ t.Errorf("Expected %v but received %v", errExpect, err)
+ }
+
+ part1.Method = "BYE"
+ errExpect = "MISSING_INVITE"
+ if _, err := pairToRecord(part1, part2); err == nil || err.Error() != errExpect {
+ t.Errorf("Expected %v but received %v", errExpect, err)
+ }
+}
diff --git a/ers/readers_test.go b/ers/readers_test.go
index cf4f9b07e..f1849dfca 100644
--- a/ers/readers_test.go
+++ b/ers/readers_test.go
@@ -25,6 +25,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/ltcache"
)
func TestNewInvalidReader(t *testing.T) {
@@ -136,3 +137,109 @@ func TestNewSQLReaderError(t *testing.T) {
t.Errorf("Expecting: <%+v>, received: <%+v>", expected, err)
}
}
+
+func TestNewPartialCSVReader(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltr := &engine.FilterS{}
+ cfg.ERsCfg().Readers[0].Type = utils.MetaPartialCSV
+ cfg.ERsCfg().Readers[0].SourcePath = "/tmp/ers/in"
+ cfg.ERsCfg().Readers[0].ConcurrentReqs = 1024
+ expected, err := NewPartialCSVFileER(cfg, 0, nil, nil, fltr, nil)
+ if err != nil {
+ t.Errorf("Expecting: , received: <%+v>", err)
+ }
+ rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil)
+ if err != nil {
+ t.Error(err)
+ } else {
+ rcv.(*PartialCSVFileER).conReqs = nil
+ rcv.(*PartialCSVFileER).cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, nil)
+ expected.(*PartialCSVFileER).conReqs = nil
+ expected.(*PartialCSVFileER).cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, nil)
+ if !reflect.DeepEqual(expected, rcv) {
+ t.Errorf("Expecting %v but received %v", expected, rcv)
+ }
+ }
+}
+
+func TestNewFileXMLReader(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltr := &engine.FilterS{}
+ cfg.ERsCfg().Readers[0].Type = utils.MetaFileXML
+ expected, err := NewXMLFileER(cfg, 0, nil, nil, fltr, nil)
+ if err != nil {
+ t.Error(err)
+ }
+ rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil)
+ if err != nil {
+ t.Error(err)
+ } else {
+ rcv.(*XMLFileER).conReqs = nil
+ expected.(*XMLFileER).conReqs = nil
+ if !reflect.DeepEqual(expected, rcv) {
+ t.Errorf("Expecting %v but received %v", expected, rcv)
+ }
+ }
+}
+
+func TestNewFileFWVReader(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltr := &engine.FilterS{}
+ cfg.ERsCfg().Readers[0].Type = utils.MetaFileFWV
+ expected, err := NewFWVFileER(cfg, 0, nil, nil, fltr, nil)
+ if err != nil {
+ t.Error(err)
+ }
+ rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil)
+ if err != nil {
+ t.Error(nil)
+ } else {
+ rcv.(*FWVFileER).conReqs = nil
+ expected.(*FWVFileER).conReqs = nil
+ if !reflect.DeepEqual(expected, rcv) {
+ t.Errorf("Expecting %v but received %v", expected, rcv)
+ }
+ }
+}
+
+func TestNewFlatstoreReader(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltr := &engine.FilterS{}
+ cfg.ERsCfg().Readers[0].Type = utils.MetaFlatstore
+ expected, err := NewFlatstoreER(cfg, 0, nil, nil, fltr, nil)
+ if err != nil {
+ t.Error(err)
+ }
+ rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil)
+ if err != nil {
+ t.Error(err)
+ } else {
+ rcv.(*FlatstoreER).conReqs = nil
+ expected.(*FlatstoreER).conReqs = nil
+ rcv.(*FlatstoreER).cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, nil)
+ expected.(*FlatstoreER).cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, nil)
+ if !reflect.DeepEqual(expected, rcv) {
+ t.Errorf("Expecting %v but received %v", expected, rcv)
+ }
+ }
+}
+
+func TestNewJSONReader(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltr := &engine.FilterS{}
+ cfg.ERsCfg().Readers[0].Type = utils.MetaFileJSON
+ expected, err := NewJSONFileER(cfg, 0, nil, nil, fltr, nil)
+ if err != nil {
+ t.Error(err)
+ }
+ rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil)
+ if err != nil {
+ t.Error(err)
+ } else {
+ rcv.(*JSONFileER).conReqs = nil
+ expected.(*JSONFileER).conReqs = nil
+ if !reflect.DeepEqual(expected, rcv) {
+ t.Errorf("Expecting %v but received %v", expected, rcv)
+ }
+ }
+}