Add redisPoolPipelineLimit configuration field for datadb

Sets the maximum number of commands that can be pipelined before flushing.
0 means no limit and pipelines will only be limited by the time window.
This commit is contained in:
ionutboangiu
2024-07-04 19:38:25 +03:00
committed by Dan Christian Bogos
parent 8ed26c769e
commit eb3bd5cc1d
20 changed files with 116 additions and 76 deletions

View File

@@ -74,7 +74,8 @@ const (
func NewRedisStorage(address string, db int, user, pass, mrshlerStr string,
maxConns, attempts int, sentinelName string, isCluster bool, clusterSync,
clusterOnDownDelay, pipelineWindow, connTimeout, readTimeout, writeTimeout time.Duration,
clusterOnDownDelay, connTimeout, readTimeout, writeTimeout,
pipelineWindow time.Duration, pipelineLimit int,
tlsConn bool, tlsClientCert, tlsClientKey, tlsCACert string) (_ *RedisStorage, err error) {
var ms Marshaler
if ms, err = NewMarshaler(mrshlerStr); err != nil {
@@ -127,7 +128,8 @@ func NewRedisStorage(address string, db int, user, pass, mrshlerStr string,
var client radix.Client
if client, err = newRedisClient(address, sentinelName,
isCluster, clusterSync, clusterOnDownDelay, pipelineWindow,
isCluster, clusterSync, clusterOnDownDelay,
pipelineWindow, pipelineLimit,
maxConns, attempts, dialOpts); err != nil {
return
}
@@ -150,7 +152,10 @@ func redisDial(network, addr string, attempts int, opts ...radix.DialOpt) (conn
}
func newRedisClient(address, sentinelName string, isCluster bool,
clusterSync, clusterOnDownDelay, pipelineWindow time.Duration, maxConns, attempts int, dialOpts []radix.DialOpt) (radix.Client, error) {
clusterSync, clusterOnDownDelay, pipelineWindow time.Duration,
pipelineLimit, maxConns, attempts int, dialOpts []radix.DialOpt,
) (radix.Client, error) {
dialFunc := func(network, addr string) (radix.Conn, error) {
return redisDial(network, addr, attempts, dialOpts...)
}
@@ -160,7 +165,7 @@ func newRedisClient(address, sentinelName string, isCluster bool,
// Configure common pool options.
poolOpts := make([]radix.PoolOpt, 0, 2)
poolOpts = append(poolOpts, radix.PoolPipelineWindow(pipelineWindow, 0))
poolOpts = append(poolOpts, radix.PoolPipelineWindow(pipelineWindow, pipelineLimit))
switch {
case isCluster:

View File

@@ -49,8 +49,8 @@ func NewDataDBConn(dbType, host, port, name, user,
opts.RedisMaxConns, opts.RedisConnectAttempts,
opts.RedisSentinel,
opts.RedisCluster, opts.RedisClusterSync, opts.RedisClusterOndownDelay,
opts.RedisPoolPipelineWindow,
opts.RedisConnectTimeout, opts.RedisReadTimeout, opts.RedisWriteTimeout,
opts.RedisPoolPipelineWindow, opts.RedisPoolPipelineLimit,
opts.RedisTLS, opts.RedisClientCertificate, opts.RedisClientKey, opts.RedisCACertificate)
case utils.MetaMongo:
d, err = NewMongoStorage(opts.MongoConnScheme, host, port, name, user, pass, marshaler, utils.DataDB, nil, opts.MongoQueryTimeout)

View File

@@ -53,7 +53,7 @@ func TestDMitinitDB(t *testing.T) {
dataDB, err = NewRedisStorage(
fmt.Sprintf("%s:%s", cfg.DataDbCfg().Host, cfg.DataDbCfg().Port),
4, cfg.DataDbCfg().User, cfg.DataDbCfg().Password, cfg.GeneralCfg().DBDataEncoding,
10, 20, "", false, 0, 0, 150*time.Microsecond, 0, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
10, 20, "", false, 0, 0, 0, 0, 0, 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
if err != nil {
t.Fatal("Could not connect to Redis", err.Error())
}

View File

@@ -99,7 +99,7 @@ func TestFilterIndexerIT(t *testing.T) {
redisDB, err := NewRedisStorage(
fmt.Sprintf("%s:%s", cfg.DataDbCfg().Host, cfg.DataDbCfg().Port),
4, cfg.DataDbCfg().User, cfg.DataDbCfg().Password, cfg.GeneralCfg().DBDataEncoding,
10, 20, "", false, 0, 0, 150*time.Microsecond, 0, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
10, 20, "", false, 0, 0, 0, 0, 0, 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
if err != nil {
t.Fatal("Could not connect to Redis", err.Error())
}

View File

@@ -94,7 +94,7 @@ func TestOnStorIT(t *testing.T) {
rdsITdb, err = NewRedisStorage(
fmt.Sprintf("%s:%s", cfg.DataDbCfg().Host, cfg.DataDbCfg().Port),
4, cfg.DataDbCfg().User, cfg.DataDbCfg().Password, cfg.GeneralCfg().DBDataEncoding,
10, 20, "", false, 0, 0, 150*time.Microsecond, 0, 0, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
10, 20, "", false, 0, 0, 0, 0, 0, 150*time.Microsecond, 0, false, utils.EmptyString, utils.EmptyString, utils.EmptyString)
if err != nil {
t.Fatal("Could not connect to Redis", err.Error())
}