Add WHERE statement availability and non-delete option to ERS SQL reader

This commit is contained in:
arberkatellari
2024-11-13 18:31:11 +02:00
committed by Dan Christian Bogos
parent 204601a70f
commit 321910d181
13 changed files with 698 additions and 95 deletions

View File

@@ -627,7 +627,7 @@ func main() {
ldrs, anz, dspS, dspH, dmService, storDBService,
services.NewEventExporterService(cfg, filterSChan,
connManager, server, internalEEsChan, anz, srvDep),
services.NewEventReaderService(cfg, filterSChan,
services.NewEventReaderService(cfg, dmService, filterSChan,
shdChan, connManager, server, internalERsChan, anz, srvDep),
services.NewSIPAgent(cfg, filterSChan, shdChan, connManager, srvDep),
services.NewJanusAgent(cfg, filterSChan, server, connManager, srvDep),

View File

@@ -46,9 +46,10 @@ type erEvent struct {
}
// NewERService instantiates the ERService
func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engine.ConnManager) (ers *ERService) {
func NewERService(cfg *config.CGRConfig, datadb *engine.DataManager, filterS *engine.FilterS, connMgr *engine.ConnManager) (ers *ERService) {
ers = &ERService{
cfg: cfg,
dataManager: datadb,
rdrs: make(map[string]EventReader),
rdrPaths: make(map[string]string),
stopLsn: make(map[string]chan struct{}),
@@ -67,6 +68,7 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engin
type ERService struct {
sync.RWMutex
cfg *config.CGRConfig
dataManager *engine.DataManager
rdrs map[string]EventReader // map[rdrID]EventReader
rdrPaths map[string]string // used for reloads in case of path changes
stopLsn map[string]chan struct{} // map[rdrID] chan struct{}
@@ -220,7 +222,7 @@ func (erS *ERService) addReader(rdrID string, cfgIdx int) (err error) {
var rdr EventReader
if rdr, err = NewEventReader(erS.cfg, cfgIdx,
erS.rdrEvents, erS.partialEvents, erS.rdrErr,
erS.filterS, erS.stopLsn[rdrID]); err != nil {
erS.filterS, erS.stopLsn[rdrID], erS.dataManager); err != nil {
return
}
erS.rdrs[rdrID] = rdr

View File

@@ -53,7 +53,7 @@ func TestERsNewERService(t *testing.T) {
rdrEvents: make(chan *erEvent),
rdrErr: make(chan error),
}
rcv := NewERService(cfg, fltrS, nil)
rcv := NewERService(cfg, nil, fltrS, nil)
if !reflect.DeepEqual(expected.cfg, rcv.cfg) {
t.Errorf("Expecting: <%+v>, received: <%+v>", expected.cfg, rcv.cfg)
@@ -65,7 +65,7 @@ func TestERsNewERService(t *testing.T) {
func TestERsAddReader(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrS := &engine.FilterS{}
erS := NewERService(cfg, fltrS, nil)
erS := NewERService(cfg, nil, fltrS, nil)
reader := cfg.ERsCfg().Readers[0]
reader.Type = utils.MetaFileCSV
reader.ID = "file_reader"
@@ -89,7 +89,7 @@ func TestERsListenAndServeErr(t *testing.T) {
{},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
stopChan := make(chan struct{}, 1)
cfgRldChan := make(chan struct{}, 1)
err := srv.ListenAndServe(stopChan, cfgRldChan)
@@ -106,7 +106,7 @@ func TestERsProcessEventErr(t *testing.T) {
},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
rdrCfg := &config.EventReaderCfg{
ID: "",
Type: "",
@@ -130,7 +130,7 @@ func TestERsCloseAllRdrs(t *testing.T) {
},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
srv.stopLsn[""] = make(chan struct{}, 1)
srv.closeAllRdrs()
}
@@ -142,7 +142,7 @@ func TestERsListenAndServeRdrErr(t *testing.T) {
},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
stopChan := make(chan struct{}, 1)
cfgRldChan := make(chan struct{}, 1)
srv.rdrErr = make(chan error, 1)
@@ -162,7 +162,7 @@ func TestERsListenAndServeStopchan(t *testing.T) {
},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
stopChan := make(chan struct{}, 1)
cfgRldChan := make(chan struct{}, 1)
stopChan <- struct{}{}
@@ -181,7 +181,7 @@ func TestERsListenAndServeRdrEvents(t *testing.T) {
},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
stopChan := make(chan struct{}, 1)
cfgRldChan := make(chan struct{}, 1)
srv.rdrErr = make(chan error, 1)
@@ -208,7 +208,7 @@ func TestERsListenAndServeCfgRldChan(t *testing.T) {
},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
stopChan := make(chan struct{}, 1)
cfgRldChan := make(chan struct{}, 1)
srv.rdrErr = make(chan error, 1)
@@ -232,7 +232,7 @@ func TestERsListenAndServeCfgRldChan2(t *testing.T) {
},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
exp := &CSVFileER{
cgrCfg: cfg,
cfgIdx: 0,
@@ -266,7 +266,7 @@ func TestERsListenAndServeCfgRldChan3(t *testing.T) {
},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
exp := &CSVFileER{
cgrCfg: cfg,
cfgIdx: 0,
@@ -299,7 +299,7 @@ func TestERsListenAndServeCfgRldChan4(t *testing.T) {
},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
exp := &CSVFileER{
cgrCfg: cfg,
cfgIdx: 0,
@@ -334,7 +334,7 @@ func TestERsListenAndServeCfgRldChan5(t *testing.T) {
},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
exp := &CSVFileER{
cgrCfg: cfg,
}
@@ -368,7 +368,7 @@ func TestERsListenAndServeCfgRldChan6(t *testing.T) {
},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
exp := &CSVFileER{
cgrCfg: cfg,
cfgIdx: 0,
@@ -408,7 +408,7 @@ func TestERsProcessEvent(t *testing.T) {
},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
rdrCfg := &config.EventReaderCfg{
Flags: map[string]utils.FlagParams{
utils.MetaLog: map[string][]string{
@@ -431,7 +431,7 @@ func TestERsProcessEvent2(t *testing.T) {
},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
rdrCfg := &config.EventReaderCfg{
Flags: map[string]utils.FlagParams{
utils.MetaDryRun: map[string][]string{
@@ -455,7 +455,7 @@ func TestERsProcessEvent3(t *testing.T) {
}
cfg.ERsCfg().SessionSConns = []string{}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
rdrCfg := &config.EventReaderCfg{
Flags: map[string]utils.FlagParams{
utils.MetaEvent: map[string][]string{},
@@ -482,7 +482,7 @@ func TestERsProcessEvent4(t *testing.T) {
}
cfg.ERsCfg().SessionSConns = []string{}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
rdrCfg := &config.EventReaderCfg{
Flags: map[string]utils.FlagParams{
utils.MetaAuthorize: map[string][]string{},
@@ -509,7 +509,7 @@ func TestERsProcessEvent5(t *testing.T) {
}
cfg.ERsCfg().SessionSConns = []string{}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
rdrCfg := &config.EventReaderCfg{
Flags: map[string]utils.FlagParams{
utils.MetaTerminate: map[string][]string{},
@@ -536,7 +536,7 @@ func TestERsProcessEvent6(t *testing.T) {
}
cfg.ERsCfg().SessionSConns = []string{}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
rdrCfg := &config.EventReaderCfg{
Flags: map[string]utils.FlagParams{
utils.MetaInitiate: map[string][]string{},
@@ -562,7 +562,7 @@ func TestERsProcessEvent7(t *testing.T) {
}
cfg.ERsCfg().SessionSConns = []string{}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
rdrCfg := &config.EventReaderCfg{
Flags: map[string]utils.FlagParams{
utils.MetaUpdate: map[string][]string{},
@@ -588,7 +588,7 @@ func TestERsProcessEvent8(t *testing.T) {
}
cfg.ERsCfg().SessionSConns = []string{}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
rdrCfg := &config.EventReaderCfg{
Flags: map[string]utils.FlagParams{
utils.MetaMessage: map[string][]string{},
@@ -615,7 +615,7 @@ func TestERsProcessEvent9(t *testing.T) {
}
cfg.ERsCfg().SessionSConns = []string{}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
rdrCfg := &config.EventReaderCfg{
Flags: map[string]utils.FlagParams{
utils.MetaCDRs: map[string][]string{},
@@ -642,7 +642,7 @@ func TestERsProcessEvent10(t *testing.T) {
}
cfg.ERsCfg().SessionSConns = []string{}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
srv := NewERService(cfg, nil, fltrS, nil)
rdrCfg := &config.EventReaderCfg{
Flags: map[string]utils.FlagParams{
utils.MetaMessage: map[string][]string{},
@@ -697,7 +697,7 @@ func TestERsProcessEvent11(t *testing.T) {
connMng := engine.NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS): clientChan,
})
srv := NewERService(cfg, fltrS, connMng)
srv := NewERService(cfg, nil, fltrS, connMng)
rdrCfg := &config.EventReaderCfg{
Flags: map[string]utils.FlagParams{
utils.MetaMessage: map[string][]string{},

View File

@@ -34,7 +34,7 @@ import (
func TestERsProcessPartialEvent(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
erS := NewERService(cfg, nil, nil)
erS := NewERService(cfg, nil, nil, nil)
event := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "EventERsProcessPartial",

View File

@@ -34,7 +34,7 @@ type EventReader interface {
// NewEventReader instantiates the event reader based on configuration at index
func NewEventReader(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
fltrS *engine.FilterS, rdrExit chan struct{}, dm *engine.DataManager) (er EventReader, err error) {
switch cfg.ERsCfg().Readers[cfgIdx].Type {
default:
err = fmt.Errorf("unsupported reader type: <%s>", cfg.ERsCfg().Readers[cfgIdx].Type)
@@ -47,7 +47,7 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents
case utils.MetaKafkajsonMap:
return NewKafkaER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
case utils.MetaSQL:
return NewSQLEventReader(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
return NewSQLEventReader(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, dm)
case utils.MetaFileJSON:
return NewJSONFileER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
case utils.MetaAMQPjsonMap:

View File

@@ -36,7 +36,7 @@ func TestNewInvalidReader(t *testing.T) {
if len(cfg.ERsCfg().Readers) != 2 {
t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers))
}
if _, err := NewEventReader(cfg, 1, nil, nil, nil, &engine.FilterS{}, nil); err == nil || err.Error() != "unsupported reader type: <Invalid>" {
if _, err := NewEventReader(cfg, 1, nil, nil, nil, &engine.FilterS{}, nil, nil); err == nil || err.Error() != "unsupported reader type: <Invalid>" {
t.Errorf("Expecting: <unsupported reader type: <Invalid>>, received: <%+v>", err)
}
}
@@ -61,7 +61,7 @@ func TestNewCsvReader(t *testing.T) {
rdrExit: nil,
conReqs: nil}
var expected EventReader = exp
if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil {
if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil); err != nil {
t.Errorf("Expecting: <nil>, received: <%+v>", err)
} else {
// because we use function make to init the channel when we create the EventReader reflect.DeepEqual
@@ -88,7 +88,7 @@ func TestNewKafkaReader(t *testing.T) {
if err != nil {
t.Errorf("Expecting: <nil>, received: <%+v>", err)
}
if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil {
if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil); err != nil {
t.Errorf("Expecting: <nil>, received: <%+v>", err)
} else if !reflect.DeepEqual(expected, rcv) {
t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv)
@@ -113,11 +113,11 @@ func TestNewSQLReader(t *testing.T) {
if len(cfg.ERsCfg().Readers) != 2 {
t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers))
}
expected, err := NewSQLEventReader(cfg, 1, nil, nil, nil, fltr, nil)
expected, err := NewSQLEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil)
if err != nil {
t.Errorf("Expecting: <nil>, received: <%+v>", err)
}
if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil {
if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil); err != nil {
t.Errorf("Expecting: <nil>, received: <%+v>", err)
} else if !reflect.DeepEqual(expected, rcv) {
t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv)
@@ -137,7 +137,7 @@ func TestNewSQLReaderError(t *testing.T) {
reader.SourcePath = "#"
reader.ProcessedPath = ""
expected := "unknown db_type "
_, err := NewSQLEventReader(cfg, 0, nil, nil, nil, fltr, nil)
_, err := NewSQLEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
if err == nil || err.Error() != expected {
t.Errorf("Expecting: <%+v>, received: <%+v>", expected, err)
}
@@ -151,7 +151,7 @@ func TestNewFileXMLReader(t *testing.T) {
if err != nil {
t.Error(err)
}
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
if err != nil {
t.Error(err)
} else {
@@ -171,7 +171,7 @@ func TestNewFileFWVReader(t *testing.T) {
if err != nil {
t.Error(err)
}
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
if err != nil {
t.Error(nil)
} else {
@@ -191,7 +191,7 @@ func TestNewJSONReader(t *testing.T) {
if err != nil {
t.Error(err)
}
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
if err != nil {
t.Error(err)
} else {
@@ -215,7 +215,7 @@ func TestNewAMQPReader(t *testing.T) {
exp.Config().ProcessedPath = ""
exp.createClient(&config.AMQPROpts{}, nil)
// var expected EventReader = exp
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
if err != nil {
t.Error(err)
} else if !reflect.DeepEqual(exp, rcv) {
@@ -239,7 +239,7 @@ func TestNewAMQPv1Reader(t *testing.T) {
exp.Config().ProcessedPath = ""
exp.Config().Opts = &config.EventReaderOpts{}
var expected EventReader = exp
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
if err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expected, rcv) {
@@ -264,7 +264,7 @@ func TestNewS3Reader(t *testing.T) {
exp.Config().ProcessedPath = ""
exp.Config().Opts = &config.EventReaderOpts{}
var expected EventReader = exp
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
if err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expected, rcv) {
@@ -300,7 +300,7 @@ func TestNewSQSReader(t *testing.T) {
exp.Config().ProcessedPath = ""
exp.Config().Opts = &config.EventReaderOpts{}
var expected EventReader = exp
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil)
rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil)
exp.session = rcv.(*SQSER).session
if err != nil {
t.Error(err)

View File

@@ -46,10 +46,10 @@ const (
// NewSQLEventReader return a new sql event reader
func NewSQLEventReader(cfg *config.CGRConfig, cfgIdx int,
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
fltrS *engine.FilterS, rdrExit chan struct{}, dm *engine.DataManager) (er EventReader, err error) {
rdr := &SQLEventReader{
cgrCfg: cfg,
dm: dm,
cfgIdx: cfgIdx,
fltrS: fltrS,
rdrEvents: rdrEvents,
@@ -70,6 +70,7 @@ func NewSQLEventReader(cfg *config.CGRConfig, cfgIdx int,
type SQLEventReader struct {
// sync.RWMutex
cgrCfg *config.CGRConfig
dm *engine.DataManager
cfgIdx int // index of config instance within ERsCfg.Readers
fltrS *engine.FilterS
@@ -99,7 +100,7 @@ func (rdr *SQLEventReader) openDB(dialect gorm.Dialector) (err error) {
return
}
sqlDB.SetMaxOpenConns(10)
if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API
if rdr.Config().RunDelay <= 0 { // 0 disables the automatic read, maybe done per API
return
}
go rdr.readLoop(db, sqlDB) // read until the connection is closed
@@ -121,6 +122,141 @@ func (rdr *SQLEventReader) Serve() (err error) {
return
}
// Creates mysql conditions used in WHERE statement out of filters
func valueQry(ruleType, elem, field string, values []string, not bool) (conditions []string) {
// here are for the filters that their values are empty: *exists, *notexists, *empty, *notempty..
if len(values) == 0 {
switch ruleType {
case utils.MetaExists, utils.MetaNotExists:
if not {
if elem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf(" %s IS NOT NULL", field))
return
}
conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') IS NOT NULL", elem, field))
return
}
if elem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf(" %s IS NULL", field))
return
}
conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') IS NULL", elem, field))
case utils.MetaEmpty, utils.MetaNotEmpty:
if not {
if elem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf(" %s != ''", field))
return
}
conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') != ''", elem, field))
return
}
if elem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf(" %s == ''", field))
return
}
conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') == ''", elem, field))
}
return
}
// here are for the filters that can have more than one value: *string, *prefix, *suffix ..
for _, value := range values {
switch value { // in case we have boolean values, it should be queried over 1 or 0
case "true":
value = "1"
case "false":
value = "0"
}
var singleCond string
switch ruleType {
case utils.MetaString, utils.MetaNotString, utils.MetaEqual, utils.MetaNotEqual:
if not {
if elem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf(" %s != '%s'", field, value))
continue
}
conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') != '%s'",
elem, field, value))
continue
}
if elem == utils.EmptyString {
singleCond = fmt.Sprintf(" %s = '%s'", field, value)
} else {
singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') = '%s'", elem, field, value)
}
case utils.MetaLessThan, utils.MetaLessOrEqual, utils.MetaGreaterThan, utils.MetaGreaterOrEqual:
if ruleType == utils.MetaGreaterOrEqual {
if elem == utils.EmptyString {
singleCond = fmt.Sprintf(" %s >= %s", field, value)
} else {
singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') >= %s", elem, field, value)
}
} else if ruleType == utils.MetaGreaterThan {
if elem == utils.EmptyString {
singleCond = fmt.Sprintf(" %s > %s", field, value)
} else {
singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') > %s", elem, field, value)
}
} else if ruleType == utils.MetaLessOrEqual {
if elem == utils.EmptyString {
singleCond = fmt.Sprintf(" %s <= %s", field, value)
} else {
singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') <= %s", elem, field, value)
}
} else if ruleType == utils.MetaLessThan {
if elem == utils.EmptyString {
singleCond = fmt.Sprintf(" %s < %s", field, value)
} else {
singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') < %s", elem, field, value)
}
}
case utils.MetaPrefix, utils.MetaNotPrefix:
if not {
if elem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf(" %s NOT LIKE '%s%%'", field, value))
continue
}
conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') NOT LIKE '%s%%'", elem, field, value))
continue
}
if elem == utils.EmptyString {
singleCond = fmt.Sprintf(" %s LIKE '%s%%'", field, value)
} else {
singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') LIKE '%s%%'", elem, field, value)
}
case utils.MetaSuffix, utils.MetaNotSuffix:
if not {
if elem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf(" %s NOT LIKE '%%%s'", field, value))
continue
}
conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') NOT LIKE '%%%s'", elem, field, value))
continue
}
if elem == utils.EmptyString {
singleCond = fmt.Sprintf(" %s LIKE '%%%s'", field, value)
} else {
singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') LIKE '%%%s'", elem, field, value)
}
case utils.MetaRegex, utils.MetaNotRegex:
if not {
if elem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf(" %s NOT REGEXP '%s'", field, value))
continue
}
conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') NOT REGEXP '%s'", elem, field, value))
continue
}
if elem == utils.EmptyString {
singleCond = fmt.Sprintf(" %s REGEXP '%s'", field, value)
} else {
singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') REGEXP '%s'", elem, field, value)
}
}
conditions = append(conditions, singleCond)
}
return
}
func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) {
defer sqlDB.Close()
if rdr.Config().StartDelay > 0 {
@@ -134,8 +270,47 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) {
}
}
tm := time.NewTimer(0)
var filters []*engine.Filter
var whereQueries []string
var renewedFltrs []string
for _, fltr := range rdr.Config().Filters {
if result, err := rdr.dm.GetFilter(config.CgrConfig().GeneralCfg().DefaultTenant, fltr, true, false, utils.NonTransactional); err != nil {
rdr.rdrErr <- err
return
} else {
filters = append(filters, result)
if !strings.Contains(fltr, utils.DynamicDataPrefix+utils.MetaReq) {
renewedFltrs = append(renewedFltrs, fltr)
}
}
}
rdr.Config().Filters = renewedFltrs // remove filters containing *req
for _, filter := range filters {
for _, rule := range filter.Rules {
var elem, field string
switch {
case strings.HasPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep):
field = strings.TrimPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep)
parts := strings.SplitN(field, ".", 2)
if len(parts) == 2 { // Split in 2 pieces if it contains any more dots in the field
// First part (before the first dot)
elem = parts[0]
// Second part (everything after the first dot)
field = parts[1]
}
default:
continue
}
conditions := valueQry(rule.Type, elem, field, rule.Values, strings.HasPrefix(rule.Type, utils.MetaNot))
whereQueries = append(whereQueries, strings.Join(conditions, " OR "))
}
}
for {
rows, err := db.Table(rdr.tableName).Select(utils.Meta).Rows()
tx := db.Table(rdr.tableName).Select(utils.Meta)
for _, whereQ := range whereQueries {
tx = tx.Where(whereQ)
}
rows, err := tx.Rows()
if err != nil {
rdr.rdrErr <- err
return
@@ -194,13 +369,15 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) {
fltr[colName] = utils.IfaceAsString(columns[i])
}
}
if err = db.Table(rdr.tableName).Delete(nil, fltr).Error; err != nil { // to ensure we don't read it again
utils.Logger.Warning(
fmt.Sprintf("<%s> deleting message %s error: %s",
utils.ERs, utils.ToJSON(msg), err.Error()))
rdr.rdrErr <- err
rows.Close()
return
if rdr.Config().ProcessedPath == utils.MetaDelete {
if err = db.Table(rdr.tableName).Delete(nil, fltr).Error; err != nil { // to ensure we don't read it again
utils.Logger.Warning(
fmt.Sprintf("<%s> deleting message %s error: %s",
utils.ERs, utils.ToJSON(msg), err.Error()))
rdr.rdrErr <- err
rows.Close()
return
}
}
go func(msg map[string]any) {
@@ -215,11 +392,8 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) {
}(msg)
}
rows.Close()
if rdr.Config().RunDelay < 0 {
return
}
tm.Reset(rdr.Config().RunDelay)
select {
tm.Reset(rdr.Config().RunDelay) // reset the timer to RunDelay
select { // wait for timer or rdrExit
case <-rdr.rdrExit:
tm.Stop()
utils.Logger.Info(

View File

@@ -89,26 +89,24 @@ func testSQLInitConfig(t *testing.T) {
"stor_db": {
"db_password": "CGRateS.org",
},
"ers": { // EventReaderService
"enabled": true, // starts the EventReader service: <true|false>
"ers": {
"enabled": true,
"sessions_conns":["*localhost"],
"readers": [
{
"id": "mysql", // identifier of the EventReader profile
"type": "*sql", // reader type <*file_csv>
"run_delay": "1", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
"source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", // read data from this path
"id": "mysql",
"type": "*sql",
"run_delay": "1",
"concurrent_requests": 1024,
"source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306",
"opts": {
"sqlDBName":"cgrates2",
"sqlDBNameProcessed":"cgrates2",
"sqlTableNameProcessed":"cdrs2",
},
"processed_path": "", // move processed data here
"tenant": "cgrates.org", // tenant used by import
"filters": [], // limit parsing based on the filters
"flags": [], // flags to influence the event processing
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
"processed_path": "*delete",
"tenant": "cgrates.org",
"filters": [],
"flags": [],
"fields":[
{"tag": "CGRID", "type": "*composed", "value": "~*req.cgrid", "path": "*cgreq.CGRID"},
{"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"},
],
@@ -236,7 +234,7 @@ func testSQLReader(t *testing.T) {
rdrEvents = make(chan *erEvent, 1)
rdrErr = make(chan error, 1)
rdrExit = make(chan struct{}, 1)
sqlER, err := NewEventReader(sqlCfg, 1, rdrEvents, make(chan *erEvent, 1), rdrErr, new(engine.FilterS), rdrExit)
sqlER, err := NewEventReader(sqlCfg, 1, rdrEvents, make(chan *erEvent, 1), rdrErr, new(engine.FilterS), rdrExit, nil)
if err != nil {
t.Fatal(err)
}
@@ -373,25 +371,23 @@ func testSQLInitConfig2(t *testing.T) {
"stor_db": {
"db_password": "CGRateS.org",
},
"ers": { // EventReaderService
"enabled": true, // starts the EventReader service: <true|false>
"ers": {
"enabled": true,
"readers": [
{
"id": "mysql", // identifier of the EventReader profile
"type": "*sql", // reader type <*file_csv>
"run_delay": "1", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
"source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", // read data from this path
"id": "mysql",
"type": "*sql",
"run_delay": "1",
"concurrent_requests": 1024,
"source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306",
"opts": {
"sqlDBName":"cgrates2",
"sqlDBNameProcessed":"cgrates2",
"sqlTableNameProcessed":"cdrs2",
},
"processed_path": "", // move processed data here
"tenant": "cgrates.org", // tenant used by import
"filters": [], // limit parsing based on the filters
"flags": [], // flags to influence the event processing
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
"processed_path": "*delete",
"tenant": "cgrates.org",
"filters": [],
"flags": [],
"fields":[
{"tag": "CGRID", "type": "*composed", "value": "~*req.cgrid", "path": "*cgreq.CGRID"},
],
},
@@ -484,7 +480,7 @@ func testSQLReader3(t *testing.T) {
rdrEvents = make(chan *erEvent, 1)
rdrErr = make(chan error, 1)
rdrExit = make(chan struct{}, 1)
sqlER, err := NewEventReader(sqlCfg, 1, rdrEvents, make(chan *erEvent, 1), rdrErr, new(engine.FilterS), rdrExit)
sqlER, err := NewEventReader(sqlCfg, 1, rdrEvents, make(chan *erEvent, 1), rdrErr, new(engine.FilterS), rdrExit, nil)
if err != nil {
t.Fatal(err)
}

View File

@@ -0,0 +1,424 @@
//go:build integration
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package general_tests
import (
"bufio"
"bytes"
"fmt"
"strings"
"testing"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
var (
db *gorm.DB
dbConnString = "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'"
cdr1 = &engine.CDR{ // sample with values not realisticy calculated
CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()),
OrderID: 123,
ToR: utils.MetaVoice,
OriginID: "dsafdsaf",
OriginHost: "192.168.1.1",
Source: "test",
RequestType: utils.MetaRated,
Tenant: "cgrates.org",
Category: "call",
Account: "1001",
Subject: "1001",
Destination: "1002",
SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
RunID: utils.MetaDefault,
Usage: 10 * time.Second,
ExtraInfo: "extraInfo",
Partial: false,
PreRated: true,
CostSource: "cost source",
CostDetails: &engine.EventCost{
CGRID: "test1",
RunID: utils.MetaDefault,
StartTime: time.Date(2017, 1, 9, 16, 18, 21, 0, time.UTC),
Usage: utils.DurationPointer(3 * time.Minute),
Cost: utils.Float64Pointer(2.3),
Charges: []*engine.ChargingInterval{
{
RatingID: "c1a5ab9",
Increments: []*engine.ChargingIncrement{
{
Usage: 2 * time.Minute,
Cost: 2.0,
AccountingID: "a012888",
CompressFactor: 1,
},
{
Usage: time.Second,
Cost: 0.005,
AccountingID: "44d6c02",
CompressFactor: 60,
},
},
CompressFactor: 1,
},
},
AccountSummary: &engine.AccountSummary{
Tenant: "cgrates.org",
ID: "testV1CDRsRefundOutOfSessionCost",
BalanceSummaries: []*engine.BalanceSummary{
{
UUID: "uuid1",
Type: utils.MetaMonetary,
Value: 50,
},
},
AllowNegative: false,
Disabled: false,
},
Rating: engine.Rating{
"c1a5ab9": &engine.RatingUnit{
ConnectFee: 0.1,
RoundingMethod: "*up",
RoundingDecimals: 5,
RatesID: "ec1a177",
RatingFiltersID: "43e77dc",
},
},
Accounting: engine.Accounting{
"a012888": &engine.BalanceCharge{
AccountID: "cgrates.org:testV1CDRsRefundOutOfSessionCost",
BalanceUUID: "uuid1",
Units: 120.7,
},
"44d6c02": &engine.BalanceCharge{
AccountID: "cgrates.org:testV1CDRsRefundOutOfSessionCost",
BalanceUUID: "uuid1",
Units: 120.7,
},
},
Rates: engine.ChargedRates{
"ec1a177": engine.RateGroups{
&engine.RGRate{
GroupIntervalStart: 0,
Value: 0.01,
RateIncrement: time.Minute,
RateUnit: time.Second},
},
},
},
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
}
timeStart = time.Now()
cgrID = utils.Sha1("dsafdsaf", timeStart.String())
cdr2 = &engine.CDR{ // sample with values not realisticy calculated
CGRID: cgrID,
OrderID: 123,
ToR: utils.MetaVoice,
OriginID: "dsafdsaf",
OriginHost: "192.168.1.1",
Source: "test",
RequestType: utils.MetaRated,
Tenant: "cgrates.org",
Category: "call",
Account: "1001",
Subject: "1001",
Destination: "1002",
SetupTime: timeStart,
AnswerTime: timeStart,
RunID: utils.MetaDefault,
Usage: 10 * time.Second,
ExtraInfo: "extraInfo",
Partial: false,
PreRated: true,
CostSource: "cost source",
CostDetails: &engine.EventCost{
CGRID: "test1",
RunID: utils.MetaDefault,
StartTime: time.Date(2017, 1, 9, 16, 18, 21, 0, time.UTC),
Usage: utils.DurationPointer(3 * time.Minute),
Cost: utils.Float64Pointer(2.3),
Charges: []*engine.ChargingInterval{
{
RatingID: "RatingID2",
Increments: []*engine.ChargingIncrement{
{
Usage: 2 * time.Minute,
Cost: 2.0,
AccountingID: "a012888",
CompressFactor: 1,
},
{
Usage: time.Second,
Cost: 0.005,
AccountingID: "44d6c02",
CompressFactor: 60,
},
},
CompressFactor: 1,
},
},
AccountSummary: &engine.AccountSummary{
Tenant: "cgrates.org",
ID: "testV1CDRsRefundOutOfSessionCost",
BalanceSummaries: []*engine.BalanceSummary{
{
UUID: "uuid1",
Type: utils.MetaMonetary,
Value: 50,
},
},
AllowNegative: false,
Disabled: false,
},
Rating: engine.Rating{
"c1a5ab9": &engine.RatingUnit{
ConnectFee: 0.1,
RoundingMethod: "*up",
RoundingDecimals: 5,
RatesID: "ec1a177",
RatingFiltersID: "43e77dc",
},
},
Accounting: engine.Accounting{
"a012888": &engine.BalanceCharge{
AccountID: "cgrates.org:testV1CDRsRefundOutOfSessionCost",
BalanceUUID: "uuid1",
Units: 120.7,
},
"44d6c02": &engine.BalanceCharge{
AccountID: "cgrates.org:testV1CDRsRefundOutOfSessionCost",
BalanceUUID: "uuid1",
Units: 120.7,
},
},
Rates: engine.ChargedRates{
"ec1a177": engine.RateGroups{
&engine.RGRate{
GroupIntervalStart: 0,
Value: 0.01,
RateIncrement: time.Minute,
RateUnit: time.Second},
},
},
},
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
}
)
func TestERSSQLFilters(t *testing.T) {
var dbcfg engine.DBCfg
switch *utils.DBType {
case utils.MetaInternal:
dbcfg = engine.InternalDBCfg
case utils.MetaMySQL:
case utils.MetaMongo:
dbcfg = engine.MongoDBCfg
case utils.MetaPostgres:
dbcfg = engine.PostgresDBCfg
default:
t.Fatal("unsupported dbtype value")
}
t.Run("InitSQLDB", func(t *testing.T) {
var err error
var db2 *gorm.DB
if db2, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates")),
&gorm.Config{
AllowGlobalUpdate: true,
Logger: logger.Default.LogMode(logger.Silent),
}); err != nil {
t.Fatal(err)
}
if err = db2.Exec(`CREATE DATABASE IF NOT EXISTS cgrates2;`).Error; err != nil {
t.Fatal(err)
}
})
type testModelSql struct {
ID int64
Cgrid string
RunID string
OriginHost string
Source string
OriginID string
ToR string
RequestType string
Tenant string
Category string
Account string
Subject string
Destination string
SetupTime time.Time
AnswerTime time.Time
Usage int64
ExtraFields string
CostSource string
Cost float64
CostDetails string
ExtraInfo string
CreatedAt time.Time
UpdatedAt time.Time
DeletedAt *time.Time
}
t.Run("PutCDRsInDataBase", func(t *testing.T) {
var err error
if db, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates2")),
&gorm.Config{
AllowGlobalUpdate: true,
Logger: logger.Default.LogMode(logger.Silent),
}); err != nil {
t.Fatal(err)
}
tx := db.Begin()
if !tx.Migrator().HasTable("cdrs") {
if err = tx.Migrator().CreateTable(new(engine.CDRsql)); err != nil {
tx.Rollback()
t.Fatal(err)
}
}
tx.Commit()
tx = db.Begin()
tx = tx.Table(utils.CDRsTBL)
cdrSql := cdr1.AsCDRsql()
cdrSql2 := cdr2.AsCDRsql()
cdrSql.CreatedAt = time.Now()
cdrSql2.CreatedAt = time.Now()
saved := tx.Save(cdrSql)
if saved.Error != nil {
tx.Rollback()
t.Fatal(err)
}
saved = tx.Save(cdrSql2)
if saved.Error != nil {
tx.Rollback()
t.Fatal(err)
}
tx.Commit()
time.Sleep(10 * time.Millisecond)
var result int64
db.Table(utils.CDRsTBL).Count(&result)
if result != 2 {
t.Error("Expected table to have only one result ", result)
}
})
defer t.Run("StopSQL", func(t *testing.T) {
if err := db.Migrator().DropTable("cdrs"); err != nil {
t.Fatal(err)
}
if err := db.Exec(`DROP DATABASE cgrates2;`).Error; err != nil {
t.Fatal(err)
}
if db2, err := db.DB(); err != nil {
t.Fatal(err)
} else if err = db2.Close(); err != nil {
t.Fatal(err)
}
})
content := `{
"general": {
"log_level": 7
},
"apiers": {
"enabled": true
},
"ers": {
"enabled": true,
"sessions_conns":["*localhost"],
"apiers_conns": ["*localhost"],
"readers": [
{
"id": "mysql",
"type": "*sql",
"run_delay": "1m",
"source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306",
"opts": {
"sqlDBName":"cgrates2",
},
"processed_path": "",
"tenant": "cgrates.org",
"filters": [
"*gt:~*req.answer_time:NOW() - INTERVAL 7 DAY", // dont process cdrs with answer_time older than 7 days ago (continue if answer_time > now-7days)
"*eq:~*req.cost_details.Charges[0].RatingID:RatingID2",
],
"flags": ["*dryrun"],
"fields":[
{"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true},
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true},
{"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true},
{"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true},
],
},
],
},
}`
buf := &bytes.Buffer{}
ng := engine.TestEngine{
ConfigJSON: content,
DBCfg: dbcfg,
LogBuffer: buf,
}
ng.Run(t)
t.Run("VerifyProcessedFieldsFromLogs", func(t *testing.T) {
time.Sleep(100 * time.Millisecond) // give enough time to process from sql table
records := 0
scanner := bufio.NewScanner(strings.NewReader(buf.String()))
timeStartFormated := timeStart.Format("2006-01-02T15:04:05-07:00")
expectedLog := fmt.Sprintf("\"Event\":{\"Account\":\"1001\",\"AnswerTime\":\"%s\",\"CGRID\":\"%s\",\"Category\":\"call\",\"CostDetails\":\"{\\\"CGRID\\\":\\\"test1\\\",\\\"RunID\\\":\\\"*default\\\",\\\"StartTime\\\":\\\"2017-01-09T16:18:21Z\\\",\\\"Usage\\\":180000000000,\\\"Cost\\\":2.3,\\\"Charges\\\":[{\\\"RatingID\\\":\\\"RatingID2\\\",\\\"Increments\\\":[{\\\"Usage\\\":120000000000,\\\"Cost\\\":2,\\\"AccountingID\\\":\\\"a012888\\\",\\\"CompressFactor\\\":1},{\\\"Usage\\\":1000000000,\\\"Cost\\\":0.005,\\\"AccountingID\\\":\\\"44d6c02\\\",\\\"CompressFactor\\\":60}],\\\"CompressFactor\\\":1}],\\\"AccountSummary\\\":{\\\"Tenant\\\":\\\"cgrates.org\\\",\\\"ID\\\":\\\"testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceSummaries\\\":[{\\\"UUID\\\":\\\"uuid1\\\",\\\"ID\\\":\\\"\\\",\\\"Type\\\":\\\"*monetary\\\",\\\"Initial\\\":0,\\\"Value\\\":50,\\\"Disabled\\\":false}],\\\"AllowNegative\\\":false,\\\"Disabled\\\":false},\\\"Rating\\\":{\\\"c1a5ab9\\\":{\\\"ConnectFee\\\":0.1,\\\"RoundingMethod\\\":\\\"*up\\\",\\\"RoundingDecimals\\\":5,\\\"MaxCost\\\":0,\\\"MaxCostStrategy\\\":\\\"\\\",\\\"TimingID\\\":\\\"\\\",\\\"RatesID\\\":\\\"ec1a177\\\",\\\"RatingFiltersID\\\":\\\"43e77dc\\\"}},\\\"Accounting\\\":{\\\"44d6c02\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"},\\\"a012888\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"}},\\\"RatingFilters\\\":null,\\\"Rates\\\":{\\\"ec1a177\\\":[{\\\"GroupIntervalStart\\\":0,\\\"Value\\\":0.01,\\\"RateIncrement\\\":60000000000,\\\"RateUnit\\\":1000000000}]},\\\"Timings\\\":null}\",\"Destination\":\"1002\",\"OriginID\":\"dsafdsaf\",\"RequestType\":\"*rated\",\"SetupTime\":\"%s\",\"Subject\":\"1001\",\"Tenant\":\"cgrates.org\",\"ToR\":\"*voice\",\"Usage\":\"10000000000\"},\"APIOpts\":{}}>", timeStartFormated, cgrID, timeStartFormated)
for scanner.Scan() {
line := scanner.Text()
if !strings.Contains(line, "<ERs> DRYRUN, reader: <mysql>") {
continue
}
records++
if !strings.Contains(line, expectedLog) {
t.Errorf("expected \n<%s>, \nreceived\n<%s>", expectedLog, line)
}
}
if err := scanner.Err(); err != nil {
t.Errorf("error reading input: %v", err)
}
if records != 1 {
t.Errorf("expected ERs to process 1 records, but it processed %d records", records)
}
})
}

View File

@@ -35,6 +35,7 @@ import (
// NewEventReaderService returns the EventReader Service
func NewEventReaderService(
cfg *config.CGRConfig,
dm *DataDBService,
filterSChan chan *engine.FilterS,
shdChan *utils.SyncedChan,
connMgr *engine.ConnManager,
@@ -48,6 +49,7 @@ func NewEventReaderService(
cfg: cfg,
filterSChan: filterSChan,
shdChan: shdChan,
dm: dm,
connMgr: connMgr,
server: server,
intConn: intConn,
@@ -66,6 +68,7 @@ type EventReaderService struct {
ers *ers.ERService
rldChan chan struct{}
stopChan chan struct{}
dm *DataDBService
connMgr *engine.ConnManager
server *cores.Server
intConn chan birpc.ClientConnector
@@ -84,6 +87,9 @@ func (erS *EventReaderService) Start() (err error) {
filterS := <-erS.filterSChan
erS.filterSChan <- filterS
dbchan := erS.dm.GetDMChan()
datadb := <-dbchan
dbchan <- datadb
// remake the stop chan
erS.stopChan = make(chan struct{})
@@ -91,7 +97,7 @@ func (erS *EventReaderService) Start() (err error) {
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs))
// build the service
erS.ers = ers.NewERService(erS.cfg, filterS, erS.connMgr)
erS.ers = ers.NewERService(erS.cfg, datadb, filterS, erS.connMgr)
go erS.listenAndServe(erS.ers, erS.stopChan, erS.rldChan)
// Register ERsV1 methods.

View File

@@ -68,7 +68,7 @@ func TestEventReaderSReload(t *testing.T) {
db := NewDataDBService(cfg, nil, false, srvDep)
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, anz, srvDep)
intERsConn := make(chan birpc.ClientConnector, 1)
erS := NewEventReaderService(cfg, filterSChan, shdChan, nil, server, intERsConn, anz, srvDep)
erS := NewEventReaderService(cfg, nil, filterSChan, shdChan, nil, server, intERsConn, anz, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(erS, sS,
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
@@ -139,8 +139,8 @@ func TestEventReaderSReload2(t *testing.T) {
shdChan := utils.NewSyncedChan()
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
server := cores.NewServer(nil)
erS := NewEventReaderService(cfg, filterSChan, shdChan, nil, server, nil, nil, srvDep)
ers := ers.NewERService(cfg, nil, nil)
erS := NewEventReaderService(cfg, nil, filterSChan, shdChan, nil, server, nil, nil, srvDep)
ers := ers.NewERService(cfg, nil, nil, nil)
runtime.Gosched()
srv := erS.(*EventReaderService)

View File

@@ -39,7 +39,7 @@ func TestEventReaderSCoverage(t *testing.T) {
shdChan := utils.NewSyncedChan()
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
server := cores.NewServer(nil)
srv := NewEventReaderService(cfg, filterSChan, shdChan, nil, server, nil, nil, srvDep)
srv := NewEventReaderService(cfg, nil, filterSChan, shdChan, nil, server, nil, nil, srvDep)
if srv.IsRunning() {
t.Errorf("Expected service to be down")

View File

@@ -318,6 +318,7 @@ const (
CreateCDRsTablesSQL = "create_cdrs_tables.sql"
CreateTariffPlanTablesSQL = "create_tariffplan_tables.sql"
TestSQL = "TEST_SQL"
MetaDelete = "*delete"
MetaConstant = "*constant"
MetaPositive = "*positive"
MetaNegative = "*negative"