diff --git a/agents/astagent.go b/agents/astagent.go index 8ac0019cf..c4db7a063 100644 --- a/agents/astagent.go +++ b/agents/astagent.go @@ -89,7 +89,8 @@ func (sma *AsteriskAgent) connectAsterisk(stopChan <-chan struct{}) (err error) sma.astConn, err = aringo.NewARInGO(fmt.Sprintf("ws://%s/ari/events?api_key=%s:%s&app=%s", connCfg.Address, connCfg.User, connCfg.Password, CGRAuthAPP), "http://cgrates.org", connCfg.User, connCfg.Password, fmt.Sprintf("%s@%s", utils.CGRateS, utils.Version), - sma.astEvChan, sma.astErrChan, stopChan, connCfg.ConnectAttempts, connCfg.Reconnects) + sma.astEvChan, sma.astErrChan, stopChan, connCfg.ConnectAttempts, connCfg.Reconnects, + connCfg.MaxReconnectInterval, utils.FibDuration) return } diff --git a/agents/fsagent.go b/agents/fsagent.go index ffd66b87b..4f5745f46 100644 --- a/agents/fsagent.go +++ b/agents/fsagent.go @@ -298,8 +298,8 @@ func (fsa *FSsessions) Connect() error { eventFilters := map[string][]string{"Call-Direction": {"inbound"}} errChan := make(chan error) for connIdx, connCfg := range fsa.cfg.EventSocketConns { - fSock, err := fsock.NewFSock(connCfg.Address, connCfg.Password, connCfg.Reconnects, - fsa.createHandlers(), eventFilters, utils.Logger.GetSyslog(), connIdx) + fSock, err := fsock.NewFSock(connCfg.Address, connCfg.Password, connCfg.Reconnects, connCfg.MaxReconnectInterval, + utils.FibDuration, fsa.createHandlers(), eventFilters, utils.Logger.GetSyslog(), connIdx, true) if err != nil { return err } @@ -313,8 +313,8 @@ func (fsa *FSsessions) Connect() error { errChan <- err } }(fSock) - fsSenderPool, err := fsock.NewFSockPool(5, connCfg.Address, connCfg.Password, 1, fsa.cfg.MaxWaitConnection, - make(map[string][]func(string, int)), make(map[string][]string), utils.Logger.GetSyslog(), connIdx) + fsSenderPool := fsock.NewFSockPool(5, connCfg.Address, connCfg.Password, 1, fsa.cfg.MaxWaitConnection, + 0, utils.FibDuration, make(map[string][]func(string, int)), make(map[string][]string), utils.Logger, connIdx, true) if err != nil { return fmt.Errorf("Cannot connect FreeSWITCH senders pool, error: %s", err.Error()) } diff --git a/agents/kamagent.go b/agents/kamagent.go index 2a726e518..d3ddb3079 100644 --- a/agents/kamagent.go +++ b/agents/kamagent.go @@ -80,7 +80,8 @@ func (self *KamailioAgent) Connect() (err error) { errChan := make(chan error) for connIdx, connCfg := range self.cfg.EvapiConns { logger := log.New(utils.Logger, "kamevapi:", 2) - if self.conns[connIdx], err = kamevapi.NewKamEvapi(connCfg.Address, connIdx, connCfg.Reconnects, eventHandlers, logger); err != nil { + if self.conns[connIdx], err = kamevapi.NewKamEvapi(connCfg.Address, connIdx, connCfg.Reconnects, connCfg.MaxReconnectInterval, + utils.FibDuration, eventHandlers, logger); err != nil { return } utils.Logger.Info(fmt.Sprintf("<%s> successfully connected to Kamailio at: <%s>", utils.KamailioAgent, connCfg.Address)) diff --git a/cmd/cgr-console/cgr-console.go b/cmd/cgr-console/cgr-console.go index 9159cd42c..86837990b 100644 --- a/cmd/cgr-console/cgr-console.go +++ b/cmd/cgr-console/cgr-console.go @@ -132,7 +132,7 @@ func main() { var err error client, err = rpcclient.NewRPCClient(context.Background(), utils.TCP, *server, *tls, *keyPath, *certificatePath, *caPath, 3, 3, - time.Second, time.Duration(*replyTimeOut)*time.Second, *rpcEncoding, nil, false, nil) + 0, utils.FibDuration, time.Second, time.Duration(*replyTimeOut)*time.Second, *rpcEncoding, nil, false, nil) if err != nil { cgrConsoleFlags.PrintDefaults() log.Fatal("Could not connect to server " + *server) diff --git a/ees/poster_it_test.go b/ees/poster_it_test.go index 9d31e2958..88a2b1416 100644 --- a/ees/poster_it_test.go +++ b/ees/poster_it_test.go @@ -336,7 +336,9 @@ func TestAMQPv1Poster(t *testing.T) { } // Accept message - msg.Accept(ctx) + if err = receiver.AcceptMessage(ctx, msg); err != nil { + t.Fatalf("Failure accepting message: %v", err) + } if rply := string(msg.GetData()); rply != body { t.Errorf("Expected: %q, received: %q", body, rply) } diff --git a/engine/connmanager.go b/engine/connmanager.go index c5dbf3c0c..b68e4ccfa 100644 --- a/engine/connmanager.go +++ b/engine/connmanager.go @@ -113,8 +113,9 @@ func (cM *ConnManager) getConnWithConfig(ctx *context.Context, connID string, co if conn, err = rpcclient.NewRPCParallelClientPool(ctx, utils.TCP, rpcConnCfg.Address, rpcConnCfg.TLS, utils.FirstNonEmpty(rpcConnCfg.ClientKey, cM.cfg.TLSCfg().ClientKey), utils.FirstNonEmpty(rpcConnCfg.ClientCertificate, cM.cfg.TLSCfg().ClientCerificate), utils.FirstNonEmpty(rpcConnCfg.CaCertificate, cM.cfg.TLSCfg().CaCertificate), utils.FirstIntNonEmpty(rpcConnCfg.ConnectAttempts, cM.cfg.GeneralCfg().ConnectAttempts), - utils.FirstIntNonEmpty(rpcConnCfg.Reconnects, cM.cfg.GeneralCfg().Reconnects), utils.FirstDurationNonEmpty(rpcConnCfg.ConnectTimeout, cM.cfg.GeneralCfg().ConnectTimeout), - utils.FirstDurationNonEmpty(rpcConnCfg.ReplyTimeout, cM.cfg.GeneralCfg().ReplyTimeout), codec, intChan, int64(cM.cfg.GeneralCfg().MaxParallelConns), false, ctx.Client); err != nil { + utils.FirstIntNonEmpty(rpcConnCfg.Reconnects, cM.cfg.GeneralCfg().Reconnects), utils.FirstDurationNonEmpty(rpcConnCfg.MaxReconnectInterval, cM.cfg.GeneralCfg().MaxReconnectInterval), + utils.FibDuration, utils.FirstDurationNonEmpty(rpcConnCfg.ConnectTimeout, cM.cfg.GeneralCfg().ConnectTimeout), utils.FirstDurationNonEmpty(rpcConnCfg.ReplyTimeout, cM.cfg.GeneralCfg().ReplyTimeout), + codec, intChan, int64(cM.cfg.GeneralCfg().MaxParallelConns), false, ctx.Client); err != nil { return } } else { @@ -122,8 +123,9 @@ func (cM *ConnManager) getConnWithConfig(ctx *context.Context, connID string, co cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, cM.cfg.TLSCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, - cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, - connCfg.Conns, intChan, false, ctx.Client, connID, cM.connCache); err != nil { + cM.cfg.GeneralCfg().MaxReconnectInterval, cM.cfg.GeneralCfg().ConnectTimeout, + cM.cfg.GeneralCfg().ReplyTimeout, connCfg.Conns, intChan, false, ctx.Client, + connID, cM.connCache); err != nil { return } } diff --git a/engine/connmanager_test.go b/engine/connmanager_test.go index 4e4b677eb..ae828b21a 100644 --- a/engine/connmanager_test.go +++ b/engine/connmanager_test.go @@ -85,7 +85,7 @@ func TestCMgetConnUnsupportedBiRPC(t *testing.T) { experr := rpcclient.ErrUnsupportedBiRPC exp, err := NewRPCPool(context.Background(), "*first", "", "", "", cfg.GeneralCfg().ConnectAttempts, - cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, + cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().MaxReconnectInterval, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, nil, cc, true, nil, "", cM.connCache) if err != nil { t.Fatal(err) @@ -131,9 +131,9 @@ func TestCMgetConnNotInternalRPC(t *testing.T) { exp, err := NewRPCPool(context.Background(), "*first", cfg.TLSCfg().ClientKey, cfg.TLSCfg().ClientCerificate, cfg.TLSCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, - cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, - cfg.GeneralCfg().ReplyTimeout, cfg.RPCConns()[connID].Conns, cc, - true, nil, connID, cM.connCache) + cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().MaxReconnectInterval, + cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, + cfg.RPCConns()[connID].Conns, cc, true, nil, connID, cM.connCache) if err != nil { t.Fatal(err) } diff --git a/engine/dispatcherprfl.go b/engine/dispatcherprfl.go index 8227e2342..0d127f701 100644 --- a/engine/dispatcherprfl.go +++ b/engine/dispatcherprfl.go @@ -153,7 +153,8 @@ func (dH *DispatcherHost) GetConn(ctx *context.Context, cfg *config.CGRConfig, i cfg.TLSCfg().ClientKey, cfg.TLSCfg().ClientCerificate, cfg.TLSCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, + cfg.GeneralCfg().MaxReconnectInterval, cfg.GeneralCfg().ConnectTimeout, + cfg.GeneralCfg().ReplyTimeout, iPRCCh, false, nil, utils.EmptyString, utils.EmptyString, nil); err != nil { return diff --git a/engine/libengine.go b/engine/libengine.go index 6ea5c392c..c675a6907 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -36,7 +36,7 @@ import ( // NewRPCPool returns a new pool of connection with the given configuration func NewRPCPool(ctx *context.Context, dispatchStrategy string, keyPath, certPath, caPath string, connAttempts, reconnects int, - connectTimeout, replyTimeout time.Duration, rpcConnCfgs []*config.RemoteHost, + maxReconnectInterval, connectTimeout, replyTimeout time.Duration, rpcConnCfgs []*config.RemoteHost, internalConnChan chan birpc.ClientConnector, lazyConnect bool, biRPCClient interface{}, poolID string, connCache *ltcache.Cache) (rpcPool *rpcclient.RPCPool, err error) { var rpcClient birpc.ClientConnector @@ -50,7 +50,7 @@ func NewRPCPool(ctx *context.Context, dispatchStrategy string, keyPath, certPath continue } if rpcClient, err = NewRPCConnection(ctx, rpcConnCfg, keyPath, certPath, caPath, connAttempts, reconnects, - connectTimeout, replyTimeout, internalConnChan, lazyConnect, biRPCClient, + maxReconnectInterval, connectTimeout, replyTimeout, internalConnChan, lazyConnect, biRPCClient, poolID, rpcConnCfg.ID, connCache); err == rpcclient.ErrUnsupportedCodec { return nil, fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport) } @@ -68,7 +68,7 @@ func NewRPCPool(ctx *context.Context, dispatchStrategy string, keyPath, certPath // NewRPCConnection creates a new connection based on the RemoteHost structure // connCache is used to cache the connection with ID func NewRPCConnection(ctx *context.Context, cfg *config.RemoteHost, keyPath, certPath, caPath string, connAttempts, reconnects int, - connectTimeout, replyTimeout time.Duration, internalConnChan chan birpc.ClientConnector, lazyConnect bool, + maxReconnectInterval, connectTimeout, replyTimeout time.Duration, internalConnChan chan birpc.ClientConnector, lazyConnect bool, biRPCClient interface{}, poolID, connID string, connCache *ltcache.Cache) (client birpc.ClientConnector, err error) { var id string if connID != utils.EmptyString { @@ -79,12 +79,28 @@ func NewRPCConnection(ctx *context.Context, cfg *config.RemoteHost, keyPath, cer } if cfg.Address == rpcclient.InternalRPC || cfg.Address == rpcclient.BiRPCInternal { - client, err = rpcclient.NewRPCClient(ctx, "", "", cfg.TLS, utils.FirstNonEmpty(cfg.ClientKey, keyPath), utils.FirstNonEmpty(cfg.ClientCertificate, certPath), utils.FirstNonEmpty(cfg.CaCertificate, caPath), utils.FirstIntNonEmpty(cfg.ConnectAttempts, connAttempts), - utils.FirstIntNonEmpty(cfg.Reconnects, reconnects), utils.FirstDurationNonEmpty(cfg.ConnectTimeout, connectTimeout), utils.FirstDurationNonEmpty(cfg.ReplyTimeout, replyTimeout), cfg.Address, internalConnChan, lazyConnect, biRPCClient) - } else { - client, err = rpcclient.NewRPCClient(ctx, utils.TCP, cfg.Address, cfg.TLS, utils.FirstNonEmpty(cfg.ClientKey, keyPath), utils.FirstNonEmpty(cfg.ClientCertificate, certPath), utils.FirstNonEmpty(cfg.CaCertificate, caPath), + client, err = rpcclient.NewRPCClient(ctx, "", "", cfg.TLS, + utils.FirstNonEmpty(cfg.ClientKey, keyPath), + utils.FirstNonEmpty(cfg.ClientCertificate, certPath), + utils.FirstNonEmpty(cfg.CaCertificate, caPath), utils.FirstIntNonEmpty(cfg.ConnectAttempts, connAttempts), - utils.FirstIntNonEmpty(cfg.Reconnects, reconnects), utils.FirstDurationNonEmpty(cfg.ConnectTimeout, connectTimeout), utils.FirstDurationNonEmpty(cfg.ReplyTimeout, replyTimeout), + utils.FirstIntNonEmpty(cfg.Reconnects, reconnects), + utils.FirstDurationNonEmpty(cfg.MaxReconnectInterval, maxReconnectInterval), + utils.FibDuration, + utils.FirstDurationNonEmpty(cfg.ConnectTimeout, connectTimeout), + utils.FirstDurationNonEmpty(cfg.ReplyTimeout, replyTimeout), + cfg.Address, internalConnChan, lazyConnect, biRPCClient) + } else { + client, err = rpcclient.NewRPCClient(ctx, utils.TCP, cfg.Address, cfg.TLS, + utils.FirstNonEmpty(cfg.ClientKey, keyPath), + utils.FirstNonEmpty(cfg.ClientCertificate, certPath), + utils.FirstNonEmpty(cfg.CaCertificate, caPath), + utils.FirstIntNonEmpty(cfg.ConnectAttempts, connAttempts), + utils.FirstIntNonEmpty(cfg.Reconnects, reconnects), + utils.FirstDurationNonEmpty(cfg.MaxReconnectInterval, maxReconnectInterval), + utils.FibDuration, + utils.FirstDurationNonEmpty(cfg.ConnectTimeout, connectTimeout), + utils.FirstDurationNonEmpty(cfg.ReplyTimeout, replyTimeout), utils.FirstNonEmpty(cfg.Transport, rpcclient.GOBrpc), nil, lazyConnect, biRPCClient) } if connID != utils.EmptyString && diff --git a/engine/libengine_test.go b/engine/libengine_test.go index 3c06a6f31..d5932d447 100644 --- a/engine/libengine_test.go +++ b/engine/libengine_test.go @@ -52,16 +52,17 @@ func TestLibengineNewRPCConnection(t *testing.T) { } expectedErr := "dial tcp [::1]:6012: connect: connection refused" cM := NewConnManager(config.NewDefaultCGRConfig()) - exp, err := rpcclient.NewRPCClient(context.Background(), utils.TCP, cfg.Address, cfg.TLS, cfg.ClientKey, cM.cfg.TLSCfg().ClientCerificate, - cM.cfg.TLSCfg().CaCertificate, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - cfg.Transport, nil, false, nil) + exp, err := rpcclient.NewRPCClient(context.Background(), utils.TCP, cfg.Address, cfg.TLS, cfg.ClientKey, + cM.cfg.TLSCfg().ClientCerificate, cM.cfg.TLSCfg().CaCertificate, cfg.ConnectAttempts, cfg.Reconnects, + cfg.MaxReconnectInterval, utils.FibDuration, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.Transport, nil, false, nil) if err.Error() != expectedErr { t.Errorf("Expected %v \n but received \n %v", expectedErr, err) } - conn, err := NewRPCConnection(context.Background(), cfg, cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, cM.cfg.TLSCfg().CaCertificate, - cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, + conn, err := NewRPCConnection(context.Background(), cfg, cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, + cM.cfg.TLSCfg().CaCertificate, cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, + cM.cfg.GeneralCfg().MaxReconnectInterval, cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, nil, false, nil, "*localhost", "a4f3f", new(ltcache.Cache)) if err.Error() != expectedErr { t.Errorf("Expected %v \n but received \n %v", expectedErr, err) @@ -90,8 +91,8 @@ func TestLibengineNewRPCConnectionInternal(t *testing.T) { } cM := NewConnManager(config.NewDefaultCGRConfig()) exp, err := rpcclient.NewRPCClient(context.Background(), "", "", cfg.TLS, cfg.ClientKey, cM.cfg.TLSCfg().ClientCerificate, - cM.cfg.TLSCfg().ClientCerificate, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - rpcclient.InternalRPC, cM.rpcInternal["a4f3f"], false, nil) + cM.cfg.TLSCfg().ClientCerificate, cfg.ConnectAttempts, cfg.Reconnects, cfg.MaxReconnectInterval, utils.FibDuration, + cfg.ConnectTimeout, cfg.ReplyTimeout, rpcclient.InternalRPC, cM.rpcInternal["a4f3f"], false, nil) // We only want to check if the client loaded with the correct config, // therefore connection is not mandatory @@ -100,7 +101,7 @@ func TestLibengineNewRPCConnectionInternal(t *testing.T) { } conn, err := NewRPCConnection(context.Background(), cfg, cM.cfg.TLSCfg().ClientKey, cM.cfg.TLSCfg().ClientCerificate, cM.cfg.TLSCfg().CaCertificate, - cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, + cM.cfg.GeneralCfg().ConnectAttempts, cM.cfg.GeneralCfg().Reconnects, cM.cfg.GeneralCfg().MaxReconnectInterval, cM.cfg.GeneralCfg().ConnectTimeout, cM.cfg.GeneralCfg().ReplyTimeout, cM.rpcInternal["a4f3f"], false, nil, "*internal", "a4f3f", new(ltcache.Cache)) if err != rpcclient.ErrInternallyDisconnected { diff --git a/ers/amqpv1.go b/ers/amqpv1.go index 664422655..e50809bca 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -130,7 +130,7 @@ func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) { rdr.rdrErr <- err return } - if err = msg.Accept(ctx); err != nil { + if err = recv.AcceptMessage(ctx, msg); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> unable to accept message error: %s", utils.ERs, err.Error()))