From 041b14fa03da08e65303a67ace11a622765575cd Mon Sep 17 00:00:00 2001 From: arberkatellari Date: Fri, 15 Nov 2024 13:56:02 +0200 Subject: [PATCH] Improvements to ERS SQL filters --- config/config_defaults.go | 3 +- config/erscfg.go | 22 +- config/erscfg_test.go | 8 +- config/libconfig_json.go | 85 ++-- ers/sql.go | 124 ++--- general_tests/ers_sql_filters_it_test.go | 574 ++++++++++++++++++++++- services/datadb.go | 2 +- services/ers_it_test.go | 4 +- utils/consts.go | 5 +- 9 files changed, 704 insertions(+), 123 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index 40bfa2bd3..c937231b6 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -399,7 +399,7 @@ const CGRATES_CFG_JSON = ` { "id": "*default", // identifier of the EventReader profile "type": "*none", // reader type <*file_csv> - "run_delay": "0", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together + "run_delay": "0", // sleep interval between consecutive runs; "-1" to use automation via inotify; "0" to disable running all together; <""|$dur> "start_delay": "0", // time to wait before an reader starts to run "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited "source_path": "/var/spool/cgrates/ers/in", // read data from this path @@ -450,6 +450,7 @@ const CGRATES_CFG_JSON = ` // SQL // "sqlDBName": "cgrates", // the name of the database from were the events are read // "sqlTableName": "cdrs", // the name of the table from were the events are read + // "sqlDeleteIndexedFields": [], // list of fields to DELETE from the table // "pgSSLMode": "disable", // the ssl mode for postgres db // SQS and S3 diff --git a/config/erscfg.go b/config/erscfg.go index e641ddd44..b02fa3bea 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -256,9 +256,10 @@ func (kafkaROpts *KafkaROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err } type SQLROpts struct { - DBName *string - TableName *string - PgSSLMode *string + DBName *string + TableName *string + DeleteIndexedFields *[]string + PgSSLMode *string } func (sqlOpts *SQLROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) { @@ -268,6 +269,11 @@ func (sqlOpts *SQLROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error if jsnCfg.SQLTableName != nil { sqlOpts.TableName = jsnCfg.SQLTableName } + if jsnCfg.SQLDeleteIndexedFields != nil { + dif := make([]string, len(*jsnCfg.SQLDeleteIndexedFields)) + copy(dif, *jsnCfg.SQLDeleteIndexedFields) + sqlOpts.DeleteIndexedFields = &dif + } if jsnCfg.PgSSLMode != nil { sqlOpts.PgSSLMode = jsnCfg.PgSSLMode } @@ -660,6 +666,11 @@ func (sqlOpts *SQLROpts) Clone() *SQLROpts { cln.TableName = new(string) *cln.TableName = *sqlOpts.TableName } + if sqlOpts.DeleteIndexedFields != nil { + idx := make([]string, len(*sqlOpts.DeleteIndexedFields)) + copy(idx, *sqlOpts.DeleteIndexedFields) + cln.DeleteIndexedFields = &idx + } if sqlOpts.PgSSLMode != nil { cln.PgSSLMode = new(string) *cln.PgSSLMode = *sqlOpts.PgSSLMode @@ -911,6 +922,11 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string if sqlOpts.TableName != nil { opts[utils.SQLTableNameOpt] = *sqlOpts.TableName } + if sqlOpts.DeleteIndexedFields != nil { + deleteIndexedFields := make([]string, len(*sqlOpts.DeleteIndexedFields)) + copy(deleteIndexedFields, *sqlOpts.DeleteIndexedFields) + opts[utils.SQLDeleteIndexedFieldsOpt] = deleteIndexedFields + } if sqlOpts.PgSSLMode != nil { opts[utils.PgSSLModeCfg] = *sqlOpts.PgSSLMode } diff --git a/config/erscfg_test.go b/config/erscfg_test.go index dac7afa0d..a2c60621a 100644 --- a/config/erscfg_test.go +++ b/config/erscfg_test.go @@ -1427,6 +1427,7 @@ func TestEventReaderOptsCfg(t *testing.T) { KafkaTopic: utils.StringPointer("kafka"), KafkaMaxWait: utils.StringPointer("1m"), SQLDBName: utils.StringPointer("dbname"), + SQLDeleteIndexedFields: utils.SliceStringPointer([]string{"id"}), SQLTableName: utils.StringPointer("tablename"), PgSSLMode: utils.StringPointer("sslmode"), AWSRegion: utils.StringPointer("eu"), @@ -1506,9 +1507,10 @@ func TestEventReaderCfgClone(t *testing.T) { RoutingKey: utils.StringPointer("key1"), }, SQL: &SQLROpts{ - DBName: utils.StringPointer("dbname"), - TableName: utils.StringPointer("tablename"), - PgSSLMode: utils.StringPointer("sslmode"), + DBName: utils.StringPointer("dbname"), + TableName: utils.StringPointer("tablename"), + DeleteIndexedFields: utils.SliceStringPointer([]string{"id"}), + PgSSLMode: utils.StringPointer("sslmode"), }, AWS: &AWSROpts{ diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 110b7542c..8cfa554cf 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -224,48 +224,49 @@ type ERsJsonCfg struct { } type EventReaderOptsJson struct { - PartialPath *string `json:"partialPath"` - PartialCacheAction *string `json:"partialCacheAction"` - PartialOrderField *string `json:"partialOrderField"` - PartialCSVFieldSeparator *string `json:"partialcsvFieldSeparator"` - CSVRowLength *int `json:"csvRowLength"` - CSVFieldSeparator *string `json:"csvFieldSeparator"` - CSVHeaderDefineChar *string `json:"csvHeaderDefineChar"` - CSVLazyQuotes *bool `json:"csvLazyQuotes"` - XMLRootPath *string `json:"xmlRootPath"` - AMQPQueueID *string `json:"amqpQueueID"` - AMQPUsername *string `json:"amqpUsername"` - AMQPPassword *string `json:"amqpPassword"` - AMQPConsumerTag *string `json:"amqpConsumerTag"` - AMQPExchange *string `json:"amqpExchange"` - AMQPExchangeType *string `json:"amqpExchangeType"` - AMQPRoutingKey *string `json:"amqpRoutingKey"` - KafkaTopic *string `json:"kafkaTopic"` - KafkaGroupID *string `json:"kafkaGroupID"` - KafkaMaxWait *string `json:"kafkaMaxWait"` - KafkaTLS *bool `json:"kafkaTLS"` - KafkaCAPath *string `json:"kafkaCAPath"` - KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"` - SQLDBName *string `json:"sqlDBName"` - SQLTableName *string `json:"sqlTableName"` - PgSSLMode *string `json:"pgSSLMode"` - AWSRegion *string `json:"awsRegion"` - AWSKey *string `json:"awsKey"` - AWSSecret *string `json:"awsSecret"` - AWSToken *string `json:"awsToken"` - SQSQueueID *string `json:"sqsQueueID"` - S3BucketID *string `json:"s3BucketID"` - NATSJetStream *bool `json:"natsJetStream"` - NATSConsumerName *string `json:"natsConsumerName"` - NATSStreamName *string `json:"natsStreamName"` - NATSSubject *string `json:"natsSubject"` - NATSQueueID *string `json:"natsQueueID"` - NATSJWTFile *string `json:"natsJWTFile"` - NATSSeedFile *string `json:"natsSeedFile"` - NATSCertificateAuthority *string `json:"natsCertificateAuthority"` - NATSClientCertificate *string `json:"natsClientCertificate"` - NATSClientKey *string `json:"natsClientKey"` - NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"` + PartialPath *string `json:"partialPath"` + PartialCacheAction *string `json:"partialCacheAction"` + PartialOrderField *string `json:"partialOrderField"` + PartialCSVFieldSeparator *string `json:"partialcsvFieldSeparator"` + CSVRowLength *int `json:"csvRowLength"` + CSVFieldSeparator *string `json:"csvFieldSeparator"` + CSVHeaderDefineChar *string `json:"csvHeaderDefineChar"` + CSVLazyQuotes *bool `json:"csvLazyQuotes"` + XMLRootPath *string `json:"xmlRootPath"` + AMQPQueueID *string `json:"amqpQueueID"` + AMQPUsername *string `json:"amqpUsername"` + AMQPPassword *string `json:"amqpPassword"` + AMQPConsumerTag *string `json:"amqpConsumerTag"` + AMQPExchange *string `json:"amqpExchange"` + AMQPExchangeType *string `json:"amqpExchangeType"` + AMQPRoutingKey *string `json:"amqpRoutingKey"` + KafkaTopic *string `json:"kafkaTopic"` + KafkaGroupID *string `json:"kafkaGroupID"` + KafkaMaxWait *string `json:"kafkaMaxWait"` + KafkaTLS *bool `json:"kafkaTLS"` + KafkaCAPath *string `json:"kafkaCAPath"` + KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"` + SQLDBName *string `json:"sqlDBName"` + SQLTableName *string `json:"sqlTableName"` + SQLDeleteIndexedFields *[]string `json:"sqlDeleteIndexedFields"` + PgSSLMode *string `json:"pgSSLMode"` + AWSRegion *string `json:"awsRegion"` + AWSKey *string `json:"awsKey"` + AWSSecret *string `json:"awsSecret"` + AWSToken *string `json:"awsToken"` + SQSQueueID *string `json:"sqsQueueID"` + S3BucketID *string `json:"s3BucketID"` + NATSJetStream *bool `json:"natsJetStream"` + NATSConsumerName *string `json:"natsConsumerName"` + NATSStreamName *string `json:"natsStreamName"` + NATSSubject *string `json:"natsSubject"` + NATSQueueID *string `json:"natsQueueID"` + NATSJWTFile *string `json:"natsJWTFile"` + NATSSeedFile *string `json:"natsSeedFile"` + NATSCertificateAuthority *string `json:"natsCertificateAuthority"` + NATSClientCertificate *string `json:"natsClientCertificate"` + NATSClientKey *string `json:"natsClientKey"` + NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"` } // EventReaderSJsonCfg is the configuration of a single EventReader diff --git a/ers/sql.go b/ers/sql.go index 31f782956..f923e8075 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -74,9 +74,11 @@ type SQLEventReader struct { cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - connString string - connType string - tableName string + connString string + connType string + tableName string + dbFilters []string + lazyFilters []string rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to @@ -130,31 +132,31 @@ func valueQry(ruleType, elem, field string, values []string, not bool) (conditio case utils.MetaExists, utils.MetaNotExists: if not { if elem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf(" %s IS NOT NULL", field)) + conditions = append(conditions, fmt.Sprintf("%s IS NOT NULL", field)) return } - conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') IS NOT NULL", elem, field)) + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NOT NULL", elem, field)) return } if elem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf(" %s IS NULL", field)) + conditions = append(conditions, fmt.Sprintf("%s IS NULL", field)) return } - conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') IS NULL", elem, field)) + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NULL", elem, field)) case utils.MetaEmpty, utils.MetaNotEmpty: if not { if elem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf(" %s != ''", field)) + conditions = append(conditions, fmt.Sprintf("%s != ''", field)) return } - conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') != ''", elem, field)) + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') != ''", elem, field)) return } if elem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf(" %s == ''", field)) + conditions = append(conditions, fmt.Sprintf("%s == ''", field)) return } - conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') == ''", elem, field)) + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') == ''", elem, field)) } return } @@ -171,85 +173,85 @@ func valueQry(ruleType, elem, field string, values []string, not bool) (conditio case utils.MetaString, utils.MetaNotString, utils.MetaEqual, utils.MetaNotEqual: if not { if elem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf(" %s != '%s'", field, value)) + conditions = append(conditions, fmt.Sprintf("%s != '%s'", field, value)) continue } - conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') != '%s'", + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') != '%s'", elem, field, value)) continue } if elem == utils.EmptyString { - singleCond = fmt.Sprintf(" %s = '%s'", field, value) + singleCond = fmt.Sprintf("%s = '%s'", field, value) } else { - singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') = '%s'", elem, field, value) + singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') = '%s'", elem, field, value) } case utils.MetaLessThan, utils.MetaLessOrEqual, utils.MetaGreaterThan, utils.MetaGreaterOrEqual: if ruleType == utils.MetaGreaterOrEqual { if elem == utils.EmptyString { - singleCond = fmt.Sprintf(" %s >= %s", field, value) + singleCond = fmt.Sprintf("%s >= %s", field, value) } else { - singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') >= %s", elem, field, value) + singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') >= %s", elem, field, value) } } else if ruleType == utils.MetaGreaterThan { if elem == utils.EmptyString { - singleCond = fmt.Sprintf(" %s > %s", field, value) + singleCond = fmt.Sprintf("%s > %s", field, value) } else { - singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') > %s", elem, field, value) + singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') > %s", elem, field, value) } } else if ruleType == utils.MetaLessOrEqual { if elem == utils.EmptyString { - singleCond = fmt.Sprintf(" %s <= %s", field, value) + singleCond = fmt.Sprintf("%s <= %s", field, value) } else { - singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') <= %s", elem, field, value) + singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') <= %s", elem, field, value) } } else if ruleType == utils.MetaLessThan { if elem == utils.EmptyString { - singleCond = fmt.Sprintf(" %s < %s", field, value) + singleCond = fmt.Sprintf("%s < %s", field, value) } else { - singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') < %s", elem, field, value) + singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') < %s", elem, field, value) } } case utils.MetaPrefix, utils.MetaNotPrefix: if not { if elem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf(" %s NOT LIKE '%s%%'", field, value)) + conditions = append(conditions, fmt.Sprintf("%s NOT LIKE '%s%%'", field, value)) continue } - conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') NOT LIKE '%s%%'", elem, field, value)) + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') NOT LIKE '%s%%'", elem, field, value)) continue } if elem == utils.EmptyString { - singleCond = fmt.Sprintf(" %s LIKE '%s%%'", field, value) + singleCond = fmt.Sprintf("%s LIKE '%s%%'", field, value) } else { - singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') LIKE '%s%%'", elem, field, value) + singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') LIKE '%s%%'", elem, field, value) } case utils.MetaSuffix, utils.MetaNotSuffix: if not { if elem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf(" %s NOT LIKE '%%%s'", field, value)) + conditions = append(conditions, fmt.Sprintf("%s NOT LIKE '%%%s'", field, value)) continue } - conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') NOT LIKE '%%%s'", elem, field, value)) + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') NOT LIKE '%%%s'", elem, field, value)) continue } if elem == utils.EmptyString { - singleCond = fmt.Sprintf(" %s LIKE '%%%s'", field, value) + singleCond = fmt.Sprintf("%s LIKE '%%%s'", field, value) } else { - singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') LIKE '%%%s'", elem, field, value) + singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') LIKE '%%%s'", elem, field, value) } case utils.MetaRegex, utils.MetaNotRegex: if not { if elem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf(" %s NOT REGEXP '%s'", field, value)) + conditions = append(conditions, fmt.Sprintf("%s NOT REGEXP '%s'", field, value)) continue } - conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') NOT REGEXP '%s'", elem, field, value)) + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') NOT REGEXP '%s'", elem, field, value)) continue } if elem == utils.EmptyString { - singleCond = fmt.Sprintf(" %s REGEXP '%s'", field, value) + singleCond = fmt.Sprintf("%s REGEXP '%s'", field, value) } else { - singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') REGEXP '%s'", elem, field, value) + singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') REGEXP '%s'", elem, field, value) } } conditions = append(conditions, singleCond) @@ -271,43 +273,39 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { } tm := time.NewTimer(0) var filters []*engine.Filter - var whereQueries []string - var renewedFltrs []string for _, fltr := range rdr.Config().Filters { - if result, err := rdr.dm.GetFilter(config.CgrConfig().GeneralCfg().DefaultTenant, fltr, true, false, utils.NonTransactional); err != nil { + if resultFltr, err := rdr.dm.GetFilter(config.CgrConfig().GeneralCfg().DefaultTenant, fltr, true, false, utils.NonTransactional); err != nil { rdr.rdrErr <- err return } else { - filters = append(filters, result) - if !strings.Contains(fltr, utils.DynamicDataPrefix+utils.MetaReq) { - renewedFltrs = append(renewedFltrs, fltr) - } + filters = append(filters, resultFltr) } } - rdr.Config().Filters = renewedFltrs // remove filters containing *req for _, filter := range filters { + var addFltrCount int for _, rule := range filter.Rules { var elem, field string switch { case strings.HasPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep): field = strings.TrimPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep) - parts := strings.SplitN(field, ".", 2) - if len(parts) == 2 { // Split in 2 pieces if it contains any more dots in the field - // First part (before the first dot) - elem = parts[0] - // Second part (everything after the first dot) - field = parts[1] + if before, after, hasSep := strings.Cut(field, utils.NestingSep); hasSep { + elem = before + field = after } default: + addFltrCount++ + if addFltrCount == 1 { + rdr.lazyFilters = append(rdr.lazyFilters, filter.ID) + } continue } conditions := valueQry(rule.Type, elem, field, rule.Values, strings.HasPrefix(rule.Type, utils.MetaNot)) - whereQueries = append(whereQueries, strings.Join(conditions, " OR ")) + rdr.dbFilters = append(rdr.dbFilters, strings.Join(conditions, " OR ")) } } for { tx := db.Table(rdr.tableName).Select(utils.Meta) - for _, whereQ := range whereQueries { + for _, whereQ := range rdr.dbFilters { tx = tx.Where(whereQ) } rows, err := tx.Rows() @@ -344,13 +342,14 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { for i := range columns { columnPointers[i] = &columns[i] } + // copy row values to their respective column if err = rows.Scan(columnPointers...); err != nil { rdr.rdrErr <- err rows.Close() return } msg := make(map[string]any) - fltr := make(map[string]string) + fltr := make(map[string]any) for i, colName := range colNames { msg[colName] = columns[i] if colName != createdAt && colName != updatedAt && colName != deletedAt { // ignore the sql colums for filter only @@ -359,18 +358,33 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { if tm.IsZero() { continue } + fltr[colName] = columns[i] + continue case *time.Time: if tm == nil || tm.IsZero() { continue } + fltr[colName] = columns[i] + continue case nil: continue } fltr[colName] = utils.IfaceAsString(columns[i]) } } - if rdr.Config().ProcessedPath == utils.MetaDelete { - if err = db.Table(rdr.tableName).Delete(nil, fltr).Error; err != nil { // to ensure we don't read it again + if rdr.Config().Opts.SQL.DeleteIndexedFields != nil { + tmpFltrMap := make(map[string]any) + for _, fieldName := range *rdr.Config().Opts.SQL.DeleteIndexedFields { + if _, has := fltr[fieldName]; has { + tmpFltrMap[fieldName] = fltr[fieldName] + } + } + if len(tmpFltrMap) != 0 { + fltr = tmpFltrMap + } + } + if rdr.Config().ProcessedPath == utils.MetaDelete || rdr.Config().Opts.SQL.DeleteIndexedFields != nil { + if err = tx.Delete(nil, fltr).Error; err != nil { // to ensure we don't read it again utils.Logger.Warning( fmt.Sprintf("<%s> deleting message %s error: %s", utils.ERs, utils.ToJSON(msg), err.Error())) @@ -415,7 +429,7 @@ func (rdr *SQLEventReader) processMessage(msg map[string]any) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTimezone), rdr.fltrS, nil) // create an AgentRequest var pass bool - if pass, err = rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, + if pass, err = rdr.fltrS.Pass(agReq.Tenant, rdr.lazyFilters, agReq); err != nil || !pass { return } diff --git a/general_tests/ers_sql_filters_it_test.go b/general_tests/ers_sql_filters_it_test.go index d99d906af..d7edd79e7 100644 --- a/general_tests/ers_sql_filters_it_test.go +++ b/general_tests/ers_sql_filters_it_test.go @@ -38,6 +38,7 @@ import ( var ( db *gorm.DB dbConnString = "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'" + timeStart = time.Now() cdr1 = &engine.CDR{ // sample with values not realisticy calculated CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderID: 123, @@ -131,13 +132,12 @@ var ( }, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, } - timeStart = time.Now() - cgrID = utils.Sha1("dsafdsaf", timeStart.String()) - cdr2 = &engine.CDR{ // sample with values not realisticy calculated + cgrID = utils.Sha1("oid2", timeStart.String()) + cdr2 = &engine.CDR{ // sample with values not realisticy calculated CGRID: cgrID, OrderID: 123, ToR: utils.MetaVoice, - OriginID: "dsafdsaf", + OriginID: "oid2", OriginHost: "192.168.1.1", Source: "test", RequestType: utils.MetaRated, @@ -224,7 +224,32 @@ var ( }, }, }, - ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, + ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, + Cost: 1.01, + } + cdr3 = &engine.CDR{ // sample with values not realisticy calculated + CGRID: utils.Sha1("oid3", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), + OrderID: 123, + ToR: utils.MetaVoice, + OriginID: "oid3", + OriginHost: "192.168.1.1", + Source: "test", + RequestType: utils.MetaRated, + Tenant: "cgrates.org", + Category: "call", + Account: "1001", + Subject: "1001", + Destination: "1002", + SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + RunID: utils.MetaDefault, + Usage: 10 * time.Second, + ExtraInfo: "extraInfo", + Partial: false, + PreRated: true, + CostSource: "cost source", + ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, + Cost: 1.01, } ) @@ -232,7 +257,7 @@ func TestERSSQLFilters(t *testing.T) { var dbcfg engine.DBCfg switch *utils.DBType { case utils.MetaInternal: - dbcfg = engine.InternalDBCfg + t.SkipNow() case utils.MetaMySQL: case utils.MetaMongo: dbcfg = engine.MongoDBCfg @@ -305,8 +330,10 @@ func TestERSSQLFilters(t *testing.T) { tx = tx.Table(utils.CDRsTBL) cdrSql := cdr1.AsCDRsql() cdrSql2 := cdr2.AsCDRsql() + cdrsql3 := cdr3.AsCDRsql() cdrSql.CreatedAt = time.Now() cdrSql2.CreatedAt = time.Now() + cdrsql3.CreatedAt = time.Now() saved := tx.Save(cdrSql) if saved.Error != nil { tx.Rollback() @@ -317,12 +344,17 @@ func TestERSSQLFilters(t *testing.T) { tx.Rollback() t.Fatal(err) } + saved = tx.Save(cdrsql3) + if saved.Error != nil { + tx.Rollback() + t.Fatal(err) + } tx.Commit() time.Sleep(10 * time.Millisecond) var result int64 db.Table(utils.CDRsTBL).Count(&result) - if result != 2 { - t.Error("Expected table to have only one result ", result) + if result != 3 { + t.Error("Expected table to have 3 results but got ", result) } }) defer t.Run("StopSQL", func(t *testing.T) { @@ -348,11 +380,12 @@ func TestERSSQLFilters(t *testing.T) { "apiers": { "enabled": true }, - + "filters": { + "apiers_conns": ["*localhost"] + }, "ers": { "enabled": true, "sessions_conns":["*localhost"], - "apiers_conns": ["*localhost"], "readers": [ { "id": "mysql", @@ -361,12 +394,16 @@ func TestERSSQLFilters(t *testing.T) { "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", "opts": { "sqlDBName":"cgrates2", + "sqlDeleteIndexedFields": ["id"], }, + "start_delay": "500ms", // wait for db to be populated before starting reader "processed_path": "", "tenant": "cgrates.org", "filters": [ "*gt:~*req.answer_time:NOW() - INTERVAL 7 DAY", // dont process cdrs with answer_time older than 7 days ago (continue if answer_time > now-7days) - "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", + "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", + "*string:~*vars.*readerID:mysql", + "FLTR_VARS", // "*string:~*vars.*readerID:mysql", ], "flags": ["*dryrun"], "fields":[ @@ -390,20 +427,29 @@ func TestERSSQLFilters(t *testing.T) { }` + tpFiles := map[string]string{ + utils.FiltersCsv: `#Tenant[0],ID[1],Type[2],Path[3],Values[4],ActivationInterval[5] +cgrates.org,FLTR_SQL_RatingID,*eq,~*req.cost_details.Charges[0].RatingID,RatingID2, +cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`, + } + buf := &bytes.Buffer{} ng := engine.TestEngine{ ConfigJSON: content, DBCfg: dbcfg, + TpFiles: tpFiles, LogBuffer: buf, } ng.Run(t) + time.Sleep(1 * time.Second) t.Run("VerifyProcessedFieldsFromLogs", func(t *testing.T) { time.Sleep(100 * time.Millisecond) // give enough time to process from sql table records := 0 scanner := bufio.NewScanner(strings.NewReader(buf.String())) - timeStartFormated := timeStart.Format("2006-01-02T15:04:05-07:00") - expectedLog := fmt.Sprintf("\"Event\":{\"Account\":\"1001\",\"AnswerTime\":\"%s\",\"CGRID\":\"%s\",\"Category\":\"call\",\"CostDetails\":\"{\\\"CGRID\\\":\\\"test1\\\",\\\"RunID\\\":\\\"*default\\\",\\\"StartTime\\\":\\\"2017-01-09T16:18:21Z\\\",\\\"Usage\\\":180000000000,\\\"Cost\\\":2.3,\\\"Charges\\\":[{\\\"RatingID\\\":\\\"RatingID2\\\",\\\"Increments\\\":[{\\\"Usage\\\":120000000000,\\\"Cost\\\":2,\\\"AccountingID\\\":\\\"a012888\\\",\\\"CompressFactor\\\":1},{\\\"Usage\\\":1000000000,\\\"Cost\\\":0.005,\\\"AccountingID\\\":\\\"44d6c02\\\",\\\"CompressFactor\\\":60}],\\\"CompressFactor\\\":1}],\\\"AccountSummary\\\":{\\\"Tenant\\\":\\\"cgrates.org\\\",\\\"ID\\\":\\\"testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceSummaries\\\":[{\\\"UUID\\\":\\\"uuid1\\\",\\\"ID\\\":\\\"\\\",\\\"Type\\\":\\\"*monetary\\\",\\\"Initial\\\":0,\\\"Value\\\":50,\\\"Disabled\\\":false}],\\\"AllowNegative\\\":false,\\\"Disabled\\\":false},\\\"Rating\\\":{\\\"c1a5ab9\\\":{\\\"ConnectFee\\\":0.1,\\\"RoundingMethod\\\":\\\"*up\\\",\\\"RoundingDecimals\\\":5,\\\"MaxCost\\\":0,\\\"MaxCostStrategy\\\":\\\"\\\",\\\"TimingID\\\":\\\"\\\",\\\"RatesID\\\":\\\"ec1a177\\\",\\\"RatingFiltersID\\\":\\\"43e77dc\\\"}},\\\"Accounting\\\":{\\\"44d6c02\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"},\\\"a012888\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"}},\\\"RatingFilters\\\":null,\\\"Rates\\\":{\\\"ec1a177\\\":[{\\\"GroupIntervalStart\\\":0,\\\"Value\\\":0.01,\\\"RateIncrement\\\":60000000000,\\\"RateUnit\\\":1000000000}]},\\\"Timings\\\":null}\",\"Destination\":\"1002\",\"OriginID\":\"dsafdsaf\",\"RequestType\":\"*rated\",\"SetupTime\":\"%s\",\"Subject\":\"1001\",\"Tenant\":\"cgrates.org\",\"ToR\":\"*voice\",\"Usage\":\"10000000000\"},\"APIOpts\":{}}>", timeStartFormated, cgrID, timeStartFormated) + timeStartFormated := timeStart.Format("2006-01-02T15:04:05Z07:00") + expectedLog := fmt.Sprintf("\"Event\":{\"Account\":\"1001\",\"AnswerTime\":\"%s\",\"CGRID\":\"%s\",\"Category\":\"call\",\"CostDetails\":\"{\\\"CGRID\\\":\\\"test1\\\",\\\"RunID\\\":\\\"*default\\\",\\\"StartTime\\\":\\\"2017-01-09T16:18:21Z\\\",\\\"Usage\\\":180000000000,\\\"Cost\\\":2.3,\\\"Charges\\\":[{\\\"RatingID\\\":\\\"RatingID2\\\",\\\"Increments\\\":[{\\\"Usage\\\":120000000000,\\\"Cost\\\":2,\\\"AccountingID\\\":\\\"a012888\\\",\\\"CompressFactor\\\":1},{\\\"Usage\\\":1000000000,\\\"Cost\\\":0.005,\\\"AccountingID\\\":\\\"44d6c02\\\",\\\"CompressFactor\\\":60}],\\\"CompressFactor\\\":1}],\\\"AccountSummary\\\":{\\\"Tenant\\\":\\\"cgrates.org\\\",\\\"ID\\\":\\\"testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceSummaries\\\":[{\\\"UUID\\\":\\\"uuid1\\\",\\\"ID\\\":\\\"\\\",\\\"Type\\\":\\\"*monetary\\\",\\\"Initial\\\":0,\\\"Value\\\":50,\\\"Disabled\\\":false}],\\\"AllowNegative\\\":false,\\\"Disabled\\\":false},\\\"Rating\\\":{\\\"c1a5ab9\\\":{\\\"ConnectFee\\\":0.1,\\\"RoundingMethod\\\":\\\"*up\\\",\\\"RoundingDecimals\\\":5,\\\"MaxCost\\\":0,\\\"MaxCostStrategy\\\":\\\"\\\",\\\"TimingID\\\":\\\"\\\",\\\"RatesID\\\":\\\"ec1a177\\\",\\\"RatingFiltersID\\\":\\\"43e77dc\\\"}},\\\"Accounting\\\":{\\\"44d6c02\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"},\\\"a012888\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"}},\\\"RatingFilters\\\":null,\\\"Rates\\\":{\\\"ec1a177\\\":[{\\\"GroupIntervalStart\\\":0,\\\"Value\\\":0.01,\\\"RateIncrement\\\":60000000000,\\\"RateUnit\\\":1000000000}]},\\\"Timings\\\":null}\",\"Destination\":\"1002\",\"OriginID\":\"oid2\",\"RequestType\":\"*rated\",\"SetupTime\":\"%s\",\"Subject\":\"1001\",\"Tenant\":\"cgrates.org\",\"ToR\":\"*voice\",\"Usage\":\"10000000000\"},\"APIOpts\":{}}>", timeStartFormated, cgrID, timeStartFormated) + var ersLogsCount int for scanner.Scan() { line := scanner.Text() if !strings.Contains(line, " DRYRUN, reader: ") { @@ -413,6 +459,9 @@ func TestERSSQLFilters(t *testing.T) { if !strings.Contains(line, expectedLog) { t.Errorf("expected \n<%s>, \nreceived\n<%s>", expectedLog, line) } + if strings.Contains(line, "[INFO] DRYRUN") { + ersLogsCount++ + } } if err := scanner.Err(); err != nil { t.Errorf("error reading input: %v", err) @@ -420,5 +469,504 @@ func TestERSSQLFilters(t *testing.T) { if records != 1 { t.Errorf("expected ERs to process 1 records, but it processed %d records", records) } + if ersLogsCount != 1 { + t.Error("Expected only 1 ERS Dryrun log, received: ", ersLogsCount) + } + }) + + t.Run("VerifyRowsNotDeleted", func(t *testing.T) { + var result int64 + db.Table(utils.CDRsTBL).Count(&result) + if result != 2 { + t.Fatal("Expected 2 rows in table ", result) + } + var rslt []map[string]interface{} + if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil { + t.Fatalf("failed to query table: %v", err) + } + + // Print the entire table as a string + for _, row := range rslt { + for col, value := range row { + if strings.Contains(fmt.Sprintln(value), "RatingID2") { + t.Fatalf("Expected CDR with RatingID: \"RatingID2\" to be deleted. Received column <%s>, value <%s>", col, value) + } + } + } + }) +} + +func TestERSSQLFiltersWithoutDelete(t *testing.T) { + var dbcfg engine.DBCfg + switch *utils.DBType { + case utils.MetaInternal: + t.SkipNow() + case utils.MetaMySQL: + case utils.MetaMongo: + dbcfg = engine.MongoDBCfg + case utils.MetaPostgres: + dbcfg = engine.PostgresDBCfg + default: + t.Fatal("unsupported dbtype value") + } + + t.Run("InitSQLDB", func(t *testing.T) { + var err error + var db2 *gorm.DB + if db2, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates")), + &gorm.Config{ + AllowGlobalUpdate: true, + Logger: logger.Default.LogMode(logger.Silent), + }); err != nil { + t.Fatal(err) + } + + if err = db2.Exec(`CREATE DATABASE IF NOT EXISTS cgrates2;`).Error; err != nil { + t.Fatal(err) + } + }) + + type testModelSql struct { + ID int64 + Cgrid string + RunID string + OriginHost string + Source string + OriginID string + ToR string + RequestType string + Tenant string + Category string + Account string + Subject string + Destination string + SetupTime time.Time + AnswerTime time.Time + Usage int64 + ExtraFields string + CostSource string + Cost float64 + CostDetails string + ExtraInfo string + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt *time.Time + } + t.Run("PutCDRsInDataBase", func(t *testing.T) { + var err error + if db, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates2")), + &gorm.Config{ + AllowGlobalUpdate: true, + Logger: logger.Default.LogMode(logger.Silent), + }); err != nil { + t.Fatal(err) + } + tx := db.Begin() + if !tx.Migrator().HasTable("cdrs") { + if err = tx.Migrator().CreateTable(new(engine.CDRsql)); err != nil { + tx.Rollback() + t.Fatal(err) + } + } + tx.Commit() + tx = db.Begin() + tx = tx.Table(utils.CDRsTBL) + cdrSql := cdr1.AsCDRsql() + cdrSql2 := cdr2.AsCDRsql() + cdrsql3 := cdr3.AsCDRsql() + cdrSql.CreatedAt = time.Now() + cdrSql2.CreatedAt = time.Now() + cdrsql3.CreatedAt = time.Now() + saved := tx.Save(cdrSql) + if saved.Error != nil { + tx.Rollback() + t.Fatal(err) + } + saved = tx.Save(cdrSql2) + if saved.Error != nil { + tx.Rollback() + t.Fatal(err) + } + saved = tx.Save(cdrsql3) + if saved.Error != nil { + tx.Rollback() + t.Fatal(err) + } + tx.Commit() + time.Sleep(10 * time.Millisecond) + var result int64 + db.Table(utils.CDRsTBL).Count(&result) + if result != 3 { + t.Error("Expected table to have 3 results but got ", result) + } + }) + defer t.Run("StopSQL", func(t *testing.T) { + if err := db.Migrator().DropTable("cdrs"); err != nil { + t.Fatal(err) + } + if err := db.Exec(`DROP DATABASE cgrates2;`).Error; err != nil { + t.Fatal(err) + } + if db2, err := db.DB(); err != nil { + t.Fatal(err) + } else if err = db2.Close(); err != nil { + t.Fatal(err) + } + }) + + content := `{ + + "general": { + "log_level": 7 + }, + + "apiers": { + "enabled": true + }, + "filters": { + "apiers_conns": ["*localhost"] + }, + "ers": { + "enabled": true, + "sessions_conns":["*localhost"], + "readers": [ + { + "id": "mysql", + "type": "*sql", + "run_delay": "1m", + "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", + "opts": { + "sqlDBName":"cgrates2", + }, + "start_delay": "500ms", // wait for db to be populated before starting reader + "processed_path": "", + "tenant": "cgrates.org", + "filters": [ + "*gt:~*req.answer_time:NOW() - INTERVAL 7 DAY", // dont process cdrs with answer_time older than 7 days ago (continue if answer_time > now-7days) + "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", + "*string:~*vars.*readerID:mysql", + "FLTR_VARS", // "*string:~*vars.*readerID:mysql", + ], + "flags": ["*dryrun"], + "fields":[ + {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true}, + {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true}, + {"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true}, + ], + }, + ], + }, + + }` + + tpFiles := map[string]string{ + utils.FiltersCsv: `#Tenant[0],ID[1],Type[2],Path[3],Values[4],ActivationInterval[5] +cgrates.org,FLTR_SQL_RatingID,*eq,~*req.cost_details.Charges[0].RatingID,RatingID2, +cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`, + } + + buf := &bytes.Buffer{} + ng := engine.TestEngine{ + ConfigJSON: content, + DBCfg: dbcfg, + TpFiles: tpFiles, + LogBuffer: buf, + } + ng.Run(t) + time.Sleep(1 * time.Second) + + t.Run("VerifyProcessedFieldsFromLogs", func(t *testing.T) { + time.Sleep(100 * time.Millisecond) // give enough time to process from sql table + records := 0 + scanner := bufio.NewScanner(strings.NewReader(buf.String())) + timeStartFormated := timeStart.Format("2006-01-02T15:04:05Z07:00") + expectedLog := fmt.Sprintf("\"Event\":{\"Account\":\"1001\",\"AnswerTime\":\"%s\",\"CGRID\":\"%s\",\"Category\":\"call\",\"CostDetails\":\"{\\\"CGRID\\\":\\\"test1\\\",\\\"RunID\\\":\\\"*default\\\",\\\"StartTime\\\":\\\"2017-01-09T16:18:21Z\\\",\\\"Usage\\\":180000000000,\\\"Cost\\\":2.3,\\\"Charges\\\":[{\\\"RatingID\\\":\\\"RatingID2\\\",\\\"Increments\\\":[{\\\"Usage\\\":120000000000,\\\"Cost\\\":2,\\\"AccountingID\\\":\\\"a012888\\\",\\\"CompressFactor\\\":1},{\\\"Usage\\\":1000000000,\\\"Cost\\\":0.005,\\\"AccountingID\\\":\\\"44d6c02\\\",\\\"CompressFactor\\\":60}],\\\"CompressFactor\\\":1}],\\\"AccountSummary\\\":{\\\"Tenant\\\":\\\"cgrates.org\\\",\\\"ID\\\":\\\"testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceSummaries\\\":[{\\\"UUID\\\":\\\"uuid1\\\",\\\"ID\\\":\\\"\\\",\\\"Type\\\":\\\"*monetary\\\",\\\"Initial\\\":0,\\\"Value\\\":50,\\\"Disabled\\\":false}],\\\"AllowNegative\\\":false,\\\"Disabled\\\":false},\\\"Rating\\\":{\\\"c1a5ab9\\\":{\\\"ConnectFee\\\":0.1,\\\"RoundingMethod\\\":\\\"*up\\\",\\\"RoundingDecimals\\\":5,\\\"MaxCost\\\":0,\\\"MaxCostStrategy\\\":\\\"\\\",\\\"TimingID\\\":\\\"\\\",\\\"RatesID\\\":\\\"ec1a177\\\",\\\"RatingFiltersID\\\":\\\"43e77dc\\\"}},\\\"Accounting\\\":{\\\"44d6c02\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"},\\\"a012888\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"}},\\\"RatingFilters\\\":null,\\\"Rates\\\":{\\\"ec1a177\\\":[{\\\"GroupIntervalStart\\\":0,\\\"Value\\\":0.01,\\\"RateIncrement\\\":60000000000,\\\"RateUnit\\\":1000000000}]},\\\"Timings\\\":null}\",\"Destination\":\"1002\",\"OriginID\":\"oid2\",\"RequestType\":\"*rated\",\"SetupTime\":\"%s\",\"Subject\":\"1001\",\"Tenant\":\"cgrates.org\",\"ToR\":\"*voice\",\"Usage\":\"10000000000\"},\"APIOpts\":{}}>", timeStartFormated, cgrID, timeStartFormated) + var ersLogsCount int + for scanner.Scan() { + line := scanner.Text() + if !strings.Contains(line, " DRYRUN, reader: ") { + continue + } + records++ + if !strings.Contains(line, expectedLog) { + t.Errorf("expected \n<%s>, \nreceived\n<%s>", expectedLog, line) + } + if strings.Contains(line, "[INFO] DRYRUN") { + ersLogsCount++ + } + } + if err := scanner.Err(); err != nil { + t.Errorf("error reading input: %v", err) + } + if records != 1 { + t.Errorf("expected ERs to process 1 records, but it processed %d records", records) + } + if ersLogsCount != 1 { + t.Error("Expected only 1 ERS Dryrun log, received: ", ersLogsCount) + } + }) + + t.Run("VerifyRowsNotDeleted", func(t *testing.T) { + var result int64 + db.Table(utils.CDRsTBL).Count(&result) + if result != 3 { + t.Error("Expected 3 rows in table, got: ", result) + } + var rslt []map[string]interface{} + if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil { + t.Fatalf("failed to query table: %v", err) + } + }) +} + +func TestERSSQLFiltersWithMetaDelete(t *testing.T) { + var dbcfg engine.DBCfg + switch *utils.DBType { + case utils.MetaInternal: + t.SkipNow() + case utils.MetaMySQL: + case utils.MetaMongo: + dbcfg = engine.MongoDBCfg + case utils.MetaPostgres: + dbcfg = engine.PostgresDBCfg + default: + t.Fatal("unsupported dbtype value") + } + + t.Run("InitSQLDB", func(t *testing.T) { + var err error + var db2 *gorm.DB + if db2, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates")), + &gorm.Config{ + AllowGlobalUpdate: true, + Logger: logger.Default.LogMode(logger.Silent), + }); err != nil { + t.Fatal(err) + } + + if err = db2.Exec(`CREATE DATABASE IF NOT EXISTS cgrates2;`).Error; err != nil { + t.Fatal(err) + } + }) + + type testModelSql struct { + ID int64 + Cgrid string + RunID string + OriginHost string + Source string + OriginID string + ToR string + RequestType string + Tenant string + Category string + Account string + Subject string + Destination string + SetupTime time.Time + AnswerTime time.Time + Usage int64 + ExtraFields string + CostSource string + Cost float64 + CostDetails string + ExtraInfo string + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt *time.Time + } + t.Run("PutCDRsInDataBase", func(t *testing.T) { + var err error + if db, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates2")), + &gorm.Config{ + AllowGlobalUpdate: true, + Logger: logger.Default.LogMode(logger.Silent), + }); err != nil { + t.Fatal(err) + } + tx := db.Begin() + if !tx.Migrator().HasTable("cdrs") { + if err = tx.Migrator().CreateTable(new(engine.CDRsql)); err != nil { + tx.Rollback() + t.Fatal(err) + } + } + tx.Commit() + tx = db.Begin() + tx = tx.Table(utils.CDRsTBL) + cdrSql := cdr1.AsCDRsql() + cdrSql2 := cdr2.AsCDRsql() + cdrsql3 := cdr3.AsCDRsql() + cdrSql.CreatedAt = time.Now() + cdrSql2.CreatedAt = time.Now() + cdrsql3.CreatedAt = time.Now() + saved := tx.Save(cdrSql) + if saved.Error != nil { + tx.Rollback() + t.Fatal(err) + } + saved = tx.Save(cdrSql2) + if saved.Error != nil { + tx.Rollback() + t.Fatal(err) + } + saved = tx.Save(cdrsql3) + if saved.Error != nil { + tx.Rollback() + t.Fatal(err) + } + tx.Commit() + time.Sleep(10 * time.Millisecond) + var result int64 + db.Table(utils.CDRsTBL).Count(&result) + if result != 3 { + t.Error("Expected table to have 3 results but got ", result) + } + }) + defer t.Run("StopSQL", func(t *testing.T) { + if err := db.Migrator().DropTable("cdrs"); err != nil { + t.Fatal(err) + } + if err := db.Exec(`DROP DATABASE cgrates2;`).Error; err != nil { + t.Fatal(err) + } + if db2, err := db.DB(); err != nil { + t.Fatal(err) + } else if err = db2.Close(); err != nil { + t.Fatal(err) + } + }) + + content := `{ + + "general": { + "log_level": 7 + }, + + "apiers": { + "enabled": true + }, + "filters": { + "apiers_conns": ["*localhost"] + }, + "ers": { + "enabled": true, + "sessions_conns":["*localhost"], + "readers": [ + { + "id": "mysql", + "type": "*sql", + "run_delay": "1m", + "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", + "opts": { + "sqlDBName":"cgrates2", + }, + "start_delay": "500ms", // wait for db to be populated before starting reader + "processed_path": "*delete", + "tenant": "cgrates.org", + "filters": [ + "*gt:~*req.answer_time:NOW() - INTERVAL 7 DAY", // dont process cdrs with answer_time older than 7 days ago (continue if answer_time > now-7days) + "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", + "*string:~*vars.*readerID:mysql", + "FLTR_VARS", // "*string:~*vars.*readerID:mysql", + ], + "flags": ["*dryrun"], + "fields":[ + {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true}, + {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true}, + {"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true}, + ], + }, + ], + }, + + }` + + tpFiles := map[string]string{ + utils.FiltersCsv: `#Tenant[0],ID[1],Type[2],Path[3],Values[4],ActivationInterval[5] +cgrates.org,FLTR_SQL_RatingID,*eq,~*req.cost_details.Charges[0].RatingID,RatingID2, +cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`, + } + + buf := &bytes.Buffer{} + ng := engine.TestEngine{ + ConfigJSON: content, + DBCfg: dbcfg, + TpFiles: tpFiles, + LogBuffer: buf, + } + ng.Run(t) + time.Sleep(1 * time.Second) + + t.Run("VerifyProcessedFieldsFromLogs", func(t *testing.T) { + time.Sleep(100 * time.Millisecond) // give enough time to process from sql table + records := 0 + scanner := bufio.NewScanner(strings.NewReader(buf.String())) + timeStartFormated := timeStart.Format("2006-01-02T15:04:05Z07:00") + expectedLog := fmt.Sprintf("\"Event\":{\"Account\":\"1001\",\"AnswerTime\":\"%s\",\"CGRID\":\"%s\",\"Category\":\"call\",\"CostDetails\":\"{\\\"CGRID\\\":\\\"test1\\\",\\\"RunID\\\":\\\"*default\\\",\\\"StartTime\\\":\\\"2017-01-09T16:18:21Z\\\",\\\"Usage\\\":180000000000,\\\"Cost\\\":2.3,\\\"Charges\\\":[{\\\"RatingID\\\":\\\"RatingID2\\\",\\\"Increments\\\":[{\\\"Usage\\\":120000000000,\\\"Cost\\\":2,\\\"AccountingID\\\":\\\"a012888\\\",\\\"CompressFactor\\\":1},{\\\"Usage\\\":1000000000,\\\"Cost\\\":0.005,\\\"AccountingID\\\":\\\"44d6c02\\\",\\\"CompressFactor\\\":60}],\\\"CompressFactor\\\":1}],\\\"AccountSummary\\\":{\\\"Tenant\\\":\\\"cgrates.org\\\",\\\"ID\\\":\\\"testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceSummaries\\\":[{\\\"UUID\\\":\\\"uuid1\\\",\\\"ID\\\":\\\"\\\",\\\"Type\\\":\\\"*monetary\\\",\\\"Initial\\\":0,\\\"Value\\\":50,\\\"Disabled\\\":false}],\\\"AllowNegative\\\":false,\\\"Disabled\\\":false},\\\"Rating\\\":{\\\"c1a5ab9\\\":{\\\"ConnectFee\\\":0.1,\\\"RoundingMethod\\\":\\\"*up\\\",\\\"RoundingDecimals\\\":5,\\\"MaxCost\\\":0,\\\"MaxCostStrategy\\\":\\\"\\\",\\\"TimingID\\\":\\\"\\\",\\\"RatesID\\\":\\\"ec1a177\\\",\\\"RatingFiltersID\\\":\\\"43e77dc\\\"}},\\\"Accounting\\\":{\\\"44d6c02\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"},\\\"a012888\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"}},\\\"RatingFilters\\\":null,\\\"Rates\\\":{\\\"ec1a177\\\":[{\\\"GroupIntervalStart\\\":0,\\\"Value\\\":0.01,\\\"RateIncrement\\\":60000000000,\\\"RateUnit\\\":1000000000}]},\\\"Timings\\\":null}\",\"Destination\":\"1002\",\"OriginID\":\"oid2\",\"RequestType\":\"*rated\",\"SetupTime\":\"%s\",\"Subject\":\"1001\",\"Tenant\":\"cgrates.org\",\"ToR\":\"*voice\",\"Usage\":\"10000000000\"},\"APIOpts\":{}}>", timeStartFormated, cgrID, timeStartFormated) + var ersLogsCount int + for scanner.Scan() { + line := scanner.Text() + if !strings.Contains(line, " DRYRUN, reader: ") { + continue + } + records++ + if !strings.Contains(line, expectedLog) { + t.Errorf("expected \n<%s>, \nreceived\n<%s>", expectedLog, line) + } + if strings.Contains(line, "[INFO] DRYRUN") { + ersLogsCount++ + } + } + if err := scanner.Err(); err != nil { + t.Errorf("error reading input: %v", err) + } + if records != 1 { + t.Errorf("expected ERs to process 1 records, but it processed %d records", records) + } + if ersLogsCount != 1 { + t.Error("Expected only 1 ERS Dryrun log, received: ", ersLogsCount) + } + }) + + t.Run("VerifyRowsNotDeleted", func(t *testing.T) { + var result int64 + db.Table(utils.CDRsTBL).Count(&result) + if result != 2 { + t.Error("Expected 2 rows in table, got: ", result) + } + var rslt []map[string]interface{} + if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil { + t.Fatalf("failed to query table: %v", err) + } + + // Print the entire table as a string + for _, row := range rslt { + for col, value := range row { + if strings.Contains(fmt.Sprintln(value), "RatingID2") { + t.Fatalf("Expected CDR with RatingID: \"RatingID2\" to be deleted. Received column <%s>, value <%s>", col, value) + } + } + } }) } diff --git a/services/datadb.go b/services/datadb.go index 640f687a5..2c8180d16 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -145,7 +145,7 @@ func (db *DataDBService) mandatoryDB() bool { return db.cfg.RalsCfg().Enabled || db.cfg.SchedulerCfg().Enabled || db.cfg.ChargerSCfg().Enabled || db.cfg.AttributeSCfg().Enabled || db.cfg.ResourceSCfg().Enabled || db.cfg.StatSCfg().Enabled || db.cfg.ThresholdSCfg().Enabled || db.cfg.RouteSCfg().Enabled || db.cfg.DispatcherSCfg().Enabled || - db.cfg.LoaderCfg().Enabled() || db.cfg.ApierCfg().Enabled || db.cfg.AnalyzerSCfg().Enabled + db.cfg.LoaderCfg().Enabled() || db.cfg.ApierCfg().Enabled || db.cfg.AnalyzerSCfg().Enabled || db.cfg.ERsCfg().Enabled } // GetDM returns the DataManager diff --git a/services/ers_it_test.go b/services/ers_it_test.go index d905ed3cb..c51125c66 100644 --- a/services/ers_it_test.go +++ b/services/ers_it_test.go @@ -68,7 +68,7 @@ func TestEventReaderSReload(t *testing.T) { db := NewDataDBService(cfg, nil, false, srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, anz, srvDep) intERsConn := make(chan birpc.ClientConnector, 1) - erS := NewEventReaderService(cfg, nil, filterSChan, shdChan, nil, server, intERsConn, anz, srvDep) + erS := NewEventReaderService(cfg, db, filterSChan, shdChan, nil, server, intERsConn, anz, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(erS, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db) @@ -94,7 +94,6 @@ func TestEventReaderSReload(t *testing.T) { if !erS.IsRunning() { t.Fatalf("Expected service to be running") } - runtime.Gosched() err := erS.Start() if err == nil || err != utils.ErrServiceAlreadyRunning { @@ -112,7 +111,6 @@ func TestEventReaderSReload(t *testing.T) { if erS.IsRunning() { t.Fatal("Expected service to be down") } - } func TestEventReaderSReload2(t *testing.T) { diff --git a/utils/consts.go b/utils/consts.go index 0ec5f3e63..e56cf4b2c 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2806,8 +2806,9 @@ const ( SQLDefaultDBName = "cgrates" SQLDefaultSSLMode = "disable" - SQLDBNameOpt = "sqlDBName" - SQLTableNameOpt = "sqlTableName" + SQLDBNameOpt = "sqlDBName" + SQLTableNameOpt = "sqlTableName" + SQLDeleteIndexedFieldsOpt = "sqlDeleteIndexedFields" SQLMaxOpenConns = "sqlMaxOpenConns" SQLConnMaxLifetime = "sqlConnMaxLifetime"