From 9783cdbf3ec8f0c4348b994f3fc465d1191fbf6e Mon Sep 17 00:00:00 2001 From: arberkatellari Date: Tue, 2 Dec 2025 18:18:34 +0200 Subject: [PATCH] Make CDRs storable in Redis DB --- cdrs/cdrs_it_test.go | 99 ++++- config/config_defaults.go | 3 - config/configsanity.go | 9 - data/conf/samples/cdrsv2internal/cgrates.json | 2 +- data/conf/samples/cdrsv2redis/cgrates.json | 72 ++++ .../conf/samples/cdrsv2redis_gob/cgrates.json | 67 +++ engine/libtest.go | 14 - engine/storage_it_test.go | 2 +- engine/storage_redis.go | 403 +++++++++++++++++- engine/storage_redis_it_test.go | 2 +- engine/storage_utils.go | 2 +- engine/z_datamanager_it_test.go | 2 +- engine/z_filterindexer_it_test.go | 2 +- engine/z_onstor_it_test.go | 2 +- general_tests/cdrs_it_test.go | 2 +- general_tests/indexes_redis_it_test.go | 2 +- sessions/basics_it_test.go | 102 ++++- utils/consts.go | 3 + 18 files changed, 710 insertions(+), 80 deletions(-) create mode 100644 data/conf/samples/cdrsv2redis/cgrates.json create mode 100644 data/conf/samples/cdrsv2redis_gob/cgrates.json diff --git a/cdrs/cdrs_it_test.go b/cdrs/cdrs_it_test.go index 633f65a2e..7878b9381 100644 --- a/cdrs/cdrs_it_test.go +++ b/cdrs/cdrs_it_test.go @@ -72,12 +72,7 @@ cgrates.org,DEFAULT_RATE,,;0,0,0,*free,RT_ALWAYS,,"* * * * *",;0,false,0s,,0.1,1 "db": { "db_conns": { "*default": { - "db_type": "*internal", - "string_indexed_fields": ["RunID"], - "opts":{ - "internalDBRewriteInterval": "0s", - "internalDBDumpInterval": "0s" - } +%s }, }, }, @@ -119,10 +114,23 @@ cgrates.org,DEFAULT_RATE,,;0,0,0,*free,RT_ALWAYS,,"* * * * *",;0,false,0s,,0.1,1 } ` + var dbcfg string switch *utils.DBType { case utils.MetaInternal: - case utils.MetaMySQL, utils.MetaRedis, utils.MetaMongo, utils.MetaPostgres: + dbcfg = ` "db_type": "*internal", + "string_indexed_fields": ["*opts.*originID"], + "opts":{ + "internalDBRewriteInterval": "0s", + "internalDBDumpInterval": "0s" + }` + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: t.SkipNow() + case utils.MetaRedis: + dbcfg = ` "db_type": "*redis", + "db_host": "127.0.0.1", + "db_port": 6379, + "db_name": "10", + "string_indexed_fields": ["*opts.*originID"],` default: t.Fatal("unknown dbtype") } @@ -130,7 +138,7 @@ cgrates.org,DEFAULT_RATE,,;0,0,0,*free,RT_ALWAYS,,"* * * * *",;0,false,0s,,0.1,1 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - cfg, cfgPath, clean, err := initCfg(ctx, fmt.Sprintf(cfgContent, tpPath)) + cfg, cfgPath, clean, err := initCfg(ctx, fmt.Sprintf(cfgContent, dbcfg, tpPath)) if err != nil { t.Fatalf("parsing configuration file failed: %v", err) } @@ -282,20 +290,67 @@ cgrates.org,DEFAULT_RATE,,;0,0,0,*free,RT_ALWAYS,,"* * * * *",;0,false,0s,,0.1,1 &cdrs); err != nil { t.Error(err) } - sort.Slice(cdrs, func(i, j int) bool { - return cdrs[i].Opts[utils.MetaCost].(float64) < cdrs[j].Opts[utils.MetaCost].(float64) - }) - if cdrs[0].Opts[utils.MetaCost] != 2. || - cdrs[0].Opts[utils.MetaOriginID] != "processCDR1" { - t.Errorf("expected first cdr to have originID %s and cost %.2f, received %s and %.1f", - "processCDR1", 2., - cdrs[0].Opts[utils.MetaOriginID], cdrs[0].Opts[utils.MetaCost]) - } - if cdrs[1].Opts[utils.MetaCost] != 7. || - cdrs[1].Opts[utils.MetaOriginID] != "processCDR2" { - t.Errorf("expected first cdr to have originID %s and cost %.2f, received %s and %.1f", - "processCDR1", 7., - cdrs[1].Opts[utils.MetaOriginID], cdrs[1].Opts[utils.MetaCost]) + if *utils.DBType == utils.MetaRedis { + sort.Slice(cdrs, func(i, j int) bool { + return cdrs[i].Opts[utils.MetaCost].(string) < cdrs[j].Opts[utils.MetaCost].(string) + }) + if cdrs[0].Opts[utils.MetaCost] != "2.0" || + cdrs[0].Opts[utils.MetaOriginID] != "processCDR1" { + t.Errorf("expected first cdr to have originID %s and cost 2, received %s and %#v", + "processCDR1", + cdrs[0].Opts[utils.MetaOriginID], cdrs[0].Opts[utils.MetaCost]) + } + if cdrs[1].Opts[utils.MetaCost] != "7.0" || + cdrs[1].Opts[utils.MetaOriginID] != "processCDR2" { + t.Errorf("expected first cdr to have originID %s and cost 7, received %s and %#v", + "processCDR1", + cdrs[1].Opts[utils.MetaOriginID], cdrs[1].Opts[utils.MetaCost]) + } + } else { + sort.Slice(cdrs, func(i, j int) bool { + return cdrs[i].Opts[utils.MetaCost].(float64) < cdrs[j].Opts[utils.MetaCost].(float64) + }) + if cdrs[0].Opts[utils.MetaCost] != 2. || + cdrs[0].Opts[utils.MetaOriginID] != "processCDR1" { + t.Errorf("expected first cdr to have originID %s and cost %.2f, received %s and %.1f", + "processCDR1", 2., + cdrs[0].Opts[utils.MetaOriginID], cdrs[0].Opts[utils.MetaCost]) + } + if cdrs[1].Opts[utils.MetaCost] != 7. || + cdrs[1].Opts[utils.MetaOriginID] != "processCDR2" { + t.Errorf("expected first cdr to have originID %s and cost %.2f, received %s and %.1f", + "processCDR1", 7., + cdrs[1].Opts[utils.MetaOriginID], cdrs[1].Opts[utils.MetaCost]) + } } }) + + t.Run("RemoveCDRs", func(t *testing.T) { + args := &utils.CDRFilters{ + Tenant: "cgrates.org", + ID: "RemoveCDRs1", + } + + var reply string + if err := client.Call(context.Background(), utils.AdminSv1RemoveCDRs, args, + &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("expected reply <%v>, received <%v>", utils.OK, reply) + } + + args = &utils.CDRFilters{ + Tenant: "cgrates.org", + ID: "GetCDRs1", + FilterIDs: []string{"*string:*opts.*originID:processCDR1", "*string:*opts.*originID:processCDR2"}, + } + + experr := "retrieving CDRs failed: NOT_FOUND" + var cdrs []*utils.CDR + if err := client.Call(context.Background(), utils.AdminSv1GetCDRs, args, + &cdrs); err == nil || err.Error() != experr { + t.Errorf("expected err <%v>, received <%v>", experr, err) + } + + }) } diff --git a/config/config_defaults.go b/config/config_defaults.go index 3dd0be0c4..05bcafb45 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -164,7 +164,6 @@ const CGRATES_CFG_JSON = ` }, }, "items":{ - // compatible with all db types "*accounts": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*ip_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*ip_allocations": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, @@ -198,8 +197,6 @@ const CGRATES_CFG_JSON = ` "*action_profile_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, "*account_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, "*reverse_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, - - // compatible db types: <*internal|*mysql|*mongo|*postgres> "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"} }, }, diff --git a/config/configsanity.go b/config/configsanity.go index 7230d9d64..063353269 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -1086,15 +1086,6 @@ func (cfg *CGRConfig) checkConfigSanity() error { if _, has := cfg.dbCfg.DBConns[val.DBConn]; !has { return fmt.Errorf("item's <%s> dbConn <%v>, does not match any db_conns ID", item, val.DBConn) } - storDBTypes := []string{utils.MetaInternal, utils.MetaMySQL, utils.MetaMongo, - utils.MetaPostgres, utils.Internal, utils.MySQL, utils.Mongo, utils.Postgres} - - if item == utils.MetaCDRs { - if !slices.Contains(storDBTypes, cfg.dbCfg.DBConns[val.DBConn].Type) { - return fmt.Errorf("<%s> db item can only be of types <%v>, got <%s>", item, - storDBTypes[4:], cfg.dbCfg.DBConns[val.DBConn].Type) - } - } found1RmtConns := false found1RplConns := false for _, dbcfg := range cfg.dbCfg.DBConns { diff --git a/data/conf/samples/cdrsv2internal/cgrates.json b/data/conf/samples/cdrsv2internal/cgrates.json index 5306d9399..7e383cf3e 100644 --- a/data/conf/samples/cdrsv2internal/cgrates.json +++ b/data/conf/samples/cdrsv2internal/cgrates.json @@ -8,7 +8,7 @@ "db_conns": { "*default": { "db_type": "*internal", - "string_indexed_fields": ["RunID"], + "string_indexed_fields": ["*opts.*accounts"], "opts":{ "internalDBRewriteInterval": "0s", "internalDBDumpInterval": "0s" diff --git a/data/conf/samples/cdrsv2redis/cgrates.json b/data/conf/samples/cdrsv2redis/cgrates.json new file mode 100644 index 000000000..ee45aef50 --- /dev/null +++ b/data/conf/samples/cdrsv2redis/cgrates.json @@ -0,0 +1,72 @@ +{ + +"logger": { + "level": 7 +}, + +"db": { + "db_conns": { + "*default": { + "db_type": "redis", // db type: + "db_host": "127.0.0.1", + "db_port": 6379, // db port to reach the database + "db_name": "10", // db database name to connect to + "string_indexed_fields": ["*opts.*accounts"] + }, + } +}, + +"rates": { + "enabled": true +}, + +"cdrs": { + "enabled": true, + "attributes_conns":["*internal"], + "chargers_conns":["*localhost"], + "rates_conns": ["*localhost"], + "stats_conns": ["*localhost"], + "thresholds_conns": ["*localhost"], +}, + +"attributes": { + "enabled": true +}, + +"stats": { + "enabled": true, + "store_interval": "-1", + "thresholds_conns": ["*internal"] +}, + +"thresholds": { + "enabled": true, + "actions_conns": ["*localhost"], + "store_interval": "-1" +}, + +"chargers": { + "enabled": true, + "attributes_conns": ["*localhost"] +}, + +"admins": { + "enabled": true, +}, + +"actions": { + "enabled": true +}, + +"loaders": [ + { + "id": "*default", + "enabled": true, + "tenant": "cgrates.org", + "lockfile_path": ".cgr.lck", + "tp_in_dir": "/usr/share/cgrates/tariffplans/testit", + "tp_out_dir": "" + } +] + +} diff --git a/data/conf/samples/cdrsv2redis_gob/cgrates.json b/data/conf/samples/cdrsv2redis_gob/cgrates.json new file mode 100644 index 000000000..ebf0fba32 --- /dev/null +++ b/data/conf/samples/cdrsv2redis_gob/cgrates.json @@ -0,0 +1,67 @@ +{ + "logger": { + "level": 7 + }, + + + "rpc_conns": { + "conn1": { + "strategy": "*first", + "conns": [{"address": "127.0.0.1:2013", "transport":"*gob"}] + } + }, + + "rates": { + "enabled": true + }, + + "cdrs": { + "enabled": true, + "attributes_conns":["*internal"], + "chargers_conns":["conn1"], + "rates_conns": ["conn1"], + "stats_conns": ["conn1"], + "thresholds_conns": ["conn1"], + }, + + "attributes": { + "enabled": true + }, + + "stats": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": ["*internal"] + }, + + "thresholds": { + "enabled": true, + "actions_conns": ["*localhost"], + "store_interval": "1s" + }, + + "chargers": { + "enabled": true, + "attributes_conns": ["*internal"] + }, + + "admins": { + "enabled": true, + }, + + "actions": { + "enabled": true + }, + + "loaders": [ + { + "id": "*default", + "enabled": true, + "tenant": "cgrates.org", + "lockfile_path": ".cgr.lck", + "tp_in_dir": "/usr/share/cgrates/tariffplans/testit", + "tp_out_dir": "" + } + ] + +} diff --git a/engine/libtest.go b/engine/libtest.go index a36f47931..0654a0faf 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -769,20 +769,6 @@ var ( Name: utils.StringPointer("10"), User: utils.StringPointer(utils.CGRateSLwr), }, - utils.StorDB: { - Type: utils.StringPointer(utils.MetaMySQL), - Host: utils.StringPointer("127.0.0.1"), - Port: utils.IntPointer(3306), - Name: utils.StringPointer(utils.CGRateSLwr), - User: utils.StringPointer(utils.CGRateSLwr), - Password: utils.StringPointer("CGRateS.org"), - }, - }, - Items: map[string]Item{ - utils.MetaCDRs: { - Limit: utils.IntPointer(-1), - DbConn: utils.StringPointer(utils.StorDB), - }, }, }, } diff --git a/engine/storage_it_test.go b/engine/storage_it_test.go index 2bcaa2a7c..bd213c6bf 100644 --- a/engine/storage_it_test.go +++ b/engine/storage_it_test.go @@ -35,7 +35,7 @@ func TestSetGetRemoveConfigSectionsDrvRedis(t *testing.T) { cfg := config.NewDefaultCGRConfig() db, err := NewRedisStorage("127.0.0.1:6379", 10, utils.CGRateSLwr, cfg.DbCfg().DBConns[utils.MetaDefault].Password, cfg.GeneralCfg().DBDataEncoding, cfg.DbCfg().DBConns[utils.MetaDefault].Opts.RedisMaxConns, cfg.DbCfg().DBConns[utils.MetaDefault].Opts.RedisConnectAttempts, - utils.EmptyString, false, 0, 0, 0, 0, 0, 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString) + utils.EmptyString, false, 0, 0, 0, 0, 0, 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString, nil, nil) if err != nil { t.Error(err) } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 8c982c40f..e0a56a388 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -23,6 +23,7 @@ import ( "crypto/x509" "errors" "os" + "slices" "strconv" "strings" "time" @@ -35,8 +36,10 @@ import ( ) type RedisStorage struct { - client radix.Client - ms utils.Marshaler + stringIndexedFields []string // used for CDR indexing + prefixIndexedFields []string // used for CDR indexing + client radix.Client + ms utils.Marshaler } // Redis commands @@ -76,7 +79,8 @@ func NewRedisStorage(address string, db int, user, pass, mrshlerStr string, maxConns, attempts int, sentinelName string, isCluster bool, clusterSync, clusterOnDownDelay, connTimeout, readTimeout, writeTimeout time.Duration, pipelineWindow time.Duration, pipelineLimit int, - tlsConn bool, tlsClientCert, tlsClientKey, tlsCACert string) (_ *RedisStorage, err error) { + tlsConn bool, tlsClientCert, tlsClientKey, tlsCACert string, + stringIndexedFields, prefixIndexedFields []string) (_ *RedisStorage, err error) { var ms utils.Marshaler if ms, err = utils.NewMarshaler(mrshlerStr); err != nil { return @@ -134,16 +138,17 @@ func NewRedisStorage(address string, db int, user, pass, mrshlerStr string, return } return &RedisStorage{ - ms: ms, - client: client, + stringIndexedFields: stringIndexedFields, + prefixIndexedFields: prefixIndexedFields, + ms: ms, + client: client, }, nil } func redisDial(network, addr string, attempts int, opts ...radix.DialOpt) (conn radix.Conn, err error) { fib := utils.FibDuration(time.Millisecond, 0) for i := 0; i < attempts; i++ { - if conn, err = radix.Dial(network, addr, opts...); err == nil || - err != nil && !strings.Contains(err.Error(), redisLoadError) { + if conn, err = radix.Dial(network, addr, opts...); err == nil || !strings.Contains(err.Error(), redisLoadError) { break } time.Sleep(fib()) @@ -1113,19 +1118,389 @@ func (rs *RedisStorage) RemoveConfigSectionsDrv(ctx *context.Context, nodeID str return } -// StorDB method not implemented yet func (rs *RedisStorage) SetCDR(_ *context.Context, cdr *utils.CGREvent, allowUpdate bool) error { - return utils.ErrNotImplemented + cdrID := utils.IfaceAsString(cdr.APIOpts[utils.MetaCDRID]) + if !allowUpdate { + // Check if CDR exists + var exists bool + if err := rs.Cmd(&exists, redisEXISTS, utils.CDRsPrefix+cdrID); err != nil { + return err + } + if exists { + return utils.ErrExists + } + } + + // Index the CDR + idx := make(utils.StringSet) + dp := cdr.AsDataProvider() + for _, v := range rs.stringIndexedFields { + val, err := dp.FieldAsString(strings.Split(v, utils.NestingSep)) + if err != nil { + if err == utils.ErrNotFound { + continue + } + return err + } + idx.Add(utils.ConcatenatedKey(v, val)) + } + for _, v := range rs.prefixIndexedFields { + val, err := dp.FieldAsString(strings.Split(v, utils.NestingSep)) + if err != nil { + if err == utils.ErrNotFound { + continue + } + return err + } + idx.Add(utils.ConcatenatedKey(v, val)) + for i := len(val) - 1; i > 0; i-- { + idx.Add(utils.ConcatenatedKey(v, val[:i])) + } + } + + cdrMs, err := rs.ms.Marshal(cdr) + if err != nil { + return err + } + + var lockIDs []string // used to lock used indexes while setting + for key := range idx { + lockIDs = append(lockIDs, utils.CDRsIndexes+utils.ConcatenatedKey(cdr.Tenant, key)) + } + if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) { + // Store the CDR in Redis + if err := rs.Cmd(nil, redisSET, utils.CDRsPrefix+cdrID, string(cdrMs)); err != nil { + return err + } + return + }, 0, lockIDs...); err != nil { + return err + } + + // Store indexes + for key := range idx { + if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) { + rs.Cmd(nil, redisSADD, utils.CDRsIndexes+utils.ConcatenatedKey(cdr.Tenant, + key), utils.CDRsPrefix+cdrID) + return + }, 0, utils.CDRsIndexes+utils.ConcatenatedKey(cdr.Tenant, key)); err != nil { + return err + } + } + + return nil } -// StorDB method not implemented yet -func (rs *RedisStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts map[string]any) ([]*utils.CDR, error) { - return nil, utils.ErrNotImplemented +func (rs *RedisStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts map[string]any) (cdrs []*utils.CDR, err error) { + pairFltrs := make(map[string][]string) + notPairFltrs := make(map[string][]string) + notIndexed := []*FilterRule{} + for _, fltr := range qryFltr { + for _, rule := range fltr.Rules { + var elem string + if !slices.Contains(rs.stringIndexedFields, strings.TrimPrefix(rule.Element, "~")) || + rule.Type != utils.MetaString && rule.Type != utils.MetaNotString { + notIndexed = append(notIndexed, rule) + continue + } + elem = strings.Trim(rule.Element, "~") + switch rule.Type { + case utils.MetaString: + pairFltrs[elem] = rule.Values + case utils.MetaNotString: + notPairFltrs[elem] = rule.Values + } + } + } + + var lockIDs []string // used to lock used indexes while setting + // Find indexed fields + var cdrMpIDs utils.StringSet + for keySlice, fltrSlice := range pairFltrs { + if len(fltrSlice) == 0 { + continue + } + grpMpIDs := make(utils.StringSet) + for _, id := range fltrSlice { + lockIDs = append(lockIDs, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id)) + var ids []string + if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) { + rs.Cmd(&ids, redisSMEMBERS, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id)) + return + }, 0, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id)); err != nil { + return nil, err + } + grpMpIDs.AddSlice(ids) + } + if grpMpIDs.Size() == 0 { + return nil, utils.ErrNotFound + } + if cdrMpIDs == nil { + cdrMpIDs = grpMpIDs + continue + } + cdrMpIDs.Intersect(grpMpIDs) + if cdrMpIDs.Size() == 0 { + return nil, utils.ErrNotFound + } + } + + if cdrMpIDs == nil { + if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) { + // Get all CDR IDs if no filters + var allIDs []string + if err := rs.Cmd(&allIDs, redisKEYS, utils.CDRsPrefix+utils.Meta); err != nil { + return err + } + cdrMpIDs = utils.NewStringSet(allIDs) + return + }, 0, lockIDs...); err != nil { + return nil, err + } + } + + // Check for Not filters + for keySlice, fltrSlice := range notPairFltrs { + if len(fltrSlice) == 0 { + continue + } + for _, id := range fltrSlice { + lockIDs = append(lockIDs, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id)) + var ids []string + if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) { + rs.Cmd(&ids, redisSMEMBERS, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id)) + return + }, 0, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id)); err != nil { + return nil, err + } + for _, cid := range ids { + cdrMpIDs.Remove(cid) + if cdrMpIDs.Size() == 0 { + return nil, utils.ErrNotFound + } + } + } + } + + // Retrieve CDRs + for key := range cdrMpIDs { + var cdrBytes []byte // holds the CDR gotten from redis as []byte + if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) { + if err := rs.Cmd(&cdrBytes, redisGET, key); err != nil { + return err + } + return + }, 0, lockIDs...); err != nil { + return nil, err + } + cgrEv := new(utils.CGREvent) + if err := rs.ms.Unmarshal(cdrBytes, &cgrEv); err != nil { + return nil, err + } + cgrEvDP := cgrEv.AsDataProvider() + // Apply non-indexed filters + var pass bool = true + for _, fltr := range notIndexed { + if pass, err = fltr.Pass(ctx, cgrEvDP); err != nil { + return nil, err + } else if !pass { + break + } + } + if !pass { + continue + } + cdrs = append(cdrs, &utils.CDR{ + Tenant: cgrEv.Tenant, + Opts: cgrEv.APIOpts, + Event: cgrEv.Event, + CreatedAt: time.Now(), + }) + } + + if len(cdrs) == 0 { + return nil, utils.ErrNotFound + } + + // Handle pagination + var limit, offset, maxItems int + if limit, offset, maxItems, err = utils.GetPaginateOpts(opts); err != nil { + return nil, err + } + cdrs, err = utils.Paginate(cdrs, limit, offset, maxItems) + return cdrs, err } -// StorDB method not implemented yet func (rs *RedisStorage) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err error) { - return utils.ErrNotImplemented + pairFltrs := make(map[string][]string) + notPairFltrs := make(map[string][]string) + notIndexed := []*FilterRule{} + + for _, fltr := range qryFltr { + for _, rule := range fltr.Rules { + var elem string + if !slices.Contains(rs.stringIndexedFields, strings.TrimPrefix(rule.Element, "~")) || + rule.Type != utils.MetaString && rule.Type != utils.MetaNotString { + notIndexed = append(notIndexed, rule) + continue + } + elem = strings.Trim(rule.Element, "~") + switch rule.Type { + case utils.MetaString: + pairFltrs[elem] = rule.Values + case utils.MetaNotString: + notPairFltrs[elem] = rule.Values + } + } + } + var lockIDs []string // used to lock used indexes while setting + // Find indexed fields + var cdrMpIDs utils.StringSet + for keySlice, fltrSlice := range pairFltrs { + if len(fltrSlice) == 0 { + continue + } + grpMpIDs := make(utils.StringSet) + for _, id := range fltrSlice { + lockIDs = append(lockIDs, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id)) + var ids []string + if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) { + rs.Cmd(&ids, redisSMEMBERS, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id)) + return + }, 0, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id)); err != nil { + return err + } + grpMpIDs.AddSlice(ids) + } + if grpMpIDs.Size() == 0 { + return utils.ErrNotFound + } + if cdrMpIDs == nil { + cdrMpIDs = grpMpIDs + continue + } + cdrMpIDs.Intersect(grpMpIDs) + if cdrMpIDs.Size() == 0 { + return utils.ErrNotFound + } + } + + if cdrMpIDs == nil { + // Get all CDR IDs if no filters + var allIDs []string + if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) { + if err := rs.Cmd(&allIDs, redisKEYS, utils.CDRsPrefix+utils.Meta); err != nil { + return err + } + return + }, 0, lockIDs...); err != nil { + return err + } + cdrMpIDs = utils.NewStringSet(allIDs) + } + + // Check for Not filters + for keySlice, fltrSlice := range notPairFltrs { + if len(fltrSlice) == 0 { + continue + } + for _, id := range fltrSlice { + lockIDs = append(lockIDs, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id)) + var ids []string + if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) { + rs.Cmd(&ids, redisSMEMBERS, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id)) + return + }, 0, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id)); err != nil { + return err + } + for _, cid := range ids { + cdrMpIDs.Remove(cid) + if cdrMpIDs.Size() == 0 { + return utils.ErrNotFound + } + } + } + } + + // Remove CDRs and their indexes + for key := range cdrMpIDs { + var cdrBytes []byte + if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) { + // key includes "cdr_" prefix + if err := rs.Cmd(&cdrBytes, redisGET, key); err != nil { + return err + } + return + }, 0, lockIDs...); err != nil { + return err + } + cgrEv := new(utils.CGREvent) + if err := rs.ms.Unmarshal(cdrBytes, &cgrEv); err != nil { + return err + } + // Apply non-indexed filters + dp := cgrEv.AsDataProvider() + var pass bool = true + for _, fltr := range notIndexed { + if pass, err = fltr.Pass(ctx, dp); err != nil { + return err + } else if !pass { + cdrMpIDs.Remove(key) + break + } + } + if !pass { + continue + } + // Get the CDR to find all indexes + idx := make(utils.StringSet) + for _, v := range rs.stringIndexedFields { + val, err := dp.FieldAsString(strings.Split(v, utils.NestingSep)) + if err != nil { + if err == utils.ErrNotFound { + continue + } + return err + } + idx.Add(utils.ConcatenatedKey(v, val)) + } + for _, v := range rs.prefixIndexedFields { + val, err := dp.FieldAsString(strings.Split(v, utils.NestingSep)) + if err != nil { + if err == utils.ErrNotFound { + continue + } + return err + } + idx.Add(utils.ConcatenatedKey(v, val)) + for i := len(val) - 1; i > 0; i-- { + idx.Add(utils.ConcatenatedKey(v, val[:i])) + } + } + + // Remove CDR from all indexes + for indexKey := range idx { + if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) { + rs.Cmd(nil, redisSREM, utils.CDRsIndexes+ + utils.ConcatenatedKey(cgrEv.Tenant, indexKey), key) + return + }, 0, utils.ConcatenatedKey(cgrEv.Tenant, indexKey)); err != nil { + return err + } + } + + if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) { + // Remove CDR + if err := rs.Cmd(nil, redisDEL, key); err != nil { + return err + } + return + }, 0, lockIDs...); err != nil { + return err + } + } + + return nil } // DumpDataDB will dump all of datadb from memory to a file, only for InternalDB diff --git a/engine/storage_redis_it_test.go b/engine/storage_redis_it_test.go index 4f168febb..578d8cf81 100644 --- a/engine/storage_redis_it_test.go +++ b/engine/storage_redis_it_test.go @@ -33,7 +33,7 @@ import ( // go test -bench RedisGetKeysForPrefix -run=^# -count 3 -benchtime=10s func BenchmarkRedisGetKeysForPrefix(b *testing.B) { rs, _ := NewRedisStorage("127.0.0.1:6379", 10, utils.CGRateSLwr, "", "json", 10, 20, - "", false, 5*time.Second, 0, 0, 0, 0, 150*time.Microsecond, 0, false, "", "", "") + "", false, 5*time.Second, 0, 0, 0, 0, 150*time.Microsecond, 0, false, "", "", "", nil, nil) chargerProfile := &utils.ChargerProfile{ ID: "TestA_CHARGER1", Tenant: "cgrates.org", diff --git a/engine/storage_utils.go b/engine/storage_utils.go index 6e343f6e1..f9b130b2e 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -50,7 +50,7 @@ func NewDataDBConn(dbType, host, port, name, user, opts.RedisConnectTimeout, opts.RedisReadTimeout, opts.RedisWriteTimeout, opts.RedisPoolPipelineWindow, opts.RedisPoolPipelineLimit, opts.RedisTLS, opts.RedisClientCertificate, opts.RedisClientKey, - opts.RedisCACertificate) + opts.RedisCACertificate, stringIndexedFields, prefixIndexedFields) case utils.MetaMongo: d, err = NewMongoStorage(opts.MongoConnScheme, host, port, name, user, pass, marshaler, stringIndexedFields, opts.MongoQueryTimeout) diff --git a/engine/z_datamanager_it_test.go b/engine/z_datamanager_it_test.go index d98906ff2..f6e420aae 100644 --- a/engine/z_datamanager_it_test.go +++ b/engine/z_datamanager_it_test.go @@ -56,7 +56,7 @@ func TestDMitinitDB(t *testing.T) { cfg.DbCfg().DBConns[utils.MetaDefault].Opts.RedisConnectAttempts, "", false, 0, 0, 0, 0, 0, 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, - utils.EmptyString) + utils.EmptyString, nil, nil) if err != nil { t.Fatal("Could not connect to Redis", err.Error()) } diff --git a/engine/z_filterindexer_it_test.go b/engine/z_filterindexer_it_test.go index b2d6aea13..89d8e522a 100644 --- a/engine/z_filterindexer_it_test.go +++ b/engine/z_filterindexer_it_test.go @@ -110,7 +110,7 @@ func TestFilterIndexerIT(t *testing.T) { cfg.DbCfg().DBConns[utils.MetaDefault].Opts.RedisConnectAttempts, "", false, 0, 0, 0, 0, 0, 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, - utils.EmptyString) + utils.EmptyString, nil, nil) if err != nil { t.Fatal("Could not connect to Redis", err.Error()) } diff --git a/engine/z_onstor_it_test.go b/engine/z_onstor_it_test.go index 6895caf5c..00d836442 100644 --- a/engine/z_onstor_it_test.go +++ b/engine/z_onstor_it_test.go @@ -83,7 +83,7 @@ func TestOnStorIT(t *testing.T) { rdsITdb, err = NewRedisStorage("127.0.0.1:6379", 4, utils.CGRateSLwr, cfg.DbCfg().DBConns[utils.MetaDefault].Password, cfg.GeneralCfg().DBDataEncoding, cfg.DbCfg().DBConns[utils.MetaDefault].Opts.RedisMaxConns, cfg.DbCfg().DBConns[utils.MetaDefault].Opts.RedisConnectAttempts, "", false, - 0, 0, 0, 0, 0, 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString) + 0, 0, 0, 0, 0, 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString, nil, nil) if err != nil { t.Fatal("Could not connect to Redis", err.Error()) } diff --git a/general_tests/cdrs_it_test.go b/general_tests/cdrs_it_test.go index b141f22c5..9da0a5e09 100644 --- a/general_tests/cdrs_it_test.go +++ b/general_tests/cdrs_it_test.go @@ -75,7 +75,7 @@ func TestCDRsIT(t *testing.T) { case utils.MetaInternal: cdrsConfDIR = "cdrsv2internal" case utils.MetaRedis: - t.SkipNow() + cdrsConfDIR = "cdrsv2redis" case utils.MetaMySQL: cdrsConfDIR = "cdrsv2mysql" case utils.MetaMongo: diff --git a/general_tests/indexes_redis_it_test.go b/general_tests/indexes_redis_it_test.go index 54df6c065..b34d953ae 100644 --- a/general_tests/indexes_redis_it_test.go +++ b/general_tests/indexes_redis_it_test.go @@ -37,7 +37,7 @@ func TestIndexesRedis(t *testing.T) { db, err := engine.NewRedisStorage("127.0.0.1:6379", 10, utils.CGRateSLwr, cfg.DbCfg().DBConns[utils.MetaDefault].Password, cfg.GeneralCfg().DBDataEncoding, cfg.DbCfg().DBConns[utils.MetaDefault].Opts.RedisMaxConns, cfg.DbCfg().DBConns[utils.MetaDefault].Opts.RedisConnectAttempts, utils.EmptyString, false, 0, 0, 0, 0, 0, - 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString) + 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString, nil, nil) if err != nil { t.Fatal(err) } diff --git a/sessions/basics_it_test.go b/sessions/basics_it_test.go index 89bb1e420..07621492f 100644 --- a/sessions/basics_it_test.go +++ b/sessions/basics_it_test.go @@ -35,7 +35,19 @@ func TestSessionBasics(t *testing.T) { case utils.MetaInternal: dbcfg = engine.InternalDBCfg case utils.MetaRedis: - dbcfg = engine.RedisDBCfg + dbcfg = engine.DBCfg{ + DB: &engine.DBParams{ + DBConns: map[string]engine.DBConn{ + utils.MetaDefault: { + Type: utils.StringPointer(utils.MetaRedis), + Host: utils.StringPointer("127.0.0.1"), + Port: utils.IntPointer(6379), + Name: utils.StringPointer("10"), + User: utils.StringPointer(utils.CGRateSLwr), + }, + }, + }, + } case utils.MetaMySQL: dbcfg = engine.MySQLDBCfg case utils.MetaMongo: @@ -270,6 +282,46 @@ cgrates.org,RP_FALLBACK,,;0,,,,RT_FALLBACK,*string:~*req.Destination:1002,"* * * } } + checkCDRRedis := func(t *testing.T, cdr *utils.CDR, wantCosts map[string]string) { + t.Helper() + var got string + for costKey, want := range wantCosts { + switch costKey { + case utils.Abstracts, utils.Concretes: + cd := getCostDetails(t, cdr, utils.MetaAccountSCost) + if cd == nil { + t.Fatalf("Nil costDetails") + } + var canCast bool + got, canCast = cd[costKey].(string) + if !canCast { + t.Fatalf("Could not cast cdr.Opts[utils.MetaCost] to string") + } + case utils.Cost: + cd := getCostDetails(t, cdr, utils.MetaRateSCost) + if cd == nil { + t.Fatalf("Nil costDetails") + } + var canCast bool + got, canCast = cd[costKey].(string) + if !canCast { + t.Fatalf("Could not cast cdr.Opts[utils.MetaCost] to string") + } + case utils.MetaCost: + var canCast bool + got, canCast = cdr.Opts[utils.MetaCost].(string) + if !canCast { + t.Fatalf("Could not cast cdr.Opts[utils.MetaCost] to string") + } + default: + t.Fatalf("invalid cdr cost key: %q", costKey) + } + if got != want { + t.Errorf("cdr %s = %v, want %v", costKey, got, want) + } + } + } + // session helpers authEvent := func(t *testing.T, wantUsage, wantErr string) { t.Helper() @@ -375,14 +427,22 @@ cgrates.org,RP_FALLBACK,,;0,,,,RT_FALLBACK,*string:~*req.Destination:1002,"* * * // accounting via CostIncrements cdr := processCDR(t, "1001", "1002", "1m30s", utils.MetaAccounts) - if *utils.DBType == utils.MetaMongo { // field names are lowercase on mongo + switch *utils.DBType { + case utils.MetaMongo: // field names are lowercase on mongo checkCDRMongo(t, cdr, map[string]float64{ "abstracts": 90000000000.0, "concretes": 0.9, utils.MetaCost: 0.9, }) - } else { + case utils.MetaRedis: + checkCDRRedis(t, cdr, + map[string]string{ + utils.Abstracts: "90000000000", + utils.Concretes: "0.9", + utils.MetaCost: "0.9", + }) + default: checkCDR(t, cdr, map[string]float64{ utils.Abstracts: 90000000000.0, @@ -406,14 +466,22 @@ cgrates.org,RP_FALLBACK,,;0,,,,RT_FALLBACK,*string:~*req.Destination:1002,"* * * }, }) cdr := processCDR(t, "1001", "1002", "2m30s", utils.MetaAccounts) - if *utils.DBType == utils.MetaMongo { // field names are lowercase on mongo + switch *utils.DBType { + case utils.MetaMongo: // field names are lowercase on mongo checkCDRMongo(t, cdr, map[string]float64{ "abstracts": float64(150 * time.Second), "concretes": 2.9, utils.MetaCost: 2.9, }) - } else { + case utils.MetaRedis: + checkCDRRedis(t, cdr, + map[string]string{ + utils.Abstracts: "150000000000", + utils.Concretes: "2.90", + utils.MetaCost: "2.90", + }) + default: checkCDR(t, cdr, map[string]float64{ utils.Abstracts: float64(150 * time.Second), @@ -436,13 +504,20 @@ cgrates.org,RP_FALLBACK,,;0,,,,RT_FALLBACK,*string:~*req.Destination:1002,"* * * }, }) cdr := processCDR(t, "1001", "1002", "2m30s", utils.MetaRates) - if *utils.DBType == utils.MetaMongo { // field names are lowercase on mongo + switch *utils.DBType { + case utils.MetaMongo: // field names are lowercase on mongo checkCDRMongo(t, cdr, map[string]float64{ "cost": 2.9, utils.MetaCost: 2.9, }) - } else { + case utils.MetaRedis: + checkCDRRedis(t, cdr, + map[string]string{ + utils.Cost: "2.90", + utils.MetaCost: "2.90", + }) + default: checkCDR(t, cdr, map[string]float64{ utils.Cost: 2.9, @@ -497,7 +572,8 @@ cgrates.org,RP_FALLBACK,,;0,,,,RT_FALLBACK,*string:~*req.Destination:1002,"* * * }, }) cdr := processCDR(t, "1001", "1002", "2m30s", utils.MetaAccounts, utils.MetaRates) - if *utils.DBType == utils.MetaMongo { // field names are lowercase on mongo + switch *utils.DBType { + case utils.MetaMongo: // field names are lowercase on mongo checkCDRMongo(t, cdr, map[string]float64{ "abstracts": float64(150 * time.Second), @@ -505,7 +581,15 @@ cgrates.org,RP_FALLBACK,,;0,,,,RT_FALLBACK,*string:~*req.Destination:1002,"* * * utils.MetaCost: 1.5, "cost": 2.9, }) - } else { + case utils.MetaRedis: + checkCDRRedis(t, cdr, + map[string]string{ + utils.Abstracts: "150000000000", + utils.Concretes: "1.5", + utils.MetaCost: "1.5", + utils.Cost: "2.90", + }) + default: checkCDR(t, cdr, map[string]float64{ utils.Abstracts: float64(150 * time.Second), diff --git a/utils/consts.go b/utils/consts.go index 971c021b0..b74d98510 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -274,6 +274,8 @@ const ( ThresholdPrefix = "thd_" FilterPrefix = "ftr_" CDRsStatsPrefix = "cst_" + CDRsPrefix = "cdr_" + CDRsIndexes = "cdi_" VersionPrefix = "ver_" StatQueueProfilePrefix = "sqp_" RouteProfilePrefix = "rpp_" @@ -714,6 +716,7 @@ const ( MetaERsThresholds = "*ersThresholds" MetaDryRun = "*dryRun" Event = "Event" + APIOpts = "APIOpts" EventLowCase = "event" EmptyString = "" DynamicDataPrefix = "~"