Updated Redis storage

This commit is contained in:
Trial97
2021-10-06 19:02:26 +03:00
committed by Dan Christian Bogos
parent 02abed3c60
commit 239cbb7365
15 changed files with 81 additions and 61 deletions

View File

@@ -170,7 +170,7 @@ func (cdrS *CDRServer) rateCDR(cdr *CDRWithAPIOpts) ([]*CDR, error) {
(cdr.Usage != 0 || hasLastUsed) && cdr.CostDetails == nil {
// ToDo: Get rid of Prepaid as soon as we don't want to support it backwards
// Should be previously calculated and stored in DB
fib := utils.Fib()
fib := utils.FibDuration(time.Second)
var smCosts []*SMCost
cgrID := cdr.CGRID
if _, hasIT := cdr.ExtraFields[utils.OriginIDPrefix]; hasIT {
@@ -183,7 +183,7 @@ func (cdrS *CDRServer) rateCDR(cdr *CDRWithAPIOpts) ([]*CDR, error) {
break
}
if i <= cdrS.cgrCfg.CdrsCfg().SMCostRetries-1 {
time.Sleep(time.Duration(fib()) * time.Second)
time.Sleep(fib())
}
}
if len(smCosts) != 0 { // Cost retrieved from SMCost table

View File

@@ -341,10 +341,10 @@ func StartEngine(cfgPath string, waitEngine int) (*exec.Cmd, error) {
if err != nil {
return nil, err
}
fib := utils.Fib()
fib := utils.FibDuration(time.Millisecond)
var connected bool
for i := 0; i < 200; i++ {
time.Sleep(time.Duration(fib()) * time.Millisecond)
time.Sleep(fib())
if _, err := jsonrpc.Dial(utils.TCP, cfg.ListenCfg().RPCJSONListen); err != nil {
utils.Logger.Warning(fmt.Sprintf("Error <%s> when opening test connection to: <%s>",
err.Error(), cfg.ListenCfg().RPCJSONListen))
@@ -370,9 +370,9 @@ func StartEngineWithContext(ctx context.Context, cfgPath string, waitEngine int)
if cfg, err = config.NewCGRConfigFromPath(cfgPath); err != nil {
return
}
fib := utils.Fib()
fib := utils.FibDuration(time.Millisecond)
for i := 0; i < 200; i++ {
time.Sleep(time.Duration(fib()) * time.Millisecond)
time.Sleep(fib())
if _, err = jsonrpc.Dial(utils.TCP, cfg.ListenCfg().RPCJSONListen); err != nil {
continue
}

View File

@@ -27,6 +27,7 @@ import (
"io"
"os"
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/config"
@@ -65,21 +66,21 @@ const (
redis_HGET = "HGET"
redis_RENAME = "RENAME"
redis_HMSET = "HMSET"
redisLoadError = "Redis is loading the dataset in memory"
)
func NewRedisStorage(address string, db int, user, pass, mrshlerStr string,
maxConns int, sentinelName string, isCluster bool, clusterSync,
maxConns,attempts int, sentinelName string, isCluster bool, clusterSync,
clusterOnDownDelay time.Duration, tlsConn bool,
tlsClientCert, tlsClientKey, tlsCACert string) (rs *RedisStorage, err error) {
rs = new(RedisStorage)
if rs.ms, err = NewMarshaler(mrshlerStr); err != nil {
rs = nil
tlsClientCert, tlsClientKey, tlsCACert string) (_ *RedisStorage, err error) {
var ms Marshaler
if ms, err = NewMarshaler(mrshlerStr); err != nil {
return
}
dialOpts := []radix.DialOpt{
radix.DialSelectDB(db),
}
dialOpts := make([]radix.DialOpt, 1, 3)
dialOpts[0] = radix.DialSelectDB(db)
if pass != utils.EmptyString {
if user == utils.EmptyString {
dialOpts = append(dialOpts, radix.DialAuthPass(pass))
@@ -91,8 +92,7 @@ func NewRedisStorage(address string, db int, user, pass, mrshlerStr string,
if tlsConn {
var cert tls.Certificate
if tlsClientCert != "" && tlsClientKey != "" {
cert, err = tls.LoadX509KeyPair(tlsClientCert, tlsClientKey)
if err != nil {
if cert, err = tls.LoadX509KeyPair(tlsClientCert, tlsClientKey); err != nil {
return
}
}
@@ -108,8 +108,7 @@ func NewRedisStorage(address string, db int, user, pass, mrshlerStr string,
if ca, err = os.ReadFile(tlsCACert); err != nil {
return
}
if ok := rootCAs.AppendCertsFromPEM(ca); !ok {
if !rootCAs.AppendCertsFromPEM(ca) {
return
}
}
@@ -119,41 +118,56 @@ func NewRedisStorage(address string, db int, user, pass, mrshlerStr string,
}))
}
var client radix.Client
if client, err = newRedisClient(address, sentinelName,
isCluster, clusterSync, clusterOnDownDelay,
maxConns, attempts, dialOpts); err != nil {
return
}
return &RedisStorage{
ms: ms,
client: client,
}, nil
}
func redisDial(network, addr string, attempts int, opts ...radix.DialOpt) (conn radix.Conn, err error) {
fib := utils.FibDuration(time.Millisecond)
for i := 0; i < attempts; i++ {
if conn, err = radix.Dial(network, addr, opts...); err == nil ||
(err != nil && !strings.Contains(err.Error(), redisLoadError)) {
break
}
time.Sleep(fib())
}
return
}
func newRedisClient(address, sentinelName string, isCluster bool,
clusterSync, clusterOnDownDelay time.Duration, maxConns, attempts int, dialOpts []radix.DialOpt) (radix.Client, error) {
dialFunc := func(network, addr string) (radix.Conn, error) {
return radix.Dial(network, addr, dialOpts...)
return redisDial(network, addr, attempts, dialOpts...)
}
dialFuncAuthOnly := func(network, addr string) (radix.Conn, error) {
return radix.Dial(network, addr, dialOpts[1:]...)
return redisDial(network, addr, attempts, dialOpts[1:]...)
}
switch {
case isCluster:
if rs.client, err = radix.NewCluster(utils.InfieldSplit(address),
return radix.NewCluster(utils.InfieldSplit(address),
radix.ClusterSyncEvery(clusterSync),
radix.ClusterOnDownDelayActionsBy(clusterOnDownDelay),
radix.ClusterPoolFunc(func(network, addr string) (radix.Client, error) {
// in cluster enviorment do not select the DB as we expect to have only one DB
return radix.NewPool(network, addr, maxConns, radix.PoolConnFunc(dialFuncAuthOnly))
})); err != nil {
rs = nil
return
}
}))
case sentinelName != utils.EmptyString:
if rs.client, err = radix.NewSentinel(sentinelName, utils.InfieldSplit(address),
return radix.NewSentinel(sentinelName, utils.InfieldSplit(address),
radix.SentinelConnFunc(dialFuncAuthOnly),
radix.SentinelPoolFunc(func(network, addr string) (radix.Client, error) {
return radix.NewPool(network, addr, maxConns, radix.PoolConnFunc(dialFunc))
})); err != nil {
rs = nil
return
}
}))
default:
if rs.client, err = radix.NewPool(utils.TCP, address, maxConns, radix.PoolConnFunc(dialFunc)); err != nil {
rs = nil
return
}
return radix.NewPool(utils.TCP, address, maxConns, radix.PoolConnFunc(dialFunc))
}
return
}
// Cmd function get a connection from the pool.

View File

@@ -59,7 +59,7 @@ func NewDataDBConn(dbType, host, port, name, user,
return
}
d, err = NewRedisStorage(host, dbNo, user, pass, marshaler,
utils.RedisMaxConns, utils.IfaceAsString(opts[utils.RedisSentinelNameCfg]),
utils.RedisMaxConns, utils.RedisMaxAttempts, utils.IfaceAsString(opts[utils.RedisSentinelNameCfg]),
isCluster, clusterSync, clusterOnDownDelay, hasTlsConn,
utils.IfaceAsString(opts[utils.RedisClientCertificate]),
utils.IfaceAsString(opts[utils.RedisClientKey]),

View File

@@ -53,7 +53,7 @@ func TestDMitinitDB(t *testing.T) {
dataDB, err = NewRedisStorage(
fmt.Sprintf("%s:%s", cfg.DataDbCfg().Host, cfg.DataDbCfg().Port),
4, cfg.DataDbCfg().User, cfg.DataDbCfg().Password, cfg.GeneralCfg().DBDataEncoding,
utils.RedisMaxConns, "", false, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
utils.RedisMaxConns, utils.RedisMaxAttempts, "", false, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
if err != nil {
t.Fatal("Could not connect to Redis", err.Error())
}

View File

@@ -99,7 +99,7 @@ func TestFilterIndexerIT(t *testing.T) {
redisDB, err := NewRedisStorage(
fmt.Sprintf("%s:%s", cfg.DataDbCfg().Host, cfg.DataDbCfg().Port),
4, cfg.DataDbCfg().User, cfg.DataDbCfg().Password, cfg.GeneralCfg().DBDataEncoding,
utils.RedisMaxConns, "", false, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
utils.RedisMaxConns, utils.RedisMaxAttempts, "", false, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
if err != nil {
t.Fatal("Could not connect to Redis", err.Error())
}

View File

@@ -93,7 +93,7 @@ func TestOnStorIT(t *testing.T) {
rdsITdb, err = NewRedisStorage(
fmt.Sprintf("%s:%s", cfg.DataDbCfg().Host, cfg.DataDbCfg().Port),
4, cfg.DataDbCfg().User, cfg.DataDbCfg().Password, cfg.GeneralCfg().DBDataEncoding,
utils.RedisMaxConns, "", false, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
utils.RedisMaxConns, utils.RedisMaxAttempts, "", false, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
if err != nil {
t.Fatal("Could not connect to Redis", err.Error())
}