Add redis pipeline window and limit cfg opts

Set redisPoolPipelineWindow to control duration before pipeline
flush (0 disables implicit pipelining) and redisPoolPipelineLimit
for max commands per pipeline (0 means no limit, only time window
applies).
This commit is contained in:
ionutboangiu
2024-10-25 20:32:00 +03:00
committed by Dan Christian Bogos
parent 3666dea54e
commit e0c05ecfa7
21 changed files with 178 additions and 85 deletions

View File

@@ -35,7 +35,7 @@ func TestSetGetRemoveConfigSectionsDrvRedis(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
db, err := NewRedisStorage(cfg.DataDbCfg().Host+":"+cfg.DataDbCfg().Port, 10, cfg.DataDbCfg().User,
cfg.DataDbCfg().Password, cfg.GeneralCfg().DBDataEncoding, cfg.DataDbCfg().Opts.RedisMaxConns, cfg.DataDbCfg().Opts.RedisConnectAttempts,
utils.EmptyString, false, 0, 0, 0, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
utils.EmptyString, false, 0, 0, 0, 0, 0, 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
if err != nil {
t.Error(err)
}

View File

@@ -73,9 +73,10 @@ const (
)
func NewRedisStorage(address string, db int, user, pass, mrshlerStr string,
maxConns, attempts int, sentinelName string, isCluster bool, clusterSync, clusterOnDownDelay,
connTimeout, readTimeout, writeTimeout time.Duration, tlsConn bool,
tlsClientCert, tlsClientKey, tlsCACert string) (_ *RedisStorage, err error) {
maxConns, attempts int, sentinelName string, isCluster bool, clusterSync,
clusterOnDownDelay, connTimeout, readTimeout, writeTimeout time.Duration,
pipelineWindow time.Duration, pipelineLimit int,
tlsConn bool, tlsClientCert, tlsClientKey, tlsCACert string) (_ *RedisStorage, err error) {
var ms utils.Marshaler
if ms, err = utils.NewMarshaler(mrshlerStr); err != nil {
return
@@ -128,6 +129,7 @@ func NewRedisStorage(address string, db int, user, pass, mrshlerStr string,
var client radix.Client
if client, err = newRedisClient(address, sentinelName,
isCluster, clusterSync, clusterOnDownDelay,
pipelineWindow, pipelineLimit,
maxConns, attempts, dialOpts); err != nil {
return
}
@@ -149,14 +151,20 @@ func redisDial(network, addr string, attempts int, opts ...radix.DialOpt) (conn
return
}
func newRedisClient(address, sentinelName string, isCluster bool,
clusterSync, clusterOnDownDelay time.Duration, maxConns, attempts int, dialOpts []radix.DialOpt) (radix.Client, error) {
func newRedisClient(address, sentinelName string, isCluster bool, clusterSync, clusterOnDownDelay,
pipelineWindow time.Duration, pipelineLimit, maxConns, attempts int, dialOpts []radix.DialOpt,
) (radix.Client, error) {
dialFunc := func(network, addr string) (radix.Conn, error) {
return redisDial(network, addr, attempts, dialOpts...)
}
dialFuncAuthOnly := func(network, addr string) (radix.Conn, error) {
return redisDial(network, addr, attempts, dialOpts[1:]...)
}
// Configure common pool options.
poolOpts := make([]radix.PoolOpt, 0, 2)
poolOpts = append(poolOpts, radix.PoolPipelineWindow(pipelineWindow, pipelineLimit))
switch {
case isCluster:
return radix.NewCluster(utils.InfieldSplit(address),
@@ -164,16 +172,16 @@ func newRedisClient(address, sentinelName string, isCluster bool,
radix.ClusterOnDownDelayActionsBy(clusterOnDownDelay),
radix.ClusterPoolFunc(func(network, addr string) (radix.Client, error) {
// in cluster enviorment do not select the DB as we expect to have only one DB
return radix.NewPool(network, addr, maxConns, radix.PoolConnFunc(dialFuncAuthOnly))
return radix.NewPool(network, addr, maxConns, append(poolOpts, radix.PoolConnFunc(dialFuncAuthOnly))...)
}))
case sentinelName != utils.EmptyString:
return radix.NewSentinel(sentinelName, utils.InfieldSplit(address),
radix.SentinelConnFunc(dialFuncAuthOnly),
radix.SentinelPoolFunc(func(network, addr string) (radix.Client, error) {
return radix.NewPool(network, addr, maxConns, radix.PoolConnFunc(dialFunc))
return radix.NewPool(network, addr, maxConns, append(poolOpts, radix.PoolConnFunc(dialFunc))...)
}))
default:
return radix.NewPool(utils.TCP, address, maxConns, radix.PoolConnFunc(dialFunc))
return radix.NewPool(utils.TCP, address, maxConns, append(poolOpts, radix.PoolConnFunc(dialFunc))...)
}
}

View File

@@ -33,7 +33,7 @@ import (
// go test -bench RedisGetKeysForPrefix -run=^# -count 3 -benchtime=10s
func BenchmarkRedisGetKeysForPrefix(b *testing.B) {
rs, _ := NewRedisStorage("127.0.0.1:6379", 10, "cgrates", "", "json", 10, 20,
"", false, 5*time.Second, 0, 0, 0, 0, false, "", "", "")
"", false, 5*time.Second, 0, 0, 0, 0, 150*time.Microsecond, 0, false, "", "", "")
chargerProfile := &ChargerProfile{
ID: "TestA_CHARGER1",
Tenant: "cgrates.org",

View File

@@ -44,10 +44,13 @@ func NewDataDBConn(dbType, host, port, name, user,
if port != "" && !strings.Contains(host, ":") {
host += ":" + port
}
d, err = NewRedisStorage(host, dbNo, user, pass, marshaler, opts.RedisMaxConns, opts.RedisConnectAttempts,
opts.RedisSentinel, opts.RedisCluster, opts.RedisClusterSync, opts.RedisClusterOndownDelay,
opts.RedisConnectTimeout, opts.RedisReadTimeout, opts.RedisWriteTimeout, opts.RedisTLS,
opts.RedisClientCertificate, opts.RedisClientKey, opts.RedisCACertificate)
d, err = NewRedisStorage(host, dbNo, user, pass, marshaler,
opts.RedisMaxConns, opts.RedisConnectAttempts,
opts.RedisSentinel,
opts.RedisCluster, opts.RedisClusterSync, opts.RedisClusterOndownDelay,
opts.RedisConnectTimeout, opts.RedisReadTimeout, opts.RedisWriteTimeout,
opts.RedisPoolPipelineWindow, opts.RedisPoolPipelineLimit,
opts.RedisTLS, opts.RedisClientCertificate, opts.RedisClientKey, opts.RedisCACertificate)
case utils.MetaMongo:
d, err = NewMongoStorage(opts.MongoConnScheme, host, port, name, user, pass, marshaler, utils.DataDB, nil, opts.MongoQueryTimeout)
case utils.MetaInternal:

View File

@@ -50,10 +50,15 @@ func TestDMitinitDB(t *testing.T) {
case utils.MetaInternal:
t.SkipNow()
case utils.MetaMySQL:
dataDB, err = NewRedisStorage(
fmt.Sprintf("%s:%s", cfg.DataDbCfg().Host, cfg.DataDbCfg().Port),
4, cfg.DataDbCfg().User, cfg.DataDbCfg().Password, cfg.GeneralCfg().DBDataEncoding,
cfg.DataDbCfg().Opts.RedisMaxConns, cfg.DataDbCfg().Opts.RedisConnectAttempts, "", false, 0, 0, 0, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
dataDB, err = NewRedisStorage(fmt.Sprintf("%s:%s",
cfg.DataDbCfg().Host, cfg.DataDbCfg().Port), 4,
cfg.DataDbCfg().User, cfg.DataDbCfg().Password,
cfg.GeneralCfg().DBDataEncoding,
cfg.DataDbCfg().Opts.RedisMaxConns,
cfg.DataDbCfg().Opts.RedisConnectAttempts, "", false,
0, 0, 0, 0, 0, 150*time.Microsecond, 0, false,
utils.EmptyString, utils.EmptyString,
utils.EmptyString)
if err != nil {
t.Fatal("Could not connect to Redis", err.Error())
}

View File

@@ -100,10 +100,15 @@ func TestFilterIndexerIT(t *testing.T) {
config.CgrConfig().CacheCfg(), nil)
case utils.MetaMySQL:
cfg := config.NewDefaultCGRConfig()
redisDB, err := NewRedisStorage(
fmt.Sprintf("%s:%s", cfg.DataDbCfg().Host, cfg.DataDbCfg().Port),
4, cfg.DataDbCfg().User, cfg.DataDbCfg().Password, cfg.GeneralCfg().DBDataEncoding,
cfg.DataDbCfg().Opts.RedisMaxConns, cfg.DataDbCfg().Opts.RedisConnectAttempts, "", false, 0, 0, 0, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
redisDB, err := NewRedisStorage(fmt.Sprintf("%s:%s",
cfg.DataDbCfg().Host, cfg.DataDbCfg().Port), 4,
cfg.DataDbCfg().User, cfg.DataDbCfg().Password,
cfg.GeneralCfg().DBDataEncoding,
cfg.DataDbCfg().Opts.RedisMaxConns,
cfg.DataDbCfg().Opts.RedisConnectAttempts, "", false,
0, 0, 0, 0, 0, 150*time.Microsecond, 0, false,
utils.EmptyString, utils.EmptyString,
utils.EmptyString)
if err != nil {
t.Fatal("Could not connect to Redis", err.Error())
}

View File

@@ -81,7 +81,7 @@ func TestOnStorIT(t *testing.T) {
fmt.Sprintf("%s:%s", cfg.DataDbCfg().Host, cfg.DataDbCfg().Port),
4, cfg.DataDbCfg().User, cfg.DataDbCfg().Password, cfg.GeneralCfg().DBDataEncoding,
cfg.DataDbCfg().Opts.RedisMaxConns, cfg.DataDbCfg().Opts.RedisConnectAttempts, "", false,
0, 0, 0, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
0, 0, 0, 0, 0, 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
if err != nil {
t.Fatal("Could not connect to Redis", err.Error())
}