From d9359c9e8fd424a660a577092572bc116cfd6a29 Mon Sep 17 00:00:00 2001 From: TeoV Date: Tue, 7 Jan 2020 08:36:33 -0500 Subject: [PATCH] Add SessionReplication through conn manager --- config/config_json_test.go | 2 +- config/config_test.go | 2 +- config/libconfig_json.go | 2 +- config/smconfig.go | 15 ++- config/smconfig_test.go | 2 +- .../smgreplcmaster/cgrates.json | 13 ++- .../smgreplcslave/cgrates.json | 10 +- data/conf/samples/smgreplcmaster/cgrates.json | 13 ++- .../samples/smgreplcmaster_gob/cgrates.json | 13 ++- data/conf/samples/smgreplcslave/cgrates.json | 8 +- .../samples/smgreplcslave_gob/cgrates.json | 8 +- go.mod | 1 + services/sessions.go | 23 +---- sessions/sessions.go | 94 +++++-------------- sessions/sessions_rpl_it_test.go | 15 +-- sessions/sessions_test.go | 26 ++--- 16 files changed, 99 insertions(+), 148 deletions(-) diff --git a/config/config_json_test.go b/config/config_json_test.go index c83351ef7..d31ef455f 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -565,7 +565,7 @@ func TestSmgJsonCfg(t *testing.T) { Stats_conns: &[]string{}, Suppliers_conns: &[]string{}, Attributes_conns: &[]string{}, - Replication_conns: &[]*RemoteHostJson{}, + Replication_conns: &[]string{}, Debit_interval: utils.StringPointer("0s"), Store_session_costs: utils.BoolPointer(false), Min_call_duration: utils.StringPointer("0s"), diff --git a/config/config_test.go b/config/config_test.go index dea938365..973f7349d 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -636,7 +636,7 @@ func TestCgrCfgJSONDefaultsSMGenericCfg(t *testing.T) { StatSConns: []string{}, SupplSConns: []string{}, AttrSConns: []string{}, - ReplicationConns: []*RemoteHost{}, + ReplicationConns: []string{}, DebitInterval: 0 * time.Second, StoreSCosts: false, MinCallDuration: 0 * time.Second, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 36046e936..134295ab9 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -220,7 +220,7 @@ type SessionSJsonCfg struct { Stats_conns *[]string Suppliers_conns *[]string Cdrs_conns *[]string - Replication_conns *[]*RemoteHostJson + Replication_conns *[]string Attributes_conns *[]string Debit_interval *string Store_session_costs *bool diff --git a/config/smconfig.go b/config/smconfig.go index 0be4c51e7..07fe388d0 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -19,6 +19,7 @@ along with this program. If not, see package config import ( + "fmt" "time" "github.com/cgrates/cgrates/utils" @@ -73,7 +74,7 @@ type SessionSCfg struct { SupplSConns []string AttrSConns []string CDRsConns []string - ReplicationConns []*RemoteHost + ReplicationConns []string DebitInterval time.Duration StoreSCosts bool MinCallDuration time.Duration @@ -187,10 +188,14 @@ func (scfg *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) (err error) { } } if jsnCfg.Replication_conns != nil { - scfg.ReplicationConns = make([]*RemoteHost, len(*jsnCfg.Replication_conns)) - for idx, jsnHaCfg := range *jsnCfg.Replication_conns { - scfg.ReplicationConns[idx] = NewDfltRemoteHost() - scfg.ReplicationConns[idx].loadFromJsonCfg(jsnHaCfg) + scfg.ReplicationConns = make([]string, len(*jsnCfg.Replication_conns)) + for idx, connID := range *jsnCfg.Replication_conns { + // if we have the connection internal we change the name so we can have internal rpc for each subsystem + if connID == utils.MetaInternal { + return fmt.Errorf("Replication connection ID needs to be different than *internal") + } else { + scfg.ReplicationConns[idx] = connID + } } } if jsnCfg.Debit_interval != nil { diff --git a/config/smconfig_test.go b/config/smconfig_test.go index e7fd06cb0..4e37faab0 100644 --- a/config/smconfig_test.go +++ b/config/smconfig_test.go @@ -107,7 +107,7 @@ func TestSessionSCfgloadFromJsonCfg(t *testing.T) { SupplSConns: []string{}, AttrSConns: []string{}, CDRsConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs)}, - ReplicationConns: []*RemoteHost{}, + ReplicationConns: []string{}, MaxCallDuration: time.Duration(3 * time.Hour), SessionIndexes: map[string]bool{}, ClientProtocol: 1, diff --git a/data/conf/samples/sessions_replication/smgreplcmaster/cgrates.json b/data/conf/samples/sessions_replication/smgreplcmaster/cgrates.json index 3ee5e3055..ed6b868a4 100644 --- a/data/conf/samples/sessions_replication/smgreplcmaster/cgrates.json +++ b/data/conf/samples/sessions_replication/smgreplcmaster/cgrates.json @@ -12,6 +12,15 @@ "http": "127.0.0.1:2080", }, + +"rpc_conns": { + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:22012", "transport": "*json"}], + }, +}, + + "stor_db": { // database used to store offline tariff plans and CDRs "db_password": "CGRateS.org", // password to use when connecting to stordb }, @@ -41,9 +50,7 @@ "sessions": { "enabled": true, "debit_interval": "5ms", // interval to perform debits on. - "replication_conns": [ - {"address": "127.0.0.1:22012", "transport": "*json"}, - ], + "replication_conns": ["rplConn"], "rals_conns": ["*internal"], "cdrs_conns": ["*internal"], "chargers_conns": ["*internal"], diff --git a/data/conf/samples/sessions_replication/smgreplcslave/cgrates.json b/data/conf/samples/sessions_replication/smgreplcslave/cgrates.json index b0b5a9e68..df9a7728b 100644 --- a/data/conf/samples/sessions_replication/smgreplcslave/cgrates.json +++ b/data/conf/samples/sessions_replication/smgreplcslave/cgrates.json @@ -18,10 +18,14 @@ "strategy": "*first", "conns": [{"address": "127.0.0.1:22012", "transport":"*json"}], }, + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:2012", "transport": "*json"}], + } }, - "stor_db": { // database used to store offline tariff plans and CDRs +"stor_db": { // database used to store offline tariff plans and CDRs "db_password": "CGRateS.org", // password to use when connecting to stordb }, @@ -51,9 +55,7 @@ "enabled": true, // starts SessionManager service: "debit_interval": "5ms", // interval to perform debits on. "listen_bijson": "127.0.0.1:22014", // address where to listen for bidirectional JSON-RPC requests - "replication_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"}, - ], + "replication_conns": ["rplConn"], "rals_conns": ["conn1"], "cdrs_conns": ["*internal"], "chargers_conns": ["*internal"], diff --git a/data/conf/samples/smgreplcmaster/cgrates.json b/data/conf/samples/smgreplcmaster/cgrates.json index 24c05cf94..85255d3c4 100644 --- a/data/conf/samples/smgreplcmaster/cgrates.json +++ b/data/conf/samples/smgreplcmaster/cgrates.json @@ -16,6 +16,15 @@ "db_password": "CGRateS.org", // password to use when connecting to stordb }, + +"rpc_conns": { + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:22012", "transport": "*json"}], + }, +}, + + "rals": { "enabled": true, }, @@ -40,9 +49,7 @@ "sessions": { "enabled": true, - "replication_conns": [ - {"address": "127.0.0.1:22012", "transport": "*json"}, - ], + "replication_conns": ["rplConn"], "rals_conns": ["*internal"], "cdrs_conns": ["*internal"], "chargers_conns": ["*internal"], diff --git a/data/conf/samples/smgreplcmaster_gob/cgrates.json b/data/conf/samples/smgreplcmaster_gob/cgrates.json index fded1176d..3e54425f2 100644 --- a/data/conf/samples/smgreplcmaster_gob/cgrates.json +++ b/data/conf/samples/smgreplcmaster_gob/cgrates.json @@ -12,6 +12,15 @@ "http": "127.0.0.1:2080", }, + +"rpc_conns": { + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:22013", "transport": "*gob"}], + }, +}, + + "stor_db": { // database used to store offline tariff plans and CDRs "db_password": "CGRateS.org", // password to use when connecting to stordb }, @@ -40,9 +49,7 @@ "sessions": { "enabled": true, - "replication_conns": [ - {"address": "127.0.0.1:22013", "transport": "*gob"}, - ], + "replication_conns": ["rplConn"], "rals_conns": ["*internal"], "cdrs_conns": ["*internal"], "chargers_conns": ["*internal"], diff --git a/data/conf/samples/smgreplcslave/cgrates.json b/data/conf/samples/smgreplcslave/cgrates.json index 1ff07df45..4fc8609bd 100644 --- a/data/conf/samples/smgreplcslave/cgrates.json +++ b/data/conf/samples/smgreplcslave/cgrates.json @@ -17,6 +17,10 @@ "strategy": "*first", "conns": [{"address": "127.0.0.1:22012", "transport":"*json"}], }, + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:2012", "transport": "*json"}], + } }, @@ -49,9 +53,7 @@ "sessions": { "enabled": true, // starts SessionManager service: "listen_bijson": "127.0.0.1:22014", // address where to listen for bidirectional JSON-RPC requests - "replication_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"}, - ], + "replication_conns": ["rplConn"], "rals_conns": ["conn1"], "cdrs_conns": ["*internal"], "chargers_conns": ["*internal"], diff --git a/data/conf/samples/smgreplcslave_gob/cgrates.json b/data/conf/samples/smgreplcslave_gob/cgrates.json index fc2d3b79a..88e7221c7 100644 --- a/data/conf/samples/smgreplcslave_gob/cgrates.json +++ b/data/conf/samples/smgreplcslave_gob/cgrates.json @@ -37,6 +37,10 @@ "strategy": "*first", "conns": [{"address": "127.0.0.1:22013", "transport":"*gob"}], }, + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:2013", "transport": "*gob"}], + } }, @@ -48,9 +52,7 @@ "sessions": { "enabled": true, // starts SessionManager service: "listen_bijson": "127.0.0.1:22014", // address where to listen for bidirectional JSON-RPC requests - "replication_conns": [ - {"address": "127.0.0.1:2013", "transport": "*gob"}, - ], + "replication_conns": ["rplConn"], "rals_conns": ["conn1"], "cdrs_conns": ["*internal"], "chargers_conns": ["*internal"], diff --git a/go.mod b/go.mod index c68ed8c57..31d996fb7 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/cgrates/cgrates go 1.13 // replace github.com/cgrates/radigo => /home/dan/go/src/github.com/cgrates/radigo +// replace github.com/cgrates/rpcclient => ../rpcclient require ( cloud.google.com/go v0.41.1-0.20190715155837-570ba224802b // indirect diff --git a/services/sessions.go b/services/sessions.go index d5e82ed40..0e09a3e51 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -73,17 +73,7 @@ func (smg *SessionService) Start() (err error) { smg.Lock() defer smg.Unlock() - sReplConns, err := sessions.NewSReplConns(smg.cfg.SessionSCfg().ReplicationConns, - smg.cfg.GeneralCfg().Reconnects, smg.cfg.GeneralCfg().ConnectTimeout, - smg.cfg.GeneralCfg().ReplyTimeout) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SMGReplicationConnection error: <%s>", - utils.SessionS, err.Error())) - return - } - - smg.sm = sessions.NewSessionS(smg.cfg, - sReplConns, smg.dm.GetDM(), smg.connMgr) + smg.sm = sessions.NewSessionS(smg.cfg, smg.dm.GetDM(), smg.connMgr) //start sync session in a separate gorutine go func(sm *sessions.SessionS) { if err = sm.ListenAndServe(smg.exitChan); err != nil { @@ -129,17 +119,6 @@ func (smg *SessionService) GetIntenternalChan() (conn chan rpcclient.ClientConne // Reload handles the change of config func (smg *SessionService) Reload() (err error) { - sReplConns, err := sessions.NewSReplConns(smg.cfg.SessionSCfg().ReplicationConns, - smg.cfg.GeneralCfg().Reconnects, smg.cfg.GeneralCfg().ConnectTimeout, - smg.cfg.GeneralCfg().ReplyTimeout) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SMGReplicationConnection error: <%s>", - utils.SessionS, err.Error())) - return - } - smg.Lock() - smg.sm.SetReplicationConnections(sReplConns) - smg.Unlock() return } diff --git a/sessions/sessions.go b/sessions/sessions.go index a4ddd5b12..977964cfc 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -41,31 +41,9 @@ var ( ErrForcedDisconnect = errors.New("FORCED_DISCONNECT") ) -// NewSReplConns initiates the connections configured for session replication -func NewSReplConns(conns []*config.RemoteHost, reconnects int, - connTimeout, replyTimeout time.Duration) (sReplConns []*SReplConn, err error) { - sReplConns = make([]*SReplConn, len(conns)) - for i, replConnCfg := range conns { - var replCon *rpcclient.RPCClient - if replCon, err = rpcclient.NewRPCClient(utils.TCP, replConnCfg.Address, - replConnCfg.TLS, "", "", "", 0, reconnects, connTimeout, - replyTimeout, replConnCfg.Transport, nil, true); err != nil { - return nil, err - } - sReplConns[i] = &SReplConn{Connection: replCon, Synchronous: replConnCfg.Synchronous} - } - return -} - -// SReplConn represents one connection to a passive node where we will replicate session data -type SReplConn struct { - Connection rpcclient.ClientConnector - Synchronous bool -} - // NewSessionS constructs a new SessionS instance func NewSessionS(cgrCfg *config.CGRConfig, - sReplConns []*SReplConn, dm *engine.DataManager, + dm *engine.DataManager, connMgr *engine.ConnManager) *SessionS { cgrCfg.SessionSCfg().SessionIndexes[utils.OriginID] = true // Make sure we have indexing for OriginID since it is a requirement on prefix searching @@ -73,7 +51,6 @@ func NewSessionS(cgrCfg *config.CGRConfig, cgrCfg: cgrCfg, dm: dm, connMgr: connMgr, - sReplConns: sReplConns, biJClnts: make(map[rpcclient.ClientConnector]string), biJIDs: make(map[string]*biJClient), aSessions: make(map[string]*Session), @@ -115,8 +92,6 @@ type SessionS struct { pSessionsIdx map[string]map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue][cgrID]utils.StringMap[runID]sID pSessionsRIdx map[string][]*riFieldNameVal // reverse indexes for passive sessions, used on remove - sReplConns []*SReplConn // list of connections where we will replicate our session data - } // ListenAndServe starts the service and binds it to the listen loop @@ -393,7 +368,7 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs utils.SessionS, err.Error(), s.ResourceID)) } } - sS.replicateSessions(s.CGRID, false, sS.sReplConns) + sS.replicateSessions(s.CGRID, false, sS.cgrCfg.SessionSCfg().ReplicationConns) if clntConn := sS.biJClnt(s.ClientConnID); clntConn != nil { go func() { var rply string @@ -512,7 +487,7 @@ func (sS *SessionS) debitLoopSession(s *Session, sRunIdx int, debitStop := s.debitStop // avoid concurrency with endSession s.SRuns[sRunIdx].NextAutoDebit = utils.TimePointer(time.Now().Add(dbtIvl)) s.Unlock() - sS.replicateSessions(s.CGRID, false, sS.sReplConns) + sS.replicateSessions(s.CGRID, false, sS.cgrCfg.SessionSCfg().ReplicationConns) if maxDebit < dbtIvl { // disconnect faster select { case <-debitStop: // call was disconnected already @@ -674,8 +649,8 @@ func (sS *SessionS) disconnectSession(s *Session, rsn string) (err error) { } // replicateSessions will replicate sessions with or without cgrID specified -func (sS *SessionS) replicateSessions(cgrID string, psv bool, rplConns []*SReplConn) (err error) { - if len(rplConns) == 0 { +func (sS *SessionS) replicateSessions(cgrID string, psv bool, connIDs []string) (err error) { + if len(connIDs) == 0 { return } ss := sS.getSessions(cgrID, psv) @@ -685,29 +660,17 @@ func (sS *SessionS) replicateSessions(cgrID string, psv bool, rplConns []*SReplC &Session{CGRID: cgrID, EventStart: make(engine.MapEvent)}} } - var wg sync.WaitGroup - for _, rplConn := range rplConns { - if rplConn.Synchronous { - wg.Add(1) + for _, s := range ss { + sCln := s.Clone() + var rply string + if err := sS.connMgr.Call(connIDs, nil, + utils.SessionSv1SetPassiveSession, + sCln, &rply); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> cannot replicate session with id <%s>, err: %s", + utils.SessionS, sCln.CGRID, err.Error())) } - go func(conn rpcclient.ClientConnector, sync bool, ss []*Session) { - for _, s := range ss { - sCln := s.Clone() - var rply string - if err := conn.Call( - utils.SessionSv1SetPassiveSession, - sCln, &rply); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> cannot replicate session with id <%s>, err: %s", - utils.SessionS, sCln.CGRID, err.Error())) - } - } - if sync { - wg.Done() - } - }(rplConn.Connection, rplConn.Synchronous, ss) } - wg.Wait() // wait for synchronous replication to finish return } @@ -1205,7 +1168,7 @@ func (sS *SessionS) relocateSession(initOriginID, originID, originHost string) ( } s.Unlock() sS.registerSession(s, false) - sS.replicateSessions(initCGRID, false, sS.sReplConns) + sS.replicateSessions(initCGRID, false, sS.cgrCfg.SessionSCfg().ReplicationConns) return } @@ -1378,7 +1341,7 @@ func (sS *SessionS) initSession(tnt string, evStart engine.MapEvent, clntConnID // updateSession will reset terminator, perform debits and replicate sessions func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent, isMsg bool) (maxUsage time.Duration, err error) { if !isMsg { - defer sS.replicateSessions(s.CGRID, false, sS.sReplConns) + defer sS.replicateSessions(s.CGRID, false, sS.cgrCfg.SessionSCfg().ReplicationConns) s.Lock() defer s.Unlock() @@ -1442,7 +1405,7 @@ func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration, aTime *time.Time, isMsg bool) (err error) { if !isMsg { //check if we have replicate connection and close the session there - defer sS.replicateSessions(s.CGRID, true, sS.sReplConns) + defer sS.replicateSessions(s.CGRID, true, sS.cgrCfg.SessionSCfg().ReplicationConns) sS.unregisterSession(s.CGRID, false) s.stopSTerminator() s.stopDebitLoops() @@ -1659,25 +1622,16 @@ func (sS *SessionS) BiRPCv1SetPassiveSession(clnt rpcclient.ClientConnector, // ArgsReplicateSessions used to specify wich Session to replicate over the given connections type ArgsReplicateSessions struct { - CGRID string - Passive bool - Connections []*config.RemoteHost + CGRID string + Passive bool + ConnIDs []string } // BiRPCv1ReplicateSessions will replicate active sessions to either args.Connections or the internal configured ones // args.Filter is used to filter the sessions which are replicated, CGRID is the only one possible for now func (sS *SessionS) BiRPCv1ReplicateSessions(clnt rpcclient.ClientConnector, args ArgsReplicateSessions, reply *string) (err error) { - sSConns := sS.sReplConns - if len(args.Connections) != 0 { - if sSConns, err = NewSReplConns(args.Connections, - sS.cgrCfg.GeneralCfg().Reconnects, - sS.cgrCfg.GeneralCfg().ConnectTimeout, - sS.cgrCfg.GeneralCfg().ReplyTimeout); err != nil { - return utils.NewErrServerError(err) - } - } - if err = sS.replicateSessions(args.CGRID, args.Passive, sSConns); err != nil { + if err = sS.replicateSessions(args.CGRID, args.Passive, args.ConnIDs); err != nil { return utils.NewErrServerError(err) } *reply = utils.OK @@ -3514,9 +3468,3 @@ func (sS *SessionS) BiRPCV1ProcessCDR(clnt rpcclient.ClientConnector, Event: ev}}, rply) } - -// SetReplicationConnections sets the new connections to the replictes sessions -// only used on reload -func (sS *SessionS) SetReplicationConnections(sReplConns []*SReplConn) { - sS.sReplConns = sReplConns -} diff --git a/sessions/sessions_rpl_it_test.go b/sessions/sessions_rpl_it_test.go index fd637a29f..26409993a 100644 --- a/sessions/sessions_rpl_it_test.go +++ b/sessions/sessions_rpl_it_test.go @@ -396,12 +396,7 @@ func TestSessionSRplManualReplicate(t *testing.T) { t.Error(err, aSessions) } argsRepl := ArgsReplicateSessions{ - Connections: []*config.RemoteHost{ - { - Address: smgRplcSlaveCfg.ListenCfg().RPCJSONListen, - Transport: utils.MetaJSON, - Synchronous: true}, - }, + ConnIDs: []string{"rplConn"}, } //replicate manually the session from master to slave var repply string @@ -444,12 +439,8 @@ func TestSessionSRplManualReplicate(t *testing.T) { // recover passive sessions from slave argsRepl = ArgsReplicateSessions{ Passive: true, - Connections: []*config.RemoteHost{ - { - Address: smgRplcMasterCfg.ListenCfg().RPCJSONListen, - Transport: utils.MetaJSON, - Synchronous: true}, - }} + ConnIDs: []string{"rplConn"}, + } if err := smgRplcSlvRPC.Call(utils.SessionSv1ReplicateSessions, argsRepl, &repply); err != nil { t.Error(err) } diff --git a/sessions/sessions_test.go b/sessions/sessions_test.go index 25e38bc61..81d895ceb 100644 --- a/sessions/sessions_test.go +++ b/sessions/sessions_test.go @@ -60,7 +60,7 @@ func TestSessionSIndexAndUnindexSessions(t *testing.T) { "Extra3": true, "Extra4": true, } - sS := NewSessionS(sSCfg, nil, nil, nil) + sS := NewSessionS(sSCfg, nil, nil) sEv := engine.NewMapEvent(map[string]interface{}{ utils.EVENT_NAME: "TEST_EVENT", utils.ToR: "*voice", @@ -382,7 +382,7 @@ func TestSessionSIndexAndUnindexSessions(t *testing.T) { func TestSessionSRegisterAndUnregisterASessions(t *testing.T) { sSCfg, _ := config.NewDefaultCGRConfig() - sS := NewSessionS(sSCfg, nil, nil, nil) + sS := NewSessionS(sSCfg, nil, nil) sSEv := engine.NewMapEvent(map[string]interface{}{ utils.EVENT_NAME: "TEST_EVENT", utils.ToR: "*voice", @@ -603,7 +603,7 @@ func TestSessionSRegisterAndUnregisterASessions(t *testing.T) { func TestSessionSRegisterAndUnregisterPSessions(t *testing.T) { sSCfg, _ := config.NewDefaultCGRConfig() - sS := NewSessionS(sSCfg, nil, nil, nil) + sS := NewSessionS(sSCfg, nil, nil) sSEv := engine.NewMapEvent(map[string]interface{}{ utils.EVENT_NAME: "TEST_EVENT", utils.ToR: "*voice", @@ -1378,7 +1378,7 @@ func TestV1ProcessEventReplyAsNavigableMap(t *testing.T) { func TestSessionStransitSState(t *testing.T) { sSCfg, _ := config.NewDefaultCGRConfig() - sS := NewSessionS(sSCfg, nil, nil, nil) + sS := NewSessionS(sSCfg, nil, nil) sSEv := engine.NewMapEvent(map[string]interface{}{ utils.EVENT_NAME: "TEST_EVENT", utils.ToR: "*voice", @@ -1425,7 +1425,7 @@ func TestSessionStransitSState(t *testing.T) { func TestSessionSrelocateSessionS(t *testing.T) { sSCfg, _ := config.NewDefaultCGRConfig() - sS := NewSessionS(sSCfg, nil, nil, nil) + sS := NewSessionS(sSCfg, nil, nil) sSEv := engine.NewMapEvent(map[string]interface{}{ utils.EVENT_NAME: "TEST_EVENT", utils.ToR: "*voice", @@ -1567,7 +1567,7 @@ func TestSessionSNewV1AuthorizeArgsWithArgDispatcher2(t *testing.T) { func TestSessionSGetIndexedFilters(t *testing.T) { sSCfg, _ := config.NewDefaultCGRConfig() mpStr := engine.NewInternalDB(nil, nil) - sS := NewSessionS(sSCfg, nil, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg(), nil), nil) + sS := NewSessionS(sSCfg, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg(), nil), nil) expIndx := map[string][]string{} expUindx := []*engine.FilterRule{ &engine.FilterRule{ @@ -1585,7 +1585,7 @@ func TestSessionSGetIndexedFilters(t *testing.T) { sSCfg.SessionSCfg().SessionIndexes = utils.StringMap{ "ToR": true, } - sS = NewSessionS(sSCfg, nil, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg(), nil), nil) + sS = NewSessionS(sSCfg, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg(), nil), nil) expIndx = map[string][]string{(utils.DynamicDataPrefix + utils.ToR): []string{utils.VOICE}} expUindx = nil if rplyindx, rplyUnindx := sS.getIndexedFilters("", fltrs); !reflect.DeepEqual(expIndx, rplyindx) { @@ -1602,7 +1602,7 @@ func TestSessionSGetIndexedFilters(t *testing.T) { ExpiryTime: time.Now().Add(-time.Hour), }, }) - sS = NewSessionS(sSCfg, nil, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg(), nil), nil) + sS = NewSessionS(sSCfg, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg(), nil), nil) expIndx = map[string][]string{} expUindx = nil fltrs = []string{"FLTR1", "FLTR2"} @@ -1619,7 +1619,7 @@ func TestSessionSgetSessionIDsMatchingIndexes(t *testing.T) { sSCfg.SessionSCfg().SessionIndexes = utils.StringMap{ "ToR": true, } - sS := NewSessionS(sSCfg, nil, nil, nil) + sS := NewSessionS(sSCfg, nil, nil) sEv := engine.NewMapEvent(map[string]interface{}{ utils.EVENT_NAME: "TEST_EVENT", utils.ToR: "*voice", @@ -1672,7 +1672,7 @@ func TestSessionSgetSessionIDsMatchingIndexes(t *testing.T) { "ToR": true, "Extra3": true, } - sS = NewSessionS(sSCfg, nil, nil, nil) + sS = NewSessionS(sSCfg, nil, nil) sS.indexSession(session, false) indx = map[string][]string{ "~ToR": []string{utils.VOICE, utils.DATA}, @@ -1706,7 +1706,7 @@ func TestSessionSgetSessionIDsMatchingIndexes(t *testing.T) { "ToR": true, "Extra2": true, } - sS = NewSessionS(sSCfg, nil, nil, nil) + sS = NewSessionS(sSCfg, nil, nil) sS.indexSession(session, true) indx = map[string][]string{ "~ToR": []string{utils.VOICE, utils.DATA}, @@ -1744,7 +1744,7 @@ func TestNewSessionS(t *testing.T) { pSessionsIdx: make(map[string]map[string]map[string]utils.StringMap), pSessionsRIdx: make(map[string][]*riFieldNameVal), } - sS := NewSessionS(cgrCGF, nil, nil, nil) + sS := NewSessionS(cgrCGF, nil, nil) if !reflect.DeepEqual(sS, eOut) { t.Errorf("Expected %s , received: %s", utils.ToJSON(sS), utils.ToJSON(eOut)) } @@ -1905,7 +1905,7 @@ func TestV1ProcessMessageArgsParseFlags(t *testing.T) { func TestSessionSgetSession(t *testing.T) { sSCfg, _ := config.NewDefaultCGRConfig() - sS := NewSessionS(sSCfg, nil, nil, nil) + sS := NewSessionS(sSCfg, nil, nil) sSEv := engine.NewMapEvent(map[string]interface{}{ utils.EVENT_NAME: "TEST_EVENT", utils.ToR: "*voice",