From 73667f343fb3fbd77f6543258f001a2f319034b3 Mon Sep 17 00:00:00 2001 From: arberkatellari Date: Mon, 30 Jun 2025 17:03:22 +0200 Subject: [PATCH] Improve filters validation --- engine/filters.go | 26 ++- engine/filters_test.go | 30 +++- ers/sql.go | 4 + general_tests/ers_sql_filters_it_test.go | 215 ++++++++++++++++++++++- 4 files changed, 253 insertions(+), 22 deletions(-) diff --git a/engine/filters.go b/engine/filters.go index 624911ca0..0bd7b545c 100644 --- a/engine/filters.go +++ b/engine/filters.go @@ -823,6 +823,12 @@ func CheckFilter(fltr *Filter) (err error) { return fmt.Errorf("%s for filter <%v>", err, fltr) //encapsulated error } for _, val := range rls.Values { + if rls.Type == utils.MetaEmpty || rls.Type == utils.MetaNotEmpty || + rls.Type == utils.MetaExists || rls.Type == utils.MetaNotExists && + val != utils.EmptyString { + return fmt.Errorf("value of filter <%s> is not empty <%s>", + fltr.ID, val) + } if err = valFunc(val); err != nil { return fmt.Errorf("%s for filter <%v>", err, fltr) //encapsulated error } @@ -881,19 +887,20 @@ func (fltr *FilterRule) FilterToSQLQuery() (conditions []string) { if len(fltr.Values) == 0 { switch fltr.Type { case utils.MetaExists, utils.MetaNotExists: - if not { + if not { // not existing means Column IS NULL if firstItem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf("%s IS NOT NULL", restOfItems)) + conditions = append(conditions, fmt.Sprintf("%s IS NULL", restOfItems)) return } - conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NOT NULL", firstItem, restOfItems)) + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NULL", firstItem, restOfItems)) return } + // existing means Column IS NOT NULL if firstItem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf("%s IS NULL", restOfItems)) + conditions = append(conditions, fmt.Sprintf("%s IS NOT NULL", restOfItems)) return } - conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NULL", firstItem, restOfItems)) + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NOT NULL", firstItem, restOfItems)) case utils.MetaEmpty, utils.MetaNotEmpty: if not { if firstItem == utils.EmptyString { @@ -938,25 +945,26 @@ func (fltr *FilterRule) FilterToSQLQuery() (conditions []string) { } case utils.MetaLessThan, utils.MetaLessOrEqual, utils.MetaGreaterThan, utils.MetaGreaterOrEqual: parsedValAny := utils.StringToInterface(value) - if fltr.Type == utils.MetaGreaterOrEqual { + switch fltr.Type { + case utils.MetaGreaterOrEqual: if firstItem == utils.EmptyString { singleCond = fmt.Sprintf("%s >= '%v'", restOfItems, parsedValAny) } else { singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') >= '%v'", firstItem, restOfItems, parsedValAny) } - } else if fltr.Type == utils.MetaGreaterThan { + case utils.MetaGreaterThan: if firstItem == utils.EmptyString { singleCond = fmt.Sprintf("%s > '%v'", restOfItems, parsedValAny) } else { singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') > '%v'", firstItem, restOfItems, parsedValAny) } - } else if fltr.Type == utils.MetaLessOrEqual { + case utils.MetaLessOrEqual: if firstItem == utils.EmptyString { singleCond = fmt.Sprintf("%s <= '%v'", restOfItems, parsedValAny) } else { singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') <= '%v'", firstItem, restOfItems, parsedValAny) } - } else if fltr.Type == utils.MetaLessThan { + case utils.MetaLessThan: if firstItem == utils.EmptyString { singleCond = fmt.Sprintf("%s < '%v'", restOfItems, parsedValAny) } else { diff --git a/engine/filters_test.go b/engine/filters_test.go index e8adb3484..2bf410b87 100644 --- a/engine/filters_test.go +++ b/engine/filters_test.go @@ -2446,6 +2446,24 @@ func TestCheckFilterErr(t *testing.T) { } } +func TestCheckFilterNotEmptyErr(t *testing.T) { + fltr := &Filter{ + Tenant: "cgrates.org", + ID: "TestFilter", + Rules: []*FilterRule{ + { + Type: utils.MetaNotEmpty, + Element: "~*req.Account", + Values: []string{"''"}, + }, + }, + } + expErr := `value of filter is not empty <''>` + if err := CheckFilter(fltr); err == nil || err.Error() != expErr { + t.Error(err) + } +} + func TestFilterPassRegexErr(t *testing.T) { cd := &CallDescriptor{ Category: "callx", @@ -3169,13 +3187,13 @@ func TestFilterToSQLQuery(t *testing.T) { }{ {"MetaEqual with values", FilterRule{Type: utils.MetaEqual, Element: "~*req.cost_details.Charges[0].RatingID", Values: []string{"RatingID2"}}, []string{"JSON_VALUE(cost_details, '$.Charges[0].RatingID') = 'RatingID2'"}}, - {"MetaExists with no values", FilterRule{Type: utils.MetaExists, Element: "~*req.answer_time", Values: nil}, []string{"answer_time IS NULL"}}, + {"MetaExists with no values", FilterRule{Type: utils.MetaExists, Element: "~*req.answer_time", Values: nil}, []string{"answer_time IS NOT NULL"}}, - {"MetaExists with JSON field", FilterRule{Type: utils.MetaExists, Element: "~*req.cost_details.Charges[0].RatingID", Values: nil}, []string{"JSON_VALUE(cost_details, '$.Charges[0].RatingID') IS NULL"}}, + {"MetaExists with JSON field", FilterRule{Type: utils.MetaExists, Element: "~*req.cost_details.Charges[0].RatingID", Values: nil}, []string{"JSON_VALUE(cost_details, '$.Charges[0].RatingID') IS NOT NULL"}}, - {"MetaNotExists with no values", FilterRule{Type: utils.MetaNotExists, Element: "~*req.answer_time", Values: nil}, []string{"answer_time IS NOT NULL"}}, + {"MetaNotExists with no values", FilterRule{Type: utils.MetaNotExists, Element: "~*req.answer_time", Values: nil}, []string{"answer_time IS NULL"}}, - {"MetaNotExists with JSON field", FilterRule{Type: utils.MetaNotExists, Element: "~*req.cost_details.Charges[0].RatingID", Values: nil}, []string{"JSON_VALUE(cost_details, '$.Charges[0].RatingID') IS NOT NULL"}}, + {"MetaNotExists with JSON field", FilterRule{Type: utils.MetaNotExists, Element: "~*req.cost_details.Charges[0].RatingID", Values: nil}, []string{"JSON_VALUE(cost_details, '$.Charges[0].RatingID') IS NULL"}}, {"MetaString with values", FilterRule{Type: utils.MetaString, Element: "~*req.answer_time", Values: []string{"value1", "value2"}}, []string{"answer_time = 'value1'", "answer_time = 'value2'"}}, @@ -3396,7 +3414,7 @@ func TestFilterToSQLQueryValidations(t *testing.T) { Element: "~*req.column1", Values: nil, }, - expected: []string{"column1 IS NULL"}, + expected: []string{"column1 IS NOT NULL"}, }, { name: "MetaNotExists with no values", @@ -3405,7 +3423,7 @@ func TestFilterToSQLQueryValidations(t *testing.T) { Element: "~*req.json_field.key", Values: nil, }, - expected: []string{"JSON_VALUE(json_field, '$.key') IS NOT NULL"}, + expected: []string{"JSON_VALUE(json_field, '$.key') IS NULL"}, }, { name: "MetaString with values", diff --git a/ers/sql.go b/ers/sql.go index b4814154b..1edc79e7b 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -146,6 +146,10 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { } } for _, filterObj := range filtersObjList { // seperate filters used for WHERE clause from other filters, and build query conditions out of them + if err := engine.CheckFilter(filterObj); err != nil { + rdr.rdrErr <- err + return + } var lazyFltrPopulated bool // Track if a lazyFilter is already populated by the previous filterObj.Rules, so we dont store the same lazy filter more than once for _, rule := range filterObj.Rules { if strings.HasPrefix(rule.Element, utils.MetaDynReq+utils.NestingSep) { // convert filter to WHERE condition only on filters with ~*req. diff --git a/general_tests/ers_sql_filters_it_test.go b/general_tests/ers_sql_filters_it_test.go index 0b32dacd3..155f286a6 100644 --- a/general_tests/ers_sql_filters_it_test.go +++ b/general_tests/ers_sql_filters_it_test.go @@ -407,7 +407,7 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`, if result != 3 { t.Error("Expected 3 rows in table, got: ", result) } - var rslt []map[string]interface{} + var rslt []map[string]any if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil { t.Fatalf("failed to query table: %v", err) } @@ -570,7 +570,7 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`, if result != 2 { t.Fatal("Expected 2 rows in table ", result) } - var rslt []map[string]interface{} + var rslt []map[string]any if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil { t.Fatalf("failed to query table: %v", err) } @@ -739,7 +739,7 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`, if result != 2 { t.Error("Expected 2 rows in table, got: ", result) } - var rslt []map[string]interface{} + var rslt []map[string]any if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil { t.Fatalf("failed to query table: %v", err) } @@ -948,7 +948,7 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`, if result != 2 { t.Fatal("Expected 2 rows in table, got: ", result) } - var rslt []map[string]interface{} + var rslt []map[string]any if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil { t.Errorf("failed to query table: %v", err) } @@ -966,7 +966,7 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`, if result2 != 1 { t.Fatal("Expected 1 rows in table, got: ", result2) } - var rslt2 []map[string]interface{} + var rslt2 []map[string]any if err := db.Raw("SELECT * FROM " + "cdrsProcessed").Scan(&rslt2).Error; err != nil { t.Errorf("failed to query table: %v", err) } @@ -1137,7 +1137,7 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`, if result != 3 { t.Error("Expected 3 rows in table, got: ", result) } - var rslt []map[string]interface{} + var rslt []map[string]any if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil { t.Errorf("failed to query table: %v", err) } @@ -1318,7 +1318,7 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`, if result != 3 { t.Error("Expected 3 rows in table, got: ", result) } - var rslt []map[string]interface{} + var rslt []map[string]any if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil { t.Errorf("failed to query table: %v", err) } @@ -1341,3 +1341,204 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`, } }) } + +func TestERSSQLFiltersErr(t *testing.T) { + var dbcfg engine.DBCfg + switch *utils.DBType { + case utils.MetaInternal: + dbcfg = engine.InternalDBCfg + case utils.MetaMySQL: + case utils.MetaMongo: + dbcfg = engine.MongoDBCfg + case utils.MetaPostgres: + dbcfg = engine.PostgresDBCfg + default: + t.Fatal("unsupported dbtype value") + } + + var db, cdb2 *gorm.DB + t.Run("InitSQLDB", func(t *testing.T) { + var err error + if cdb2, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates")), + &gorm.Config{ + AllowGlobalUpdate: true, + }); err != nil { + t.Fatal(err) + } + + if err = cdb2.Exec(`CREATE DATABASE IF NOT EXISTS cgrates2;`).Error; err != nil { + t.Fatal(err) + } + sqlDB, err := cdb2.DB() + if err != nil { + t.Fatal(err) + } + sqlDB.SetConnMaxLifetime(5 * time.Second) // connections will stay idle even if you close the database. Set MaxLifetime to 5 seconds so that we dont get too many connection attempts error when ran with other tests togather + }) + + t.Run("PutCDRsInDataBase", func(t *testing.T) { + var err error + if db, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates2")), + &gorm.Config{ + AllowGlobalUpdate: true, + }); err != nil { + t.Fatal(err) + } + tx := db.Begin() + if !tx.Migrator().HasTable("cdrs") { + if err = tx.Migrator().CreateTable(new(engine.CDRsql)); err != nil { + tx.Rollback() + t.Fatal(err) + } + } + tx.Commit() + tx = db.Begin() + tx = tx.Table(utils.CDRsTBL) + cdrSql, err := cdr1.AsCDRsql(&engine.JSONMarshaler{}) + cdrSql2, err := cdr2.AsCDRsql(&engine.JSONMarshaler{}) + cdrsql3, err := cdr3.AsCDRsql(&engine.JSONMarshaler{}) + cdrSql.CreatedAt = time.Now() + cdrSql2.CreatedAt = time.Now() + cdrsql3.CreatedAt = time.Now() + saved := tx.Save(cdrSql) + if saved.Error != nil { + tx.Rollback() + t.Fatal(err) + } + saved = tx.Save(cdrSql2) + if saved.Error != nil { + tx.Rollback() + t.Fatal(err) + } + saved = tx.Save(cdrsql3) + if saved.Error != nil { + tx.Rollback() + t.Fatal(err) + } + tx.Commit() + time.Sleep(10 * time.Millisecond) + var result int64 + db.Table(utils.CDRsTBL).Count(&result) + if result != 3 { + t.Error("Expected table to have 3 results but got ", result) + } + }) + defer t.Run("StopSQL", func(t *testing.T) { + if err := db.Migrator().DropTable("cdrs"); err != nil { + t.Fatal(err) + } + if err := db.Exec(`DROP DATABASE cgrates2;`).Error; err != nil { + t.Fatal(err) + } + if db2, err := db.DB(); err != nil { + t.Fatal(err) + } else if err = db2.Close(); err != nil { + t.Fatal(err) + } + if cdb2, err := cdb2.DB(); err != nil { + t.Fatal(err) + } else if err = cdb2.Close(); err != nil { + t.Fatal(err) + } + }) + + tpFiles := map[string]string{ + utils.FiltersCsv: `#Tenant[0],ID[1],Type[2],Path[3],Values[4],ActivationInterval[5] +cgrates.org,FLTR_SQL_RatingID,*eq,~*req.cost_details.Charges[0].RatingID,RatingID2, +cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`, + } + + jsonCfg := `{ +"general": { + "log_level": 7 +}, + +"apiers": { + "enabled": true +}, +"filters": { + "apiers_conns": ["*localhost"] +}, +"stor_db": { + "opts": { + "sqlConnMaxLifetime": "5s", // needed while running all integration tests + }, +}, +"ers": { + "enabled": true, + "sessions_conns":["*localhost"], + "readers": [ + { + "id": "mysql", + "type": "*sql", + "run_delay": "1m", + "source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306", + "opts": { + "sqlDBName":"cgrates2", + "sqlTableName":"cdrs", + "sqlBatchSize": 2, + "sqlDeleteIndexedFields": ["id"], + }, + "start_delay": "500ms", // wait for db to be populated before starting reader + "processed_path": "*delete", + "tenant": "cgrates.org", + "filters": [ + "*gt:~*req.answer_time:-168h", // dont process cdrs with answer_time older than 7 days ago + "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", + "*string:~*vars.*readerID:mysql", + "FLTR_VARS", // "*string:~*vars.*readerID:mysql", + "*notempty:~*vars.*readerID:''", // invalid filter + ], + "flags": ["*dryrun"], + "fields":[ + {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true}, + {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true}, + {"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true}, + ], + }, + ], +}, + +}` + + buf := &bytes.Buffer{} + ng := engine.TestEngine{ + ConfigJSON: jsonCfg, + DBCfg: dbcfg, + TpFiles: tpFiles, + LogBuffer: buf, + GracefulShutdown: true, + } + ng.Run(t) + time.Sleep(1 * time.Second) + + t.Run("VerifyProcessedFieldsFromLogs", func(t *testing.T) { + time.Sleep(100 * time.Millisecond) // give enough time to process from sql table + foundErr := false + scanner := bufio.NewScanner(strings.NewReader(buf.String())) + expectedLog := `[ERROR] error: is not empty <''>>` + for scanner.Scan() { + line := scanner.Text() + if strings.Contains(line, "[ERROR] error: is not empty <''>>") { + foundErr = true + } + } + if err := scanner.Err(); err != nil { + t.Errorf("error reading input: %v", err) + } + if !foundErr { + t.Errorf("expected error log <%s> \nreceived <%s>", expectedLog, buf) + } + }) + +}