mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-20 14:48:43 +05:00
SMGeneric SessionReplication with connection establishment
This commit is contained in:
@@ -150,7 +150,18 @@ func startSmGeneric(internalSMGChan chan *sessionmanager.SMGeneric, internalRate
|
||||
return
|
||||
}
|
||||
}
|
||||
sm := sessionmanager.NewSMGeneric(cfg, ralsConns, cdrsConn, cfg.DefaultTimezone)
|
||||
smgReplConns := make([]*sessionmanager.SMGReplicationConn, len(cfg.SmGenericConfig.SMGReplicationConns))
|
||||
for i, replConnCfg := range cfg.SmGenericConfig.SMGReplicationConns {
|
||||
if replCon, err := rpcclient.NewRpcClient("tcp", replConnCfg.Address, cfg.ConnectAttempts, cfg.Reconnects,
|
||||
cfg.ConnectTimeout, cfg.ReplyTimeout, replConnCfg.Transport[1:], nil); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<SMGeneric> Could not connect to SMGReplicationConn: <%s>, error: <%s>", replConnCfg.Address, err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
} else {
|
||||
smgReplConns[i] = &sessionmanager.SMGReplicationConn{Connection: replCon, Synchronous: replConnCfg.Synchronous}
|
||||
}
|
||||
}
|
||||
sm := sessionmanager.NewSMGeneric(cfg, ralsConns, cdrsConn, smgReplConns, cfg.DefaultTimezone)
|
||||
if err = sm.Connect(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SMGeneric> error: %s!", err))
|
||||
}
|
||||
|
||||
@@ -38,15 +38,17 @@ const (
|
||||
var ErrPartiallyExecuted = errors.New("Partially executed")
|
||||
|
||||
// ReplicationConnection represents one connection to a passive node where we will replicate session data
|
||||
type SMGReplicationConnection struct {
|
||||
conn rpcclient.RpcClientConnection
|
||||
sync bool
|
||||
type SMGReplicationConn struct {
|
||||
Connection rpcclient.RpcClientConnection
|
||||
Synchronous bool
|
||||
}
|
||||
|
||||
func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection, timezone string) *SMGeneric {
|
||||
func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection,
|
||||
smgReplConns []*SMGReplicationConn, timezone string) *SMGeneric {
|
||||
return &SMGeneric{cgrCfg: cgrCfg,
|
||||
rater: rater,
|
||||
cdrsrv: cdrsrv,
|
||||
smgReplConns: smgReplConns,
|
||||
timezone: timezone,
|
||||
activeSessions: make(map[string][]*SMGSession),
|
||||
aSessionsIndex: make(map[string]map[string]utils.StringMap),
|
||||
@@ -58,7 +60,7 @@ type SMGeneric struct {
|
||||
cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple
|
||||
rater rpcclient.RpcClientConnection
|
||||
cdrsrv rpcclient.RpcClientConnection
|
||||
smgReplConns []*SMGReplicationConnection // list of connections where we will replicate our session data
|
||||
smgReplConns []*SMGReplicationConn // list of connections where we will replicate our session data
|
||||
timezone string
|
||||
activeSessions map[string][]*SMGSession // group sessions per sessionId, multiple runs based on derived charging
|
||||
aSessionsMux sync.RWMutex
|
||||
@@ -396,7 +398,7 @@ func (smg *SMGeneric) replicateSessionsForEvent(gev SMGenericEvent) (err error)
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
for _, rplConn := range smg.smgReplConns {
|
||||
if rplConn.sync {
|
||||
if rplConn.Synchronous {
|
||||
wg.Add(1)
|
||||
}
|
||||
go func(conn rpcclient.RpcClientConnection, sync bool, ss []*SMGSession) {
|
||||
@@ -409,7 +411,7 @@ func (smg *SMGeneric) replicateSessionsForEvent(gev SMGenericEvent) (err error)
|
||||
if sync {
|
||||
wg.Done()
|
||||
}
|
||||
}(rplConn.conn, rplConn.sync, aSessions)
|
||||
}(rplConn.Connection, rplConn.Synchronous, aSessions)
|
||||
}
|
||||
wg.Wait() // wait for synchronous replication to finish
|
||||
return
|
||||
|
||||
@@ -34,7 +34,7 @@ func init() {
|
||||
}
|
||||
|
||||
func TestSMGSessionIndexing(t *testing.T) {
|
||||
smg := NewSMGeneric(smgCfg, nil, nil, "UTC")
|
||||
smg := NewSMGeneric(smgCfg, nil, nil, nil, "UTC")
|
||||
smGev := SMGenericEvent{
|
||||
utils.EVENT_NAME: "TEST_EVENT",
|
||||
utils.TOR: "*voice",
|
||||
@@ -166,7 +166,7 @@ func TestSMGSessionIndexing(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSMGActiveSessions(t *testing.T) {
|
||||
smg := NewSMGeneric(smgCfg, nil, nil, "UTC")
|
||||
smg := NewSMGeneric(smgCfg, nil, nil, nil, "UTC")
|
||||
smGev1 := SMGenericEvent{
|
||||
utils.EVENT_NAME: "TEST_EVENT",
|
||||
utils.TOR: "*voice",
|
||||
@@ -240,7 +240,7 @@ func TestSMGActiveSessions(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGetSetPassiveSessions(t *testing.T) {
|
||||
smg := NewSMGeneric(smgCfg, nil, nil, "UTC")
|
||||
smg := NewSMGeneric(smgCfg, nil, nil, nil, "UTC")
|
||||
smGev := SMGenericEvent{
|
||||
utils.EVENT_NAME: "TEST_EVENT",
|
||||
utils.TOR: "*voice",
|
||||
|
||||
Reference in New Issue
Block a user