From 239cbb7365d4c1e8d5cac8becb1d4d8440e3322e Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 6 Oct 2021 19:02:26 +0300 Subject: [PATCH] Updated Redis storage --- apier/v1/config_it_test.go | 4 +- apier/v1/core_it_test.go | 8 ++-- apier/v1/preload_it_test.go | 4 +- cmd/cgr-tester/cgr-tester.go | 7 ++- ees/ees.go | 6 +-- engine/cdrs.go | 4 +- engine/libtest.go | 8 ++-- engine/storage_redis.go | 74 ++++++++++++++++++------------- engine/storage_utils.go | 2 +- engine/z_datamanager_it_test.go | 2 +- engine/z_filterindexer_it_test.go | 2 +- engine/z_onstor_it_test.go | 2 +- sessions/sessions.go | 12 ++--- utils/consts.go | 1 + utils/coreutils.go | 6 +++ 15 files changed, 81 insertions(+), 61 deletions(-) diff --git a/apier/v1/config_it_test.go b/apier/v1/config_it_test.go index 0e47064bb..f0091e44b 100644 --- a/apier/v1/config_it_test.go +++ b/apier/v1/config_it_test.go @@ -443,11 +443,11 @@ func testConfigStartEngineFromHTTP(t *testing.T) { if err := engine.Start(); err != nil { t.Error(err) } - fib := utils.Fib() + fib := utils.FibDuration(time.Millisecond) var jsonClnt *rpc.Client var connected bool for i := 0; i < 200; i++ { - time.Sleep(time.Duration(fib()) * time.Millisecond) + time.Sleep(fib()) if jsonClnt, err = jsonrpc.Dial(utils.TCP, "localhost:2012"); err != nil { utils.Logger.Warning(fmt.Sprintf("Error <%s> when opening test connection to: <%s>", err.Error(), "localhost:2012")) diff --git a/apier/v1/core_it_test.go b/apier/v1/core_it_test.go index be228922b..add4e1622 100644 --- a/apier/v1/core_it_test.go +++ b/apier/v1/core_it_test.go @@ -128,10 +128,10 @@ func testCoreSv1StartEngineByExecWithCPUProfiling(t *testing.T) { if err := engine.Start(); err != nil { t.Error(err) } - fib := utils.Fib() + fib := utils.FibDuration(time.Millisecond) var connected bool for i := 0; i < 200; i++ { - time.Sleep(time.Duration(fib()) * time.Millisecond) + time.Sleep(fib()) if _, err := jsonrpc.Dial(utils.TCP, coreV1Cfg.ListenCfg().RPCJSONListen); err != nil { t.Log(err) } else { @@ -184,10 +184,10 @@ func testCoreSv1StartEngineByExecWIthMemProfiling(t *testing.T) { if err := engine.Start(); err != nil { t.Error(err) } - fib := utils.Fib() + fib := utils.FibDuration(time.Millisecond) var connected bool for i := 0; i < 200; i++ { - time.Sleep(time.Duration(fib()) * time.Millisecond) + time.Sleep(fib()) if _, err := jsonrpc.Dial(utils.TCP, coreV1Cfg.ListenCfg().RPCJSONListen); err != nil { t.Log(err) } else { diff --git a/apier/v1/preload_it_test.go b/apier/v1/preload_it_test.go index 1c030a90a..cc04008bf 100644 --- a/apier/v1/preload_it_test.go +++ b/apier/v1/preload_it_test.go @@ -95,10 +95,10 @@ func testPreloadITStartEngine(t *testing.T) { if err := engine.Start(); err != nil { t.Error(err) } - fib := utils.Fib() + fib := utils.FibDuration(time.Millisecond) var connected bool for i := 0; i < 25; i++ { - time.Sleep(time.Duration(fib()) * time.Millisecond) + time.Sleep(fib()) if _, err := jsonrpc.Dial(utils.TCP, preloadCfg.ListenCfg().RPCJSONListen); err != nil { t.Logf("Error <%s> when opening test connection to: <%s>", err.Error(), preloadCfg.ListenCfg().RPCJSONListen) diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go index 9cff7f06b..7ed0706e0 100644 --- a/cmd/cgr-tester/cgr-tester.go +++ b/cmd/cgr-tester/cgr-tester.go @@ -71,7 +71,6 @@ var ( destination = cgrTesterFlags.String("destination", "1002", "The destination to use in queries.") json = cgrTesterFlags.Bool("json", false, "Use JSON RPC") version = cgrTesterFlags.Bool("version", false, "Prints the application version.") - nilDuration = time.Duration(0) usage = cgrTesterFlags.String("usage", "1m", "The duration to use in call simulation.") fPath = cgrTesterFlags.String("file_path", "", "read requests from file with path") reqSep = cgrTesterFlags.String("req_separator", "\n\n", "separator for requests in file") @@ -86,13 +85,13 @@ func durInternalRater(cd *engine.CallDescriptorWithAPIOpts) (time.Duration, erro tstCfg.DataDbCfg().Password, tstCfg.GeneralCfg().DBDataEncoding, tstCfg.DataDbCfg().Opts) if err != nil { - return nilDuration, fmt.Errorf("Could not connect to data database: %s", err.Error()) + return 0, fmt.Errorf("Could not connect to data database: %s", err.Error()) } dm := engine.NewDataManager(dbConn, cgrConfig.CacheCfg(), nil) // for the momentn we use here "" for sentinelName defer dm.DataDB().Close() engine.SetDataStorage(dm) if err := engine.LoadAllDataDBToCache(dm); err != nil { - return nilDuration, fmt.Errorf("Cache rating error: %s", err.Error()) + return 0, fmt.Errorf("Cache rating error: %s", err.Error()) } log.Printf("Runnning %d cycles...", *runs) var result *engine.CallCost @@ -131,7 +130,7 @@ func durRemoteRater(cd *engine.CallDescriptorWithAPIOpts) (time.Duration, error) } if err != nil { - return nilDuration, fmt.Errorf("Could not connect to engine: %s", err.Error()) + return 0, fmt.Errorf("Could not connect to engine: %s", err.Error()) } defer client.Close() start := time.Now() diff --git a/ees/ees.go b/ees/ees.go index d441de471..16067ea2f 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -304,14 +304,14 @@ func ExportWithAttempts(exp EventExporter, eEv interface{}, key string) (err err } }() } - fib := utils.Fib() + fib := utils.FibDuration(time.Second) for i := 0; i < exp.Cfg().Attempts; i++ { if err = exp.Connect(); err == nil { break } if i+1 < exp.Cfg().Attempts { - time.Sleep(time.Duration(fib()) * time.Second) + time.Sleep(fib()) } } if err != nil { @@ -326,7 +326,7 @@ func ExportWithAttempts(exp EventExporter, eEv interface{}, key string) (err err break } if i+1 < exp.Cfg().Attempts { - time.Sleep(time.Duration(fib()) * time.Second) + time.Sleep(fib()) } } if err != nil { diff --git a/engine/cdrs.go b/engine/cdrs.go index 35f1dafdd..f896b886b 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -170,7 +170,7 @@ func (cdrS *CDRServer) rateCDR(cdr *CDRWithAPIOpts) ([]*CDR, error) { (cdr.Usage != 0 || hasLastUsed) && cdr.CostDetails == nil { // ToDo: Get rid of Prepaid as soon as we don't want to support it backwards // Should be previously calculated and stored in DB - fib := utils.Fib() + fib := utils.FibDuration(time.Second) var smCosts []*SMCost cgrID := cdr.CGRID if _, hasIT := cdr.ExtraFields[utils.OriginIDPrefix]; hasIT { @@ -183,7 +183,7 @@ func (cdrS *CDRServer) rateCDR(cdr *CDRWithAPIOpts) ([]*CDR, error) { break } if i <= cdrS.cgrCfg.CdrsCfg().SMCostRetries-1 { - time.Sleep(time.Duration(fib()) * time.Second) + time.Sleep(fib()) } } if len(smCosts) != 0 { // Cost retrieved from SMCost table diff --git a/engine/libtest.go b/engine/libtest.go index ea2d1ab85..65e9af5ba 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -341,10 +341,10 @@ func StartEngine(cfgPath string, waitEngine int) (*exec.Cmd, error) { if err != nil { return nil, err } - fib := utils.Fib() + fib := utils.FibDuration(time.Millisecond) var connected bool for i := 0; i < 200; i++ { - time.Sleep(time.Duration(fib()) * time.Millisecond) + time.Sleep(fib()) if _, err := jsonrpc.Dial(utils.TCP, cfg.ListenCfg().RPCJSONListen); err != nil { utils.Logger.Warning(fmt.Sprintf("Error <%s> when opening test connection to: <%s>", err.Error(), cfg.ListenCfg().RPCJSONListen)) @@ -370,9 +370,9 @@ func StartEngineWithContext(ctx context.Context, cfgPath string, waitEngine int) if cfg, err = config.NewCGRConfigFromPath(cfgPath); err != nil { return } - fib := utils.Fib() + fib := utils.FibDuration(time.Millisecond) for i := 0; i < 200; i++ { - time.Sleep(time.Duration(fib()) * time.Millisecond) + time.Sleep(fib()) if _, err = jsonrpc.Dial(utils.TCP, cfg.ListenCfg().RPCJSONListen); err != nil { continue } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 095d67153..f3ca6e5c0 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -27,6 +27,7 @@ import ( "io" "os" "strconv" + "strings" "time" "github.com/cgrates/cgrates/config" @@ -65,21 +66,21 @@ const ( redis_HGET = "HGET" redis_RENAME = "RENAME" redis_HMSET = "HMSET" + + redisLoadError = "Redis is loading the dataset in memory" ) func NewRedisStorage(address string, db int, user, pass, mrshlerStr string, - maxConns int, sentinelName string, isCluster bool, clusterSync, + maxConns,attempts int, sentinelName string, isCluster bool, clusterSync, clusterOnDownDelay time.Duration, tlsConn bool, - tlsClientCert, tlsClientKey, tlsCACert string) (rs *RedisStorage, err error) { - rs = new(RedisStorage) - if rs.ms, err = NewMarshaler(mrshlerStr); err != nil { - rs = nil + tlsClientCert, tlsClientKey, tlsCACert string) (_ *RedisStorage, err error) { + var ms Marshaler + if ms, err = NewMarshaler(mrshlerStr); err != nil { return } - dialOpts := []radix.DialOpt{ - radix.DialSelectDB(db), - } + dialOpts := make([]radix.DialOpt, 1, 3) + dialOpts[0] = radix.DialSelectDB(db) if pass != utils.EmptyString { if user == utils.EmptyString { dialOpts = append(dialOpts, radix.DialAuthPass(pass)) @@ -91,8 +92,7 @@ func NewRedisStorage(address string, db int, user, pass, mrshlerStr string, if tlsConn { var cert tls.Certificate if tlsClientCert != "" && tlsClientKey != "" { - cert, err = tls.LoadX509KeyPair(tlsClientCert, tlsClientKey) - if err != nil { + if cert, err = tls.LoadX509KeyPair(tlsClientCert, tlsClientKey); err != nil { return } } @@ -108,8 +108,7 @@ func NewRedisStorage(address string, db int, user, pass, mrshlerStr string, if ca, err = os.ReadFile(tlsCACert); err != nil { return } - - if ok := rootCAs.AppendCertsFromPEM(ca); !ok { + if !rootCAs.AppendCertsFromPEM(ca) { return } } @@ -119,41 +118,56 @@ func NewRedisStorage(address string, db int, user, pass, mrshlerStr string, })) } + var client radix.Client + if client, err = newRedisClient(address, sentinelName, + isCluster, clusterSync, clusterOnDownDelay, + maxConns, attempts, dialOpts); err != nil { + return + } + return &RedisStorage{ + ms: ms, + client: client, + }, nil +} + +func redisDial(network, addr string, attempts int, opts ...radix.DialOpt) (conn radix.Conn, err error) { + fib := utils.FibDuration(time.Millisecond) + for i := 0; i < attempts; i++ { + if conn, err = radix.Dial(network, addr, opts...); err == nil || + (err != nil && !strings.Contains(err.Error(), redisLoadError)) { + break + } + time.Sleep(fib()) + } + return +} + +func newRedisClient(address, sentinelName string, isCluster bool, + clusterSync, clusterOnDownDelay time.Duration, maxConns, attempts int, dialOpts []radix.DialOpt) (radix.Client, error) { dialFunc := func(network, addr string) (radix.Conn, error) { - return radix.Dial(network, addr, dialOpts...) + return redisDial(network, addr, attempts, dialOpts...) } dialFuncAuthOnly := func(network, addr string) (radix.Conn, error) { - return radix.Dial(network, addr, dialOpts[1:]...) + return redisDial(network, addr, attempts, dialOpts[1:]...) } switch { case isCluster: - if rs.client, err = radix.NewCluster(utils.InfieldSplit(address), + return radix.NewCluster(utils.InfieldSplit(address), radix.ClusterSyncEvery(clusterSync), radix.ClusterOnDownDelayActionsBy(clusterOnDownDelay), radix.ClusterPoolFunc(func(network, addr string) (radix.Client, error) { // in cluster enviorment do not select the DB as we expect to have only one DB return radix.NewPool(network, addr, maxConns, radix.PoolConnFunc(dialFuncAuthOnly)) - })); err != nil { - rs = nil - return - } + })) case sentinelName != utils.EmptyString: - if rs.client, err = radix.NewSentinel(sentinelName, utils.InfieldSplit(address), + return radix.NewSentinel(sentinelName, utils.InfieldSplit(address), radix.SentinelConnFunc(dialFuncAuthOnly), radix.SentinelPoolFunc(func(network, addr string) (radix.Client, error) { return radix.NewPool(network, addr, maxConns, radix.PoolConnFunc(dialFunc)) - })); err != nil { - rs = nil - return - } + })) default: - if rs.client, err = radix.NewPool(utils.TCP, address, maxConns, radix.PoolConnFunc(dialFunc)); err != nil { - rs = nil - return - } + return radix.NewPool(utils.TCP, address, maxConns, radix.PoolConnFunc(dialFunc)) } - - return } // Cmd function get a connection from the pool. diff --git a/engine/storage_utils.go b/engine/storage_utils.go index 7275cb283..ee6923a36 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -59,7 +59,7 @@ func NewDataDBConn(dbType, host, port, name, user, return } d, err = NewRedisStorage(host, dbNo, user, pass, marshaler, - utils.RedisMaxConns, utils.IfaceAsString(opts[utils.RedisSentinelNameCfg]), + utils.RedisMaxConns, utils.RedisMaxAttempts, utils.IfaceAsString(opts[utils.RedisSentinelNameCfg]), isCluster, clusterSync, clusterOnDownDelay, hasTlsConn, utils.IfaceAsString(opts[utils.RedisClientCertificate]), utils.IfaceAsString(opts[utils.RedisClientKey]), diff --git a/engine/z_datamanager_it_test.go b/engine/z_datamanager_it_test.go index b34adf128..67944393b 100644 --- a/engine/z_datamanager_it_test.go +++ b/engine/z_datamanager_it_test.go @@ -53,7 +53,7 @@ func TestDMitinitDB(t *testing.T) { dataDB, err = NewRedisStorage( fmt.Sprintf("%s:%s", cfg.DataDbCfg().Host, cfg.DataDbCfg().Port), 4, cfg.DataDbCfg().User, cfg.DataDbCfg().Password, cfg.GeneralCfg().DBDataEncoding, - utils.RedisMaxConns, "", false, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString) + utils.RedisMaxConns, utils.RedisMaxAttempts, "", false, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString) if err != nil { t.Fatal("Could not connect to Redis", err.Error()) } diff --git a/engine/z_filterindexer_it_test.go b/engine/z_filterindexer_it_test.go index d31089f18..7d271a0f0 100644 --- a/engine/z_filterindexer_it_test.go +++ b/engine/z_filterindexer_it_test.go @@ -99,7 +99,7 @@ func TestFilterIndexerIT(t *testing.T) { redisDB, err := NewRedisStorage( fmt.Sprintf("%s:%s", cfg.DataDbCfg().Host, cfg.DataDbCfg().Port), 4, cfg.DataDbCfg().User, cfg.DataDbCfg().Password, cfg.GeneralCfg().DBDataEncoding, - utils.RedisMaxConns, "", false, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString) + utils.RedisMaxConns, utils.RedisMaxAttempts, "", false, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString) if err != nil { t.Fatal("Could not connect to Redis", err.Error()) } diff --git a/engine/z_onstor_it_test.go b/engine/z_onstor_it_test.go index 77a1d03b8..bfe668284 100644 --- a/engine/z_onstor_it_test.go +++ b/engine/z_onstor_it_test.go @@ -93,7 +93,7 @@ func TestOnStorIT(t *testing.T) { rdsITdb, err = NewRedisStorage( fmt.Sprintf("%s:%s", cfg.DataDbCfg().Host, cfg.DataDbCfg().Port), 4, cfg.DataDbCfg().User, cfg.DataDbCfg().Password, cfg.GeneralCfg().DBDataEncoding, - utils.RedisMaxConns, "", false, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString) + utils.RedisMaxConns, utils.RedisMaxAttempts, "", false, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString) if err != nil { t.Fatal("Could not connect to Redis", err.Error()) } diff --git a/sessions/sessions.go b/sessions/sessions.go index d7994d4a8..736795e2d 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -559,10 +559,10 @@ func (sS *SessionS) debitLoopSession(s *Session, sRunIdx int, dscReason = err.Error() } // try to disconect the session n times before we force terminate it on our side - fib := utils.Fib() + fib := utils.FibDuration(time.Millisecond) for i := 0; i < sS.cgrCfg.SessionSCfg().TerminateAttempts; i++ { if i != 0 { // not the first iteration - time.Sleep(time.Duration(fib()) * time.Millisecond) + time.Sleep(fib()) } if err = sS.disconnectSession(s, dscReason); err == nil { s.Unlock() @@ -598,10 +598,10 @@ func (sS *SessionS) debitLoopSession(s *Session, sRunIdx int, s.Lock() defer s.Unlock() // try to disconect the session n times before we force terminate it on our side - fib := utils.Fib() + fib := utils.FibDuration(time.Millisecond) for i := 0; i < sS.cgrCfg.SessionSCfg().TerminateAttempts; i++ { if i != 0 { // not the first iteration - time.Sleep(time.Duration(fib()) * time.Millisecond) + time.Sleep(fib()) } if err = sS.disconnectSession(s, utils.ErrInsufficientCredit.Error()); err == nil { return @@ -2654,7 +2654,7 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.ClientConnector, } } var s *Session - fib := utils.Fib() + fib := utils.FibDuration(time.Millisecond) var isMsg bool // one time charging, do not perform indexing and sTerminator for i := 0; i < sS.cgrCfg.SessionSCfg().TerminateAttempts; i++ { if s = sS.getRelocateSession(cgrID, @@ -2664,7 +2664,7 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.ClientConnector, break } if i+1 < sS.cgrCfg.SessionSCfg().TerminateAttempts { // not last iteration - time.Sleep(time.Duration(fib()) * time.Millisecond) + time.Sleep(fib()) continue } isMsg = true diff --git a/utils/consts.go b/utils/consts.go index dd324c580..2c0cd7d9e 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -156,6 +156,7 @@ const ( Version = "v0.11.0~dev" DiameterFirmwareRevision = 918 RedisMaxConns = 10 + RedisMaxAttempts = 20 CGRateSLwr = "cgrates" Postgres = "postgres" MySQL = "mysql" diff --git a/utils/coreutils.go b/utils/coreutils.go index 0632b6f09..62036c2bc 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -487,6 +487,12 @@ func Fib() func() int { return a } } +func FibDuration(mult time.Duration) func() time.Duration { + fib := Fib() + return func() time.Duration { + return time.Duration(fib()) * mult + } +} // Utilities to provide pointers where we need to define ad-hoc func StringPointer(str string) *string {