diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index edf9c4bc2..9fac4aa90 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -150,7 +150,18 @@ func startSmGeneric(internalSMGChan chan *sessionmanager.SMGeneric, internalRate return } } - sm := sessionmanager.NewSMGeneric(cfg, ralsConns, cdrsConn, cfg.DefaultTimezone) + smgReplConns := make([]*sessionmanager.SMGReplicationConn, len(cfg.SmGenericConfig.SMGReplicationConns)) + for i, replConnCfg := range cfg.SmGenericConfig.SMGReplicationConns { + if replCon, err := rpcclient.NewRpcClient("tcp", replConnCfg.Address, cfg.ConnectAttempts, cfg.Reconnects, + cfg.ConnectTimeout, cfg.ReplyTimeout, replConnCfg.Transport[1:], nil); err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to SMGReplicationConn: <%s>, error: <%s>", replConnCfg.Address, err.Error())) + exitChan <- true + return + } else { + smgReplConns[i] = &sessionmanager.SMGReplicationConn{Connection: replCon, Synchronous: replConnCfg.Synchronous} + } + } + sm := sessionmanager.NewSMGeneric(cfg, ralsConns, cdrsConn, smgReplConns, cfg.DefaultTimezone) if err = sm.Connect(); err != nil { utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) } diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 193b7c026..4791dbd46 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -38,15 +38,17 @@ const ( var ErrPartiallyExecuted = errors.New("Partially executed") // ReplicationConnection represents one connection to a passive node where we will replicate session data -type SMGReplicationConnection struct { - conn rpcclient.RpcClientConnection - sync bool +type SMGReplicationConn struct { + Connection rpcclient.RpcClientConnection + Synchronous bool } -func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection, timezone string) *SMGeneric { +func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection, + smgReplConns []*SMGReplicationConn, timezone string) *SMGeneric { return &SMGeneric{cgrCfg: cgrCfg, rater: rater, cdrsrv: cdrsrv, + smgReplConns: smgReplConns, timezone: timezone, activeSessions: make(map[string][]*SMGSession), aSessionsIndex: make(map[string]map[string]utils.StringMap), @@ -58,7 +60,7 @@ type SMGeneric struct { cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple rater rpcclient.RpcClientConnection cdrsrv rpcclient.RpcClientConnection - smgReplConns []*SMGReplicationConnection // list of connections where we will replicate our session data + smgReplConns []*SMGReplicationConn // list of connections where we will replicate our session data timezone string activeSessions map[string][]*SMGSession // group sessions per sessionId, multiple runs based on derived charging aSessionsMux sync.RWMutex @@ -396,7 +398,7 @@ func (smg *SMGeneric) replicateSessionsForEvent(gev SMGenericEvent) (err error) } var wg sync.WaitGroup for _, rplConn := range smg.smgReplConns { - if rplConn.sync { + if rplConn.Synchronous { wg.Add(1) } go func(conn rpcclient.RpcClientConnection, sync bool, ss []*SMGSession) { @@ -409,7 +411,7 @@ func (smg *SMGeneric) replicateSessionsForEvent(gev SMGenericEvent) (err error) if sync { wg.Done() } - }(rplConn.conn, rplConn.sync, aSessions) + }(rplConn.Connection, rplConn.Synchronous, aSessions) } wg.Wait() // wait for synchronous replication to finish return diff --git a/sessionmanager/smgeneric_test.go b/sessionmanager/smgeneric_test.go index c59967861..0d1a5fa2b 100644 --- a/sessionmanager/smgeneric_test.go +++ b/sessionmanager/smgeneric_test.go @@ -34,7 +34,7 @@ func init() { } func TestSMGSessionIndexing(t *testing.T) { - smg := NewSMGeneric(smgCfg, nil, nil, "UTC") + smg := NewSMGeneric(smgCfg, nil, nil, nil, "UTC") smGev := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.TOR: "*voice", @@ -166,7 +166,7 @@ func TestSMGSessionIndexing(t *testing.T) { } func TestSMGActiveSessions(t *testing.T) { - smg := NewSMGeneric(smgCfg, nil, nil, "UTC") + smg := NewSMGeneric(smgCfg, nil, nil, nil, "UTC") smGev1 := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.TOR: "*voice", @@ -240,7 +240,7 @@ func TestSMGActiveSessions(t *testing.T) { } func TestGetSetPassiveSessions(t *testing.T) { - smg := NewSMGeneric(smgCfg, nil, nil, "UTC") + smg := NewSMGeneric(smgCfg, nil, nil, nil, "UTC") smGev := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.TOR: "*voice",