From 321910d181d6ad864349b21416203ec74f845538 Mon Sep 17 00:00:00 2001 From: arberkatellari Date: Wed, 13 Nov 2024 18:31:11 +0200 Subject: [PATCH] Add WHERE statement availability and non-delete option to ERS SQL reader --- cmd/cgr-engine/cgr-engine.go | 2 +- ers/ers.go | 6 +- ers/ers_it_test.go | 50 +-- ers/ers_test.go | 2 +- ers/reader.go | 4 +- ers/readers_test.go | 26 +- ers/sql.go | 206 ++++++++++- ers/sql_it_test.go | 56 ++- general_tests/ers_sql_filters_it_test.go | 424 +++++++++++++++++++++++ services/ers.go | 8 +- services/ers_it_test.go | 6 +- services/ers_test.go | 2 +- utils/consts.go | 1 + 13 files changed, 698 insertions(+), 95 deletions(-) create mode 100644 general_tests/ers_sql_filters_it_test.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 54d0b762a..6b42819a5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -627,7 +627,7 @@ func main() { ldrs, anz, dspS, dspH, dmService, storDBService, services.NewEventExporterService(cfg, filterSChan, connManager, server, internalEEsChan, anz, srvDep), - services.NewEventReaderService(cfg, filterSChan, + services.NewEventReaderService(cfg, dmService, filterSChan, shdChan, connManager, server, internalERsChan, anz, srvDep), services.NewSIPAgent(cfg, filterSChan, shdChan, connManager, srvDep), services.NewJanusAgent(cfg, filterSChan, server, connManager, srvDep), diff --git a/ers/ers.go b/ers/ers.go index 664369c92..1ab27897e 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -46,9 +46,10 @@ type erEvent struct { } // NewERService instantiates the ERService -func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engine.ConnManager) (ers *ERService) { +func NewERService(cfg *config.CGRConfig, datadb *engine.DataManager, filterS *engine.FilterS, connMgr *engine.ConnManager) (ers *ERService) { ers = &ERService{ cfg: cfg, + dataManager: datadb, rdrs: make(map[string]EventReader), rdrPaths: make(map[string]string), stopLsn: make(map[string]chan struct{}), @@ -67,6 +68,7 @@ func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engin type ERService struct { sync.RWMutex cfg *config.CGRConfig + dataManager *engine.DataManager rdrs map[string]EventReader // map[rdrID]EventReader rdrPaths map[string]string // used for reloads in case of path changes stopLsn map[string]chan struct{} // map[rdrID] chan struct{} @@ -220,7 +222,7 @@ func (erS *ERService) addReader(rdrID string, cfgIdx int) (err error) { var rdr EventReader if rdr, err = NewEventReader(erS.cfg, cfgIdx, erS.rdrEvents, erS.partialEvents, erS.rdrErr, - erS.filterS, erS.stopLsn[rdrID]); err != nil { + erS.filterS, erS.stopLsn[rdrID], erS.dataManager); err != nil { return } erS.rdrs[rdrID] = rdr diff --git a/ers/ers_it_test.go b/ers/ers_it_test.go index 0be185359..59bffc40c 100644 --- a/ers/ers_it_test.go +++ b/ers/ers_it_test.go @@ -53,7 +53,7 @@ func TestERsNewERService(t *testing.T) { rdrEvents: make(chan *erEvent), rdrErr: make(chan error), } - rcv := NewERService(cfg, fltrS, nil) + rcv := NewERService(cfg, nil, fltrS, nil) if !reflect.DeepEqual(expected.cfg, rcv.cfg) { t.Errorf("Expecting: <%+v>, received: <%+v>", expected.cfg, rcv.cfg) @@ -65,7 +65,7 @@ func TestERsNewERService(t *testing.T) { func TestERsAddReader(t *testing.T) { cfg := config.NewDefaultCGRConfig() fltrS := &engine.FilterS{} - erS := NewERService(cfg, fltrS, nil) + erS := NewERService(cfg, nil, fltrS, nil) reader := cfg.ERsCfg().Readers[0] reader.Type = utils.MetaFileCSV reader.ID = "file_reader" @@ -89,7 +89,7 @@ func TestERsListenAndServeErr(t *testing.T) { {}, } fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) stopChan := make(chan struct{}, 1) cfgRldChan := make(chan struct{}, 1) err := srv.ListenAndServe(stopChan, cfgRldChan) @@ -106,7 +106,7 @@ func TestERsProcessEventErr(t *testing.T) { }, } fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) rdrCfg := &config.EventReaderCfg{ ID: "", Type: "", @@ -130,7 +130,7 @@ func TestERsCloseAllRdrs(t *testing.T) { }, } fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) srv.stopLsn[""] = make(chan struct{}, 1) srv.closeAllRdrs() } @@ -142,7 +142,7 @@ func TestERsListenAndServeRdrErr(t *testing.T) { }, } fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) stopChan := make(chan struct{}, 1) cfgRldChan := make(chan struct{}, 1) srv.rdrErr = make(chan error, 1) @@ -162,7 +162,7 @@ func TestERsListenAndServeStopchan(t *testing.T) { }, } fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) stopChan := make(chan struct{}, 1) cfgRldChan := make(chan struct{}, 1) stopChan <- struct{}{} @@ -181,7 +181,7 @@ func TestERsListenAndServeRdrEvents(t *testing.T) { }, } fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) stopChan := make(chan struct{}, 1) cfgRldChan := make(chan struct{}, 1) srv.rdrErr = make(chan error, 1) @@ -208,7 +208,7 @@ func TestERsListenAndServeCfgRldChan(t *testing.T) { }, } fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) stopChan := make(chan struct{}, 1) cfgRldChan := make(chan struct{}, 1) srv.rdrErr = make(chan error, 1) @@ -232,7 +232,7 @@ func TestERsListenAndServeCfgRldChan2(t *testing.T) { }, } fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) exp := &CSVFileER{ cgrCfg: cfg, cfgIdx: 0, @@ -266,7 +266,7 @@ func TestERsListenAndServeCfgRldChan3(t *testing.T) { }, } fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) exp := &CSVFileER{ cgrCfg: cfg, cfgIdx: 0, @@ -299,7 +299,7 @@ func TestERsListenAndServeCfgRldChan4(t *testing.T) { }, } fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) exp := &CSVFileER{ cgrCfg: cfg, cfgIdx: 0, @@ -334,7 +334,7 @@ func TestERsListenAndServeCfgRldChan5(t *testing.T) { }, } fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) exp := &CSVFileER{ cgrCfg: cfg, } @@ -368,7 +368,7 @@ func TestERsListenAndServeCfgRldChan6(t *testing.T) { }, } fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) exp := &CSVFileER{ cgrCfg: cfg, cfgIdx: 0, @@ -408,7 +408,7 @@ func TestERsProcessEvent(t *testing.T) { }, } fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) rdrCfg := &config.EventReaderCfg{ Flags: map[string]utils.FlagParams{ utils.MetaLog: map[string][]string{ @@ -431,7 +431,7 @@ func TestERsProcessEvent2(t *testing.T) { }, } fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) rdrCfg := &config.EventReaderCfg{ Flags: map[string]utils.FlagParams{ utils.MetaDryRun: map[string][]string{ @@ -455,7 +455,7 @@ func TestERsProcessEvent3(t *testing.T) { } cfg.ERsCfg().SessionSConns = []string{} fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) rdrCfg := &config.EventReaderCfg{ Flags: map[string]utils.FlagParams{ utils.MetaEvent: map[string][]string{}, @@ -482,7 +482,7 @@ func TestERsProcessEvent4(t *testing.T) { } cfg.ERsCfg().SessionSConns = []string{} fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) rdrCfg := &config.EventReaderCfg{ Flags: map[string]utils.FlagParams{ utils.MetaAuthorize: map[string][]string{}, @@ -509,7 +509,7 @@ func TestERsProcessEvent5(t *testing.T) { } cfg.ERsCfg().SessionSConns = []string{} fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) rdrCfg := &config.EventReaderCfg{ Flags: map[string]utils.FlagParams{ utils.MetaTerminate: map[string][]string{}, @@ -536,7 +536,7 @@ func TestERsProcessEvent6(t *testing.T) { } cfg.ERsCfg().SessionSConns = []string{} fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) rdrCfg := &config.EventReaderCfg{ Flags: map[string]utils.FlagParams{ utils.MetaInitiate: map[string][]string{}, @@ -562,7 +562,7 @@ func TestERsProcessEvent7(t *testing.T) { } cfg.ERsCfg().SessionSConns = []string{} fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) rdrCfg := &config.EventReaderCfg{ Flags: map[string]utils.FlagParams{ utils.MetaUpdate: map[string][]string{}, @@ -588,7 +588,7 @@ func TestERsProcessEvent8(t *testing.T) { } cfg.ERsCfg().SessionSConns = []string{} fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) rdrCfg := &config.EventReaderCfg{ Flags: map[string]utils.FlagParams{ utils.MetaMessage: map[string][]string{}, @@ -615,7 +615,7 @@ func TestERsProcessEvent9(t *testing.T) { } cfg.ERsCfg().SessionSConns = []string{} fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) rdrCfg := &config.EventReaderCfg{ Flags: map[string]utils.FlagParams{ utils.MetaCDRs: map[string][]string{}, @@ -642,7 +642,7 @@ func TestERsProcessEvent10(t *testing.T) { } cfg.ERsCfg().SessionSConns = []string{} fltrS := &engine.FilterS{} - srv := NewERService(cfg, fltrS, nil) + srv := NewERService(cfg, nil, fltrS, nil) rdrCfg := &config.EventReaderCfg{ Flags: map[string]utils.FlagParams{ utils.MetaMessage: map[string][]string{}, @@ -697,7 +697,7 @@ func TestERsProcessEvent11(t *testing.T) { connMng := engine.NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS): clientChan, }) - srv := NewERService(cfg, fltrS, connMng) + srv := NewERService(cfg, nil, fltrS, connMng) rdrCfg := &config.EventReaderCfg{ Flags: map[string]utils.FlagParams{ utils.MetaMessage: map[string][]string{}, diff --git a/ers/ers_test.go b/ers/ers_test.go index 62215adff..b717c045f 100644 --- a/ers/ers_test.go +++ b/ers/ers_test.go @@ -34,7 +34,7 @@ import ( func TestERsProcessPartialEvent(t *testing.T) { cfg := config.NewDefaultCGRConfig() - erS := NewERService(cfg, nil, nil) + erS := NewERService(cfg, nil, nil, nil) event := &utils.CGREvent{ Tenant: "cgrates.org", ID: "EventERsProcessPartial", diff --git a/ers/reader.go b/ers/reader.go index 42f78d3e1..d47e481fa 100644 --- a/ers/reader.go +++ b/ers/reader.go @@ -34,7 +34,7 @@ type EventReader interface { // NewEventReader instantiates the event reader based on configuration at index func NewEventReader(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents chan *erEvent, rdrErr chan error, - fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { + fltrS *engine.FilterS, rdrExit chan struct{}, dm *engine.DataManager) (er EventReader, err error) { switch cfg.ERsCfg().Readers[cfgIdx].Type { default: err = fmt.Errorf("unsupported reader type: <%s>", cfg.ERsCfg().Readers[cfgIdx].Type) @@ -47,7 +47,7 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents case utils.MetaKafkajsonMap: return NewKafkaER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) case utils.MetaSQL: - return NewSQLEventReader(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) + return NewSQLEventReader(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit, dm) case utils.MetaFileJSON: return NewJSONFileER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) case utils.MetaAMQPjsonMap: diff --git a/ers/readers_test.go b/ers/readers_test.go index 5ca464a73..7c82d9c3f 100644 --- a/ers/readers_test.go +++ b/ers/readers_test.go @@ -36,7 +36,7 @@ func TestNewInvalidReader(t *testing.T) { if len(cfg.ERsCfg().Readers) != 2 { t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers)) } - if _, err := NewEventReader(cfg, 1, nil, nil, nil, &engine.FilterS{}, nil); err == nil || err.Error() != "unsupported reader type: " { + if _, err := NewEventReader(cfg, 1, nil, nil, nil, &engine.FilterS{}, nil, nil); err == nil || err.Error() != "unsupported reader type: " { t.Errorf("Expecting: >, received: <%+v>", err) } } @@ -61,7 +61,7 @@ func TestNewCsvReader(t *testing.T) { rdrExit: nil, conReqs: nil} var expected EventReader = exp - if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil { + if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil); err != nil { t.Errorf("Expecting: , received: <%+v>", err) } else { // because we use function make to init the channel when we create the EventReader reflect.DeepEqual @@ -88,7 +88,7 @@ func TestNewKafkaReader(t *testing.T) { if err != nil { t.Errorf("Expecting: , received: <%+v>", err) } - if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil { + if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil); err != nil { t.Errorf("Expecting: , received: <%+v>", err) } else if !reflect.DeepEqual(expected, rcv) { t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv) @@ -113,11 +113,11 @@ func TestNewSQLReader(t *testing.T) { if len(cfg.ERsCfg().Readers) != 2 { t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers)) } - expected, err := NewSQLEventReader(cfg, 1, nil, nil, nil, fltr, nil) + expected, err := NewSQLEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil) if err != nil { t.Errorf("Expecting: , received: <%+v>", err) } - if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil { + if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil, nil); err != nil { t.Errorf("Expecting: , received: <%+v>", err) } else if !reflect.DeepEqual(expected, rcv) { t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv) @@ -137,7 +137,7 @@ func TestNewSQLReaderError(t *testing.T) { reader.SourcePath = "#" reader.ProcessedPath = "" expected := "unknown db_type " - _, err := NewSQLEventReader(cfg, 0, nil, nil, nil, fltr, nil) + _, err := NewSQLEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil) if err == nil || err.Error() != expected { t.Errorf("Expecting: <%+v>, received: <%+v>", expected, err) } @@ -151,7 +151,7 @@ func TestNewFileXMLReader(t *testing.T) { if err != nil { t.Error(err) } - rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil) if err != nil { t.Error(err) } else { @@ -171,7 +171,7 @@ func TestNewFileFWVReader(t *testing.T) { if err != nil { t.Error(err) } - rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil) if err != nil { t.Error(nil) } else { @@ -191,7 +191,7 @@ func TestNewJSONReader(t *testing.T) { if err != nil { t.Error(err) } - rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil) if err != nil { t.Error(err) } else { @@ -215,7 +215,7 @@ func TestNewAMQPReader(t *testing.T) { exp.Config().ProcessedPath = "" exp.createClient(&config.AMQPROpts{}, nil) // var expected EventReader = exp - rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil) if err != nil { t.Error(err) } else if !reflect.DeepEqual(exp, rcv) { @@ -239,7 +239,7 @@ func TestNewAMQPv1Reader(t *testing.T) { exp.Config().ProcessedPath = "" exp.Config().Opts = &config.EventReaderOpts{} var expected EventReader = exp - rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil) if err != nil { t.Error(err) } else if !reflect.DeepEqual(expected, rcv) { @@ -264,7 +264,7 @@ func TestNewS3Reader(t *testing.T) { exp.Config().ProcessedPath = "" exp.Config().Opts = &config.EventReaderOpts{} var expected EventReader = exp - rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil) if err != nil { t.Error(err) } else if !reflect.DeepEqual(expected, rcv) { @@ -300,7 +300,7 @@ func TestNewSQSReader(t *testing.T) { exp.Config().ProcessedPath = "" exp.Config().Opts = &config.EventReaderOpts{} var expected EventReader = exp - rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil, nil) exp.session = rcv.(*SQSER).session if err != nil { t.Error(err) diff --git a/ers/sql.go b/ers/sql.go index 4e67ab9d7..31f782956 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -46,10 +46,10 @@ const ( // NewSQLEventReader return a new sql event reader func NewSQLEventReader(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents chan *erEvent, rdrErr chan error, - fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { - + fltrS *engine.FilterS, rdrExit chan struct{}, dm *engine.DataManager) (er EventReader, err error) { rdr := &SQLEventReader{ cgrCfg: cfg, + dm: dm, cfgIdx: cfgIdx, fltrS: fltrS, rdrEvents: rdrEvents, @@ -70,6 +70,7 @@ func NewSQLEventReader(cfg *config.CGRConfig, cfgIdx int, type SQLEventReader struct { // sync.RWMutex cgrCfg *config.CGRConfig + dm *engine.DataManager cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS @@ -99,7 +100,7 @@ func (rdr *SQLEventReader) openDB(dialect gorm.Dialector) (err error) { return } sqlDB.SetMaxOpenConns(10) - if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API + if rdr.Config().RunDelay <= 0 { // 0 disables the automatic read, maybe done per API return } go rdr.readLoop(db, sqlDB) // read until the connection is closed @@ -121,6 +122,141 @@ func (rdr *SQLEventReader) Serve() (err error) { return } +// Creates mysql conditions used in WHERE statement out of filters +func valueQry(ruleType, elem, field string, values []string, not bool) (conditions []string) { + // here are for the filters that their values are empty: *exists, *notexists, *empty, *notempty.. + if len(values) == 0 { + switch ruleType { + case utils.MetaExists, utils.MetaNotExists: + if not { + if elem == utils.EmptyString { + conditions = append(conditions, fmt.Sprintf(" %s IS NOT NULL", field)) + return + } + conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') IS NOT NULL", elem, field)) + return + } + if elem == utils.EmptyString { + conditions = append(conditions, fmt.Sprintf(" %s IS NULL", field)) + return + } + conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') IS NULL", elem, field)) + case utils.MetaEmpty, utils.MetaNotEmpty: + if not { + if elem == utils.EmptyString { + conditions = append(conditions, fmt.Sprintf(" %s != ''", field)) + return + } + conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') != ''", elem, field)) + return + } + if elem == utils.EmptyString { + conditions = append(conditions, fmt.Sprintf(" %s == ''", field)) + return + } + conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') == ''", elem, field)) + } + return + } + // here are for the filters that can have more than one value: *string, *prefix, *suffix .. + for _, value := range values { + switch value { // in case we have boolean values, it should be queried over 1 or 0 + case "true": + value = "1" + case "false": + value = "0" + } + var singleCond string + switch ruleType { + case utils.MetaString, utils.MetaNotString, utils.MetaEqual, utils.MetaNotEqual: + if not { + if elem == utils.EmptyString { + conditions = append(conditions, fmt.Sprintf(" %s != '%s'", field, value)) + continue + } + conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') != '%s'", + elem, field, value)) + continue + } + if elem == utils.EmptyString { + singleCond = fmt.Sprintf(" %s = '%s'", field, value) + } else { + singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') = '%s'", elem, field, value) + } + case utils.MetaLessThan, utils.MetaLessOrEqual, utils.MetaGreaterThan, utils.MetaGreaterOrEqual: + if ruleType == utils.MetaGreaterOrEqual { + if elem == utils.EmptyString { + singleCond = fmt.Sprintf(" %s >= %s", field, value) + } else { + singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') >= %s", elem, field, value) + } + } else if ruleType == utils.MetaGreaterThan { + if elem == utils.EmptyString { + singleCond = fmt.Sprintf(" %s > %s", field, value) + } else { + singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') > %s", elem, field, value) + } + } else if ruleType == utils.MetaLessOrEqual { + if elem == utils.EmptyString { + singleCond = fmt.Sprintf(" %s <= %s", field, value) + } else { + singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') <= %s", elem, field, value) + } + } else if ruleType == utils.MetaLessThan { + if elem == utils.EmptyString { + singleCond = fmt.Sprintf(" %s < %s", field, value) + } else { + singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') < %s", elem, field, value) + } + } + case utils.MetaPrefix, utils.MetaNotPrefix: + if not { + if elem == utils.EmptyString { + conditions = append(conditions, fmt.Sprintf(" %s NOT LIKE '%s%%'", field, value)) + continue + } + conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') NOT LIKE '%s%%'", elem, field, value)) + continue + } + if elem == utils.EmptyString { + singleCond = fmt.Sprintf(" %s LIKE '%s%%'", field, value) + } else { + singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') LIKE '%s%%'", elem, field, value) + } + case utils.MetaSuffix, utils.MetaNotSuffix: + if not { + if elem == utils.EmptyString { + conditions = append(conditions, fmt.Sprintf(" %s NOT LIKE '%%%s'", field, value)) + continue + } + conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') NOT LIKE '%%%s'", elem, field, value)) + continue + } + if elem == utils.EmptyString { + singleCond = fmt.Sprintf(" %s LIKE '%%%s'", field, value) + } else { + singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') LIKE '%%%s'", elem, field, value) + } + case utils.MetaRegex, utils.MetaNotRegex: + if not { + if elem == utils.EmptyString { + conditions = append(conditions, fmt.Sprintf(" %s NOT REGEXP '%s'", field, value)) + continue + } + conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') NOT REGEXP '%s'", elem, field, value)) + continue + } + if elem == utils.EmptyString { + singleCond = fmt.Sprintf(" %s REGEXP '%s'", field, value) + } else { + singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') REGEXP '%s'", elem, field, value) + } + } + conditions = append(conditions, singleCond) + } + return +} + func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { defer sqlDB.Close() if rdr.Config().StartDelay > 0 { @@ -134,8 +270,47 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { } } tm := time.NewTimer(0) + var filters []*engine.Filter + var whereQueries []string + var renewedFltrs []string + for _, fltr := range rdr.Config().Filters { + if result, err := rdr.dm.GetFilter(config.CgrConfig().GeneralCfg().DefaultTenant, fltr, true, false, utils.NonTransactional); err != nil { + rdr.rdrErr <- err + return + } else { + filters = append(filters, result) + if !strings.Contains(fltr, utils.DynamicDataPrefix+utils.MetaReq) { + renewedFltrs = append(renewedFltrs, fltr) + } + } + } + rdr.Config().Filters = renewedFltrs // remove filters containing *req + for _, filter := range filters { + for _, rule := range filter.Rules { + var elem, field string + switch { + case strings.HasPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep): + field = strings.TrimPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep) + parts := strings.SplitN(field, ".", 2) + if len(parts) == 2 { // Split in 2 pieces if it contains any more dots in the field + // First part (before the first dot) + elem = parts[0] + // Second part (everything after the first dot) + field = parts[1] + } + default: + continue + } + conditions := valueQry(rule.Type, elem, field, rule.Values, strings.HasPrefix(rule.Type, utils.MetaNot)) + whereQueries = append(whereQueries, strings.Join(conditions, " OR ")) + } + } for { - rows, err := db.Table(rdr.tableName).Select(utils.Meta).Rows() + tx := db.Table(rdr.tableName).Select(utils.Meta) + for _, whereQ := range whereQueries { + tx = tx.Where(whereQ) + } + rows, err := tx.Rows() if err != nil { rdr.rdrErr <- err return @@ -194,13 +369,15 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { fltr[colName] = utils.IfaceAsString(columns[i]) } } - if err = db.Table(rdr.tableName).Delete(nil, fltr).Error; err != nil { // to ensure we don't read it again - utils.Logger.Warning( - fmt.Sprintf("<%s> deleting message %s error: %s", - utils.ERs, utils.ToJSON(msg), err.Error())) - rdr.rdrErr <- err - rows.Close() - return + if rdr.Config().ProcessedPath == utils.MetaDelete { + if err = db.Table(rdr.tableName).Delete(nil, fltr).Error; err != nil { // to ensure we don't read it again + utils.Logger.Warning( + fmt.Sprintf("<%s> deleting message %s error: %s", + utils.ERs, utils.ToJSON(msg), err.Error())) + rdr.rdrErr <- err + rows.Close() + return + } } go func(msg map[string]any) { @@ -215,11 +392,8 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { }(msg) } rows.Close() - if rdr.Config().RunDelay < 0 { - return - } - tm.Reset(rdr.Config().RunDelay) - select { + tm.Reset(rdr.Config().RunDelay) // reset the timer to RunDelay + select { // wait for timer or rdrExit case <-rdr.rdrExit: tm.Stop() utils.Logger.Info( diff --git a/ers/sql_it_test.go b/ers/sql_it_test.go index bf4e5299b..4493281b4 100644 --- a/ers/sql_it_test.go +++ b/ers/sql_it_test.go @@ -89,26 +89,24 @@ func testSQLInitConfig(t *testing.T) { "stor_db": { "db_password": "CGRateS.org", }, - "ers": { // EventReaderService - "enabled": true, // starts the EventReader service: + "ers": { + "enabled": true, "sessions_conns":["*localhost"], "readers": [ { - "id": "mysql", // identifier of the EventReader profile - "type": "*sql", // reader type <*file_csv> - "run_delay": "1", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together - "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited - "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", // read data from this path + "id": "mysql", + "type": "*sql", + "run_delay": "1", + "concurrent_requests": 1024, + "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", "opts": { "sqlDBName":"cgrates2", - "sqlDBNameProcessed":"cgrates2", - "sqlTableNameProcessed":"cdrs2", }, - "processed_path": "", // move processed data here - "tenant": "cgrates.org", // tenant used by import - "filters": [], // limit parsing based on the filters - "flags": [], // flags to influence the event processing - "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + "processed_path": "*delete", + "tenant": "cgrates.org", + "filters": [], + "flags": [], + "fields":[ {"tag": "CGRID", "type": "*composed", "value": "~*req.cgrid", "path": "*cgreq.CGRID"}, {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ], @@ -236,7 +234,7 @@ func testSQLReader(t *testing.T) { rdrEvents = make(chan *erEvent, 1) rdrErr = make(chan error, 1) rdrExit = make(chan struct{}, 1) - sqlER, err := NewEventReader(sqlCfg, 1, rdrEvents, make(chan *erEvent, 1), rdrErr, new(engine.FilterS), rdrExit) + sqlER, err := NewEventReader(sqlCfg, 1, rdrEvents, make(chan *erEvent, 1), rdrErr, new(engine.FilterS), rdrExit, nil) if err != nil { t.Fatal(err) } @@ -373,25 +371,23 @@ func testSQLInitConfig2(t *testing.T) { "stor_db": { "db_password": "CGRateS.org", }, - "ers": { // EventReaderService - "enabled": true, // starts the EventReader service: + "ers": { + "enabled": true, "readers": [ { - "id": "mysql", // identifier of the EventReader profile - "type": "*sql", // reader type <*file_csv> - "run_delay": "1", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together - "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited - "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", // read data from this path + "id": "mysql", + "type": "*sql", + "run_delay": "1", + "concurrent_requests": 1024, + "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", "opts": { "sqlDBName":"cgrates2", - "sqlDBNameProcessed":"cgrates2", - "sqlTableNameProcessed":"cdrs2", }, - "processed_path": "", // move processed data here - "tenant": "cgrates.org", // tenant used by import - "filters": [], // limit parsing based on the filters - "flags": [], // flags to influence the event processing - "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + "processed_path": "*delete", + "tenant": "cgrates.org", + "filters": [], + "flags": [], + "fields":[ {"tag": "CGRID", "type": "*composed", "value": "~*req.cgrid", "path": "*cgreq.CGRID"}, ], }, @@ -484,7 +480,7 @@ func testSQLReader3(t *testing.T) { rdrEvents = make(chan *erEvent, 1) rdrErr = make(chan error, 1) rdrExit = make(chan struct{}, 1) - sqlER, err := NewEventReader(sqlCfg, 1, rdrEvents, make(chan *erEvent, 1), rdrErr, new(engine.FilterS), rdrExit) + sqlER, err := NewEventReader(sqlCfg, 1, rdrEvents, make(chan *erEvent, 1), rdrErr, new(engine.FilterS), rdrExit, nil) if err != nil { t.Fatal(err) } diff --git a/general_tests/ers_sql_filters_it_test.go b/general_tests/ers_sql_filters_it_test.go new file mode 100644 index 000000000..d99d906af --- /dev/null +++ b/general_tests/ers_sql_filters_it_test.go @@ -0,0 +1,424 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package general_tests + +import ( + "bufio" + "bytes" + "fmt" + "strings" + "testing" + "time" + + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "gorm.io/driver/mysql" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +var ( + db *gorm.DB + dbConnString = "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'" + cdr1 = &engine.CDR{ // sample with values not realisticy calculated + CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), + OrderID: 123, + ToR: utils.MetaVoice, + OriginID: "dsafdsaf", + OriginHost: "192.168.1.1", + Source: "test", + RequestType: utils.MetaRated, + Tenant: "cgrates.org", + Category: "call", + Account: "1001", + Subject: "1001", + Destination: "1002", + SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + RunID: utils.MetaDefault, + Usage: 10 * time.Second, + ExtraInfo: "extraInfo", + Partial: false, + PreRated: true, + CostSource: "cost source", + CostDetails: &engine.EventCost{ + CGRID: "test1", + RunID: utils.MetaDefault, + StartTime: time.Date(2017, 1, 9, 16, 18, 21, 0, time.UTC), + Usage: utils.DurationPointer(3 * time.Minute), + Cost: utils.Float64Pointer(2.3), + Charges: []*engine.ChargingInterval{ + { + RatingID: "c1a5ab9", + Increments: []*engine.ChargingIncrement{ + { + Usage: 2 * time.Minute, + Cost: 2.0, + AccountingID: "a012888", + CompressFactor: 1, + }, + { + Usage: time.Second, + Cost: 0.005, + AccountingID: "44d6c02", + CompressFactor: 60, + }, + }, + CompressFactor: 1, + }, + }, + AccountSummary: &engine.AccountSummary{ + Tenant: "cgrates.org", + ID: "testV1CDRsRefundOutOfSessionCost", + BalanceSummaries: []*engine.BalanceSummary{ + { + UUID: "uuid1", + Type: utils.MetaMonetary, + Value: 50, + }, + }, + AllowNegative: false, + Disabled: false, + }, + Rating: engine.Rating{ + "c1a5ab9": &engine.RatingUnit{ + ConnectFee: 0.1, + RoundingMethod: "*up", + RoundingDecimals: 5, + RatesID: "ec1a177", + RatingFiltersID: "43e77dc", + }, + }, + Accounting: engine.Accounting{ + "a012888": &engine.BalanceCharge{ + AccountID: "cgrates.org:testV1CDRsRefundOutOfSessionCost", + BalanceUUID: "uuid1", + Units: 120.7, + }, + "44d6c02": &engine.BalanceCharge{ + AccountID: "cgrates.org:testV1CDRsRefundOutOfSessionCost", + BalanceUUID: "uuid1", + Units: 120.7, + }, + }, + Rates: engine.ChargedRates{ + "ec1a177": engine.RateGroups{ + &engine.RGRate{ + GroupIntervalStart: 0, + Value: 0.01, + RateIncrement: time.Minute, + RateUnit: time.Second}, + }, + }, + }, + ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, + } + timeStart = time.Now() + cgrID = utils.Sha1("dsafdsaf", timeStart.String()) + cdr2 = &engine.CDR{ // sample with values not realisticy calculated + CGRID: cgrID, + OrderID: 123, + ToR: utils.MetaVoice, + OriginID: "dsafdsaf", + OriginHost: "192.168.1.1", + Source: "test", + RequestType: utils.MetaRated, + Tenant: "cgrates.org", + Category: "call", + Account: "1001", + Subject: "1001", + Destination: "1002", + SetupTime: timeStart, + AnswerTime: timeStart, + RunID: utils.MetaDefault, + Usage: 10 * time.Second, + ExtraInfo: "extraInfo", + Partial: false, + PreRated: true, + CostSource: "cost source", + CostDetails: &engine.EventCost{ + CGRID: "test1", + RunID: utils.MetaDefault, + StartTime: time.Date(2017, 1, 9, 16, 18, 21, 0, time.UTC), + Usage: utils.DurationPointer(3 * time.Minute), + Cost: utils.Float64Pointer(2.3), + Charges: []*engine.ChargingInterval{ + { + RatingID: "RatingID2", + Increments: []*engine.ChargingIncrement{ + { + Usage: 2 * time.Minute, + Cost: 2.0, + AccountingID: "a012888", + CompressFactor: 1, + }, + { + Usage: time.Second, + Cost: 0.005, + AccountingID: "44d6c02", + CompressFactor: 60, + }, + }, + CompressFactor: 1, + }, + }, + AccountSummary: &engine.AccountSummary{ + Tenant: "cgrates.org", + ID: "testV1CDRsRefundOutOfSessionCost", + BalanceSummaries: []*engine.BalanceSummary{ + { + UUID: "uuid1", + Type: utils.MetaMonetary, + Value: 50, + }, + }, + AllowNegative: false, + Disabled: false, + }, + Rating: engine.Rating{ + "c1a5ab9": &engine.RatingUnit{ + ConnectFee: 0.1, + RoundingMethod: "*up", + RoundingDecimals: 5, + RatesID: "ec1a177", + RatingFiltersID: "43e77dc", + }, + }, + Accounting: engine.Accounting{ + "a012888": &engine.BalanceCharge{ + AccountID: "cgrates.org:testV1CDRsRefundOutOfSessionCost", + BalanceUUID: "uuid1", + Units: 120.7, + }, + "44d6c02": &engine.BalanceCharge{ + AccountID: "cgrates.org:testV1CDRsRefundOutOfSessionCost", + BalanceUUID: "uuid1", + Units: 120.7, + }, + }, + Rates: engine.ChargedRates{ + "ec1a177": engine.RateGroups{ + &engine.RGRate{ + GroupIntervalStart: 0, + Value: 0.01, + RateIncrement: time.Minute, + RateUnit: time.Second}, + }, + }, + }, + ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, + } +) + +func TestERSSQLFilters(t *testing.T) { + var dbcfg engine.DBCfg + switch *utils.DBType { + case utils.MetaInternal: + dbcfg = engine.InternalDBCfg + case utils.MetaMySQL: + case utils.MetaMongo: + dbcfg = engine.MongoDBCfg + case utils.MetaPostgres: + dbcfg = engine.PostgresDBCfg + default: + t.Fatal("unsupported dbtype value") + } + + t.Run("InitSQLDB", func(t *testing.T) { + var err error + var db2 *gorm.DB + if db2, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates")), + &gorm.Config{ + AllowGlobalUpdate: true, + Logger: logger.Default.LogMode(logger.Silent), + }); err != nil { + t.Fatal(err) + } + + if err = db2.Exec(`CREATE DATABASE IF NOT EXISTS cgrates2;`).Error; err != nil { + t.Fatal(err) + } + }) + + type testModelSql struct { + ID int64 + Cgrid string + RunID string + OriginHost string + Source string + OriginID string + ToR string + RequestType string + Tenant string + Category string + Account string + Subject string + Destination string + SetupTime time.Time + AnswerTime time.Time + Usage int64 + ExtraFields string + CostSource string + Cost float64 + CostDetails string + ExtraInfo string + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt *time.Time + } + t.Run("PutCDRsInDataBase", func(t *testing.T) { + var err error + if db, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates2")), + &gorm.Config{ + AllowGlobalUpdate: true, + Logger: logger.Default.LogMode(logger.Silent), + }); err != nil { + t.Fatal(err) + } + tx := db.Begin() + if !tx.Migrator().HasTable("cdrs") { + if err = tx.Migrator().CreateTable(new(engine.CDRsql)); err != nil { + tx.Rollback() + t.Fatal(err) + } + } + tx.Commit() + tx = db.Begin() + tx = tx.Table(utils.CDRsTBL) + cdrSql := cdr1.AsCDRsql() + cdrSql2 := cdr2.AsCDRsql() + cdrSql.CreatedAt = time.Now() + cdrSql2.CreatedAt = time.Now() + saved := tx.Save(cdrSql) + if saved.Error != nil { + tx.Rollback() + t.Fatal(err) + } + saved = tx.Save(cdrSql2) + if saved.Error != nil { + tx.Rollback() + t.Fatal(err) + } + tx.Commit() + time.Sleep(10 * time.Millisecond) + var result int64 + db.Table(utils.CDRsTBL).Count(&result) + if result != 2 { + t.Error("Expected table to have only one result ", result) + } + }) + defer t.Run("StopSQL", func(t *testing.T) { + if err := db.Migrator().DropTable("cdrs"); err != nil { + t.Fatal(err) + } + if err := db.Exec(`DROP DATABASE cgrates2;`).Error; err != nil { + t.Fatal(err) + } + if db2, err := db.DB(); err != nil { + t.Fatal(err) + } else if err = db2.Close(); err != nil { + t.Fatal(err) + } + }) + + content := `{ + + "general": { + "log_level": 7 + }, + + "apiers": { + "enabled": true + }, + + "ers": { + "enabled": true, + "sessions_conns":["*localhost"], + "apiers_conns": ["*localhost"], + "readers": [ + { + "id": "mysql", + "type": "*sql", + "run_delay": "1m", + "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", + "opts": { + "sqlDBName":"cgrates2", + }, + "processed_path": "", + "tenant": "cgrates.org", + "filters": [ + "*gt:~*req.answer_time:NOW() - INTERVAL 7 DAY", // dont process cdrs with answer_time older than 7 days ago (continue if answer_time > now-7days) + "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", + ], + "flags": ["*dryrun"], + "fields":[ + {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true}, + {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true}, + {"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true}, + ], + }, + ], + }, + + }` + + buf := &bytes.Buffer{} + ng := engine.TestEngine{ + ConfigJSON: content, + DBCfg: dbcfg, + LogBuffer: buf, + } + ng.Run(t) + + t.Run("VerifyProcessedFieldsFromLogs", func(t *testing.T) { + time.Sleep(100 * time.Millisecond) // give enough time to process from sql table + records := 0 + scanner := bufio.NewScanner(strings.NewReader(buf.String())) + timeStartFormated := timeStart.Format("2006-01-02T15:04:05-07:00") + expectedLog := fmt.Sprintf("\"Event\":{\"Account\":\"1001\",\"AnswerTime\":\"%s\",\"CGRID\":\"%s\",\"Category\":\"call\",\"CostDetails\":\"{\\\"CGRID\\\":\\\"test1\\\",\\\"RunID\\\":\\\"*default\\\",\\\"StartTime\\\":\\\"2017-01-09T16:18:21Z\\\",\\\"Usage\\\":180000000000,\\\"Cost\\\":2.3,\\\"Charges\\\":[{\\\"RatingID\\\":\\\"RatingID2\\\",\\\"Increments\\\":[{\\\"Usage\\\":120000000000,\\\"Cost\\\":2,\\\"AccountingID\\\":\\\"a012888\\\",\\\"CompressFactor\\\":1},{\\\"Usage\\\":1000000000,\\\"Cost\\\":0.005,\\\"AccountingID\\\":\\\"44d6c02\\\",\\\"CompressFactor\\\":60}],\\\"CompressFactor\\\":1}],\\\"AccountSummary\\\":{\\\"Tenant\\\":\\\"cgrates.org\\\",\\\"ID\\\":\\\"testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceSummaries\\\":[{\\\"UUID\\\":\\\"uuid1\\\",\\\"ID\\\":\\\"\\\",\\\"Type\\\":\\\"*monetary\\\",\\\"Initial\\\":0,\\\"Value\\\":50,\\\"Disabled\\\":false}],\\\"AllowNegative\\\":false,\\\"Disabled\\\":false},\\\"Rating\\\":{\\\"c1a5ab9\\\":{\\\"ConnectFee\\\":0.1,\\\"RoundingMethod\\\":\\\"*up\\\",\\\"RoundingDecimals\\\":5,\\\"MaxCost\\\":0,\\\"MaxCostStrategy\\\":\\\"\\\",\\\"TimingID\\\":\\\"\\\",\\\"RatesID\\\":\\\"ec1a177\\\",\\\"RatingFiltersID\\\":\\\"43e77dc\\\"}},\\\"Accounting\\\":{\\\"44d6c02\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"},\\\"a012888\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"}},\\\"RatingFilters\\\":null,\\\"Rates\\\":{\\\"ec1a177\\\":[{\\\"GroupIntervalStart\\\":0,\\\"Value\\\":0.01,\\\"RateIncrement\\\":60000000000,\\\"RateUnit\\\":1000000000}]},\\\"Timings\\\":null}\",\"Destination\":\"1002\",\"OriginID\":\"dsafdsaf\",\"RequestType\":\"*rated\",\"SetupTime\":\"%s\",\"Subject\":\"1001\",\"Tenant\":\"cgrates.org\",\"ToR\":\"*voice\",\"Usage\":\"10000000000\"},\"APIOpts\":{}}>", timeStartFormated, cgrID, timeStartFormated) + for scanner.Scan() { + line := scanner.Text() + if !strings.Contains(line, " DRYRUN, reader: ") { + continue + } + records++ + if !strings.Contains(line, expectedLog) { + t.Errorf("expected \n<%s>, \nreceived\n<%s>", expectedLog, line) + } + } + if err := scanner.Err(); err != nil { + t.Errorf("error reading input: %v", err) + } + if records != 1 { + t.Errorf("expected ERs to process 1 records, but it processed %d records", records) + } + }) +} diff --git a/services/ers.go b/services/ers.go index cc2da0964..0ad1b0248 100644 --- a/services/ers.go +++ b/services/ers.go @@ -35,6 +35,7 @@ import ( // NewEventReaderService returns the EventReader Service func NewEventReaderService( cfg *config.CGRConfig, + dm *DataDBService, filterSChan chan *engine.FilterS, shdChan *utils.SyncedChan, connMgr *engine.ConnManager, @@ -48,6 +49,7 @@ func NewEventReaderService( cfg: cfg, filterSChan: filterSChan, shdChan: shdChan, + dm: dm, connMgr: connMgr, server: server, intConn: intConn, @@ -66,6 +68,7 @@ type EventReaderService struct { ers *ers.ERService rldChan chan struct{} stopChan chan struct{} + dm *DataDBService connMgr *engine.ConnManager server *cores.Server intConn chan birpc.ClientConnector @@ -84,6 +87,9 @@ func (erS *EventReaderService) Start() (err error) { filterS := <-erS.filterSChan erS.filterSChan <- filterS + dbchan := erS.dm.GetDMChan() + datadb := <-dbchan + dbchan <- datadb // remake the stop chan erS.stopChan = make(chan struct{}) @@ -91,7 +97,7 @@ func (erS *EventReaderService) Start() (err error) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ERs)) // build the service - erS.ers = ers.NewERService(erS.cfg, filterS, erS.connMgr) + erS.ers = ers.NewERService(erS.cfg, datadb, filterS, erS.connMgr) go erS.listenAndServe(erS.ers, erS.stopChan, erS.rldChan) // Register ERsV1 methods. diff --git a/services/ers_it_test.go b/services/ers_it_test.go index 8698c92b8..d905ed3cb 100644 --- a/services/ers_it_test.go +++ b/services/ers_it_test.go @@ -68,7 +68,7 @@ func TestEventReaderSReload(t *testing.T) { db := NewDataDBService(cfg, nil, false, srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, anz, srvDep) intERsConn := make(chan birpc.ClientConnector, 1) - erS := NewEventReaderService(cfg, filterSChan, shdChan, nil, server, intERsConn, anz, srvDep) + erS := NewEventReaderService(cfg, nil, filterSChan, shdChan, nil, server, intERsConn, anz, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(erS, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db) @@ -139,8 +139,8 @@ func TestEventReaderSReload2(t *testing.T) { shdChan := utils.NewSyncedChan() srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} server := cores.NewServer(nil) - erS := NewEventReaderService(cfg, filterSChan, shdChan, nil, server, nil, nil, srvDep) - ers := ers.NewERService(cfg, nil, nil) + erS := NewEventReaderService(cfg, nil, filterSChan, shdChan, nil, server, nil, nil, srvDep) + ers := ers.NewERService(cfg, nil, nil, nil) runtime.Gosched() srv := erS.(*EventReaderService) diff --git a/services/ers_test.go b/services/ers_test.go index 2f6e85f6e..3a240e3cf 100644 --- a/services/ers_test.go +++ b/services/ers_test.go @@ -39,7 +39,7 @@ func TestEventReaderSCoverage(t *testing.T) { shdChan := utils.NewSyncedChan() srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} server := cores.NewServer(nil) - srv := NewEventReaderService(cfg, filterSChan, shdChan, nil, server, nil, nil, srvDep) + srv := NewEventReaderService(cfg, nil, filterSChan, shdChan, nil, server, nil, nil, srvDep) if srv.IsRunning() { t.Errorf("Expected service to be down") diff --git a/utils/consts.go b/utils/consts.go index e4d4e9b0c..0ec5f3e63 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -318,6 +318,7 @@ const ( CreateCDRsTablesSQL = "create_cdrs_tables.sql" CreateTariffPlanTablesSQL = "create_tariffplan_tables.sql" TestSQL = "TEST_SQL" + MetaDelete = "*delete" MetaConstant = "*constant" MetaPositive = "*positive" MetaNegative = "*negative"