diff --git a/engine/filters.go b/engine/filters.go index 2c09da0fb..2c17d1386 100644 --- a/engine/filters.go +++ b/engine/filters.go @@ -790,3 +790,8 @@ func (fltr *FilterRule) passHttp(dDP utils.DataProvider) (bool, error) { return filterHTTP(fltr.Type, dDP, fltr.Element, strVal) } + +// Will return all items the element, e.g. "~*req", "cost_details", "Charges[0]". "RatingID" +func (fltr *FilterRule) ElementItems() []string { + return strings.Split(fltr.Element, utils.NestingSep) +} diff --git a/engine/filters_test.go b/engine/filters_test.go index 9a52c9199..76e64fcc9 100644 --- a/engine/filters_test.go +++ b/engine/filters_test.go @@ -2993,3 +2993,15 @@ func TestFilterRanking(t *testing.T) { }) } } + +func TestFilterRuleElementItems(t *testing.T) { + rf, err := NewFilterRule(utils.MetaEqual, "~*req.cost_details.Charges[0].RatingID", []string{"RatingID2"}) + if err != nil { + t.Errorf("Error: %+v", err) + } + exp := []string{"~*req", "cost_details", "Charges[0]", "RatingID"} + rcv := rf.ElementItems() + if !reflect.DeepEqual(exp, rcv) { + t.Errorf("Expected: %v , received: %v", exp, rcv) + } +} diff --git a/ers/sql.go b/ers/sql.go index f923e8075..215632cca 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -77,8 +77,8 @@ type SQLEventReader struct { connString string connType string tableName string - dbFilters []string - lazyFilters []string + dbFilters []string // filters converted to SQL WHERE conditions from reader config filters + lazyFilters []string // filters used when processing reader events rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to @@ -124,141 +124,6 @@ func (rdr *SQLEventReader) Serve() (err error) { return } -// Creates mysql conditions used in WHERE statement out of filters -func valueQry(ruleType, elem, field string, values []string, not bool) (conditions []string) { - // here are for the filters that their values are empty: *exists, *notexists, *empty, *notempty.. - if len(values) == 0 { - switch ruleType { - case utils.MetaExists, utils.MetaNotExists: - if not { - if elem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf("%s IS NOT NULL", field)) - return - } - conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NOT NULL", elem, field)) - return - } - if elem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf("%s IS NULL", field)) - return - } - conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NULL", elem, field)) - case utils.MetaEmpty, utils.MetaNotEmpty: - if not { - if elem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf("%s != ''", field)) - return - } - conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') != ''", elem, field)) - return - } - if elem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf("%s == ''", field)) - return - } - conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') == ''", elem, field)) - } - return - } - // here are for the filters that can have more than one value: *string, *prefix, *suffix .. - for _, value := range values { - switch value { // in case we have boolean values, it should be queried over 1 or 0 - case "true": - value = "1" - case "false": - value = "0" - } - var singleCond string - switch ruleType { - case utils.MetaString, utils.MetaNotString, utils.MetaEqual, utils.MetaNotEqual: - if not { - if elem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf("%s != '%s'", field, value)) - continue - } - conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') != '%s'", - elem, field, value)) - continue - } - if elem == utils.EmptyString { - singleCond = fmt.Sprintf("%s = '%s'", field, value) - } else { - singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') = '%s'", elem, field, value) - } - case utils.MetaLessThan, utils.MetaLessOrEqual, utils.MetaGreaterThan, utils.MetaGreaterOrEqual: - if ruleType == utils.MetaGreaterOrEqual { - if elem == utils.EmptyString { - singleCond = fmt.Sprintf("%s >= %s", field, value) - } else { - singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') >= %s", elem, field, value) - } - } else if ruleType == utils.MetaGreaterThan { - if elem == utils.EmptyString { - singleCond = fmt.Sprintf("%s > %s", field, value) - } else { - singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') > %s", elem, field, value) - } - } else if ruleType == utils.MetaLessOrEqual { - if elem == utils.EmptyString { - singleCond = fmt.Sprintf("%s <= %s", field, value) - } else { - singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') <= %s", elem, field, value) - } - } else if ruleType == utils.MetaLessThan { - if elem == utils.EmptyString { - singleCond = fmt.Sprintf("%s < %s", field, value) - } else { - singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') < %s", elem, field, value) - } - } - case utils.MetaPrefix, utils.MetaNotPrefix: - if not { - if elem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf("%s NOT LIKE '%s%%'", field, value)) - continue - } - conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') NOT LIKE '%s%%'", elem, field, value)) - continue - } - if elem == utils.EmptyString { - singleCond = fmt.Sprintf("%s LIKE '%s%%'", field, value) - } else { - singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') LIKE '%s%%'", elem, field, value) - } - case utils.MetaSuffix, utils.MetaNotSuffix: - if not { - if elem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf("%s NOT LIKE '%%%s'", field, value)) - continue - } - conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') NOT LIKE '%%%s'", elem, field, value)) - continue - } - if elem == utils.EmptyString { - singleCond = fmt.Sprintf("%s LIKE '%%%s'", field, value) - } else { - singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') LIKE '%%%s'", elem, field, value) - } - case utils.MetaRegex, utils.MetaNotRegex: - if not { - if elem == utils.EmptyString { - conditions = append(conditions, fmt.Sprintf("%s NOT REGEXP '%s'", field, value)) - continue - } - conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') NOT REGEXP '%s'", elem, field, value)) - continue - } - if elem == utils.EmptyString { - singleCond = fmt.Sprintf("%s REGEXP '%s'", field, value) - } else { - singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') REGEXP '%s'", elem, field, value) - } - } - conditions = append(conditions, singleCond) - } - return -} - func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { defer sqlDB.Close() if rdr.Config().StartDelay > 0 { @@ -271,55 +136,58 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { return } } - tm := time.NewTimer(0) - var filters []*engine.Filter + var filtersObjList []*engine.Filter // List of filter objects from rdr.Config().Filters, received from DB for _, fltr := range rdr.Config().Filters { - if resultFltr, err := rdr.dm.GetFilter(config.CgrConfig().GeneralCfg().DefaultTenant, fltr, true, false, utils.NonTransactional); err != nil { + if resultFilter, err := rdr.dm.GetFilter(config.CgrConfig().GeneralCfg().DefaultTenant, fltr, true, false, utils.NonTransactional); err != nil { rdr.rdrErr <- err return } else { - filters = append(filters, resultFltr) + filtersObjList = append(filtersObjList, resultFilter) } } - for _, filter := range filters { - var addFltrCount int - for _, rule := range filter.Rules { - var elem, field string + for _, filterObj := range filtersObjList { // seperate filters used for WHERE clause from other filters, and build query conditions out of them + var lazyFltrPopulated bool // Track if a lazyFilter is already populated by the previous filterObj.Rules, so we dont store the same lazy filter more than once + for _, rule := range filterObj.Rules { + var firstItem string // Excluding ~*req, hold the first item of an element, left empty if no more than 1 item in element. e.g. "cost_details" out of ~*req.cost_details.Charges[0].RatingID or "" out of ~*req.answer_time + var restOfItems string // Excluding ~*req, hold the rest of the items past the first one. If only 1 item in all element, holds that item. e.g. "Charges[0].RatingID" out of ~*req.cost_details.Charges[0].RatingID or "answer_time" out of ~*req.answer_time switch { - case strings.HasPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep): - field = strings.TrimPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep) - if before, after, hasSep := strings.Cut(field, utils.NestingSep); hasSep { - elem = before - field = after + case strings.HasPrefix(rule.Element, utils.MetaDynReq+utils.NestingSep): // convert filter to WHERE condition only on filters with ~*req. + elementItems := rule.ElementItems()[1:] // exclude first item: ~*req + if len(elementItems) > 1 { + firstItem = elementItems[0] + restOfItems = strings.Join(elementItems[1:], utils.NestingSep) + } else { + restOfItems = elementItems[0] } - default: - addFltrCount++ - if addFltrCount == 1 { - rdr.lazyFilters = append(rdr.lazyFilters, filter.ID) + default: // If not used in the WHERE condition, put the filter in rdr.lazyFilters + if !lazyFltrPopulated { + rdr.lazyFilters = append(rdr.lazyFilters, filterObj.ID) + lazyFltrPopulated = true } continue } - conditions := valueQry(rule.Type, elem, field, rule.Values, strings.HasPrefix(rule.Type, utils.MetaNot)) + conditions := utils.FilterToSQLQuery(rule.Type, firstItem, restOfItems, rule.Values, strings.HasPrefix(rule.Type, utils.MetaNot)) rdr.dbFilters = append(rdr.dbFilters, strings.Join(conditions, " OR ")) } } + tm := time.NewTimer(0) // Timer matching rdr.Config().RunDelay, will delay the for loop until timer expires. It doesnt wait for the loop to finish an iteration to start. for { - tx := db.Table(rdr.tableName).Select(utils.Meta) + tx := db.Table(rdr.tableName).Select(utils.Meta) // Select everything from the table for _, whereQ := range rdr.dbFilters { - tx = tx.Where(whereQ) + tx = tx.Where(whereQ) // apply WHERE conditions to the select if any } - rows, err := tx.Rows() + rows, err := tx.Rows() // get all rows selected if err != nil { rdr.rdrErr <- err return } - colNames, err := rows.Columns() + colNames, err := rows.Columns() // get column names from rows selected if err != nil { rdr.rdrErr <- err rows.Close() return } - for rows.Next() { + for rows.Next() { // iterate on each row select { case <-rdr.rdrExit: utils.Logger.Info( @@ -337,73 +205,57 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { if rdr.Config().ConcurrentReqs != -1 { rdr.cap <- struct{}{} } - columns := make([]any, len(colNames)) - columnPointers := make([]any, len(colNames)) + columns := make([]any, len(colNames)) // create a list of interfaces correlating to the columns selected + columnPointers := make([]any, len(colNames)) // create a list of interfaces pointing to columns to be gotten from rows.Scan for i := range columns { columnPointers[i] = &columns[i] } - // copy row values to their respective column - if err = rows.Scan(columnPointers...); err != nil { + if err = rows.Scan(columnPointers...); err != nil { // copy row values to their respective column rdr.rdrErr <- err rows.Close() return } - msg := make(map[string]any) - fltr := make(map[string]any) - for i, colName := range colNames { - msg[colName] = columns[i] - if colName != createdAt && colName != updatedAt && colName != deletedAt { // ignore the sql colums for filter only - switch tm := columns[i].(type) { // also ignore the values that are zero for time - case time.Time: - if tm.IsZero() { - continue - } - fltr[colName] = columns[i] - continue - case *time.Time: - if tm == nil || tm.IsZero() { - continue - } - fltr[colName] = columns[i] - continue - case nil: - continue - } - fltr[colName] = utils.IfaceAsString(columns[i]) - } + + ev := make(map[string]any) // event to be processed + for i, colName := range colNames { // populate ev from columns + ev[colName] = columns[i] } - if rdr.Config().Opts.SQL.DeleteIndexedFields != nil { - tmpFltrMap := make(map[string]any) - for _, fieldName := range *rdr.Config().Opts.SQL.DeleteIndexedFields { - if _, has := fltr[fieldName]; has { - tmpFltrMap[fieldName] = fltr[fieldName] + if rdr.Config().ProcessedPath == utils.MetaDelete { + sqlWhereVars := make(map[string]any) // map used for conditioning the DELETE query + if rdr.Config().Opts.SQL.DeleteIndexedFields != nil { + for _, fieldName := range *rdr.Config().Opts.SQL.DeleteIndexedFields { + if _, has := ev[fieldName]; has && fieldName != createdAt && fieldName != updatedAt && fieldName != deletedAt { // ignore the sql colums for filter only + addValidFieldToSQLWHEREVars(sqlWhereVars, fieldName, ev[fieldName]) + } } } - if len(tmpFltrMap) != 0 { - fltr = tmpFltrMap + if len(sqlWhereVars) == 0 { + for i, colName := range colNames { + if colName != createdAt && colName != updatedAt && colName != deletedAt { // ignore the sql colums for filter only + addValidFieldToSQLWHEREVars(sqlWhereVars, colName, columns[i]) + } + } } - } - if rdr.Config().ProcessedPath == utils.MetaDelete || rdr.Config().Opts.SQL.DeleteIndexedFields != nil { - if err = tx.Delete(nil, fltr).Error; err != nil { // to ensure we don't read it again + if err = tx.Delete(nil, sqlWhereVars).Error; err != nil { // to ensure we don't read it again utils.Logger.Warning( fmt.Sprintf("<%s> deleting message %s error: %s", - utils.ERs, utils.ToJSON(msg), err.Error())) + utils.ERs, utils.ToJSON(ev), err.Error())) rdr.rdrErr <- err rows.Close() return } } - go func(msg map[string]any) { - if err := rdr.processMessage(msg); err != nil { + go func(ev map[string]any) { + if err := rdr.processMessage(ev); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> processing message %s error: %s", - utils.ERs, utils.ToJSON(msg), err.Error())) + utils.ERs, utils.ToJSON(ev), err.Error())) } if rdr.Config().ConcurrentReqs != -1 { <-rdr.cap } - }(msg) + }(ev) } rows.Close() tm.Reset(rdr.Config().RunDelay) // reset the timer to RunDelay @@ -419,6 +271,26 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { } } +// Helper function to add valid time and non-time values to the sqlWhereVars map +func addValidFieldToSQLWHEREVars(sqlWhereVars map[string]any, fieldName string, value any) { + switch dateTimeCol := value.(type) { + case time.Time: + if dateTimeCol.IsZero() { + return + } + sqlWhereVars[fieldName] = value + case *time.Time: + if dateTimeCol == nil || dateTimeCol.IsZero() { + return + } + sqlWhereVars[fieldName] = value + case nil: + return + default: + sqlWhereVars[fieldName] = utils.IfaceAsString(value) + } +} + func (rdr *SQLEventReader) processMessage(msg map[string]any) (err error) { reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}} agReq := agents.NewAgentRequest( diff --git a/general_tests/ers_sql_filters_it_test.go b/general_tests/ers_sql_filters_it_test.go index d7edd79e7..53217da86 100644 --- a/general_tests/ers_sql_filters_it_test.go +++ b/general_tests/ers_sql_filters_it_test.go @@ -397,16 +397,16 @@ func TestERSSQLFilters(t *testing.T) { "sqlDeleteIndexedFields": ["id"], }, "start_delay": "500ms", // wait for db to be populated before starting reader - "processed_path": "", + "processed_path": "*delete", "tenant": "cgrates.org", "filters": [ "*gt:~*req.answer_time:NOW() - INTERVAL 7 DAY", // dont process cdrs with answer_time older than 7 days ago (continue if answer_time > now-7days) "FLTR_SQL_RatingID", // "*eq:~*req.cost_details.Charges[0].RatingID:RatingID2", "*string:~*vars.*readerID:mysql", "FLTR_VARS", // "*string:~*vars.*readerID:mysql", - ], - "flags": ["*dryrun"], - "fields":[ + ], + "flags": ["*dryrun"], + "fields":[ {"tag": "CGRID", "path": "*cgreq.CGRID", "type": "*variable", "value": "~*req.cgrid", "mandatory": true}, {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.tor", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.origin_id", "mandatory": true}, diff --git a/utils/dbutils.go b/utils/dbutils.go new file mode 100644 index 000000000..3ae2cdd6e --- /dev/null +++ b/utils/dbutils.go @@ -0,0 +1,156 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package utils + +import "fmt" + +// Creates mysql conditions used in WHERE statement out of filters +func FilterToSQLQuery(ruleType, beforeSep, afterSep string, values []string, not bool) (conditions []string) { + // here are for the filters that their values are empty: *exists, *notexists, *empty, *notempty.. + if len(values) == 0 { + switch ruleType { + case MetaExists, MetaNotExists: + if not { + if beforeSep == EmptyString { + conditions = append(conditions, fmt.Sprintf("%s IS NOT NULL", afterSep)) + return + } + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NOT NULL", beforeSep, afterSep)) + return + } + if beforeSep == EmptyString { + conditions = append(conditions, fmt.Sprintf("%s IS NULL", afterSep)) + return + } + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NULL", beforeSep, afterSep)) + case MetaEmpty, MetaNotEmpty: + if not { + if beforeSep == EmptyString { + conditions = append(conditions, fmt.Sprintf("%s != ''", afterSep)) + return + } + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') != ''", beforeSep, afterSep)) + return + } + if beforeSep == EmptyString { + conditions = append(conditions, fmt.Sprintf("%s == ''", afterSep)) + return + } + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') == ''", beforeSep, afterSep)) + } + return + } + // here are for the filters that can have more than one value: *string, *prefix, *suffix .. + for _, value := range values { + switch value { // in case we have boolean values, it should be queried over 1 or 0 + case "true": + value = "1" + case "false": + value = "0" + } + var singleCond string + switch ruleType { + case MetaString, MetaNotString, MetaEqual, MetaNotEqual: + if not { + if beforeSep == EmptyString { + conditions = append(conditions, fmt.Sprintf("%s != '%s'", afterSep, value)) + continue + } + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') != '%s'", + beforeSep, afterSep, value)) + continue + } + if beforeSep == EmptyString { + singleCond = fmt.Sprintf("%s = '%s'", afterSep, value) + } else { + singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') = '%s'", beforeSep, afterSep, value) + } + case MetaLessThan, MetaLessOrEqual, MetaGreaterThan, MetaGreaterOrEqual: + if ruleType == MetaGreaterOrEqual { + if beforeSep == EmptyString { + singleCond = fmt.Sprintf("%s >= %s", afterSep, value) + } else { + singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') >= %s", beforeSep, afterSep, value) + } + } else if ruleType == MetaGreaterThan { + if beforeSep == EmptyString { + singleCond = fmt.Sprintf("%s > %s", afterSep, value) + } else { + singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') > %s", beforeSep, afterSep, value) + } + } else if ruleType == MetaLessOrEqual { + if beforeSep == EmptyString { + singleCond = fmt.Sprintf("%s <= %s", afterSep, value) + } else { + singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') <= %s", beforeSep, afterSep, value) + } + } else if ruleType == MetaLessThan { + if beforeSep == EmptyString { + singleCond = fmt.Sprintf("%s < %s", afterSep, value) + } else { + singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') < %s", beforeSep, afterSep, value) + } + } + case MetaPrefix, MetaNotPrefix: + if not { + if beforeSep == EmptyString { + conditions = append(conditions, fmt.Sprintf("%s NOT LIKE '%s%%'", afterSep, value)) + continue + } + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') NOT LIKE '%s%%'", beforeSep, afterSep, value)) + continue + } + if beforeSep == EmptyString { + singleCond = fmt.Sprintf("%s LIKE '%s%%'", afterSep, value) + } else { + singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') LIKE '%s%%'", beforeSep, afterSep, value) + } + case MetaSuffix, MetaNotSuffix: + if not { + if beforeSep == EmptyString { + conditions = append(conditions, fmt.Sprintf("%s NOT LIKE '%%%s'", afterSep, value)) + continue + } + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') NOT LIKE '%%%s'", beforeSep, afterSep, value)) + continue + } + if beforeSep == EmptyString { + singleCond = fmt.Sprintf("%s LIKE '%%%s'", afterSep, value) + } else { + singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') LIKE '%%%s'", beforeSep, afterSep, value) + } + case MetaRegex, MetaNotRegex: + if not { + if beforeSep == EmptyString { + conditions = append(conditions, fmt.Sprintf("%s NOT REGEXP '%s'", afterSep, value)) + continue + } + conditions = append(conditions, fmt.Sprintf("JSON_VALUE(%s, '$.%s') NOT REGEXP '%s'", beforeSep, afterSep, value)) + continue + } + if beforeSep == EmptyString { + singleCond = fmt.Sprintf("%s REGEXP '%s'", afterSep, value) + } else { + singleCond = fmt.Sprintf("JSON_VALUE(%s, '$.%s') REGEXP '%s'", beforeSep, afterSep, value) + } + } + conditions = append(conditions, singleCond) + } + return +}