Improve filters validation

This commit is contained in:
arberkatellari
2025-06-30 17:03:22 +02:00
committed by Dan Christian Bogos
parent d7254ff73c
commit 73667f343f
4 changed files with 253 additions and 22 deletions

View File

@@ -823,6 +823,12 @@ func CheckFilter(fltr *Filter) (err error) {
return fmt.Errorf("%s for filter <%v>", err, fltr) //encapsulated error
}
for _, val := range rls.Values {
if rls.Type == utils.MetaEmpty || rls.Type == utils.MetaNotEmpty ||
rls.Type == utils.MetaExists || rls.Type == utils.MetaNotExists &&
val != utils.EmptyString {
return fmt.Errorf("value of filter <%s> is not empty <%s>",
fltr.ID, val)
}
if err = valFunc(val); err != nil {
return fmt.Errorf("%s for filter <%v>", err, fltr) //encapsulated error
}
@@ -881,19 +887,20 @@ func (fltr *FilterRule) FilterToSQLQuery() (conditions []string) {
if len(fltr.Values) == 0 {
switch fltr.Type {
case utils.MetaExists, utils.MetaNotExists:
if not {
if not { // not existing means Column IS NULL
if firstItem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf("%s IS NOT NULL", restOfItems))
conditions = append(conditions, fmt.Sprintf("%s IS NULL", restOfItems))
return
}
conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NOT NULL", firstItem, restOfItems))
conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NULL", firstItem, restOfItems))
return
}
// existing means Column IS NOT NULL
if firstItem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf("%s IS NULL", restOfItems))
conditions = append(conditions, fmt.Sprintf("%s IS NOT NULL", restOfItems))
return
}
conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NULL", firstItem, restOfItems))
conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NOT NULL", firstItem, restOfItems))
case utils.MetaEmpty, utils.MetaNotEmpty:
if not {
if firstItem == utils.EmptyString {
@@ -938,25 +945,26 @@ func (fltr *FilterRule) FilterToSQLQuery() (conditions []string) {
}
case utils.MetaLessThan, utils.MetaLessOrEqual, utils.MetaGreaterThan, utils.MetaGreaterOrEqual:
parsedValAny := utils.StringToInterface(value)
if fltr.Type == utils.MetaGreaterOrEqual {
switch fltr.Type {
case utils.MetaGreaterOrEqual:
if firstItem == utils.EmptyString {
singleCond = fmt.Sprintf("%s >= '%v'", restOfItems, parsedValAny)
} else {
singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') >= '%v'", firstItem, restOfItems, parsedValAny)
}
} else if fltr.Type == utils.MetaGreaterThan {
case utils.MetaGreaterThan:
if firstItem == utils.EmptyString {
singleCond = fmt.Sprintf("%s > '%v'", restOfItems, parsedValAny)
} else {
singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') > '%v'", firstItem, restOfItems, parsedValAny)
}
} else if fltr.Type == utils.MetaLessOrEqual {
case utils.MetaLessOrEqual:
if firstItem == utils.EmptyString {
singleCond = fmt.Sprintf("%s <= '%v'", restOfItems, parsedValAny)
} else {
singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') <= '%v'", firstItem, restOfItems, parsedValAny)
}
} else if fltr.Type == utils.MetaLessThan {
case utils.MetaLessThan:
if firstItem == utils.EmptyString {
singleCond = fmt.Sprintf("%s < '%v'", restOfItems, parsedValAny)
} else {

View File

@@ -2446,6 +2446,24 @@ func TestCheckFilterErr(t *testing.T) {
}
}
func TestCheckFilterNotEmptyErr(t *testing.T) {
fltr := &Filter{
Tenant: "cgrates.org",
ID: "TestFilter",
Rules: []*FilterRule{
{
Type: utils.MetaNotEmpty,
Element: "~*req.Account",
Values: []string{"''"},
},
},
}
expErr := `value of filter <TestFilter> is not empty <''>`
if err := CheckFilter(fltr); err == nil || err.Error() != expErr {
t.Error(err)
}
}
func TestFilterPassRegexErr(t *testing.T) {
cd := &CallDescriptor{
Category: "callx",
@@ -3169,13 +3187,13 @@ func TestFilterToSQLQuery(t *testing.T) {
}{
{"MetaEqual with values", FilterRule{Type: utils.MetaEqual, Element: "~*req.cost_details.Charges[0].RatingID", Values: []string{"RatingID2"}}, []string{"JSON_VALUE(cost_details, '$.Charges[0].RatingID') = 'RatingID2'"}},
{"MetaExists with no values", FilterRule{Type: utils.MetaExists, Element: "~*req.answer_time", Values: nil}, []string{"answer_time IS NULL"}},
{"MetaExists with no values", FilterRule{Type: utils.MetaExists, Element: "~*req.answer_time", Values: nil}, []string{"answer_time IS NOT NULL"}},
{"MetaExists with JSON field", FilterRule{Type: utils.MetaExists, Element: "~*req.cost_details.Charges[0].RatingID", Values: nil}, []string{"JSON_VALUE(cost_details, '$.Charges[0].RatingID') IS NULL"}},
{"MetaExists with JSON field", FilterRule{Type: utils.MetaExists, Element: "~*req.cost_details.Charges[0].RatingID", Values: nil}, []string{"JSON_VALUE(cost_details, '$.Charges[0].RatingID') IS NOT NULL"}},
{"MetaNotExists with no values", FilterRule{Type: utils.MetaNotExists, Element: "~*req.answer_time", Values: nil}, []string{"answer_time IS NOT NULL"}},
{"MetaNotExists with no values", FilterRule{Type: utils.MetaNotExists, Element: "~*req.answer_time", Values: nil}, []string{"answer_time IS NULL"}},
{"MetaNotExists with JSON field", FilterRule{Type: utils.MetaNotExists, Element: "~*req.cost_details.Charges[0].RatingID", Values: nil}, []string{"JSON_VALUE(cost_details, '$.Charges[0].RatingID') IS NOT NULL"}},
{"MetaNotExists with JSON field", FilterRule{Type: utils.MetaNotExists, Element: "~*req.cost_details.Charges[0].RatingID", Values: nil}, []string{"JSON_VALUE(cost_details, '$.Charges[0].RatingID') IS NULL"}},
{"MetaString with values", FilterRule{Type: utils.MetaString, Element: "~*req.answer_time", Values: []string{"value1", "value2"}}, []string{"answer_time = 'value1'", "answer_time = 'value2'"}},
@@ -3396,7 +3414,7 @@ func TestFilterToSQLQueryValidations(t *testing.T) {
Element: "~*req.column1",
Values: nil,
},
expected: []string{"column1 IS NULL"},
expected: []string{"column1 IS NOT NULL"},
},
{
name: "MetaNotExists with no values",
@@ -3405,7 +3423,7 @@ func TestFilterToSQLQueryValidations(t *testing.T) {
Element: "~*req.json_field.key",
Values: nil,
},
expected: []string{"JSON_VALUE(json_field, '$.key') IS NOT NULL"},
expected: []string{"JSON_VALUE(json_field, '$.key') IS NULL"},
},
{
name: "MetaString with values",

View File

@@ -146,6 +146,10 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) {
}
}
for _, filterObj := range filtersObjList { // seperate filters used for WHERE clause from other filters, and build query conditions out of them
if err := engine.CheckFilter(filterObj); err != nil {
rdr.rdrErr <- err
return
}
var lazyFltrPopulated bool // Track if a lazyFilter is already populated by the previous filterObj.Rules, so we dont store the same lazy filter more than once
for _, rule := range filterObj.Rules {
if strings.HasPrefix(rule.Element, utils.MetaDynReq+utils.NestingSep) { // convert filter to WHERE condition only on filters with ~*req.

View File

@@ -407,7 +407,7 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`,
if result != 3 {
t.Error("Expected 3 rows in table, got: ", result)
}
var rslt []map[string]interface{}
var rslt []map[string]any
if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil {
t.Fatalf("failed to query table: %v", err)
}
@@ -570,7 +570,7 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`,
if result != 2 {
t.Fatal("Expected 2 rows in table ", result)
}
var rslt []map[string]interface{}
var rslt []map[string]any
if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil {
t.Fatalf("failed to query table: %v", err)
}
@@ -739,7 +739,7 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`,
if result != 2 {
t.Error("Expected 2 rows in table, got: ", result)
}
var rslt []map[string]interface{}
var rslt []map[string]any
if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil {
t.Fatalf("failed to query table: %v", err)
}
@@ -948,7 +948,7 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`,
if result != 2 {
t.Fatal("Expected 2 rows in table, got: ", result)
}
var rslt []map[string]interface{}
var rslt []map[string]any
if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil {
t.Errorf("failed to query table: %v", err)
}
@@ -966,7 +966,7 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`,
if result2 != 1 {
t.Fatal("Expected 1 rows in table, got: ", result2)
}
var rslt2 []map[string]interface{}
var rslt2 []map[string]any
if err := db.Raw("SELECT * FROM " + "cdrsProcessed").Scan(&rslt2).Error; err != nil {
t.Errorf("failed to query table: %v", err)
}
@@ -1137,7 +1137,7 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`,
if result != 3 {
t.Error("Expected 3 rows in table, got: ", result)
}
var rslt []map[string]interface{}
var rslt []map[string]any
if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil {
t.Errorf("failed to query table: %v", err)
}
@@ -1318,7 +1318,7 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`,
if result != 3 {
t.Error("Expected 3 rows in table, got: ", result)
}
var rslt []map[string]interface{}
var rslt []map[string]any
if err := db.Raw("SELECT * FROM " + utils.CDRsTBL).Scan(&rslt).Error; err != nil {
t.Errorf("failed to query table: %v", err)
}
@@ -1341,3 +1341,204 @@ cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`,
}
})
}
func TestERSSQLFiltersErr(t *testing.T) {
var dbcfg engine.DBCfg
switch *utils.DBType {
case utils.MetaInternal:
dbcfg = engine.InternalDBCfg
case utils.MetaMySQL:
case utils.MetaMongo:
dbcfg = engine.MongoDBCfg
case utils.MetaPostgres:
dbcfg = engine.PostgresDBCfg
default:
t.Fatal("unsupported dbtype value")
}
var db, cdb2 *gorm.DB
t.Run("InitSQLDB", func(t *testing.T) {
var err error
if cdb2, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates")),
&gorm.Config{
AllowGlobalUpdate: true,
}); err != nil {
t.Fatal(err)
}
if err = cdb2.Exec(`CREATE DATABASE IF NOT EXISTS cgrates2;`).Error; err != nil {
t.Fatal(err)
}
sqlDB, err := cdb2.DB()
if err != nil {
t.Fatal(err)
}
sqlDB.SetConnMaxLifetime(5 * time.Second) // connections will stay idle even if you close the database. Set MaxLifetime to 5 seconds so that we dont get too many connection attempts error when ran with other tests togather
})
t.Run("PutCDRsInDataBase", func(t *testing.T) {
var err error
if db, err = gorm.Open(mysql.Open(fmt.Sprintf(dbConnString, "cgrates2")),
&gorm.Config{
AllowGlobalUpdate: true,
}); err != nil {
t.Fatal(err)
}
tx := db.Begin()
if !tx.Migrator().HasTable("cdrs") {
if err = tx.Migrator().CreateTable(new(engine.CDRsql)); err != nil {
tx.Rollback()
t.Fatal(err)
}
}
tx.Commit()
tx = db.Begin()
tx = tx.Table(utils.CDRsTBL)
cdrSql, err := cdr1.AsCDRsql(&engine.JSONMarshaler{})
cdrSql2, err := cdr2.AsCDRsql(&engine.JSONMarshaler{})
cdrsql3, err := cdr3.AsCDRsql(&engine.JSONMarshaler{})
cdrSql.CreatedAt = time.Now()
cdrSql2.CreatedAt = time.Now()
cdrsql3.CreatedAt = time.Now()
saved := tx.Save(cdrSql)
if saved.Error != nil {
tx.Rollback()
t.Fatal(err)
}
saved = tx.Save(cdrSql2)
if saved.Error != nil {
tx.Rollback()
t.Fatal(err)
}
saved = tx.Save(cdrsql3)
if saved.Error != nil {
tx.Rollback()
t.Fatal(err)
}
tx.Commit()
time.Sleep(10 * time.Millisecond)
var result int64
db.Table(utils.CDRsTBL).Count(&result)
if result != 3 {
t.Error("Expected table to have 3 results but got ", result)
}
})
defer t.Run("StopSQL", func(t *testing.T) {
if err := db.Migrator().DropTable("cdrs"); err != nil {
t.Fatal(err)
}
if err := db.Exec(`DROP DATABASE cgrates2;`).Error; err != nil {
t.Fatal(err)
}
if db2, err := db.DB(); err != nil {
t.Fatal(err)
} else if err = db2.Close(); err != nil {
t.Fatal(err)
}
if cdb2, err := cdb2.DB(); err != nil {
t.Fatal(err)
} else if err = cdb2.Close(); err != nil {
t.Fatal(err)
}
})
tpFiles := map[string]string{
utils.FiltersCsv: `#Tenant[0],ID[1],Type[2],Path[3],Values[4],ActivationInterval[5]
cgrates.org,FLTR_SQL_RatingID,*eq,~*req.cost_details.Charges[0].RatingID,RatingID2,
cgrates.org,FLTR_VARS,*string,~*vars.*readerID,mysql,`,
}
jsonCfg := `{
"general": {
"log_level": 7
},
"apiers": {
"enabled": true
},
"filters": {
"apiers_conns": ["*localhost"]
},
"stor_db": {
"opts": {
"sqlConnMaxLifetime": "5s", // needed while running all integration tests
},
},
"ers": {
"enabled": true,
"sessions_conns":["*localhost"],
"readers": [
{
"id": "mysql",
"type": "*sql",
"run_delay": "1m",
"source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306",
"opts": {
"sqlDBName":"cgrates2",
"sqlTableName":"cdrs",
"sqlBatchSize": 2,
"sqlDeleteIndexedFields": ["id"],
},
"start_delay": "500ms", // wait for db to be populated before starting reader
"processed_path": "*delete",
"tenant": "cgrates.org",
"filters": [
"*gt:~*req.answer_time:-168h", // dont process cdrs with answer_time older than 7 days ago
"FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2",
"*string:~*vars.*readerID:mysql",
"FLTR_VARS", // "*string:~*vars.*readerID:mysql",
"*notempty:~*vars.*readerID:''", // invalid filter
],
"flags": ["*dryrun"],
"fields":[
{"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true},
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true},
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.request_type", "mandatory": true},
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.tenant", "mandatory": true},
{"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.category", "mandatory": true},
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.account", "mandatory": true},
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.subject", "mandatory": true},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.destination", "mandatory": true},
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.setup_time", "mandatory": true},
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.answer_time", "mandatory": true},
{"tag": "CostDetails", "path": "*cgreq.CostDetails", "type": "*variable", "value": "~*req.cost_details", "mandatory": true},
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.usage", "mandatory": true},
],
},
],
},
}`
buf := &bytes.Buffer{}
ng := engine.TestEngine{
ConfigJSON: jsonCfg,
DBCfg: dbcfg,
TpFiles: tpFiles,
LogBuffer: buf,
GracefulShutdown: true,
}
ng.Run(t)
time.Sleep(1 * time.Second)
t.Run("VerifyProcessedFieldsFromLogs", func(t *testing.T) {
time.Sleep(100 * time.Millisecond) // give enough time to process from sql table
foundErr := false
scanner := bufio.NewScanner(strings.NewReader(buf.String()))
expectedLog := `[ERROR] <ERs> error: <value of filter <*notempty:~*vars.*readerID:''> is not empty <''>>`
for scanner.Scan() {
line := scanner.Text()
if strings.Contains(line, "[ERROR] <ERs> error: <value of filter <*notempty:~*vars.*readerID:''> is not empty <''>>") {
foundErr = true
}
}
if err := scanner.Err(); err != nil {
t.Errorf("error reading input: %v", err)
}
if !foundErr {
t.Errorf("expected error log <%s> \nreceived <%s>", expectedLog, buf)
}
})
}