diff --git a/config/config_defaults.go b/config/config_defaults.go index 6fefcac67..6b66be146 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -409,8 +409,9 @@ const CGRATES_CFG_JSON = ` "filters": [], // limit parsing based on the filters "flags": [], // flags to influence the event processing "reconnects": -1, // number of retries in case of connection lost - "ees_success_ids": [], // ids of exporters used for moving the successfully processed event - "ees_failed_ids": [], // ids of exporters used for moving the unprocessed event + "ees_ids": [], // ids of exporters used for moving the processed event to EEs + "ees_success_ids": [], // ids of exporters used for moving the raw event to EEs + "ees_failed_ids": [], // ids of exporters used for moving the failed raw event to EEs "max_reconnect_interval": "5m", // time to wait in between reconnect attempts "opts": { // Partial @@ -450,7 +451,7 @@ const CGRATES_CFG_JSON = ` // SQL // "sqlDBName": "cgrates", // the name of the database from were the events are read // "sqlTableName": "cdrs", // the name of the table from were the events are read - // "sqlDeleteIndexedFields": [], // list of fields to DELETE from the table + // "sqlDeleteIndexedFields": [], // list of fields to DELETE from the table // "pgSSLMode": "disable", // the ssl mode for postgres db // SQS and S3 diff --git a/config/config_it_test.go b/config/config_it_test.go index a09b71af1..cc0349742 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -613,6 +613,7 @@ func testCGRConfigReloadERs(t *testing.T) { PartialCommitFields: []*FCTemplate{}, Reconnects: -1, MaxReconnectInterval: 5 * time.Minute, + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, Opts: &EventReaderOpts{ @@ -646,6 +647,7 @@ func testCGRConfigReloadERs(t *testing.T) { PartialCommitFields: []*FCTemplate{}, Reconnects: -1, MaxReconnectInterval: 5 * time.Minute, + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, Opts: &EventReaderOpts{ diff --git a/config/config_json_test.go b/config/config_json_test.go index 8d46c17ba..82d289847 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -2114,6 +2114,7 @@ func TestDfEventReaderCfg(t *testing.T) { Partial_commit_fields: &[]*FcTemplateJsonCfg{}, Reconnects: utils.IntPointer(-1), Max_reconnect_interval: utils.StringPointer("5m"), + Ees_ids: &[]string{}, Ees_success_ids: &[]string{}, Ees_failed_ids: &[]string{}, Opts: &EventReaderOptsJson{ diff --git a/config/config_test.go b/config/config_test.go index 7e2a8452d..d5eee9cb3 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -2182,6 +2182,7 @@ func TestERSConfig(t *testing.T) { PartialCommitFields: make([]*FCTemplate, 0), Reconnects: -1, MaxReconnectInterval: 300000000000, + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, Opts: &EventReaderOpts{ @@ -5222,6 +5223,7 @@ func TestCgrCdfEventReader(t *testing.T) { Flags: utils.FlagsWithParams{}, Reconnects: -1, MaxReconnectInterval: 300000000000, + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, Fields: []*FCTemplate{ @@ -5337,6 +5339,7 @@ func TestCgrCfgEventReaderDefault(t *testing.T) { Flags: utils.FlagsWithParams{}, Reconnects: -1, MaxReconnectInterval: 300000000000, + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, Fields: []*FCTemplate{ diff --git a/config/configsanity.go b/config/configsanity.go index cefdb3acc..e69f430df 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -724,20 +724,27 @@ func (cfg *CGRConfig) checkConfigSanity() error { } } for _, rdr := range cfg.ersCfg.Readers { - if len(rdr.EEsSuccessIDs) != 0 || len(rdr.EEsFailedIDs) != 0 { + if len(rdr.EEsSuccessIDs) != 0 || len(rdr.EEsFailedIDs) != 0 || len(rdr.EEsIDs) != 0 { if len(cfg.ersCfg.EEsConns) == 0 || !cfg.eesCfg.Enabled { return fmt.Errorf("<%s> connection to <%s> required due to exporter ID references", utils.ERs, utils.EEs) } } exporterIDs := cfg.eesCfg.exporterIDs() - for _, eesID := range rdr.EEsSuccessIDs { - if !slices.Contains(exporterIDs, eesID) { - return fmt.Errorf("<%s> exporter with id %s not defined", utils.ERs, eesID) + if slices.Contains(cfg.ersCfg.EEsConns, utils.MetaInternal) { + for _, eesID := range rdr.EEsIDs { + if !slices.Contains(exporterIDs, eesID) { + return fmt.Errorf("<%s> exporter with id %s not defined", utils.ERs, eesID) + } } - } - for _, eesID := range rdr.EEsFailedIDs { - if !slices.Contains(exporterIDs, eesID) { - return fmt.Errorf("<%s> exporter with id %s not defined", utils.ERs, eesID) + for _, eesID := range rdr.EEsSuccessIDs { + if !slices.Contains(exporterIDs, eesID) { + return fmt.Errorf("<%s> exporter with id %s not defined", utils.ERs, eesID) + } + } + for _, eesID := range rdr.EEsFailedIDs { + if !slices.Contains(exporterIDs, eesID) { + return fmt.Errorf("<%s> exporter with id %s not defined", utils.ERs, eesID) + } } } if !possibleReaderTypes.Has(rdr.Type) { diff --git a/config/erscfg.go b/config/erscfg.go index 9c1b907a0..d5b88e927 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -429,6 +429,7 @@ type EventReaderCfg struct { Flags utils.FlagsWithParams Reconnects int MaxReconnectInterval time.Duration + EEsIDs []string EEsSuccessIDs []string EEsFailedIDs []string Opts *EventReaderOpts @@ -527,6 +528,10 @@ func (er *EventReaderCfg) loadFromJSONCfg(jsnCfg *EventReaderJsonCfg, msgTemplat return err } } + if jsnCfg.Ees_ids != nil { + er.EEsIDs = make([]string, len(*jsnCfg.Ees_ids)) + copy(er.EEsIDs, *jsnCfg.Ees_ids) + } if jsnCfg.Ees_success_ids != nil { er.EEsSuccessIDs = make([]string, len(*jsnCfg.Ees_success_ids)) copy(er.EEsSuccessIDs, *jsnCfg.Ees_success_ids) @@ -811,6 +816,7 @@ func (er EventReaderCfg) Clone() (cln *EventReaderCfg) { Flags: er.Flags.Clone(), Reconnects: er.Reconnects, MaxReconnectInterval: er.MaxReconnectInterval, + EEsIDs: slices.Clone(er.EEsIDs), EEsSuccessIDs: slices.Clone(er.EEsSuccessIDs), EEsFailedIDs: slices.Clone(er.EEsFailedIDs), Opts: er.Opts.Clone(), @@ -1005,6 +1011,9 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string utils.OptsCfg: opts, } + if len(er.EEsIDs) != 0 { + initialMP[utils.EEsIDsCfg] = er.EEsIDs + } if len(er.EEsSuccessIDs) != 0 { initialMP[utils.EEsSuccessIDsCfg] = er.EEsSuccessIDs } diff --git a/config/erscfg_test.go b/config/erscfg_test.go index f6b24f002..b1c22cc51 100644 --- a/config/erscfg_test.go +++ b/config/erscfg_test.go @@ -104,6 +104,7 @@ func TestERSClone(t *testing.T) { }, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, Opts: &EventReaderOpts{ @@ -147,6 +148,7 @@ func TestERSClone(t *testing.T) { Value: NewRSRParsersMustCompile("~*req.2", utils.InfieldSep), Mandatory: true, Layout: time.RFC3339}, }, PartialCommitFields: make([]*FCTemplate, 0), + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, Opts: &EventReaderOpts{ @@ -280,6 +282,7 @@ func TestERSLoadFromjsonCfg(t *testing.T) { }, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, Opts: &EventReaderOpts{ @@ -338,6 +341,7 @@ func TestERSLoadFromjsonCfg(t *testing.T) { }, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, Opts: &EventReaderOpts{ @@ -536,6 +540,7 @@ func TestERSloadFromJsonCase3(t *testing.T) { }, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, Opts: &EventReaderOpts{ @@ -578,6 +583,7 @@ func TestERSloadFromJsonCase3(t *testing.T) { PartialCommitFields: make([]*FCTemplate, 0), Reconnects: 5, MaxReconnectInterval: 3 * time.Minute, + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, Opts: &EventReaderOpts{ @@ -697,6 +703,7 @@ func TestERSloadFromJsonCase4(t *testing.T) { }, CacheDumpFields: make([]*FCTemplate, 0), PartialCommitFields: make([]*FCTemplate, 0), + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, Opts: &EventReaderOpts{ @@ -728,6 +735,7 @@ func TestERSloadFromJsonCase4(t *testing.T) { Filters: []string{}, Flags: utils.FlagsWithParams{}, Fields: []*FCTemplate{}, + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, CacheDumpFields: []*FCTemplate{ @@ -825,6 +833,7 @@ func TestEventReaderSameID(t *testing.T) { Flags: utils.FlagsWithParams{}, Reconnects: -1, MaxReconnectInterval: 5 * time.Minute, + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, Fields: []*FCTemplate{ @@ -881,6 +890,7 @@ func TestEventReaderSameID(t *testing.T) { Timezone: utils.EmptyString, Filters: []string{}, Flags: utils.FlagsWithParams{}, + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, Fields: []*FCTemplate{ @@ -1303,6 +1313,7 @@ func TestERsloadFromJsonCfg(t *testing.T) { Flags: utils.FlagsWithParams{}, Reconnects: -1, MaxReconnectInterval: 5 * time.Minute, + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, Fields: []*FCTemplate{ @@ -1361,6 +1372,7 @@ func TestERsloadFromJsonCfg(t *testing.T) { Flags: utils.FlagsWithParams{}, Reconnects: -1, MaxReconnectInterval: 5 * time.Minute, + EEsIDs: []string{}, EEsSuccessIDs: []string{}, EEsFailedIDs: []string{}, Fields: []*FCTemplate{ diff --git a/config/libconfig_json.go b/config/libconfig_json.go index f497e927e..f0420701d 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -284,6 +284,7 @@ type EventReaderJsonCfg struct { Flags *[]string Reconnects *int Max_reconnect_interval *string + Ees_ids *[]string Ees_success_ids *[]string Ees_failed_ids *[]string Opts *EventReaderOptsJson diff --git a/data/conf/samples/ers_mysql_delete_indexed_fields/cgrates.json b/data/conf/samples/ers_mysql_delete_indexed_fields/cgrates.json index cd7f90923..6eab99ca9 100644 --- a/data/conf/samples/ers_mysql_delete_indexed_fields/cgrates.json +++ b/data/conf/samples/ers_mysql_delete_indexed_fields/cgrates.json @@ -1,61 +1,60 @@ { +"general": { + "log_level": 7 +}, - "general": { - "log_level": 7 +"apiers": { + "enabled": true +}, +"filters": { + "apiers_conns": ["*localhost"] +}, +"stor_db": { + "opts": { + "sqlConnMaxLifetime": "5s", // needed while running all integration tests }, - - "apiers": { - "enabled": true - }, - "filters": { - "apiers_conns": ["*localhost"] - }, - "stor_db": { - "opts": { - "sqlConnMaxLifetime": "5s", // needed while running all integration tests - }, - }, - "ers": { - "enabled": true, - "sessions_conns":["*localhost"], - "readers": [ - { - "id": "mysql", - "type": "*sql", - "run_delay": "1m", - "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", - "opts": { - "sqlDBName":"cgrates2", - "sqlTableName":"cdrs", - "sqlDeleteIndexedFields": ["id"], - }, - "start_delay": "500ms", // wait for db to be populated before starting reader - "processed_path": "*delete", - "tenant": "cgrates.org", - "filters": [ - "*gt:~*req.answer_time:-168h", // dont process cdrs with answer_time older than 7 days ago - "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", - "*string:~*vars.*readerID:mysql", - "FLTR_VARS", // "*string:~*vars.*readerID:mysql", - ], - "flags": ["*dryrun"], - "fields":[ - {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, - {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, - {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true}, - {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true}, - {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true}, - {"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true}, - ], +}, +"ers": { + "enabled": true, + "sessions_conns":["*localhost"], + "readers": [ + { + "id": "mysql", + "type": "*sql", + "run_delay": "1m", + "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", + "opts": { + "sqlDBName":"cgrates2", + "sqlTableName":"cdrs", + "sqlDeleteIndexedFields": ["id"], }, - ], - }, - + "start_delay": "500ms", // wait for db to be populated before starting reader + "processed_path": "*delete", + "tenant": "cgrates.org", + "filters": [ + "*gt:~*req.answer_time:-168h", // dont process cdrs with answer_time older than 7 days ago + "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", + "*string:~*vars.*readerID:mysql", + "FLTR_VARS", // "*string:~*vars.*readerID:mysql", + ], + "flags": ["*dryrun"], + "fields":[ + {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true}, + {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true}, + {"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true}, + ], + }, + ], +}, + } \ No newline at end of file diff --git a/data/conf/samples/ers_mysql_filters/cgrates.json b/data/conf/samples/ers_mysql_filters/cgrates.json index 0233f4641..bc9c33021 100644 --- a/data/conf/samples/ers_mysql_filters/cgrates.json +++ b/data/conf/samples/ers_mysql_filters/cgrates.json @@ -1,59 +1,58 @@ { +"general": { + "log_level": 7 +}, - "general": { - "log_level": 7 +"apiers": { + "enabled": true +}, +"filters": { + "apiers_conns": ["*localhost"] +}, +"stor_db": { + "opts": { + "sqlConnMaxLifetime": "5s", // needed while running all integration tests }, - - "apiers": { - "enabled": true - }, - "filters": { - "apiers_conns": ["*localhost"] - }, - "stor_db": { - "opts": { - "sqlConnMaxLifetime": "5s", // needed while running all integration tests - }, - }, - "ers": { - "enabled": true, - "sessions_conns":["*localhost"], - "readers": [ - { - "id": "mysql", - "type": "*sql", - "run_delay": "1m", - "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", - "opts": { - "sqlDBName":"cgrates2", - "sqlTableName":"cdrs", - }, - "start_delay": "500ms", // wait for db to be populated before starting reader - "tenant": "cgrates.org", - "filters": [ - "*gt:~*req.answer_time:-168h", // dont process cdrs with answer_time older than 7 days ago - "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", - "*string:~*vars.*readerID:mysql", - "FLTR_VARS", // "*string:~*vars.*readerID:mysql", - ], - "flags": ["*dryrun"], - "fields":[ - {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, - {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, - {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true}, - {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true}, - {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true}, - {"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true}, - ], +}, +"ers": { + "enabled": true, + "sessions_conns":["*localhost"], + "readers": [ + { + "id": "mysql", + "type": "*sql", + "run_delay": "1m", + "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", + "opts": { + "sqlDBName":"cgrates2", + "sqlTableName":"cdrs", }, - ], - }, - + "start_delay": "500ms", // wait for db to be populated before starting reader + "tenant": "cgrates.org", + "filters": [ + "*gt:~*req.answer_time:-168h", // dont process cdrs with answer_time older than 7 days ago + "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", + "*string:~*vars.*readerID:mysql", + "FLTR_VARS", // "*string:~*vars.*readerID:mysql", + ], + "flags": ["*dryrun"], + "fields":[ + {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true}, + {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true}, + {"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true}, + ], + }, + ], +}, + } \ No newline at end of file diff --git a/data/conf/samples/ers_mysql_meta_delete/cgrates.json b/data/conf/samples/ers_mysql_meta_delete/cgrates.json index 3f769da49..ab47b761a 100644 --- a/data/conf/samples/ers_mysql_meta_delete/cgrates.json +++ b/data/conf/samples/ers_mysql_meta_delete/cgrates.json @@ -1,60 +1,59 @@ { +"general": { + "log_level": 7 +}, - "general": { - "log_level": 7 +"apiers": { + "enabled": true +}, +"filters": { + "apiers_conns": ["*localhost"] +}, +"stor_db": { + "opts": { + "sqlConnMaxLifetime": "5s", // needed while running all integration tests }, - - "apiers": { - "enabled": true - }, - "filters": { - "apiers_conns": ["*localhost"] - }, - "stor_db": { - "opts": { - "sqlConnMaxLifetime": "5s", // needed while running all integration tests - }, - }, - "ers": { - "enabled": true, - "sessions_conns":["*localhost"], - "readers": [ - { - "id": "mysql", - "type": "*sql", - "run_delay": "1m", - "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", - "opts": { - "sqlDBName":"cgrates2", - "sqlTableName":"cdrs", - }, - "start_delay": "500ms", // wait for db to be populated before starting reader - "processed_path": "*delete", - "tenant": "cgrates.org", - "filters": [ - "*gt:~*req.answer_time:-168h", // dont process cdrs with answer_time older than 7 days ago - "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", - "*string:~*vars.*readerID:mysql", - "FLTR_VARS", // "*string:~*vars.*readerID:mysql", - ], - "flags": ["*dryrun"], - "fields":[ - {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, - {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, - {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true}, - {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true}, - {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true}, - {"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true}, - ], +}, +"ers": { + "enabled": true, + "sessions_conns":["*localhost"], + "readers": [ + { + "id": "mysql", + "type": "*sql", + "run_delay": "1m", + "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", + "opts": { + "sqlDBName":"cgrates2", + "sqlTableName":"cdrs", }, - ], - }, - + "start_delay": "500ms", // wait for db to be populated before starting reader + "processed_path": "*delete", + "tenant": "cgrates.org", + "filters": [ + "*gt:~*req.answer_time:-168h", // dont process cdrs with answer_time older than 7 days ago + "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", + "*string:~*vars.*readerID:mysql", + "FLTR_VARS", // "*string:~*vars.*readerID:mysql", + ], + "flags": ["*dryrun"], + "fields":[ + {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true}, + {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true}, + {"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true}, + ], + }, + ], +}, + } \ No newline at end of file diff --git a/data/conf/samples/ers_mysql_move/cgrates.json b/data/conf/samples/ers_mysql_move/cgrates.json index 8288fdc1d..d034ae7de 100644 --- a/data/conf/samples/ers_mysql_move/cgrates.json +++ b/data/conf/samples/ers_mysql_move/cgrates.json @@ -1,77 +1,76 @@ { +"general": { + "log_level": 7 +}, - "general": { - "log_level": 7 +"apiers": { + "enabled": true +}, +"filters": { + "apiers_conns": ["*localhost"] +}, +"stor_db": { + "opts": { + "sqlConnMaxLifetime": "5s", // needed while running all integration tests }, - - "apiers": { - "enabled": true - }, - "filters": { - "apiers_conns": ["*localhost"] - }, - "stor_db": { +}, +"ees": { + "enabled": true, + "exporters": [{ + "id": "SQLExporter", + "type": "*sql", + "export_path": "mysql://cgrates:CGRateS.org@127.0.0.1:3306", + "attempts": 1, "opts": { - "sqlConnMaxLifetime": "5s", // needed while running all integration tests + "sqlDBName": "cgrates2", + "sqlTableName":"cdrsProcessed", }, - }, - "ees": { - "enabled": true, - "exporters": [{ - "id": "SQLExporter", - "type": "*sql", - "export_path": "mysql://cgrates:CGRateS.org@127.0.0.1:3306", - "attempts": 1, + "flags": ["*log"], + }] +}, +"ers": { + "enabled": true, + "sessions_conns":["*localhost"], + "ees_conns": ["*internal"], + "readers": [ + { + "id": "mysql", + "type": "*sql", + "ees_success_ids": ["SQLExporter"], + "run_delay": "1m", + "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", "opts": { - "sqlDBName": "cgrates2", - "sqlTableName":"cdrsProcessed", + "sqlDBName":"cgrates2", + "sqlTableName":"cdrs", + "sqlDeleteIndexedFields": ["id"], }, - "flags": ["*log"], - }] - }, - "ers": { - "enabled": true, - "sessions_conns":["*localhost"], - "ees_conns": ["*internal"], - "readers": [ - { - "id": "mysql", - "type": "*sql", - "ees_success_ids": ["SQLExporter"], - "run_delay": "1m", - "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", - "opts": { - "sqlDBName":"cgrates2", - "sqlTableName":"cdrs", - "sqlDeleteIndexedFields": ["id"], - }, - "start_delay": "500ms", // wait for db to be populated before starting reader - "processed_path": "*delete", - "tenant": "cgrates.org", - "filters": [ - "*gt:~*req.answer_time:-168h", // dont process cdrs with answer_time older than 7 days ago - "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", - "*string:~*vars.*readerID:mysql", - "FLTR_VARS", // "*string:~*vars.*readerID:mysql", - ], - "flags": ["*dryrun"], - "fields":[ - {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, - {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, - {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true}, - {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true}, - {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true}, - {"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true}, - ], - }, - ], - }, - + "start_delay": "500ms", // wait for db to be populated before starting reader + "processed_path": "*delete", + "tenant": "cgrates.org", + "filters": [ + "*gt:~*req.answer_time:-168h", // dont process cdrs with answer_time older than 7 days ago + "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", + "*string:~*vars.*readerID:mysql", + "FLTR_VARS", // "*string:~*vars.*readerID:mysql", + ], + "flags": ["*dryrun"], + "fields":[ + {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true}, + {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true}, + {"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true}, + ], + }, + ], +}, + } \ No newline at end of file diff --git a/data/conf/samples/ers_mysql_raw_update/cgrates.json b/data/conf/samples/ers_mysql_raw_update/cgrates.json index c61b63238..4e5ff1576 100644 --- a/data/conf/samples/ers_mysql_raw_update/cgrates.json +++ b/data/conf/samples/ers_mysql_raw_update/cgrates.json @@ -1,80 +1,81 @@ { - "general": { - "log_level": 7, + +"general": { + "log_level": 7, +}, + +"apiers": { + "enabled": true +}, +"filters": { + "apiers_conns": ["*localhost"] +}, +"stor_db": { + "opts": { + "sqlConnMaxLifetime": "5s", // needed while running all integration tests }, - - "apiers": { - "enabled": true - }, - "filters": { - "apiers_conns": ["*localhost"] - }, - "stor_db": { +}, +"ees": { + "enabled": true, + "exporters": [{ + "id": "SQLExporter", + "type": "*sql", + "export_path": "mysql://cgrates:CGRateS.org@127.0.0.1:3306", + "attempts": 1, "opts": { - "sqlConnMaxLifetime": "5s", // needed while running all integration tests + "sqlDBName": "cgrates2", + "sqlTableName":"cdrs", + "sqlUpdateIndexedFields": ["id", "cgrid"], }, - }, - "ees": { - "enabled": true, - "exporters": [{ - "id": "SQLExporter", - "type": "*sql", - "export_path": "mysql://cgrates:CGRateS.org@127.0.0.1:3306", - "attempts": 1, + "flags": ["*log"], + "fields": [ + {"tag": "SetupTime", "path": "*exp.setup_time", "type": "*constant", "value": "2018-11-27 14:21:26"}, + {"tag": "Account", "path": "*exp.account", "type": "*variable", "value": "~*req.extra_info"}, + {"tag": "ID", "path": "*exp.id", "type": "*variable", "value": "~*req.id"}, + {"tag": "CGRID", "path": "*exp.cgrid", "type": "*variable", "value": "~*req.cgrid"}, + ] + }] +}, +"ers": { + "enabled": true, + "sessions_conns":["*localhost"], + "ees_conns": ["*localhost"], + "readers": [{ + "id": "mysql", + "type": "*sql", + "ees_success_ids": ["SQLExporter"], + "run_delay": "1m", + "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", "opts": { - "sqlDBName": "cgrates2", + "sqlDBName":"cgrates2", "sqlTableName":"cdrs", - "sqlUpdateIndexedFields": ["id", "cgrid"], }, - "flags": ["*log"], - "fields": [ - {"tag": "SetupTime", "path": "*exp.setup_time", "type": "*constant", "value": "2018-11-27 14:21:26"}, - {"tag": "Account", "path": "*exp.account", "type": "*variable", "value": "~*req.extra_info"}, - {"tag": "ID", "path": "*exp.id", "type": "*variable", "value": "~*req.id"}, - {"tag": "CGRID", "path": "*exp.cgrid", "type": "*variable", "value": "~*req.cgrid"}, - ] - }] - }, - "ers": { - "enabled": true, - "sessions_conns":["*localhost"], - "ees_conns": ["*localhost"], - "readers": [{ - "id": "mysql", - "type": "*sql", - "ees_success_ids": ["SQLExporter"], - "run_delay": "1m", - "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", - "opts": { - "sqlDBName":"cgrates2", - "sqlTableName":"cdrs", - }, - "start_delay": "500ms", // wait for db to be populated before starting reader - "tenant": "cgrates.org", - "filters": [ - "*gt:~*req.answer_time:-168h", // dont process cdrs with answer_time older than 7 days ago - "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", - "*string:~*vars.*readerID:mysql", - "FLTR_VARS", // "*string:~*vars.*readerID:mysql", - ], - "flags": ["*dryrun"], - "fields":[ - {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, - {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, - {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true}, - {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true}, - {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true}, - {"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true}, - ], - }, - ], - }, - + "start_delay": "500ms", // wait for db to be populated before starting reader + "tenant": "cgrates.org", + "filters": [ + "*gt:~*req.answer_time:-168h", // dont process cdrs with answer_time older than 7 days ago + "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", + "*string:~*vars.*readerID:mysql", + "FLTR_VARS", // "*string:~*vars.*readerID:mysql", + ], + "flags": ["*dryrun"], + "fields":[ + {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true}, + {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true}, + {"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true}, + ], + }, + ], +}, + } \ No newline at end of file diff --git a/data/conf/samples/ers_mysql_update/cgrates.json b/data/conf/samples/ers_mysql_update/cgrates.json index 9f43fd065..efc42a03e 100644 --- a/data/conf/samples/ers_mysql_update/cgrates.json +++ b/data/conf/samples/ers_mysql_update/cgrates.json @@ -1,84 +1,82 @@ { +"general": { + "log_level": 7, +}, - "general": { - "log_level": 7, +"apiers": { + "enabled": true +}, +"filters": { + "apiers_conns": ["*localhost"] +}, +"stor_db": { + "opts": { + "sqlConnMaxLifetime": "5s", // needed while running all integration tests }, - - "apiers": { - "enabled": true - }, - "filters": { - "apiers_conns": ["*localhost"] - }, - "stor_db": { +}, +"ees": { + "enabled": true, + "exporters": [{ + "id": "SQLExporter", + "type": "*sql", + "export_path": "mysql://cgrates:CGRateS.org@127.0.0.1:3306", + "attempts": 1, "opts": { - "sqlConnMaxLifetime": "5s", // needed while running all integration tests + "sqlDBName": "cgrates2", + "sqlTableName":"cdrs", + "sqlUpdateIndexedFields": ["id", "cgrid"], }, - }, - "ees": { - "enabled": true, - "exporters": [{ - "id": "SQLExporter", - "type": "*sql", - "export_path": "mysql://cgrates:CGRateS.org@127.0.0.1:3306", - "attempts": 1, + "flags": ["*log"], + "fields": [ + {"tag": "SetupTime", "path": "*exp.setup_time", "type": "*constant", "value": "2018-11-27 14:21:26"}, + {"tag": "Account", "path": "*exp.account", "type": "*variable", "value": "~*req.ExtraInfo"}, + {"tag": "ID", "path": "*exp.id", "type": "*variable", "value": "~*req.Id"}, + {"tag": "CGRID", "path": "*exp.cgrid", "type": "*variable", "value": "~*req.CGRID"}, + ] + }] +}, +"ers": { + "enabled": true, + "sessions_conns":["*localhost"], + "ees_conns": ["*localhost"], + "readers": [{ + "id": "mysql", + "type": "*sql", + "run_delay": "1m", + "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", "opts": { - "sqlDBName": "cgrates2", + "sqlDBName":"cgrates2", "sqlTableName":"cdrs", - "sqlUpdateIndexedFields": ["id", "cgrid"], }, - "filters":["*string:~*req.ReaderID:mysqlReaderID"], - "flags": ["*log"], - "fields": [ - {"tag": "SetupTime", "path": "*exp.setup_time", "type": "*constant", "value": "2018-11-27 14:21:26"}, - {"tag": "Account", "path": "*exp.account", "type": "*variable", "value": "~*req.ExtraInfo"}, - {"tag": "ID", "path": "*exp.id", "type": "*variable", "value": "~*req.Id"}, - {"tag": "CGRID", "path": "*exp.cgrid", "type": "*variable", "value": "~*req.CGRID"}, - ] - }] - }, - "ers": { - "enabled": true, - "sessions_conns":["*localhost"], - "ees_conns": ["*localhost"], - "readers": [{ - "id": "mysqlReaderID", - "type": "*sql", - "run_delay": "1m", - "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", - "opts": { - "sqlDBName":"cgrates2", - "sqlTableName":"cdrs", - }, - "start_delay": "500ms", // wait for db to be populated before starting reader - "tenant": "cgrates.org", - "filters": [ - "*gt:~*req.answer_time:-168h", // dont process cdrs with answer_time older than 7 days ago - "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", - "*string:~*vars.*readerID:mysqlReaderID", - "FLTR_VARS", // "*string:~*vars.*readerID:mysqlReaderID", - ], - "flags": ["*dryrun", "*export"], - "fields":[ - {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, - {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, - {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true}, - {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true}, - {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true}, - {"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true}, - {"tag": "ExtraInfo", "path": "*cgreq.ExtraInfo", "type": "*variable", "value": "~*req.extra_info", "mandatory": true}, - {"tag": "ID", "path": "*cgreq.Id", "type": "*variable", "value": "~*req.id", "mandatory": true}, - {"tag": "ReaderID", "path": "*cgreq.ReaderID", "type": "*variable", "value": "~*vars.*readerID", "mandatory": true}, - ], - }, - ], - }, - + "start_delay": "500ms", // wait for db to be populated before starting reader + "tenant": "cgrates.org", + "filters": [ + "*gt:~*req.answer_time:-168h", // dont process cdrs with answer_time older than 7 days ago + "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", + "*string:~*vars.*readerID:mysql", + "FLTR_VARS", // "*string:~*vars.*readerID:mysql", + ], + "ees_ids": ["SQLExporter"], + "flags": ["*dryrun", "*export"], + "fields":[ + {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true}, + {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true}, + {"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true}, + {"tag": "ExtraInfo", "path": "*cgreq.ExtraInfo", "type": "*variable", "value": "~*req.extra_info", "mandatory": true}, + {"tag": "ID", "path": "*cgreq.Id", "type": "*variable", "value": "~*req.id", "mandatory": true}, + ], + }, + ], +}, + } \ No newline at end of file diff --git a/ers/ers.go b/ers/ers.go index d4d218136..9851cb62a 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -376,6 +376,7 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, var reply map[string]map[string]any return erS.connMgr.Call(context.TODO(), erS.cfg.ERsCfg().EEsConns, utils.EeSv1ProcessEvent, &engine.CGREventWithEeIDs{ + EeIDs: rdrCfg.EEsIDs, CGREvent: cgrEv, }, &reply) } diff --git a/general_tests/ers_sql_filters_it_test.go b/general_tests/ers_sql_filters_it_test.go index 24fb47d9c..dd36ccbb2 100644 --- a/general_tests/ers_sql_filters_it_test.go +++ b/general_tests/ers_sql_filters_it_test.go @@ -1086,7 +1086,7 @@ func TestERSSQLFiltersUpdate(t *testing.T) { tpFiles := map[string]string{ utils.FiltersCsv: `#Tenant[0],ID[1],Type[2],Path[3],Values[4],ActivationInterval[5] cgrates.org,FLTR_SQL_RatingID,*eq,~*req.cost_details.Charges[0].RatingID,RatingID2, -cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysqlReaderID,`, +cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`, } buf := &bytes.Buffer{} @@ -1105,11 +1105,11 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysqlReaderID,`, records := 0 scanner := bufio.NewScanner(strings.NewReader(buf.String())) timeStartFormated := timeStart.Format("2006-01-02T15:04:05Z07:00") - expectedLog := fmt.Sprintf("\"Event\":{\"Account\":\"1001\",\"AnswerTime\":\"%s\",\"CGRID\":\"%s\",\"Category\":\"call\",\"CostDetails\":\"{\\\"CGRID\\\":\\\"test1\\\",\\\"RunID\\\":\\\"*default\\\",\\\"StartTime\\\":\\\"2017-01-09T16:18:21Z\\\",\\\"Usage\\\":180000000000,\\\"Cost\\\":2.3,\\\"Charges\\\":[{\\\"RatingID\\\":\\\"RatingID2\\\",\\\"Increments\\\":[{\\\"Usage\\\":120000000000,\\\"Cost\\\":2,\\\"AccountingID\\\":\\\"a012888\\\",\\\"CompressFactor\\\":1},{\\\"Usage\\\":1000000000,\\\"Cost\\\":0.005,\\\"AccountingID\\\":\\\"44d6c02\\\",\\\"CompressFactor\\\":60}],\\\"CompressFactor\\\":1}],\\\"AccountSummary\\\":{\\\"Tenant\\\":\\\"cgrates.org\\\",\\\"ID\\\":\\\"1001\\\",\\\"BalanceSummaries\\\":[{\\\"UUID\\\":\\\"uuid1\\\",\\\"ID\\\":\\\"\\\",\\\"Type\\\":\\\"*monetary\\\",\\\"Initial\\\":0,\\\"Value\\\":50,\\\"Disabled\\\":false}],\\\"AllowNegative\\\":false,\\\"Disabled\\\":false},\\\"Rating\\\":{\\\"c1a5ab9\\\":{\\\"ConnectFee\\\":0.1,\\\"RoundingMethod\\\":\\\"*up\\\",\\\"RoundingDecimals\\\":5,\\\"MaxCost\\\":0,\\\"MaxCostStrategy\\\":\\\"\\\",\\\"TimingID\\\":\\\"\\\",\\\"RatesID\\\":\\\"ec1a177\\\",\\\"RatingFiltersID\\\":\\\"43e77dc\\\"}},\\\"Accounting\\\":{\\\"44d6c02\\\":{\\\"AccountID\\\":\\\"cgrates.org:1001\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"},\\\"a012888\\\":{\\\"AccountID\\\":\\\"cgrates.org:1001\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"}},\\\"RatingFilters\\\":null,\\\"Rates\\\":{\\\"ec1a177\\\":[{\\\"GroupIntervalStart\\\":0,\\\"Value\\\":0.01,\\\"RateIncrement\\\":60000000000,\\\"RateUnit\\\":1000000000}]},\\\"Timings\\\":null}\",\"Destination\":\"1002\",\"ExtraInfo\":\"extraInfo\",\"Id\":\"2\",\"OriginID\":\"oid2\",\"ReaderID\":\"mysqlReaderID\",\"RequestType\":\"*rated\",\"SetupTime\":\"%s\",\"Subject\":\"1001\",\"Tenant\":\"cgrates.org\",\"ToR\":\"*voice\",\"Usage\":\"10000000000\"},\"APIOpts\":{}}>", timeStartFormated, cgrID, timeStartFormated) + expectedLog := fmt.Sprintf("\"Event\":{\"Account\":\"1001\",\"AnswerTime\":\"%s\",\"CGRID\":\"%s\",\"Category\":\"call\",\"CostDetails\":\"{\\\"CGRID\\\":\\\"test1\\\",\\\"RunID\\\":\\\"*default\\\",\\\"StartTime\\\":\\\"2017-01-09T16:18:21Z\\\",\\\"Usage\\\":180000000000,\\\"Cost\\\":2.3,\\\"Charges\\\":[{\\\"RatingID\\\":\\\"RatingID2\\\",\\\"Increments\\\":[{\\\"Usage\\\":120000000000,\\\"Cost\\\":2,\\\"AccountingID\\\":\\\"a012888\\\",\\\"CompressFactor\\\":1},{\\\"Usage\\\":1000000000,\\\"Cost\\\":0.005,\\\"AccountingID\\\":\\\"44d6c02\\\",\\\"CompressFactor\\\":60}],\\\"CompressFactor\\\":1}],\\\"AccountSummary\\\":{\\\"Tenant\\\":\\\"cgrates.org\\\",\\\"ID\\\":\\\"1001\\\",\\\"BalanceSummaries\\\":[{\\\"UUID\\\":\\\"uuid1\\\",\\\"ID\\\":\\\"\\\",\\\"Type\\\":\\\"*monetary\\\",\\\"Initial\\\":0,\\\"Value\\\":50,\\\"Disabled\\\":false}],\\\"AllowNegative\\\":false,\\\"Disabled\\\":false},\\\"Rating\\\":{\\\"c1a5ab9\\\":{\\\"ConnectFee\\\":0.1,\\\"RoundingMethod\\\":\\\"*up\\\",\\\"RoundingDecimals\\\":5,\\\"MaxCost\\\":0,\\\"MaxCostStrategy\\\":\\\"\\\",\\\"TimingID\\\":\\\"\\\",\\\"RatesID\\\":\\\"ec1a177\\\",\\\"RatingFiltersID\\\":\\\"43e77dc\\\"}},\\\"Accounting\\\":{\\\"44d6c02\\\":{\\\"AccountID\\\":\\\"cgrates.org:1001\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"},\\\"a012888\\\":{\\\"AccountID\\\":\\\"cgrates.org:1001\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"}},\\\"RatingFilters\\\":null,\\\"Rates\\\":{\\\"ec1a177\\\":[{\\\"GroupIntervalStart\\\":0,\\\"Value\\\":0.01,\\\"RateIncrement\\\":60000000000,\\\"RateUnit\\\":1000000000}]},\\\"Timings\\\":null}\",\"Destination\":\"1002\",\"ExtraInfo\":\"extraInfo\",\"Id\":\"2\",\"OriginID\":\"oid2\",\"RequestType\":\"*rated\",\"SetupTime\":\"%s\",\"Subject\":\"1001\",\"Tenant\":\"cgrates.org\",\"ToR\":\"*voice\",\"Usage\":\"10000000000\"},\"APIOpts\":{}}>", timeStartFormated, cgrID, timeStartFormated) var ersLogsCount int for scanner.Scan() { line := scanner.Text() - if !strings.Contains(line, " DRYRUN, reader: ") { + if !strings.Contains(line, " DRYRUN, reader: ") { continue } records++ diff --git a/utils/consts.go b/utils/consts.go index ad7cbd3d0..8b3f685e7 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2605,6 +2605,7 @@ const ( TenantCfg = "tenant" FlagsCfg = "flags" FieldsCfg = "fields" + EEsIDsCfg = "ees_ids" EEsSuccessIDsCfg = "ees_success_ids" EEsFailedIDsCfg = "ees_failed_ids" CacheDumpFieldsCfg = "cache_dump_fields"