Make CDRs storable in Redis DB

This commit is contained in:
arberkatellari
2025-12-02 18:18:34 +02:00
committed by Dan Christian Bogos
parent 5f2b6beaa6
commit 9783cdbf3e
18 changed files with 710 additions and 80 deletions

View File

@@ -769,20 +769,6 @@ var (
Name: utils.StringPointer("10"),
User: utils.StringPointer(utils.CGRateSLwr),
},
utils.StorDB: {
Type: utils.StringPointer(utils.MetaMySQL),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(3306),
Name: utils.StringPointer(utils.CGRateSLwr),
User: utils.StringPointer(utils.CGRateSLwr),
Password: utils.StringPointer("CGRateS.org"),
},
},
Items: map[string]Item{
utils.MetaCDRs: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
},
},
}

View File

@@ -35,7 +35,7 @@ func TestSetGetRemoveConfigSectionsDrvRedis(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
db, err := NewRedisStorage("127.0.0.1:6379", 10, utils.CGRateSLwr,
cfg.DbCfg().DBConns[utils.MetaDefault].Password, cfg.GeneralCfg().DBDataEncoding, cfg.DbCfg().DBConns[utils.MetaDefault].Opts.RedisMaxConns, cfg.DbCfg().DBConns[utils.MetaDefault].Opts.RedisConnectAttempts,
utils.EmptyString, false, 0, 0, 0, 0, 0, 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
utils.EmptyString, false, 0, 0, 0, 0, 0, 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString, nil, nil)
if err != nil {
t.Error(err)
}

View File

@@ -23,6 +23,7 @@ import (
"crypto/x509"
"errors"
"os"
"slices"
"strconv"
"strings"
"time"
@@ -35,8 +36,10 @@ import (
)
type RedisStorage struct {
client radix.Client
ms utils.Marshaler
stringIndexedFields []string // used for CDR indexing
prefixIndexedFields []string // used for CDR indexing
client radix.Client
ms utils.Marshaler
}
// Redis commands
@@ -76,7 +79,8 @@ func NewRedisStorage(address string, db int, user, pass, mrshlerStr string,
maxConns, attempts int, sentinelName string, isCluster bool, clusterSync,
clusterOnDownDelay, connTimeout, readTimeout, writeTimeout time.Duration,
pipelineWindow time.Duration, pipelineLimit int,
tlsConn bool, tlsClientCert, tlsClientKey, tlsCACert string) (_ *RedisStorage, err error) {
tlsConn bool, tlsClientCert, tlsClientKey, tlsCACert string,
stringIndexedFields, prefixIndexedFields []string) (_ *RedisStorage, err error) {
var ms utils.Marshaler
if ms, err = utils.NewMarshaler(mrshlerStr); err != nil {
return
@@ -134,16 +138,17 @@ func NewRedisStorage(address string, db int, user, pass, mrshlerStr string,
return
}
return &RedisStorage{
ms: ms,
client: client,
stringIndexedFields: stringIndexedFields,
prefixIndexedFields: prefixIndexedFields,
ms: ms,
client: client,
}, nil
}
func redisDial(network, addr string, attempts int, opts ...radix.DialOpt) (conn radix.Conn, err error) {
fib := utils.FibDuration(time.Millisecond, 0)
for i := 0; i < attempts; i++ {
if conn, err = radix.Dial(network, addr, opts...); err == nil ||
err != nil && !strings.Contains(err.Error(), redisLoadError) {
if conn, err = radix.Dial(network, addr, opts...); err == nil || !strings.Contains(err.Error(), redisLoadError) {
break
}
time.Sleep(fib())
@@ -1113,19 +1118,389 @@ func (rs *RedisStorage) RemoveConfigSectionsDrv(ctx *context.Context, nodeID str
return
}
// StorDB method not implemented yet
func (rs *RedisStorage) SetCDR(_ *context.Context, cdr *utils.CGREvent, allowUpdate bool) error {
return utils.ErrNotImplemented
cdrID := utils.IfaceAsString(cdr.APIOpts[utils.MetaCDRID])
if !allowUpdate {
// Check if CDR exists
var exists bool
if err := rs.Cmd(&exists, redisEXISTS, utils.CDRsPrefix+cdrID); err != nil {
return err
}
if exists {
return utils.ErrExists
}
}
// Index the CDR
idx := make(utils.StringSet)
dp := cdr.AsDataProvider()
for _, v := range rs.stringIndexedFields {
val, err := dp.FieldAsString(strings.Split(v, utils.NestingSep))
if err != nil {
if err == utils.ErrNotFound {
continue
}
return err
}
idx.Add(utils.ConcatenatedKey(v, val))
}
for _, v := range rs.prefixIndexedFields {
val, err := dp.FieldAsString(strings.Split(v, utils.NestingSep))
if err != nil {
if err == utils.ErrNotFound {
continue
}
return err
}
idx.Add(utils.ConcatenatedKey(v, val))
for i := len(val) - 1; i > 0; i-- {
idx.Add(utils.ConcatenatedKey(v, val[:i]))
}
}
cdrMs, err := rs.ms.Marshal(cdr)
if err != nil {
return err
}
var lockIDs []string // used to lock used indexes while setting
for key := range idx {
lockIDs = append(lockIDs, utils.CDRsIndexes+utils.ConcatenatedKey(cdr.Tenant, key))
}
if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) {
// Store the CDR in Redis
if err := rs.Cmd(nil, redisSET, utils.CDRsPrefix+cdrID, string(cdrMs)); err != nil {
return err
}
return
}, 0, lockIDs...); err != nil {
return err
}
// Store indexes
for key := range idx {
if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) {
rs.Cmd(nil, redisSADD, utils.CDRsIndexes+utils.ConcatenatedKey(cdr.Tenant,
key), utils.CDRsPrefix+cdrID)
return
}, 0, utils.CDRsIndexes+utils.ConcatenatedKey(cdr.Tenant, key)); err != nil {
return err
}
}
return nil
}
// StorDB method not implemented yet
func (rs *RedisStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts map[string]any) ([]*utils.CDR, error) {
return nil, utils.ErrNotImplemented
func (rs *RedisStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts map[string]any) (cdrs []*utils.CDR, err error) {
pairFltrs := make(map[string][]string)
notPairFltrs := make(map[string][]string)
notIndexed := []*FilterRule{}
for _, fltr := range qryFltr {
for _, rule := range fltr.Rules {
var elem string
if !slices.Contains(rs.stringIndexedFields, strings.TrimPrefix(rule.Element, "~")) ||
rule.Type != utils.MetaString && rule.Type != utils.MetaNotString {
notIndexed = append(notIndexed, rule)
continue
}
elem = strings.Trim(rule.Element, "~")
switch rule.Type {
case utils.MetaString:
pairFltrs[elem] = rule.Values
case utils.MetaNotString:
notPairFltrs[elem] = rule.Values
}
}
}
var lockIDs []string // used to lock used indexes while setting
// Find indexed fields
var cdrMpIDs utils.StringSet
for keySlice, fltrSlice := range pairFltrs {
if len(fltrSlice) == 0 {
continue
}
grpMpIDs := make(utils.StringSet)
for _, id := range fltrSlice {
lockIDs = append(lockIDs, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id))
var ids []string
if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) {
rs.Cmd(&ids, redisSMEMBERS, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id))
return
}, 0, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id)); err != nil {
return nil, err
}
grpMpIDs.AddSlice(ids)
}
if grpMpIDs.Size() == 0 {
return nil, utils.ErrNotFound
}
if cdrMpIDs == nil {
cdrMpIDs = grpMpIDs
continue
}
cdrMpIDs.Intersect(grpMpIDs)
if cdrMpIDs.Size() == 0 {
return nil, utils.ErrNotFound
}
}
if cdrMpIDs == nil {
if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) {
// Get all CDR IDs if no filters
var allIDs []string
if err := rs.Cmd(&allIDs, redisKEYS, utils.CDRsPrefix+utils.Meta); err != nil {
return err
}
cdrMpIDs = utils.NewStringSet(allIDs)
return
}, 0, lockIDs...); err != nil {
return nil, err
}
}
// Check for Not filters
for keySlice, fltrSlice := range notPairFltrs {
if len(fltrSlice) == 0 {
continue
}
for _, id := range fltrSlice {
lockIDs = append(lockIDs, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id))
var ids []string
if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) {
rs.Cmd(&ids, redisSMEMBERS, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id))
return
}, 0, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id)); err != nil {
return nil, err
}
for _, cid := range ids {
cdrMpIDs.Remove(cid)
if cdrMpIDs.Size() == 0 {
return nil, utils.ErrNotFound
}
}
}
}
// Retrieve CDRs
for key := range cdrMpIDs {
var cdrBytes []byte // holds the CDR gotten from redis as []byte
if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) {
if err := rs.Cmd(&cdrBytes, redisGET, key); err != nil {
return err
}
return
}, 0, lockIDs...); err != nil {
return nil, err
}
cgrEv := new(utils.CGREvent)
if err := rs.ms.Unmarshal(cdrBytes, &cgrEv); err != nil {
return nil, err
}
cgrEvDP := cgrEv.AsDataProvider()
// Apply non-indexed filters
var pass bool = true
for _, fltr := range notIndexed {
if pass, err = fltr.Pass(ctx, cgrEvDP); err != nil {
return nil, err
} else if !pass {
break
}
}
if !pass {
continue
}
cdrs = append(cdrs, &utils.CDR{
Tenant: cgrEv.Tenant,
Opts: cgrEv.APIOpts,
Event: cgrEv.Event,
CreatedAt: time.Now(),
})
}
if len(cdrs) == 0 {
return nil, utils.ErrNotFound
}
// Handle pagination
var limit, offset, maxItems int
if limit, offset, maxItems, err = utils.GetPaginateOpts(opts); err != nil {
return nil, err
}
cdrs, err = utils.Paginate(cdrs, limit, offset, maxItems)
return cdrs, err
}
// StorDB method not implemented yet
func (rs *RedisStorage) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err error) {
return utils.ErrNotImplemented
pairFltrs := make(map[string][]string)
notPairFltrs := make(map[string][]string)
notIndexed := []*FilterRule{}
for _, fltr := range qryFltr {
for _, rule := range fltr.Rules {
var elem string
if !slices.Contains(rs.stringIndexedFields, strings.TrimPrefix(rule.Element, "~")) ||
rule.Type != utils.MetaString && rule.Type != utils.MetaNotString {
notIndexed = append(notIndexed, rule)
continue
}
elem = strings.Trim(rule.Element, "~")
switch rule.Type {
case utils.MetaString:
pairFltrs[elem] = rule.Values
case utils.MetaNotString:
notPairFltrs[elem] = rule.Values
}
}
}
var lockIDs []string // used to lock used indexes while setting
// Find indexed fields
var cdrMpIDs utils.StringSet
for keySlice, fltrSlice := range pairFltrs {
if len(fltrSlice) == 0 {
continue
}
grpMpIDs := make(utils.StringSet)
for _, id := range fltrSlice {
lockIDs = append(lockIDs, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id))
var ids []string
if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) {
rs.Cmd(&ids, redisSMEMBERS, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id))
return
}, 0, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id)); err != nil {
return err
}
grpMpIDs.AddSlice(ids)
}
if grpMpIDs.Size() == 0 {
return utils.ErrNotFound
}
if cdrMpIDs == nil {
cdrMpIDs = grpMpIDs
continue
}
cdrMpIDs.Intersect(grpMpIDs)
if cdrMpIDs.Size() == 0 {
return utils.ErrNotFound
}
}
if cdrMpIDs == nil {
// Get all CDR IDs if no filters
var allIDs []string
if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) {
if err := rs.Cmd(&allIDs, redisKEYS, utils.CDRsPrefix+utils.Meta); err != nil {
return err
}
return
}, 0, lockIDs...); err != nil {
return err
}
cdrMpIDs = utils.NewStringSet(allIDs)
}
// Check for Not filters
for keySlice, fltrSlice := range notPairFltrs {
if len(fltrSlice) == 0 {
continue
}
for _, id := range fltrSlice {
lockIDs = append(lockIDs, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id))
var ids []string
if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) {
rs.Cmd(&ids, redisSMEMBERS, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id))
return
}, 0, utils.CDRsIndexes+utils.ConcatenatedKey(qryFltr[0].Tenant, keySlice, id)); err != nil {
return err
}
for _, cid := range ids {
cdrMpIDs.Remove(cid)
if cdrMpIDs.Size() == 0 {
return utils.ErrNotFound
}
}
}
}
// Remove CDRs and their indexes
for key := range cdrMpIDs {
var cdrBytes []byte
if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) {
// key includes "cdr_" prefix
if err := rs.Cmd(&cdrBytes, redisGET, key); err != nil {
return err
}
return
}, 0, lockIDs...); err != nil {
return err
}
cgrEv := new(utils.CGREvent)
if err := rs.ms.Unmarshal(cdrBytes, &cgrEv); err != nil {
return err
}
// Apply non-indexed filters
dp := cgrEv.AsDataProvider()
var pass bool = true
for _, fltr := range notIndexed {
if pass, err = fltr.Pass(ctx, dp); err != nil {
return err
} else if !pass {
cdrMpIDs.Remove(key)
break
}
}
if !pass {
continue
}
// Get the CDR to find all indexes
idx := make(utils.StringSet)
for _, v := range rs.stringIndexedFields {
val, err := dp.FieldAsString(strings.Split(v, utils.NestingSep))
if err != nil {
if err == utils.ErrNotFound {
continue
}
return err
}
idx.Add(utils.ConcatenatedKey(v, val))
}
for _, v := range rs.prefixIndexedFields {
val, err := dp.FieldAsString(strings.Split(v, utils.NestingSep))
if err != nil {
if err == utils.ErrNotFound {
continue
}
return err
}
idx.Add(utils.ConcatenatedKey(v, val))
for i := len(val) - 1; i > 0; i-- {
idx.Add(utils.ConcatenatedKey(v, val[:i]))
}
}
// Remove CDR from all indexes
for indexKey := range idx {
if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) {
rs.Cmd(nil, redisSREM, utils.CDRsIndexes+
utils.ConcatenatedKey(cgrEv.Tenant, indexKey), key)
return
}, 0, utils.ConcatenatedKey(cgrEv.Tenant, indexKey)); err != nil {
return err
}
}
if err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) (err error) {
// Remove CDR
if err := rs.Cmd(nil, redisDEL, key); err != nil {
return err
}
return
}, 0, lockIDs...); err != nil {
return err
}
}
return nil
}
// DumpDataDB will dump all of datadb from memory to a file, only for InternalDB

View File

@@ -33,7 +33,7 @@ import (
// go test -bench RedisGetKeysForPrefix -run=^# -count 3 -benchtime=10s
func BenchmarkRedisGetKeysForPrefix(b *testing.B) {
rs, _ := NewRedisStorage("127.0.0.1:6379", 10, utils.CGRateSLwr, "", "json", 10, 20,
"", false, 5*time.Second, 0, 0, 0, 0, 150*time.Microsecond, 0, false, "", "", "")
"", false, 5*time.Second, 0, 0, 0, 0, 150*time.Microsecond, 0, false, "", "", "", nil, nil)
chargerProfile := &utils.ChargerProfile{
ID: "TestA_CHARGER1",
Tenant: "cgrates.org",

View File

@@ -50,7 +50,7 @@ func NewDataDBConn(dbType, host, port, name, user,
opts.RedisConnectTimeout, opts.RedisReadTimeout, opts.RedisWriteTimeout,
opts.RedisPoolPipelineWindow, opts.RedisPoolPipelineLimit,
opts.RedisTLS, opts.RedisClientCertificate, opts.RedisClientKey,
opts.RedisCACertificate)
opts.RedisCACertificate, stringIndexedFields, prefixIndexedFields)
case utils.MetaMongo:
d, err = NewMongoStorage(opts.MongoConnScheme, host, port, name, user, pass,
marshaler, stringIndexedFields, opts.MongoQueryTimeout)

View File

@@ -56,7 +56,7 @@ func TestDMitinitDB(t *testing.T) {
cfg.DbCfg().DBConns[utils.MetaDefault].Opts.RedisConnectAttempts, "", false,
0, 0, 0, 0, 0, 150*time.Microsecond, 0, false,
utils.EmptyString, utils.EmptyString,
utils.EmptyString)
utils.EmptyString, nil, nil)
if err != nil {
t.Fatal("Could not connect to Redis", err.Error())
}

View File

@@ -110,7 +110,7 @@ func TestFilterIndexerIT(t *testing.T) {
cfg.DbCfg().DBConns[utils.MetaDefault].Opts.RedisConnectAttempts, "", false,
0, 0, 0, 0, 0, 150*time.Microsecond, 0, false,
utils.EmptyString, utils.EmptyString,
utils.EmptyString)
utils.EmptyString, nil, nil)
if err != nil {
t.Fatal("Could not connect to Redis", err.Error())
}

View File

@@ -83,7 +83,7 @@ func TestOnStorIT(t *testing.T) {
rdsITdb, err = NewRedisStorage("127.0.0.1:6379",
4, utils.CGRateSLwr, cfg.DbCfg().DBConns[utils.MetaDefault].Password, cfg.GeneralCfg().DBDataEncoding,
cfg.DbCfg().DBConns[utils.MetaDefault].Opts.RedisMaxConns, cfg.DbCfg().DBConns[utils.MetaDefault].Opts.RedisConnectAttempts, "", false,
0, 0, 0, 0, 0, 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
0, 0, 0, 0, 0, 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString, nil, nil)
if err != nil {
t.Fatal("Could not connect to Redis", err.Error())
}