Improvements to ERS SQL filters

This commit is contained in:
arberkatellari
2024-11-15 13:56:02 +02:00
committed by Dan Christian Bogos
parent 321910d181
commit 041b14fa03
9 changed files with 704 additions and 123 deletions

View File

@@ -399,7 +399,7 @@ const CGRATES_CFG_JSON = `
{
"id": "*default", // identifier of the EventReader profile
"type": "*none", // reader type <*file_csv>
"run_delay": "0", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together
"run_delay": "0", // sleep interval between consecutive runs; "-1" to use automation via inotify; "0" to disable running all together; <""|$dur>
"start_delay": "0", // time to wait before an reader starts to run
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
"source_path": "/var/spool/cgrates/ers/in", // read data from this path
@@ -450,6 +450,7 @@ const CGRATES_CFG_JSON = `
// SQL
// "sqlDBName": "cgrates", // the name of the database from were the events are read
// "sqlTableName": "cdrs", // the name of the table from were the events are read
// "sqlDeleteIndexedFields": [], // list of fields to DELETE from the table
// "pgSSLMode": "disable", // the ssl mode for postgres db
// SQS and S3

View File

@@ -256,9 +256,10 @@ func (kafkaROpts *KafkaROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err
}
type SQLROpts struct {
DBName *string
TableName *string
PgSSLMode *string
DBName *string
TableName *string
DeleteIndexedFields *[]string
PgSSLMode *string
}
func (sqlOpts *SQLROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) {
@@ -268,6 +269,11 @@ func (sqlOpts *SQLROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error
if jsnCfg.SQLTableName != nil {
sqlOpts.TableName = jsnCfg.SQLTableName
}
if jsnCfg.SQLDeleteIndexedFields != nil {
dif := make([]string, len(*jsnCfg.SQLDeleteIndexedFields))
copy(dif, *jsnCfg.SQLDeleteIndexedFields)
sqlOpts.DeleteIndexedFields = &dif
}
if jsnCfg.PgSSLMode != nil {
sqlOpts.PgSSLMode = jsnCfg.PgSSLMode
}
@@ -660,6 +666,11 @@ func (sqlOpts *SQLROpts) Clone() *SQLROpts {
cln.TableName = new(string)
*cln.TableName = *sqlOpts.TableName
}
if sqlOpts.DeleteIndexedFields != nil {
idx := make([]string, len(*sqlOpts.DeleteIndexedFields))
copy(idx, *sqlOpts.DeleteIndexedFields)
cln.DeleteIndexedFields = &idx
}
if sqlOpts.PgSSLMode != nil {
cln.PgSSLMode = new(string)
*cln.PgSSLMode = *sqlOpts.PgSSLMode
@@ -911,6 +922,11 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string
if sqlOpts.TableName != nil {
opts[utils.SQLTableNameOpt] = *sqlOpts.TableName
}
if sqlOpts.DeleteIndexedFields != nil {
deleteIndexedFields := make([]string, len(*sqlOpts.DeleteIndexedFields))
copy(deleteIndexedFields, *sqlOpts.DeleteIndexedFields)
opts[utils.SQLDeleteIndexedFieldsOpt] = deleteIndexedFields
}
if sqlOpts.PgSSLMode != nil {
opts[utils.PgSSLModeCfg] = *sqlOpts.PgSSLMode
}

View File

@@ -1427,6 +1427,7 @@ func TestEventReaderOptsCfg(t *testing.T) {
KafkaTopic: utils.StringPointer("kafka"),
KafkaMaxWait: utils.StringPointer("1m"),
SQLDBName: utils.StringPointer("dbname"),
SQLDeleteIndexedFields: utils.SliceStringPointer([]string{"id"}),
SQLTableName: utils.StringPointer("tablename"),
PgSSLMode: utils.StringPointer("sslmode"),
AWSRegion: utils.StringPointer("eu"),
@@ -1506,9 +1507,10 @@ func TestEventReaderCfgClone(t *testing.T) {
RoutingKey: utils.StringPointer("key1"),
},
SQL: &SQLROpts{
DBName: utils.StringPointer("dbname"),
TableName: utils.StringPointer("tablename"),
PgSSLMode: utils.StringPointer("sslmode"),
DBName: utils.StringPointer("dbname"),
TableName: utils.StringPointer("tablename"),
DeleteIndexedFields: utils.SliceStringPointer([]string{"id"}),
PgSSLMode: utils.StringPointer("sslmode"),
},
AWS: &AWSROpts{

View File

@@ -224,48 +224,49 @@ type ERsJsonCfg struct {
}
type EventReaderOptsJson struct {
PartialPath *string `json:"partialPath"`
PartialCacheAction *string `json:"partialCacheAction"`
PartialOrderField *string `json:"partialOrderField"`
PartialCSVFieldSeparator *string `json:"partialcsvFieldSeparator"`
CSVRowLength *int `json:"csvRowLength"`
CSVFieldSeparator *string `json:"csvFieldSeparator"`
CSVHeaderDefineChar *string `json:"csvHeaderDefineChar"`
CSVLazyQuotes *bool `json:"csvLazyQuotes"`
XMLRootPath *string `json:"xmlRootPath"`
AMQPQueueID *string `json:"amqpQueueID"`
AMQPUsername *string `json:"amqpUsername"`
AMQPPassword *string `json:"amqpPassword"`
AMQPConsumerTag *string `json:"amqpConsumerTag"`
AMQPExchange *string `json:"amqpExchange"`
AMQPExchangeType *string `json:"amqpExchangeType"`
AMQPRoutingKey *string `json:"amqpRoutingKey"`
KafkaTopic *string `json:"kafkaTopic"`
KafkaGroupID *string `json:"kafkaGroupID"`
KafkaMaxWait *string `json:"kafkaMaxWait"`
KafkaTLS *bool `json:"kafkaTLS"`
KafkaCAPath *string `json:"kafkaCAPath"`
KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"`
SQLDBName *string `json:"sqlDBName"`
SQLTableName *string `json:"sqlTableName"`
PgSSLMode *string `json:"pgSSLMode"`
AWSRegion *string `json:"awsRegion"`
AWSKey *string `json:"awsKey"`
AWSSecret *string `json:"awsSecret"`
AWSToken *string `json:"awsToken"`
SQSQueueID *string `json:"sqsQueueID"`
S3BucketID *string `json:"s3BucketID"`
NATSJetStream *bool `json:"natsJetStream"`
NATSConsumerName *string `json:"natsConsumerName"`
NATSStreamName *string `json:"natsStreamName"`
NATSSubject *string `json:"natsSubject"`
NATSQueueID *string `json:"natsQueueID"`
NATSJWTFile *string `json:"natsJWTFile"`
NATSSeedFile *string `json:"natsSeedFile"`
NATSCertificateAuthority *string `json:"natsCertificateAuthority"`
NATSClientCertificate *string `json:"natsClientCertificate"`
NATSClientKey *string `json:"natsClientKey"`
NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"`
PartialPath *string `json:"partialPath"`
PartialCacheAction *string `json:"partialCacheAction"`
PartialOrderField *string `json:"partialOrderField"`
PartialCSVFieldSeparator *string `json:"partialcsvFieldSeparator"`
CSVRowLength *int `json:"csvRowLength"`
CSVFieldSeparator *string `json:"csvFieldSeparator"`
CSVHeaderDefineChar *string `json:"csvHeaderDefineChar"`
CSVLazyQuotes *bool `json:"csvLazyQuotes"`
XMLRootPath *string `json:"xmlRootPath"`
AMQPQueueID *string `json:"amqpQueueID"`
AMQPUsername *string `json:"amqpUsername"`
AMQPPassword *string `json:"amqpPassword"`
AMQPConsumerTag *string `json:"amqpConsumerTag"`
AMQPExchange *string `json:"amqpExchange"`
AMQPExchangeType *string `json:"amqpExchangeType"`
AMQPRoutingKey *string `json:"amqpRoutingKey"`
KafkaTopic *string `json:"kafkaTopic"`
KafkaGroupID *string `json:"kafkaGroupID"`
KafkaMaxWait *string `json:"kafkaMaxWait"`
KafkaTLS *bool `json:"kafkaTLS"`
KafkaCAPath *string `json:"kafkaCAPath"`
KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"`
SQLDBName *string `json:"sqlDBName"`
SQLTableName *string `json:"sqlTableName"`
SQLDeleteIndexedFields *[]string `json:"sqlDeleteIndexedFields"`
PgSSLMode *string `json:"pgSSLMode"`
AWSRegion *string `json:"awsRegion"`
AWSKey *string `json:"awsKey"`
AWSSecret *string `json:"awsSecret"`
AWSToken *string `json:"awsToken"`
SQSQueueID *string `json:"sqsQueueID"`
S3BucketID *string `json:"s3BucketID"`
NATSJetStream *bool `json:"natsJetStream"`
NATSConsumerName *string `json:"natsConsumerName"`
NATSStreamName *string `json:"natsStreamName"`
NATSSubject *string `json:"natsSubject"`
NATSQueueID *string `json:"natsQueueID"`
NATSJWTFile *string `json:"natsJWTFile"`
NATSSeedFile *string `json:"natsSeedFile"`
NATSCertificateAuthority *string `json:"natsCertificateAuthority"`
NATSClientCertificate *string `json:"natsClientCertificate"`
NATSClientKey *string `json:"natsClientKey"`
NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"`
}
// EventReaderSJsonCfg is the configuration of a single EventReader

View File

@@ -74,9 +74,11 @@ type SQLEventReader struct {
cfgIdx int // index of config instance within ERsCfg.Readers
fltrS *engine.FilterS
connString string
connType string
tableName string
connString string
connType string
tableName string
dbFilters []string
lazyFilters []string
rdrEvents chan *erEvent // channel to dispatch the events created to
partialEvents chan *erEvent // channel to dispatch the partial events created to
@@ -130,31 +132,31 @@ func valueQry(ruleType, elem, field string, values []string, not bool) (conditio
case utils.MetaExists, utils.MetaNotExists:
if not {
if elem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf(" %s IS NOT NULL", field))
conditions = append(conditions, fmt.Sprintf("%s IS NOT NULL", field))
return
}
conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') IS NOT NULL", elem, field))
conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NOT NULL", elem, field))
return
}
if elem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf(" %s IS NULL", field))
conditions = append(conditions, fmt.Sprintf("%s IS NULL", field))
return
}
conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') IS NULL", elem, field))
conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NULL", elem, field))
case utils.MetaEmpty, utils.MetaNotEmpty:
if not {
if elem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf(" %s != ''", field))
conditions = append(conditions, fmt.Sprintf("%s != ''", field))
return
}
conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') != ''", elem, field))
conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') != ''", elem, field))
return
}
if elem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf(" %s == ''", field))
conditions = append(conditions, fmt.Sprintf("%s == ''", field))
return
}
conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') == ''", elem, field))
conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') == ''", elem, field))
}
return
}
@@ -171,85 +173,85 @@ func valueQry(ruleType, elem, field string, values []string, not bool) (conditio
case utils.MetaString, utils.MetaNotString, utils.MetaEqual, utils.MetaNotEqual:
if not {
if elem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf(" %s != '%s'", field, value))
conditions = append(conditions, fmt.Sprintf("%s != '%s'", field, value))
continue
}
conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') != '%s'",
conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') != '%s'",
elem, field, value))
continue
}
if elem == utils.EmptyString {
singleCond = fmt.Sprintf(" %s = '%s'", field, value)
singleCond = fmt.Sprintf("%s = '%s'", field, value)
} else {
singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') = '%s'", elem, field, value)
singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') = '%s'", elem, field, value)
}
case utils.MetaLessThan, utils.MetaLessOrEqual, utils.MetaGreaterThan, utils.MetaGreaterOrEqual:
if ruleType == utils.MetaGreaterOrEqual {
if elem == utils.EmptyString {
singleCond = fmt.Sprintf(" %s >= %s", field, value)
singleCond = fmt.Sprintf("%s >= %s", field, value)
} else {
singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') >= %s", elem, field, value)
singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') >= %s", elem, field, value)
}
} else if ruleType == utils.MetaGreaterThan {
if elem == utils.EmptyString {
singleCond = fmt.Sprintf(" %s > %s", field, value)
singleCond = fmt.Sprintf("%s > %s", field, value)
} else {
singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') > %s", elem, field, value)
singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') > %s", elem, field, value)
}
} else if ruleType == utils.MetaLessOrEqual {
if elem == utils.EmptyString {
singleCond = fmt.Sprintf(" %s <= %s", field, value)
singleCond = fmt.Sprintf("%s <= %s", field, value)
} else {
singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') <= %s", elem, field, value)
singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') <= %s", elem, field, value)
}
} else if ruleType == utils.MetaLessThan {
if elem == utils.EmptyString {
singleCond = fmt.Sprintf(" %s < %s", field, value)
singleCond = fmt.Sprintf("%s < %s", field, value)
} else {
singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') < %s", elem, field, value)
singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') < %s", elem, field, value)
}
}
case utils.MetaPrefix, utils.MetaNotPrefix:
if not {
if elem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf(" %s NOT LIKE '%s%%'", field, value))
conditions = append(conditions, fmt.Sprintf("%s NOT LIKE '%s%%'", field, value))
continue
}
conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') NOT LIKE '%s%%'", elem, field, value))
conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') NOT LIKE '%s%%'", elem, field, value))
continue
}
if elem == utils.EmptyString {
singleCond = fmt.Sprintf(" %s LIKE '%s%%'", field, value)
singleCond = fmt.Sprintf("%s LIKE '%s%%'", field, value)
} else {
singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') LIKE '%s%%'", elem, field, value)
singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') LIKE '%s%%'", elem, field, value)
}
case utils.MetaSuffix, utils.MetaNotSuffix:
if not {
if elem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf(" %s NOT LIKE '%%%s'", field, value))
conditions = append(conditions, fmt.Sprintf("%s NOT LIKE '%%%s'", field, value))
continue
}
conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') NOT LIKE '%%%s'", elem, field, value))
conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') NOT LIKE '%%%s'", elem, field, value))
continue
}
if elem == utils.EmptyString {
singleCond = fmt.Sprintf(" %s LIKE '%%%s'", field, value)
singleCond = fmt.Sprintf("%s LIKE '%%%s'", field, value)
} else {
singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') LIKE '%%%s'", elem, field, value)
singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') LIKE '%%%s'", elem, field, value)
}
case utils.MetaRegex, utils.MetaNotRegex:
if not {
if elem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf(" %s NOT REGEXP '%s'", field, value))
conditions = append(conditions, fmt.Sprintf("%s NOT REGEXP '%s'", field, value))
continue
}
conditions = append(conditions, fmt.Sprintf(" JSON_VALUE(%s, '$.%s') NOT REGEXP '%s'", elem, field, value))
conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') NOT REGEXP '%s'", elem, field, value))
continue
}
if elem == utils.EmptyString {
singleCond = fmt.Sprintf(" %s REGEXP '%s'", field, value)
singleCond = fmt.Sprintf("%s REGEXP '%s'", field, value)
} else {
singleCond = fmt.Sprintf(" JSON_VALUE(%s, '$.%s') REGEXP '%s'", elem, field, value)
singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') REGEXP '%s'", elem, field, value)
}
}
conditions = append(conditions, singleCond)
@@ -271,43 +273,39 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) {
}
tm := time.NewTimer(0)
var filters []*engine.Filter
var whereQueries []string
var renewedFltrs []string
for _, fltr := range rdr.Config().Filters {
if result, err := rdr.dm.GetFilter(config.CgrConfig().GeneralCfg().DefaultTenant, fltr, true, false, utils.NonTransactional); err != nil {
if resultFltr, err := rdr.dm.GetFilter(config.CgrConfig().GeneralCfg().DefaultTenant, fltr, true, false, utils.NonTransactional); err != nil {
rdr.rdrErr <- err
return
} else {
filters = append(filters, result)
if !strings.Contains(fltr, utils.DynamicDataPrefix+utils.MetaReq) {
renewedFltrs = append(renewedFltrs, fltr)
}
filters = append(filters, resultFltr)
}
}
rdr.Config().Filters = renewedFltrs // remove filters containing *req
for _, filter := range filters {
var addFltrCount int
for _, rule := range filter.Rules {
var elem, field string
switch {
case strings.HasPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep):
field = strings.TrimPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep)
parts := strings.SplitN(field, ".", 2)
if len(parts) == 2 { // Split in 2 pieces if it contains any more dots in the field
// First part (before the first dot)
elem = parts[0]
// Second part (everything after the first dot)
field = parts[1]
if before, after, hasSep := strings.Cut(field, utils.NestingSep); hasSep {
elem = before
field = after
}
default:
addFltrCount++
if addFltrCount == 1 {
rdr.lazyFilters = append(rdr.lazyFilters, filter.ID)
}
continue
}
conditions := valueQry(rule.Type, elem, field, rule.Values, strings.HasPrefix(rule.Type, utils.MetaNot))
whereQueries = append(whereQueries, strings.Join(conditions, " OR "))
rdr.dbFilters = append(rdr.dbFilters, strings.Join(conditions, " OR "))
}
}
for {
tx := db.Table(rdr.tableName).Select(utils.Meta)
for _, whereQ := range whereQueries {
for _, whereQ := range rdr.dbFilters {
tx = tx.Where(whereQ)
}
rows, err := tx.Rows()
@@ -344,13 +342,14 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) {
for i := range columns {
columnPointers[i] = &columns[i]
}
// copy row values to their respective column
if err = rows.Scan(columnPointers...); err != nil {
rdr.rdrErr <- err
rows.Close()
return
}
msg := make(map[string]any)
fltr := make(map[string]string)
fltr := make(map[string]any)
for i, colName := range colNames {
msg[colName] = columns[i]
if colName != createdAt && colName != updatedAt && colName != deletedAt { // ignore the sql colums for filter only
@@ -359,18 +358,33 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) {
if tm.IsZero() {
continue
}
fltr[colName] = columns[i]
continue
case *time.Time:
if tm == nil || tm.IsZero() {
continue
}
fltr[colName] = columns[i]
continue
case nil:
continue
}
fltr[colName] = utils.IfaceAsString(columns[i])
}
}
if rdr.Config().ProcessedPath == utils.MetaDelete {
if err = db.Table(rdr.tableName).Delete(nil, fltr).Error; err != nil { // to ensure we don't read it again
if rdr.Config().Opts.SQL.DeleteIndexedFields != nil {
tmpFltrMap := make(map[string]any)
for _, fieldName := range *rdr.Config().Opts.SQL.DeleteIndexedFields {
if _, has := fltr[fieldName]; has {
tmpFltrMap[fieldName] = fltr[fieldName]
}
}
if len(tmpFltrMap) != 0 {
fltr = tmpFltrMap
}
}
if rdr.Config().ProcessedPath == utils.MetaDelete || rdr.Config().Opts.SQL.DeleteIndexedFields != nil {
if err = tx.Delete(nil, fltr).Error; err != nil { // to ensure we don't read it again
utils.Logger.Warning(
fmt.Sprintf("<%s> deleting message %s error: %s",
utils.ERs, utils.ToJSON(msg), err.Error()))
@@ -415,7 +429,7 @@ func (rdr *SQLEventReader) processMessage(msg map[string]any) (err error) {
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
rdr.fltrS, nil) // create an AgentRequest
var pass bool
if pass, err = rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
if pass, err = rdr.fltrS.Pass(agReq.Tenant, rdr.lazyFilters,
agReq); err != nil || !pass {
return
}

View File

@@ -38,6 +38,7 @@ import (
var (
db *gorm.DB
dbConnString = "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'"
timeStart = time.Now()
cdr1 = &engine.CDR{ // sample with values not realisticy calculated
CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()),
OrderID: 123,
@@ -131,13 +132,12 @@ var (
},
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
}
timeStart = time.Now()
cgrID = utils.Sha1("dsafdsaf", timeStart.String())
cdr2 = &engine.CDR{ // sample with values not realisticy calculated
cgrID = utils.Sha1("oid2", timeStart.String())
cdr2 = &engine.CDR{ // sample with values not realisticy calculated
CGRID: cgrID,
OrderID: 123,
ToR: utils.MetaVoice,
OriginID: "dsafdsaf",
OriginID: "oid2",
OriginHost: "192.168.1.1",
Source: "test",
RequestType: utils.MetaRated,
@@ -224,7 +224,32 @@ var (
},
},
},
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
Cost: 1.01,
}
cdr3 = &engine.CDR{ // sample with values not realisticy calculated
CGRID: utils.Sha1("oid3", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()),
OrderID: 123,
ToR: utils.MetaVoice,
OriginID: "oid3",
OriginHost: "192.168.1.1",
Source: "test",
RequestType: utils.MetaRated,
Tenant: "cgrates.org",
Category: "call",
Account: "1001",
Subject: "1001",
Destination: "1002",
SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
RunID: utils.MetaDefault,
Usage: 10 * time.Second,
ExtraInfo: "extraInfo",
Partial: false,
PreRated: true,
CostSource: "cost source",
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
Cost: 1.01,
}
)
@@ -232,7 +257,7 @@ func TestERSSQLFilters(t *testing.T) {
var dbcfg engine.DBCfg
switch *utils.DBType {
case utils.MetaInternal:
dbcfg = engine.InternalDBCfg
t.SkipNow()
case utils.MetaMySQL:
case utils.MetaMongo:
dbcfg = engine.MongoDBCfg
@@ -305,8 +330,10 @@ func TestERSSQLFilters(t *testing.T) {
tx = tx.Table(utils.CDRsTBL)
cdrSql := cdr1.AsCDRsql()
cdrSql2 := cdr2.AsCDRsql()
cdrsql3 := cdr3.AsCDRsql()
cdrSql.CreatedAt = time.Now()
cdrSql2.CreatedAt = time.Now()
cdrsql3.CreatedAt = time.Now()
saved := tx.Save(cdrSql)
if saved.Error != nil {
tx.Rollback()
@@ -317,12 +344,17 @@ func TestERSSQLFilters(t *testing.T) {
tx.Rollback()
t.Fatal(err)
}
saved = tx.Save(cdrsql3)
if saved.Error != nil {
tx.Rollback()
t.Fatal(err)
}
tx.Commit()
time.Sleep(10 * time.Millisecond)
var result int64
db.Table(utils.CDRsTBL).Count(&result)
if result != 2 {
t.Error("Expected table to have only one result ", result)
if result != 3 {
t.Error("Expected table to have 3 results but got ", result)
}
})
defer t.Run("StopSQL", func(t *testing.T) {
@@ -348,11 +380,12 @@ func TestERSSQLFilters(t *testing.T) {
"apiers": {
"enabled": true
},
"filters": {
"apiers_conns": ["*localhost"]
},
"ers": {
"enabled": true,
"sessions_conns":["*localhost"],
"apiers_conns": ["*localhost"],
"readers": [
{
"id": "mysql",
@@ -361,12 +394,16 @@ func TestERSSQLFilters(t *testing.T) {
"source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306",
"opts": {
"sqlDBName":"cgrates2",
"sqlDeleteIndexedFields": ["id"],
},
"start_delay": "500ms", // wait for db to be populated before starting reader
"processed_path": "",
"tenant": "cgrates.org",
"filters": [
"*gt:~*req.answer_time:NOW() - INTERVAL 7 DAY", // dont process cdrs with answer_time older than 7 days ago (continue if answer_time > now-7days)
"*eq:~*req.cost_details.Charges[0].RatingID:RatingID2",
"FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2",
"*string:~*vars.*readerID:mysql",
"FLTR_VARS", // "*string:~*vars.*readerID:mysql",
],
"flags": ["*dryrun"],
"fields":[
@@ -390,20 +427,29 @@ func TestERSSQLFilters(t *testing.T) {
}`
tpFiles := map[string]string{
utils.FiltersCsv: `#Tenant[0],ID[1],Type[2],Path[3],Values[4],ActivationInterval[5]
cgrates.org,FLTR_SQL_RatingID,*eq,~*req.cost_details.Charges[0].RatingID,RatingID2,
cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`,
}
buf := &bytes.Buffer{}
ng := engine.TestEngine{
ConfigJSON: content,
DBCfg: dbcfg,
TpFiles: tpFiles,
LogBuffer: buf,
}
ng.Run(t)
time.Sleep(1 * time.Second)
t.Run("VerifyProcessedFieldsFromLogs", func(t *testing.T) {
time.Sleep(100 * time.Millisecond) // give enough time to process from sql table
records := 0
scanner := bufio.NewScanner(strings.NewReader(buf.String()))
timeStartFormated := timeStart.Format("2006-01-02T15:04:05-07:00")
expectedLog := fmt.Sprintf("\"Event\":{\"Account\":\"1001\",\"AnswerTime\":\"%s\",\"CGRID\":\"%s\",\"Category\":\"call\",\"CostDetails\":\"{\\\"CGRID\\\":\\\"test1\\\",\\\"RunID\\\":\\\"*default\\\",\\\"StartTime\\\":\\\"2017-01-09T16:18:21Z\\\",\\\"Usage\\\":180000000000,\\\"Cost\\\":2.3,\\\"Charges\\\":[{\\\"RatingID\\\":\\\"RatingID2\\\",\\\"Increments\\\":[{\\\"Usage\\\":120000000000,\\\"Cost\\\":2,\\\"AccountingID\\\":\\\"a012888\\\",\\\"CompressFactor\\\":1},{\\\"Usage\\\":1000000000,\\\"Cost\\\":0.005,\\\"AccountingID\\\":\\\"44d6c02\\\",\\\"CompressFactor\\\":60}],\\\"CompressFactor\\\":1}],\\\"AccountSummary\\\":{\\\"Tenant\\\":\\\"cgrates.org\\\",\\\"ID\\\":\\\"testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceSummaries\\\":[{\\\"UUID\\\":\\\"uuid1\\\",\\\"ID\\\":\\\"\\\",\\\"Type\\\":\\\"*monetary\\\",\\\"Initial\\\":0,\\\"Value\\\":50,\\\"Disabled\\\":false}],\\\"AllowNegative\\\":false,\\\"Disabled\\\":false},\\\"Rating\\\":{\\\"c1a5ab9\\\":{\\\"ConnectFee\\\":0.1,\\\"RoundingMethod\\\":\\\"*up\\\",\\\"RoundingDecimals\\\":5,\\\"MaxCost\\\":0,\\\"MaxCostStrategy\\\":\\\"\\\",\\\"TimingID\\\":\\\"\\\",\\\"RatesID\\\":\\\"ec1a177\\\",\\\"RatingFiltersID\\\":\\\"43e77dc\\\"}},\\\"Accounting\\\":{\\\"44d6c02\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"},\\\"a012888\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"}},\\\"RatingFilters\\\":null,\\\"Rates\\\":{\\\"ec1a177\\\":[{\\\"GroupIntervalStart\\\":0,\\\"Value\\\":0.01,\\\"RateIncrement\\\":60000000000,\\\"RateUnit\\\":1000000000}]},\\\"Timings\\\":null}\",\"Destination\":\"1002\",\"OriginID\":\"dsafdsaf\",\"RequestType\":\"*rated\",\"SetupTime\":\"%s\",\"Subject\":\"1001\",\"Tenant\":\"cgrates.org\",\"ToR\":\"*voice\",\"Usage\":\"10000000000\"},\"APIOpts\":{}}>", timeStartFormated, cgrID, timeStartFormated)
timeStartFormated := timeStart.Format("2006-01-02T15:04:05Z07:00")
expectedLog := fmt.Sprintf("\"Event\":{\"Account\":\"1001\",\"AnswerTime\":\"%s\",\"CGRID\":\"%s\",\"Category\":\"call\",\"CostDetails\":\"{\\\"CGRID\\\":\\\"test1\\\",\\\"RunID\\\":\\\"*default\\\",\\\"StartTime\\\":\\\"2017-01-09T16:18:21Z\\\",\\\"Usage\\\":180000000000,\\\"Cost\\\":2.3,\\\"Charges\\\":[{\\\"RatingID\\\":\\\"RatingID2\\\",\\\"Increments\\\":[{\\\"Usage\\\":120000000000,\\\"Cost\\\":2,\\\"AccountingID\\\":\\\"a012888\\\",\\\"CompressFactor\\\":1},{\\\"Usage\\\":1000000000,\\\"Cost\\\":0.005,\\\"AccountingID\\\":\\\"44d6c02\\\",\\\"CompressFactor\\\":60}],\\\"CompressFactor\\\":1}],\\\"AccountSummary\\\":{\\\"Tenant\\\":\\\"cgrates.org\\\",\\\"ID\\\":\\\"testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceSummaries\\\":[{\\\"UUID\\\":\\\"uuid1\\\",\\\"ID\\\":\\\"\\\",\\\"Type\\\":\\\"*monetary\\\",\\\"Initial\\\":0,\\\"Value\\\":50,\\\"Disabled\\\":false}],\\\"AllowNegative\\\":false,\\\"Disabled\\\":false},\\\"Rating\\\":{\\\"c1a5ab9\\\":{\\\"ConnectFee\\\":0.1,\\\"RoundingMethod\\\":\\\"*up\\\",\\\"RoundingDecimals\\\":5,\\\"MaxCost\\\":0,\\\"MaxCostStrategy\\\":\\\"\\\",\\\"TimingID\\\":\\\"\\\",\\\"RatesID\\\":\\\"ec1a177\\\",\\\"RatingFiltersID\\\":\\\"43e77dc\\\"}},\\\"Accounting\\\":{\\\"44d6c02\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"},\\\"a012888\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"}},\\\"RatingFilters\\\":null,\\\"Rates\\\":{\\\"ec1a177\\\":[{\\\"GroupIntervalStart\\\":0,\\\"Value\\\":0.01,\\\"RateIncrement\\\":60000000000,\\\"RateUnit\\\":1000000000}]},\\\"Timings\\\":null}\",\"Destination\":\"1002\",\"OriginID\":\"oid2\",\"RequestType\":\"*rated\",\"SetupTime\":\"%s\",\"Subject\":\"1001\",\"Tenant\":\"cgrates.org\",\"ToR\":\"*voice\",\"Usage\":\"10000000000\"},\"APIOpts\":{}}>", timeStartFormated, cgrID, timeStartFormated)
var ersLogsCount int
for scanner.Scan() {
line := scanner.Text()
if !strings.Contains(line, "<ERs> DRYRUN, reader: <mysql>") {
@@ -413,6 +459,9 @@ func TestERSSQLFilters(t *testing.T) {
if !strings.Contains(line, expectedLog) {
t.Errorf("expected \n<%s>, \nreceived\n<%s>", expectedLog, line)
}
if strings.Contains(line, "[INFO] <ERs> DRYRUN") {
ersLogsCount++
}
}
if err := scanner.Err(); err != nil {
t.Errorf("error reading input: %v", err)
@@ -420,5 +469,504 @@ func TestERSSQLFilters(t *testing.T) {
if records != 1 {
t.Errorf("expected ERs to process 1 records, but it processed %d records", records)
}
if ersLogsCount != 1 {
t.Error("Expected only 1 ERS Dryrun log, received: ", ersLogsCount)
}
})
t.Run("VerifyRowsNotDeleted", func(t *testing.T) {
var result int64
db.Table(utils.CDRsTBL).Count(&result)
if result != 2 {
t.Fatal("Expected 2 rows in table ", result)
}
var rslt []map[string]interface{}
if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil {
t.Fatalf("failed to query table: %v", err)
}
// Print the entire table as a string
for _, row := range rslt {
for col, value := range row {
if strings.Contains(fmt.Sprintln(value), "RatingID2") {
t.Fatalf("Expected CDR with RatingID: \"RatingID2\" to be deleted. Received column <%s>, value <%s>", col, value)
}
}
}
})
}
func TestERSSQLFiltersWithoutDelete(t *testing.T) {
var dbcfg engine.DBCfg
switch *utils.DBType {
case utils.MetaInternal:
t.SkipNow()
case utils.MetaMySQL:
case utils.MetaMongo:
dbcfg = engine.MongoDBCfg
case utils.MetaPostgres:
dbcfg = engine.PostgresDBCfg
default:
t.Fatal("unsupported dbtype value")
}
t.Run("InitSQLDB", func(t *testing.T) {
var err error
var db2 *gorm.DB
if db2, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates")),
&gorm.Config{
AllowGlobalUpdate: true,
Logger: logger.Default.LogMode(logger.Silent),
}); err != nil {
t.Fatal(err)
}
if err = db2.Exec(`CREATE DATABASE IF NOT EXISTS cgrates2;`).Error; err != nil {
t.Fatal(err)
}
})
type testModelSql struct {
ID int64
Cgrid string
RunID string
OriginHost string
Source string
OriginID string
ToR string
RequestType string
Tenant string
Category string
Account string
Subject string
Destination string
SetupTime time.Time
AnswerTime time.Time
Usage int64
ExtraFields string
CostSource string
Cost float64
CostDetails string
ExtraInfo string
CreatedAt time.Time
UpdatedAt time.Time
DeletedAt *time.Time
}
t.Run("PutCDRsInDataBase", func(t *testing.T) {
var err error
if db, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates2")),
&gorm.Config{
AllowGlobalUpdate: true,
Logger: logger.Default.LogMode(logger.Silent),
}); err != nil {
t.Fatal(err)
}
tx := db.Begin()
if !tx.Migrator().HasTable("cdrs") {
if err = tx.Migrator().CreateTable(new(engine.CDRsql)); err != nil {
tx.Rollback()
t.Fatal(err)
}
}
tx.Commit()
tx = db.Begin()
tx = tx.Table(utils.CDRsTBL)
cdrSql := cdr1.AsCDRsql()
cdrSql2 := cdr2.AsCDRsql()
cdrsql3 := cdr3.AsCDRsql()
cdrSql.CreatedAt = time.Now()
cdrSql2.CreatedAt = time.Now()
cdrsql3.CreatedAt = time.Now()
saved := tx.Save(cdrSql)
if saved.Error != nil {
tx.Rollback()
t.Fatal(err)
}
saved = tx.Save(cdrSql2)
if saved.Error != nil {
tx.Rollback()
t.Fatal(err)
}
saved = tx.Save(cdrsql3)
if saved.Error != nil {
tx.Rollback()
t.Fatal(err)
}
tx.Commit()
time.Sleep(10 * time.Millisecond)
var result int64
db.Table(utils.CDRsTBL).Count(&result)
if result != 3 {
t.Error("Expected table to have 3 results but got ", result)
}
})
defer t.Run("StopSQL", func(t *testing.T) {
if err := db.Migrator().DropTable("cdrs"); err != nil {
t.Fatal(err)
}
if err := db.Exec(`DROP DATABASE cgrates2;`).Error; err != nil {
t.Fatal(err)
}
if db2, err := db.DB(); err != nil {
t.Fatal(err)
} else if err = db2.Close(); err != nil {
t.Fatal(err)
}
})
content := `{
"general": {
"log_level": 7
},
"apiers": {
"enabled": true
},
"filters": {
"apiers_conns": ["*localhost"]
},
"ers": {
"enabled": true,
"sessions_conns":["*localhost"],
"readers": [
{
"id": "mysql",
"type": "*sql",
"run_delay": "1m",
"source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306",
"opts": {
"sqlDBName":"cgrates2",
},
"start_delay": "500ms", // wait for db to be populated before starting reader
"processed_path": "",
"tenant": "cgrates.org",
"filters": [
"*gt:~*req.answer_time:NOW() - INTERVAL 7 DAY", // dont process cdrs with answer_time older than 7 days ago (continue if answer_time > now-7days)
"FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2",
"*string:~*vars.*readerID:mysql",
"FLTR_VARS", // "*string:~*vars.*readerID:mysql",
],
"flags": ["*dryrun"],
"fields":[
{"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true},
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true},
{"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true},
{"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true},
],
},
],
},
}`
tpFiles := map[string]string{
utils.FiltersCsv: `#Tenant[0],ID[1],Type[2],Path[3],Values[4],ActivationInterval[5]
cgrates.org,FLTR_SQL_RatingID,*eq,~*req.cost_details.Charges[0].RatingID,RatingID2,
cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`,
}
buf := &bytes.Buffer{}
ng := engine.TestEngine{
ConfigJSON: content,
DBCfg: dbcfg,
TpFiles: tpFiles,
LogBuffer: buf,
}
ng.Run(t)
time.Sleep(1 * time.Second)
t.Run("VerifyProcessedFieldsFromLogs", func(t *testing.T) {
time.Sleep(100 * time.Millisecond) // give enough time to process from sql table
records := 0
scanner := bufio.NewScanner(strings.NewReader(buf.String()))
timeStartFormated := timeStart.Format("2006-01-02T15:04:05Z07:00")
expectedLog := fmt.Sprintf("\"Event\":{\"Account\":\"1001\",\"AnswerTime\":\"%s\",\"CGRID\":\"%s\",\"Category\":\"call\",\"CostDetails\":\"{\\\"CGRID\\\":\\\"test1\\\",\\\"RunID\\\":\\\"*default\\\",\\\"StartTime\\\":\\\"2017-01-09T16:18:21Z\\\",\\\"Usage\\\":180000000000,\\\"Cost\\\":2.3,\\\"Charges\\\":[{\\\"RatingID\\\":\\\"RatingID2\\\",\\\"Increments\\\":[{\\\"Usage\\\":120000000000,\\\"Cost\\\":2,\\\"AccountingID\\\":\\\"a012888\\\",\\\"CompressFactor\\\":1},{\\\"Usage\\\":1000000000,\\\"Cost\\\":0.005,\\\"AccountingID\\\":\\\"44d6c02\\\",\\\"CompressFactor\\\":60}],\\\"CompressFactor\\\":1}],\\\"AccountSummary\\\":{\\\"Tenant\\\":\\\"cgrates.org\\\",\\\"ID\\\":\\\"testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceSummaries\\\":[{\\\"UUID\\\":\\\"uuid1\\\",\\\"ID\\\":\\\"\\\",\\\"Type\\\":\\\"*monetary\\\",\\\"Initial\\\":0,\\\"Value\\\":50,\\\"Disabled\\\":false}],\\\"AllowNegative\\\":false,\\\"Disabled\\\":false},\\\"Rating\\\":{\\\"c1a5ab9\\\":{\\\"ConnectFee\\\":0.1,\\\"RoundingMethod\\\":\\\"*up\\\",\\\"RoundingDecimals\\\":5,\\\"MaxCost\\\":0,\\\"MaxCostStrategy\\\":\\\"\\\",\\\"TimingID\\\":\\\"\\\",\\\"RatesID\\\":\\\"ec1a177\\\",\\\"RatingFiltersID\\\":\\\"43e77dc\\\"}},\\\"Accounting\\\":{\\\"44d6c02\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"},\\\"a012888\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"}},\\\"RatingFilters\\\":null,\\\"Rates\\\":{\\\"ec1a177\\\":[{\\\"GroupIntervalStart\\\":0,\\\"Value\\\":0.01,\\\"RateIncrement\\\":60000000000,\\\"RateUnit\\\":1000000000}]},\\\"Timings\\\":null}\",\"Destination\":\"1002\",\"OriginID\":\"oid2\",\"RequestType\":\"*rated\",\"SetupTime\":\"%s\",\"Subject\":\"1001\",\"Tenant\":\"cgrates.org\",\"ToR\":\"*voice\",\"Usage\":\"10000000000\"},\"APIOpts\":{}}>", timeStartFormated, cgrID, timeStartFormated)
var ersLogsCount int
for scanner.Scan() {
line := scanner.Text()
if !strings.Contains(line, "<ERs> DRYRUN, reader: <mysql>") {
continue
}
records++
if !strings.Contains(line, expectedLog) {
t.Errorf("expected \n<%s>, \nreceived\n<%s>", expectedLog, line)
}
if strings.Contains(line, "[INFO] <ERs> DRYRUN") {
ersLogsCount++
}
}
if err := scanner.Err(); err != nil {
t.Errorf("error reading input: %v", err)
}
if records != 1 {
t.Errorf("expected ERs to process 1 records, but it processed %d records", records)
}
if ersLogsCount != 1 {
t.Error("Expected only 1 ERS Dryrun log, received: ", ersLogsCount)
}
})
t.Run("VerifyRowsNotDeleted", func(t *testing.T) {
var result int64
db.Table(utils.CDRsTBL).Count(&result)
if result != 3 {
t.Error("Expected 3 rows in table, got: ", result)
}
var rslt []map[string]interface{}
if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil {
t.Fatalf("failed to query table: %v", err)
}
})
}
func TestERSSQLFiltersWithMetaDelete(t *testing.T) {
var dbcfg engine.DBCfg
switch *utils.DBType {
case utils.MetaInternal:
t.SkipNow()
case utils.MetaMySQL:
case utils.MetaMongo:
dbcfg = engine.MongoDBCfg
case utils.MetaPostgres:
dbcfg = engine.PostgresDBCfg
default:
t.Fatal("unsupported dbtype value")
}
t.Run("InitSQLDB", func(t *testing.T) {
var err error
var db2 *gorm.DB
if db2, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates")),
&gorm.Config{
AllowGlobalUpdate: true,
Logger: logger.Default.LogMode(logger.Silent),
}); err != nil {
t.Fatal(err)
}
if err = db2.Exec(`CREATE DATABASE IF NOT EXISTS cgrates2;`).Error; err != nil {
t.Fatal(err)
}
})
type testModelSql struct {
ID int64
Cgrid string
RunID string
OriginHost string
Source string
OriginID string
ToR string
RequestType string
Tenant string
Category string
Account string
Subject string
Destination string
SetupTime time.Time
AnswerTime time.Time
Usage int64
ExtraFields string
CostSource string
Cost float64
CostDetails string
ExtraInfo string
CreatedAt time.Time
UpdatedAt time.Time
DeletedAt *time.Time
}
t.Run("PutCDRsInDataBase", func(t *testing.T) {
var err error
if db, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates2")),
&gorm.Config{
AllowGlobalUpdate: true,
Logger: logger.Default.LogMode(logger.Silent),
}); err != nil {
t.Fatal(err)
}
tx := db.Begin()
if !tx.Migrator().HasTable("cdrs") {
if err = tx.Migrator().CreateTable(new(engine.CDRsql)); err != nil {
tx.Rollback()
t.Fatal(err)
}
}
tx.Commit()
tx = db.Begin()
tx = tx.Table(utils.CDRsTBL)
cdrSql := cdr1.AsCDRsql()
cdrSql2 := cdr2.AsCDRsql()
cdrsql3 := cdr3.AsCDRsql()
cdrSql.CreatedAt = time.Now()
cdrSql2.CreatedAt = time.Now()
cdrsql3.CreatedAt = time.Now()
saved := tx.Save(cdrSql)
if saved.Error != nil {
tx.Rollback()
t.Fatal(err)
}
saved = tx.Save(cdrSql2)
if saved.Error != nil {
tx.Rollback()
t.Fatal(err)
}
saved = tx.Save(cdrsql3)
if saved.Error != nil {
tx.Rollback()
t.Fatal(err)
}
tx.Commit()
time.Sleep(10 * time.Millisecond)
var result int64
db.Table(utils.CDRsTBL).Count(&result)
if result != 3 {
t.Error("Expected table to have 3 results but got ", result)
}
})
defer t.Run("StopSQL", func(t *testing.T) {
if err := db.Migrator().DropTable("cdrs"); err != nil {
t.Fatal(err)
}
if err := db.Exec(`DROP DATABASE cgrates2;`).Error; err != nil {
t.Fatal(err)
}
if db2, err := db.DB(); err != nil {
t.Fatal(err)
} else if err = db2.Close(); err != nil {
t.Fatal(err)
}
})
content := `{
"general": {
"log_level": 7
},
"apiers": {
"enabled": true
},
"filters": {
"apiers_conns": ["*localhost"]
},
"ers": {
"enabled": true,
"sessions_conns":["*localhost"],
"readers": [
{
"id": "mysql",
"type": "*sql",
"run_delay": "1m",
"source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306",
"opts": {
"sqlDBName":"cgrates2",
},
"start_delay": "500ms", // wait for db to be populated before starting reader
"processed_path": "*delete",
"tenant": "cgrates.org",
"filters": [
"*gt:~*req.answer_time:NOW() - INTERVAL 7 DAY", // dont process cdrs with answer_time older than 7 days ago (continue if answer_time > now-7days)
"FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2",
"*string:~*vars.*readerID:mysql",
"FLTR_VARS", // "*string:~*vars.*readerID:mysql",
],
"flags": ["*dryrun"],
"fields":[
{"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true},
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true},
{"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true},
{"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true},
],
},
],
},
}`
tpFiles := map[string]string{
utils.FiltersCsv: `#Tenant[0],ID[1],Type[2],Path[3],Values[4],ActivationInterval[5]
cgrates.org,FLTR_SQL_RatingID,*eq,~*req.cost_details.Charges[0].RatingID,RatingID2,
cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`,
}
buf := &bytes.Buffer{}
ng := engine.TestEngine{
ConfigJSON: content,
DBCfg: dbcfg,
TpFiles: tpFiles,
LogBuffer: buf,
}
ng.Run(t)
time.Sleep(1 * time.Second)
t.Run("VerifyProcessedFieldsFromLogs", func(t *testing.T) {
time.Sleep(100 * time.Millisecond) // give enough time to process from sql table
records := 0
scanner := bufio.NewScanner(strings.NewReader(buf.String()))
timeStartFormated := timeStart.Format("2006-01-02T15:04:05Z07:00")
expectedLog := fmt.Sprintf("\"Event\":{\"Account\":\"1001\",\"AnswerTime\":\"%s\",\"CGRID\":\"%s\",\"Category\":\"call\",\"CostDetails\":\"{\\\"CGRID\\\":\\\"test1\\\",\\\"RunID\\\":\\\"*default\\\",\\\"StartTime\\\":\\\"2017-01-09T16:18:21Z\\\",\\\"Usage\\\":180000000000,\\\"Cost\\\":2.3,\\\"Charges\\\":[{\\\"RatingID\\\":\\\"RatingID2\\\",\\\"Increments\\\":[{\\\"Usage\\\":120000000000,\\\"Cost\\\":2,\\\"AccountingID\\\":\\\"a012888\\\",\\\"CompressFactor\\\":1},{\\\"Usage\\\":1000000000,\\\"Cost\\\":0.005,\\\"AccountingID\\\":\\\"44d6c02\\\",\\\"CompressFactor\\\":60}],\\\"CompressFactor\\\":1}],\\\"AccountSummary\\\":{\\\"Tenant\\\":\\\"cgrates.org\\\",\\\"ID\\\":\\\"testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceSummaries\\\":[{\\\"UUID\\\":\\\"uuid1\\\",\\\"ID\\\":\\\"\\\",\\\"Type\\\":\\\"*monetary\\\",\\\"Initial\\\":0,\\\"Value\\\":50,\\\"Disabled\\\":false}],\\\"AllowNegative\\\":false,\\\"Disabled\\\":false},\\\"Rating\\\":{\\\"c1a5ab9\\\":{\\\"ConnectFee\\\":0.1,\\\"RoundingMethod\\\":\\\"*up\\\",\\\"RoundingDecimals\\\":5,\\\"MaxCost\\\":0,\\\"MaxCostStrategy\\\":\\\"\\\",\\\"TimingID\\\":\\\"\\\",\\\"RatesID\\\":\\\"ec1a177\\\",\\\"RatingFiltersID\\\":\\\"43e77dc\\\"}},\\\"Accounting\\\":{\\\"44d6c02\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"},\\\"a012888\\\":{\\\"AccountID\\\":\\\"cgrates.org:testV1CDRsRefundOutOfSessionCost\\\",\\\"BalanceUUID\\\":\\\"uuid1\\\",\\\"RatingID\\\":\\\"\\\",\\\"Units\\\":120.7,\\\"ExtraChargeID\\\":\\\"\\\"}},\\\"RatingFilters\\\":null,\\\"Rates\\\":{\\\"ec1a177\\\":[{\\\"GroupIntervalStart\\\":0,\\\"Value\\\":0.01,\\\"RateIncrement\\\":60000000000,\\\"RateUnit\\\":1000000000}]},\\\"Timings\\\":null}\",\"Destination\":\"1002\",\"OriginID\":\"oid2\",\"RequestType\":\"*rated\",\"SetupTime\":\"%s\",\"Subject\":\"1001\",\"Tenant\":\"cgrates.org\",\"ToR\":\"*voice\",\"Usage\":\"10000000000\"},\"APIOpts\":{}}>", timeStartFormated, cgrID, timeStartFormated)
var ersLogsCount int
for scanner.Scan() {
line := scanner.Text()
if !strings.Contains(line, "<ERs> DRYRUN, reader: <mysql>") {
continue
}
records++
if !strings.Contains(line, expectedLog) {
t.Errorf("expected \n<%s>, \nreceived\n<%s>", expectedLog, line)
}
if strings.Contains(line, "[INFO] <ERs> DRYRUN") {
ersLogsCount++
}
}
if err := scanner.Err(); err != nil {
t.Errorf("error reading input: %v", err)
}
if records != 1 {
t.Errorf("expected ERs to process 1 records, but it processed %d records", records)
}
if ersLogsCount != 1 {
t.Error("Expected only 1 ERS Dryrun log, received: ", ersLogsCount)
}
})
t.Run("VerifyRowsNotDeleted", func(t *testing.T) {
var result int64
db.Table(utils.CDRsTBL).Count(&result)
if result != 2 {
t.Error("Expected 2 rows in table, got: ", result)
}
var rslt []map[string]interface{}
if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil {
t.Fatalf("failed to query table: %v", err)
}
// Print the entire table as a string
for _, row := range rslt {
for col, value := range row {
if strings.Contains(fmt.Sprintln(value), "RatingID2") {
t.Fatalf("Expected CDR with RatingID: \"RatingID2\" to be deleted. Received column <%s>, value <%s>", col, value)
}
}
}
})
}

View File

@@ -145,7 +145,7 @@ func (db *DataDBService) mandatoryDB() bool {
return db.cfg.RalsCfg().Enabled || db.cfg.SchedulerCfg().Enabled || db.cfg.ChargerSCfg().Enabled ||
db.cfg.AttributeSCfg().Enabled || db.cfg.ResourceSCfg().Enabled || db.cfg.StatSCfg().Enabled ||
db.cfg.ThresholdSCfg().Enabled || db.cfg.RouteSCfg().Enabled || db.cfg.DispatcherSCfg().Enabled ||
db.cfg.LoaderCfg().Enabled() || db.cfg.ApierCfg().Enabled || db.cfg.AnalyzerSCfg().Enabled
db.cfg.LoaderCfg().Enabled() || db.cfg.ApierCfg().Enabled || db.cfg.AnalyzerSCfg().Enabled || db.cfg.ERsCfg().Enabled
}
// GetDM returns the DataManager

View File

@@ -68,7 +68,7 @@ func TestEventReaderSReload(t *testing.T) {
db := NewDataDBService(cfg, nil, false, srvDep)
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, anz, srvDep)
intERsConn := make(chan birpc.ClientConnector, 1)
erS := NewEventReaderService(cfg, nil, filterSChan, shdChan, nil, server, intERsConn, anz, srvDep)
erS := NewEventReaderService(cfg, db, filterSChan, shdChan, nil, server, intERsConn, anz, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(erS, sS,
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
@@ -94,7 +94,6 @@ func TestEventReaderSReload(t *testing.T) {
if !erS.IsRunning() {
t.Fatalf("Expected service to be running")
}
runtime.Gosched()
err := erS.Start()
if err == nil || err != utils.ErrServiceAlreadyRunning {
@@ -112,7 +111,6 @@ func TestEventReaderSReload(t *testing.T) {
if erS.IsRunning() {
t.Fatal("Expected service to be down")
}
}
func TestEventReaderSReload2(t *testing.T) {

View File

@@ -2806,8 +2806,9 @@ const (
SQLDefaultDBName = "cgrates"
SQLDefaultSSLMode = "disable"
SQLDBNameOpt = "sqlDBName"
SQLTableNameOpt = "sqlTableName"
SQLDBNameOpt = "sqlDBName"
SQLTableNameOpt = "sqlTableName"
SQLDeleteIndexedFieldsOpt = "sqlDeleteIndexedFields"
SQLMaxOpenConns = "sqlMaxOpenConns"
SQLConnMaxLifetime = "sqlConnMaxLifetime"