diff --git a/ers/partial_csv_it_test.go b/ers/partial_csv_it_test.go
index a22fb627e..7ec34b67c 100644
--- a/ers/partial_csv_it_test.go
+++ b/ers/partial_csv_it_test.go
@@ -21,6 +21,10 @@ along with this program. If not, see
package ers
import (
+ "bytes"
+ "fmt"
+ "io"
+ "log"
"net/rpc"
"os"
"path"
@@ -566,3 +570,266 @@ func TestPartialCSVServe2(t *testing.T) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "no such file or directory", err)
}
}
+
+func TestPartialCSVServe5(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltrs := &engine.FilterS{}
+ eR := &PartialCSVFileER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/partErs1/out",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ eR.conReqs <- struct{}{}
+ filePath := "/tmp/partErs1/out"
+ err := os.MkdirAll(filePath, 0777)
+ if err != nil {
+ t.Error(err)
+ }
+ for i := 1; i < 4; i++ {
+ if _, err := os.Create(path.Join(filePath, fmt.Sprintf("file%d.csv", i))); err != nil {
+ t.Error(err)
+ }
+ }
+ eR.Config().RunDelay = 1 * time.Millisecond
+ if err := eR.Serve(); err != nil {
+ t.Error(err)
+ }
+ os.Create(path.Join(filePath, "file1.txt"))
+ eR.Config().RunDelay = 1 * time.Millisecond
+ if err := eR.Serve(); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestPartialCSVProcessEvent(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ cfg.ERsCfg().Readers[0].ProcessedPath = ""
+ fltrs := &engine.FilterS{}
+ filePath := "/tmp/TestPartialCSVProcessEvent/"
+ if err := os.MkdirAll(filePath, 0777); err != nil {
+ t.Error(err)
+ }
+ file, err := os.Create(path.Join(filePath, "file1.csv"))
+ if err != nil {
+ t.Error(err)
+ }
+ file.Write([]byte(",a,ToR,b,c,d,e,f,g,h,i,j,k,l"))
+ file.Close()
+ eR := &PartialCSVFileER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/partErs1/out",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ eR.conReqs <- struct{}{}
+ fname := "file1.csv"
+ errExpect := io.EOF
+ if err := eR.processFile(filePath, fname); err == nil || err != errExpect {
+ t.Errorf("Expected %v but received %v", errExpect, err)
+ }
+ if err := os.RemoveAll(filePath); err != nil {
+ t.Error(err)
+ }
+ value := []*utils.CGREvent{
+ {
+ Tenant: "cgrates.org",
+ Event: map[string]interface{}{
+ "Partial": true,
+ },
+ },
+ {
+ Tenant: "cgrates2.org",
+ Event: map[string]interface{}{
+ "Partial": true,
+ },
+ },
+ }
+ eR.Config().ProcessedPath = "/tmp"
+ eR.dumpToFile("ID1", value)
+}
+
+func TestPartialCSVDumpToFileErr1(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltrs := &engine.FilterS{}
+ var err error
+ utils.Logger, err = utils.Newlogger(utils.MetaStdLog, utils.EmptyString)
+ if err != nil {
+ t.Error(err)
+ }
+ utils.Logger.SetLogLevel(7)
+ buf := new(bytes.Buffer)
+ log.SetOutput(buf)
+ eR := &PartialCSVFileER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/partErs1/out",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ eR.conReqs <- struct{}{}
+ value := []*utils.CGREvent{
+ {
+ Event: map[string]interface{}{
+ "Partial": true,
+ },
+ },
+ }
+ //ProcessedPath is not declared in order to trigger the
+ //file creation error
+ eR.dumpToFile("ID1", value)
+ errExpect := "[ERROR] Failed creating /var/spool/cgrates/ers/out/.tmp."
+ if rcv := buf.String(); !strings.Contains(rcv, errExpect) {
+ t.Errorf("\nExpected %v but \nreceived %v", errExpect, rcv)
+ }
+ value = []*utils.CGREvent{
+ {
+ Event: map[string]interface{}{
+ //Value is false in order to stop
+ //further execution
+ "Partial": false,
+ },
+ },
+ }
+ eR.dumpToFile("ID1", value)
+ eR.postCDR("ID1", value)
+}
+
+func TestPartialCSVDumpToFileErr2(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltrs := &engine.FilterS{}
+ var err error
+ utils.Logger, err = utils.Newlogger(utils.MetaStdLog, utils.EmptyString)
+ if err != nil {
+ t.Error(err)
+ }
+ utils.Logger.SetLogLevel(7)
+ buf := new(bytes.Buffer)
+ log.SetOutput(buf)
+ eR := &PartialCSVFileER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/partErs1/out",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ eR.conReqs <- struct{}{}
+ value := []*utils.CGREvent{
+ {
+ Event: map[string]interface{}{
+ //Value of field is string in order to trigger
+ //the converting error
+ "Partial": "notBool",
+ },
+ },
+ }
+ eR.dumpToFile("ID1", value)
+ errExpect := `[WARNING] Converting Event : <{"Partial":"notBool"}> to cdr , ignoring due to error: `
+ if rcv := buf.String(); !strings.Contains(rcv, errExpect) {
+ t.Errorf("\nExpected %v but \nreceived %v", errExpect, rcv)
+ }
+}
+
+func TestPartialCSVDumpToFileErr3(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltrs := &engine.FilterS{}
+ var err error
+ utils.Logger, err = utils.Newlogger(utils.MetaStdLog, utils.EmptyString)
+ if err != nil {
+ t.Error(err)
+ }
+ utils.Logger.SetLogLevel(7)
+ buf := new(bytes.Buffer)
+ log.SetOutput(buf)
+ eR := &PartialCSVFileER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/partErs1/out",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ eR.conReqs <- struct{}{}
+ value := []*utils.CGREvent{
+ {
+ Tenant: "cgrates.org",
+ Event: map[string]interface{}{
+ "Partial": true,
+ },
+ },
+ //Added a second event in order to pass the length check
+ {
+ Tenant: "cgrates2.org",
+ Event: map[string]interface{}{
+ "Partial": "notBool",
+ },
+ },
+ }
+ eR.Config().ProcessedPath = "/tmp"
+ eR.dumpToFile("ID1", value)
+ errExpect := `[WARNING] Converting Event : <{"Partial":"notBool"}> to cdr , ignoring due to error: `
+ if rcv := buf.String(); !strings.Contains(rcv, errExpect) {
+ t.Errorf("\nExpected %v but \nreceived %v", errExpect, rcv)
+ }
+}
+
+func TestPartialCSVPostCDR(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltrs := &engine.FilterS{}
+ eR := &PartialCSVFileER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/partErs1/out",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ eR.conReqs <- struct{}{}
+
+ value := []*utils.CGREvent{
+ {
+ Tenant: "cgrates.org",
+ Event: map[string]interface{}{
+ "Partial": true,
+ },
+ APIOpts: map[string]interface{}{
+ "Opt1": "testOpt",
+ },
+ },
+ }
+ expEvent := &utils.CGREvent{
+ Tenant: "cgrates.org",
+ Event: value[0].Event,
+ APIOpts: value[0].APIOpts,
+ }
+ eR.postCDR("ID1", nil)
+ eR.postCDR("ID1", value)
+ select {
+ case data := <-eR.rdrEvents:
+ expEvent.ID = data.cgrEvent.ID
+ expEvent.Time = data.cgrEvent.Time
+ if !reflect.DeepEqual(expEvent, data.cgrEvent) {
+ t.Errorf("\nExpected %v but \nreceived %v", expEvent, data.cgrEvent)
+ }
+ case <-time.After(50 * time.Millisecond):
+ t.Error("Time limit exceeded")
+ }
+}
diff --git a/ers/readers_test.go b/ers/readers_test.go
index f1849dfca..7542834db 100644
--- a/ers/readers_test.go
+++ b/ers/readers_test.go
@@ -243,3 +243,54 @@ func TestNewJSONReader(t *testing.T) {
}
}
}
+
+func TestNewAMQPReader(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltr := &engine.FilterS{}
+ cfg.ERsCfg().Readers[0].Type = utils.MetaAMQPjsonMap
+ cfg.ERsCfg().Readers[0].ConcurrentReqs = -1
+ exp := &AMQPER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltr,
+ rdrEvents: nil,
+ rdrExit: nil,
+ rdrErr: nil,
+ }
+ exp.dialURL = exp.Config().SourcePath
+ exp.Config().ProcessedPath = ""
+ exp.setOpts(map[string]interface{}{})
+ exp.createPoster()
+ var expected EventReader = exp
+ rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil)
+ if err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(expected, rcv) {
+ t.Errorf("Expected %v but received %v", expected, rcv)
+ }
+}
+
+func TestNewAMQPv1Reader(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltr := &engine.FilterS{}
+ cfg.ERsCfg().Readers[0].Type = utils.MetaAMQPV1jsonMap
+ cfg.ERsCfg().Readers[0].ConcurrentReqs = -1
+ exp := &AMQPv1ER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltr,
+ rdrEvents: nil,
+ rdrExit: nil,
+ rdrErr: nil,
+ }
+ exp.Config().ProcessedPath = ""
+ exp.Config().Opts = map[string]interface{}{}
+ exp.createPoster()
+ var expected EventReader = exp
+ rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil)
+ if err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(expected, rcv) {
+ t.Errorf("Expected \n%v but received \n%v", expected, rcv)
+ }
+}