diff --git a/ers/filecsv_it_test.go b/ers/filecsv_it_test.go
index 7f8d17955..be1acced2 100644
--- a/ers/filecsv_it_test.go
+++ b/ers/filecsv_it_test.go
@@ -458,7 +458,7 @@ func TestFileCSVProcessEventError(t *testing.T) {
}
}
-func TestFileCSV(t *testing.T) {
+func TestFileCSVDirErr(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}
eR := &CSVFileER{
@@ -477,6 +477,21 @@ func TestFileCSV(t *testing.T) {
if err := eR.Serve(); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
+}
+func TestFileCSV(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltrs := &engine.FilterS{}
+ eR := &CSVFileER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/ers/out/",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ eR.conReqs <- struct{}{}
filePath := "/tmp/ers/out/"
err := os.MkdirAll(filePath, 0777)
if err != nil {
@@ -491,6 +506,11 @@ func TestFileCSV(t *testing.T) {
if err := eR.Serve(); err != nil {
t.Error(err)
}
+ os.Create(path.Join(filePath, "file1.txt"))
+ eR.Config().RunDelay = 1 * time.Millisecond
+ if err := eR.Serve(); err != nil {
+ t.Error(err)
+ }
}
func TestFileCSVExit(t *testing.T) {
diff --git a/ers/filefwv_it_test.go b/ers/filefwv_it_test.go
index b68dfd6fd..166a67780 100644
--- a/ers/filefwv_it_test.go
+++ b/ers/filefwv_it_test.go
@@ -21,6 +21,7 @@ along with this program. If not, see
package ers
import (
+ "fmt"
"net/rpc"
"os"
"path"
@@ -268,3 +269,135 @@ func TestFWVFileConfig(t *testing.T) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
+
+func TestFileFWVProcessEvent(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ cfg.ERsCfg().Readers[0].ProcessedPath = ""
+ fltrs := &engine.FilterS{}
+ filePath := "/tmp/TestFileFWVProcessEvent/"
+ if err := os.MkdirAll(filePath, 0777); err != nil {
+ t.Error(err)
+ }
+ file, err := os.Create(path.Join(filePath, "file1.fwv"))
+ if err != nil {
+ t.Error(err)
+ }
+ file.Write([]byte("test,test2"))
+ file.Close()
+ eR := &FWVFileER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/fwvErs/out",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ eR.conReqs <- struct{}{}
+ fname := "file1.fwv"
+ errExpect := "unsupported field prefix: <> when set fields"
+ eR.Config().Fields = []*config.FCTemplate{
+ {
+ Value: config.RSRParsers{
+ {
+ Rules: "~*hdr",
+ },
+ },
+ Type: utils.MetaRemove,
+ // Path: utils.MetaVars,
+ },
+ }
+ eR.Config().Fields[0].ComputePath()
+ if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect {
+ t.Errorf("Expected %v but received %v", errExpect, err)
+ }
+ if err := os.RemoveAll(filePath); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestFileFWVServeErrTimeDuration0(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ cfgIdx := 0
+ rdr, err := NewFWVFileER(cfg, cfgIdx, nil, nil, nil, nil)
+ if err != nil {
+ t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
+ }
+ rdr.Config().RunDelay = time.Duration(0)
+ result := rdr.Serve()
+ if !reflect.DeepEqual(result, nil) {
+ t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result)
+ }
+}
+
+func TestFileFWVServeErrTimeDurationNeg1(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ cfgIdx := 0
+ rdr, err := NewFWVFileER(cfg, cfgIdx, nil, nil, nil, nil)
+ if err != nil {
+ t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
+ }
+ rdr.Config().RunDelay = time.Duration(-1)
+ expected := "no such file or directory"
+ err = rdr.Serve()
+ if err == nil || err.Error() != expected {
+ t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
+ }
+}
+
+func TestFileFWV(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltrs := &engine.FilterS{}
+ eR := &FWVFileER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/fwvErs/out",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ eR.conReqs <- struct{}{}
+ filePath := "/tmp/fwvErs/out"
+ err := os.MkdirAll(filePath, 0777)
+ if err != nil {
+ t.Error(err)
+ }
+ for i := 1; i < 4; i++ {
+ if _, err := os.Create(path.Join(filePath, fmt.Sprintf("file%d.fwv", i))); err != nil {
+ t.Error(err)
+ }
+ }
+ eR.Config().RunDelay = 1 * time.Millisecond
+ if err := eR.Serve(); err != nil {
+ t.Error(err)
+ }
+ os.Create(path.Join(filePath, "file1.txt"))
+ eR.Config().RunDelay = 1 * time.Millisecond
+ if err := eR.Serve(); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestFileFWVExit(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltrs := &engine.FilterS{}
+ eR := &FWVFileER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/fwvErs/out",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ eR.conReqs <- struct{}{}
+ eR.Config().RunDelay = 1 * time.Millisecond
+ if err := eR.Serve(); err != nil {
+ t.Error(err)
+ }
+ eR.rdrExit <- struct{}{}
+}
diff --git a/ers/filejson_it_test.go b/ers/filejson_it_test.go
index 049918550..88b7ac1e5 100644
--- a/ers/filejson_it_test.go
+++ b/ers/filejson_it_test.go
@@ -21,9 +21,12 @@ along with this program. If not, see
package ers
import (
+ "encoding/json"
+ "fmt"
"net/rpc"
"os"
"path"
+ "reflect"
"testing"
"time"
@@ -226,7 +229,6 @@ func testJSONKillEngine(t *testing.T) {
}
}
-/*
func TestFileJSONServeErrTimeDuration0(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfgIdx := 0
@@ -241,7 +243,6 @@ func TestFileJSONServeErrTimeDuration0(t *testing.T) {
}
}
-
func TestFileJSONServeErrTimeDurationNeg1(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfgIdx := 0
@@ -257,47 +258,203 @@ func TestFileJSONServeErrTimeDurationNeg1(t *testing.T) {
}
}
-func TestFileJSONServeTimeDefault(t *testing.T) {
+// func TestFileJSONServeTimeDefault(t *testing.T) {
+// cfg := config.NewDefaultCGRConfig()
+// cfgIdx := 0
+// rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil)
+// if err != nil {
+// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
+// }
+// rdr.Config().RunDelay = time.Duration(1)
+// result := rdr.Serve()
+// if !reflect.DeepEqual(result, nil) {
+// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result)
+// }
+// }
+
+// func TestFileJSONServeTimeDefaultChanExit(t *testing.T) {
+// cfg := config.NewDefaultCGRConfig()
+// cfgIdx := 0
+// rdrExit := make(chan struct{}, 1)
+// rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, rdrExit)
+// if err != nil {
+// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
+// }
+// rdrExit <- struct{}{}
+// rdr.Config().RunDelay = time.Duration(1)
+// result := rdr.Serve()
+// if !reflect.DeepEqual(result, nil) {
+// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result)
+// }
+// }
+
+// func TestFileJSONProcessFile(t *testing.T) {
+// cfg := config.NewDefaultCGRConfig()
+// cfgIdx := 0
+// rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil)
+// if err != nil {
+// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
+// }
+// expected := "open : no such file or directory"
+// err2 := rdr.(*JSONFileER).processFile("", "")
+// if err2 == nil || err2.Error() != expected {
+// t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err2)
+// }
+// }
+
+func TestFileJSONProcessEvent(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
- cfgIdx := 0
- rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil)
- if err != nil {
- t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
+ cfg.ERsCfg().Readers[0].ProcessedPath = ""
+ fltrs := &engine.FilterS{}
+ filePath := "/tmp/TestFileJSONProcessEvent/"
+ if err := os.MkdirAll(filePath, 0777); err != nil {
+ t.Error(err)
}
- rdr.Config().RunDelay = time.Duration(1)
- result := rdr.Serve()
- if !reflect.DeepEqual(result, nil) {
- t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result)
+ file, err := os.Create(path.Join(filePath, "file1.json"))
+ if err != nil {
+ t.Error(err)
+ }
+ fcTemp := map[string]interface{}{
+ "2": "tor_test",
+ "3": "originid_test",
+ "4": "requestType_test",
+ "6": "tenant_test",
+ "7": "category_test",
+ "8": "account_test",
+ "9": "subject_test",
+ "10": "destination_test",
+ "11": "setupTime_test",
+ "12": "answerTime_test",
+ "13": "usage_test",
+ }
+ rcv, err := json.Marshal(fcTemp)
+ if err != nil {
+ t.Error(err)
+ }
+ file.Write([]byte(rcv))
+ file.Close()
+ eR := &JSONFileER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/ErsJSON/out/",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ expEvent := &utils.CGREvent{
+ Tenant: "cgrates.org",
+ Event: map[string]interface{}{
+ utils.AccountField: "account_test",
+ utils.AnswerTime: "answerTime_test",
+ utils.Category: "category_test",
+ utils.Destination: "destination_test",
+ utils.OriginID: "originid_test",
+ utils.RequestType: "requestType_test",
+ utils.SetupTime: "setupTime_test",
+ utils.Subject: "subject_test",
+ utils.Tenant: "tenant_test",
+ utils.ToR: "tor_test",
+ utils.Usage: "usage_test",
+ },
+ APIOpts: map[string]interface{}{},
+ }
+ // expEvent := &utils.CGREvent{}
+ eR.conReqs <- struct{}{}
+ fname := "file1.json"
+ if err := eR.processFile(filePath, fname); err != nil {
+ t.Error(err)
+ }
+ select {
+ case data := <-eR.rdrEvents:
+ expEvent.ID = data.cgrEvent.ID
+ expEvent.Time = data.cgrEvent.Time
+ if !reflect.DeepEqual(data.cgrEvent, expEvent) {
+ t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent))
+ }
+ case <-time.After(50 * time.Millisecond):
+ t.Error("Time limit exceeded")
+ }
+ if err := os.RemoveAll(filePath); err != nil {
+ t.Error(err)
}
}
-func TestFileJSONServeTimeDefaultChanExit(t *testing.T) {
+func TestFileJSONProcessEventReadError(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
- cfgIdx := 0
- rdrExit := make(chan struct{}, 1)
- rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, rdrExit)
- if err != nil {
- t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
+ fltrs := &engine.FilterS{}
+ filePath := "/tmp/TestFileJSONProcessEvent/"
+ fname := "file2.json"
+ eR := &CSVFileER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/ErsJSON/out/",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
}
- rdrExit <- struct{}{}
- rdr.Config().RunDelay = time.Duration(1)
- result := rdr.Serve()
- if !reflect.DeepEqual(result, nil) {
- t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, result)
+ eR.conReqs <- struct{}{}
+ errExpect := "open /tmp/TestFileJSONProcessEvent/file2.json: no such file or directory"
+ if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect {
+ t.Errorf("Expected %v but received %v", errExpect, err)
}
}
-func TestFileJSONProcessFile(t *testing.T) {
+func TestFileJSON(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
- cfgIdx := 0
- rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil)
- if err != nil {
- t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
+ fltrs := &engine.FilterS{}
+ eR := &JSONFileER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/ErsJSON/out/",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
}
- expected := "open : no such file or directory"
- err2 := rdr.(*JSONFileER).processFile("", "")
- if err2 == nil || err2.Error() != expected {
- t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err2)
+ eR.conReqs <- struct{}{}
+ filePath := "/tmp/ErsJSON/out/"
+ err := os.MkdirAll(filePath, 0777)
+ if err != nil {
+ t.Error(err)
+ }
+ for i := 1; i < 4; i++ {
+ if _, err := os.Create(path.Join(filePath, fmt.Sprintf("file%d.json", i))); err != nil {
+ t.Error(err)
+ }
+ }
+ eR.Config().RunDelay = 1 * time.Millisecond
+ if err := eR.Serve(); err != nil {
+ t.Error(err)
+ }
+ os.Create(path.Join(filePath, "file1.txt"))
+ eR.Config().RunDelay = 1 * time.Millisecond
+ if err := eR.Serve(); err != nil {
+ t.Error(err)
}
}
-*/
+
+func TestFileJSONExit(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ fltrs := &engine.FilterS{}
+ eR := &JSONFileER{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: fltrs,
+ rdrDir: "/tmp/ErsJSON/out/",
+ rdrEvents: make(chan *erEvent, 1),
+ rdrError: make(chan error, 1),
+ rdrExit: make(chan struct{}),
+ conReqs: make(chan struct{}, 1),
+ }
+ eR.conReqs <- struct{}{}
+ eR.Config().RunDelay = 1 * time.Millisecond
+ if err := eR.Serve(); err != nil {
+ t.Error(err)
+ }
+ eR.rdrExit <- struct{}{}
+}
diff --git a/ers/sql.go b/ers/sql.go
index f00db6b7e..3a3ff80ff 100644
--- a/ers/sql.go
+++ b/ers/sql.go
@@ -96,6 +96,23 @@ func (rdr *SQLEventReader) Config() *config.EventReaderCfg {
return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx]
}
+func (rdr *SQLEventReader) openDB(dialect gorm.Dialector) (err error) {
+ var db *gorm.DB
+ if db, err = gorm.Open(dialect, &gorm.Config{AllowGlobalUpdate: true}); err != nil {
+ return
+ }
+ var sqlDB *sql.DB
+ if sqlDB, err = db.DB(); err != nil {
+ return
+ }
+ sqlDB.SetMaxOpenConns(10)
+ if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API
+ return
+ }
+ go rdr.readLoop(db, sqlDB) // read until the connection is closed
+ return
+}
+
// Serve will start the gorutines needed to watch the sql topic
func (rdr *SQLEventReader) Serve() (err error) {
var dialect gorm.Dialector
@@ -107,22 +124,7 @@ func (rdr *SQLEventReader) Serve() (err error) {
default:
return fmt.Errorf("db type <%s> not supported", rdr.connType)
}
- var db *gorm.DB
- if db, err = gorm.Open(dialect, &gorm.Config{AllowGlobalUpdate: true}); err != nil {
- return
- }
- var sqlDB *sql.DB
- if sqlDB, err = db.DB(); err != nil {
- return
- }
- sqlDB.SetMaxOpenConns(10)
- if err = sqlDB.Ping(); err != nil {
- return
- }
- if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API
- return
- }
- go rdr.readLoop(db, sqlDB) // read until the connection is closed
+ err = rdr.openDB(dialect)
return
}
diff --git a/ers/sql_it_test.go b/ers/sql_it_test.go
index 2dd04192e..3b399fb45 100644
--- a/ers/sql_it_test.go
+++ b/ers/sql_it_test.go
@@ -31,7 +31,9 @@ import (
"github.com/cgrates/cgrates/utils"
"gorm.io/driver/mysql"
"gorm.io/gorm"
+ "gorm.io/gorm/clause"
"gorm.io/gorm/logger"
+ "gorm.io/gorm/schema"
)
var (
@@ -688,3 +690,72 @@ func TestErsSqlPostCDRS(t *testing.T) {
}
logger.Default = tmp
}
+
+// type mockDialector interface {
+// Name() string
+// Initialize(*gorm.DB) error
+// Migrator(db *gorm.DB) gorm.Migrator
+// DataTypeOf(*schema.Field) string
+// DefaultValueOf(*schema.Field) clause.Expression
+// BindVarTo(writer clause.Writer, stmt *Statement, v interface{})
+// QuoteTo(clause.Writer, string)
+// Explain(sql string, vars ...interface{}) string
+// }
+type mockDialect struct{}
+
+func (mockDialect) Name() string { return "" }
+func (mockDialect) Initialize(db *gorm.DB) error { return nil }
+func (mockDialect) Migrator(db *gorm.DB) gorm.Migrator { return nil }
+func (mockDialect) DataTypeOf(*schema.Field) string { return "" }
+func (mockDialect) DefaultValueOf(*schema.Field) clause.Expression { return nil }
+func (mockDialect) BindVarTo(writer clause.Writer, stmt *gorm.Statement, v interface{}) { return }
+func (mockDialect) QuoteTo(clause.Writer, string) { return }
+func (mockDialect) Explain(sql string, vars ...interface{}) string { return "" }
+
+func TestMockOpenDB(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ sqlRdr := &SQLEventReader{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: &engine.FilterS{},
+ connString: "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
+ connType: utils.MySQL,
+ tableName: "testName",
+ expConnString: "",
+ expConnType: utils.Postgres,
+ expTableName: "",
+ rdrEvents: nil,
+ rdrExit: nil,
+ rdrErr: nil,
+ cap: nil,
+ }
+ var mckDlct mockDialect
+ errExpect := "invalid db"
+ if err := sqlRdr.openDB(mckDlct); err == nil || err.Error() != errExpect {
+ t.Errorf("Expected %v but received %v", errExpect, err)
+ }
+}
+
+func TestSQLServeErr1(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+ sqlRdr := &SQLEventReader{
+ cgrCfg: cfg,
+ cfgIdx: 0,
+ fltrS: &engine.FilterS{},
+ connString: "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
+ connType: utils.MySQL,
+ tableName: "testName",
+ expConnString: "",
+ expConnType: utils.Postgres,
+ expTableName: "",
+ rdrEvents: nil,
+ rdrExit: nil,
+ rdrErr: nil,
+ cap: nil,
+ }
+ cfg.StorDbCfg().Password = "CGRateS.org"
+ sqlRdr.Config().RunDelay = time.Duration(0)
+ if err := sqlRdr.Serve(); err != nil {
+ t.Error(err)
+ }
+}