mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-15 13:19:53 +05:00
Add SessionReplication through conn manager
This commit is contained in:
@@ -565,7 +565,7 @@ func TestSmgJsonCfg(t *testing.T) {
|
||||
Stats_conns: &[]string{},
|
||||
Suppliers_conns: &[]string{},
|
||||
Attributes_conns: &[]string{},
|
||||
Replication_conns: &[]*RemoteHostJson{},
|
||||
Replication_conns: &[]string{},
|
||||
Debit_interval: utils.StringPointer("0s"),
|
||||
Store_session_costs: utils.BoolPointer(false),
|
||||
Min_call_duration: utils.StringPointer("0s"),
|
||||
|
||||
@@ -636,7 +636,7 @@ func TestCgrCfgJSONDefaultsSMGenericCfg(t *testing.T) {
|
||||
StatSConns: []string{},
|
||||
SupplSConns: []string{},
|
||||
AttrSConns: []string{},
|
||||
ReplicationConns: []*RemoteHost{},
|
||||
ReplicationConns: []string{},
|
||||
DebitInterval: 0 * time.Second,
|
||||
StoreSCosts: false,
|
||||
MinCallDuration: 0 * time.Second,
|
||||
|
||||
@@ -220,7 +220,7 @@ type SessionSJsonCfg struct {
|
||||
Stats_conns *[]string
|
||||
Suppliers_conns *[]string
|
||||
Cdrs_conns *[]string
|
||||
Replication_conns *[]*RemoteHostJson
|
||||
Replication_conns *[]string
|
||||
Attributes_conns *[]string
|
||||
Debit_interval *string
|
||||
Store_session_costs *bool
|
||||
|
||||
@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -73,7 +74,7 @@ type SessionSCfg struct {
|
||||
SupplSConns []string
|
||||
AttrSConns []string
|
||||
CDRsConns []string
|
||||
ReplicationConns []*RemoteHost
|
||||
ReplicationConns []string
|
||||
DebitInterval time.Duration
|
||||
StoreSCosts bool
|
||||
MinCallDuration time.Duration
|
||||
@@ -187,10 +188,14 @@ func (scfg *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) (err error) {
|
||||
}
|
||||
}
|
||||
if jsnCfg.Replication_conns != nil {
|
||||
scfg.ReplicationConns = make([]*RemoteHost, len(*jsnCfg.Replication_conns))
|
||||
for idx, jsnHaCfg := range *jsnCfg.Replication_conns {
|
||||
scfg.ReplicationConns[idx] = NewDfltRemoteHost()
|
||||
scfg.ReplicationConns[idx].loadFromJsonCfg(jsnHaCfg)
|
||||
scfg.ReplicationConns = make([]string, len(*jsnCfg.Replication_conns))
|
||||
for idx, connID := range *jsnCfg.Replication_conns {
|
||||
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
|
||||
if connID == utils.MetaInternal {
|
||||
return fmt.Errorf("Replication connection ID needs to be different than *internal")
|
||||
} else {
|
||||
scfg.ReplicationConns[idx] = connID
|
||||
}
|
||||
}
|
||||
}
|
||||
if jsnCfg.Debit_interval != nil {
|
||||
|
||||
@@ -107,7 +107,7 @@ func TestSessionSCfgloadFromJsonCfg(t *testing.T) {
|
||||
SupplSConns: []string{},
|
||||
AttrSConns: []string{},
|
||||
CDRsConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs)},
|
||||
ReplicationConns: []*RemoteHost{},
|
||||
ReplicationConns: []string{},
|
||||
MaxCallDuration: time.Duration(3 * time.Hour),
|
||||
SessionIndexes: map[string]bool{},
|
||||
ClientProtocol: 1,
|
||||
|
||||
@@ -12,6 +12,15 @@
|
||||
"http": "127.0.0.1:2080",
|
||||
},
|
||||
|
||||
|
||||
"rpc_conns": {
|
||||
"rplConn": {
|
||||
"strategy": "*broadcast_sync",
|
||||
"conns": [{"address": "127.0.0.1:22012", "transport": "*json"}],
|
||||
},
|
||||
},
|
||||
|
||||
|
||||
"stor_db": { // database used to store offline tariff plans and CDRs
|
||||
"db_password": "CGRateS.org", // password to use when connecting to stordb
|
||||
},
|
||||
@@ -41,9 +50,7 @@
|
||||
"sessions": {
|
||||
"enabled": true,
|
||||
"debit_interval": "5ms", // interval to perform debits on.
|
||||
"replication_conns": [
|
||||
{"address": "127.0.0.1:22012", "transport": "*json"},
|
||||
],
|
||||
"replication_conns": ["rplConn"],
|
||||
"rals_conns": ["*internal"],
|
||||
"cdrs_conns": ["*internal"],
|
||||
"chargers_conns": ["*internal"],
|
||||
|
||||
@@ -18,10 +18,14 @@
|
||||
"strategy": "*first",
|
||||
"conns": [{"address": "127.0.0.1:22012", "transport":"*json"}],
|
||||
},
|
||||
"rplConn": {
|
||||
"strategy": "*broadcast_sync",
|
||||
"conns": [{"address": "127.0.0.1:2012", "transport": "*json"}],
|
||||
}
|
||||
},
|
||||
|
||||
|
||||
"stor_db": { // database used to store offline tariff plans and CDRs
|
||||
"stor_db": { // database used to store offline tariff plans and CDRs
|
||||
"db_password": "CGRateS.org", // password to use when connecting to stordb
|
||||
},
|
||||
|
||||
@@ -51,9 +55,7 @@
|
||||
"enabled": true, // starts SessionManager service: <true|false>
|
||||
"debit_interval": "5ms", // interval to perform debits on.
|
||||
"listen_bijson": "127.0.0.1:22014", // address where to listen for bidirectional JSON-RPC requests
|
||||
"replication_conns": [
|
||||
{"address": "127.0.0.1:2012", "transport": "*json"},
|
||||
],
|
||||
"replication_conns": ["rplConn"],
|
||||
"rals_conns": ["conn1"],
|
||||
"cdrs_conns": ["*internal"],
|
||||
"chargers_conns": ["*internal"],
|
||||
|
||||
@@ -16,6 +16,15 @@
|
||||
"db_password": "CGRateS.org", // password to use when connecting to stordb
|
||||
},
|
||||
|
||||
|
||||
"rpc_conns": {
|
||||
"rplConn": {
|
||||
"strategy": "*broadcast_sync",
|
||||
"conns": [{"address": "127.0.0.1:22012", "transport": "*json"}],
|
||||
},
|
||||
},
|
||||
|
||||
|
||||
"rals": {
|
||||
"enabled": true,
|
||||
},
|
||||
@@ -40,9 +49,7 @@
|
||||
|
||||
"sessions": {
|
||||
"enabled": true,
|
||||
"replication_conns": [
|
||||
{"address": "127.0.0.1:22012", "transport": "*json"},
|
||||
],
|
||||
"replication_conns": ["rplConn"],
|
||||
"rals_conns": ["*internal"],
|
||||
"cdrs_conns": ["*internal"],
|
||||
"chargers_conns": ["*internal"],
|
||||
|
||||
@@ -12,6 +12,15 @@
|
||||
"http": "127.0.0.1:2080",
|
||||
},
|
||||
|
||||
|
||||
"rpc_conns": {
|
||||
"rplConn": {
|
||||
"strategy": "*broadcast_sync",
|
||||
"conns": [{"address": "127.0.0.1:22013", "transport": "*gob"}],
|
||||
},
|
||||
},
|
||||
|
||||
|
||||
"stor_db": { // database used to store offline tariff plans and CDRs
|
||||
"db_password": "CGRateS.org", // password to use when connecting to stordb
|
||||
},
|
||||
@@ -40,9 +49,7 @@
|
||||
|
||||
"sessions": {
|
||||
"enabled": true,
|
||||
"replication_conns": [
|
||||
{"address": "127.0.0.1:22013", "transport": "*gob"},
|
||||
],
|
||||
"replication_conns": ["rplConn"],
|
||||
"rals_conns": ["*internal"],
|
||||
"cdrs_conns": ["*internal"],
|
||||
"chargers_conns": ["*internal"],
|
||||
|
||||
@@ -17,6 +17,10 @@
|
||||
"strategy": "*first",
|
||||
"conns": [{"address": "127.0.0.1:22012", "transport":"*json"}],
|
||||
},
|
||||
"rplConn": {
|
||||
"strategy": "*broadcast_sync",
|
||||
"conns": [{"address": "127.0.0.1:2012", "transport": "*json"}],
|
||||
}
|
||||
},
|
||||
|
||||
|
||||
@@ -49,9 +53,7 @@
|
||||
"sessions": {
|
||||
"enabled": true, // starts SessionManager service: <true|false>
|
||||
"listen_bijson": "127.0.0.1:22014", // address where to listen for bidirectional JSON-RPC requests
|
||||
"replication_conns": [
|
||||
{"address": "127.0.0.1:2012", "transport": "*json"},
|
||||
],
|
||||
"replication_conns": ["rplConn"],
|
||||
"rals_conns": ["conn1"],
|
||||
"cdrs_conns": ["*internal"],
|
||||
"chargers_conns": ["*internal"],
|
||||
|
||||
@@ -37,6 +37,10 @@
|
||||
"strategy": "*first",
|
||||
"conns": [{"address": "127.0.0.1:22013", "transport":"*gob"}],
|
||||
},
|
||||
"rplConn": {
|
||||
"strategy": "*broadcast_sync",
|
||||
"conns": [{"address": "127.0.0.1:2013", "transport": "*gob"}],
|
||||
}
|
||||
},
|
||||
|
||||
|
||||
@@ -48,9 +52,7 @@
|
||||
"sessions": {
|
||||
"enabled": true, // starts SessionManager service: <true|false>
|
||||
"listen_bijson": "127.0.0.1:22014", // address where to listen for bidirectional JSON-RPC requests
|
||||
"replication_conns": [
|
||||
{"address": "127.0.0.1:2013", "transport": "*gob"},
|
||||
],
|
||||
"replication_conns": ["rplConn"],
|
||||
"rals_conns": ["conn1"],
|
||||
"cdrs_conns": ["*internal"],
|
||||
"chargers_conns": ["*internal"],
|
||||
|
||||
1
go.mod
1
go.mod
@@ -3,6 +3,7 @@ module github.com/cgrates/cgrates
|
||||
go 1.13
|
||||
|
||||
// replace github.com/cgrates/radigo => /home/dan/go/src/github.com/cgrates/radigo
|
||||
// replace github.com/cgrates/rpcclient => ../rpcclient
|
||||
|
||||
require (
|
||||
cloud.google.com/go v0.41.1-0.20190715155837-570ba224802b // indirect
|
||||
|
||||
@@ -73,17 +73,7 @@ func (smg *SessionService) Start() (err error) {
|
||||
smg.Lock()
|
||||
defer smg.Unlock()
|
||||
|
||||
sReplConns, err := sessions.NewSReplConns(smg.cfg.SessionSCfg().ReplicationConns,
|
||||
smg.cfg.GeneralCfg().Reconnects, smg.cfg.GeneralCfg().ConnectTimeout,
|
||||
smg.cfg.GeneralCfg().ReplyTimeout)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SMGReplicationConnection error: <%s>",
|
||||
utils.SessionS, err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
smg.sm = sessions.NewSessionS(smg.cfg,
|
||||
sReplConns, smg.dm.GetDM(), smg.connMgr)
|
||||
smg.sm = sessions.NewSessionS(smg.cfg, smg.dm.GetDM(), smg.connMgr)
|
||||
//start sync session in a separate gorutine
|
||||
go func(sm *sessions.SessionS) {
|
||||
if err = sm.ListenAndServe(smg.exitChan); err != nil {
|
||||
@@ -129,17 +119,6 @@ func (smg *SessionService) GetIntenternalChan() (conn chan rpcclient.ClientConne
|
||||
|
||||
// Reload handles the change of config
|
||||
func (smg *SessionService) Reload() (err error) {
|
||||
sReplConns, err := sessions.NewSReplConns(smg.cfg.SessionSCfg().ReplicationConns,
|
||||
smg.cfg.GeneralCfg().Reconnects, smg.cfg.GeneralCfg().ConnectTimeout,
|
||||
smg.cfg.GeneralCfg().ReplyTimeout)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SMGReplicationConnection error: <%s>",
|
||||
utils.SessionS, err.Error()))
|
||||
return
|
||||
}
|
||||
smg.Lock()
|
||||
smg.sm.SetReplicationConnections(sReplConns)
|
||||
smg.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -41,31 +41,9 @@ var (
|
||||
ErrForcedDisconnect = errors.New("FORCED_DISCONNECT")
|
||||
)
|
||||
|
||||
// NewSReplConns initiates the connections configured for session replication
|
||||
func NewSReplConns(conns []*config.RemoteHost, reconnects int,
|
||||
connTimeout, replyTimeout time.Duration) (sReplConns []*SReplConn, err error) {
|
||||
sReplConns = make([]*SReplConn, len(conns))
|
||||
for i, replConnCfg := range conns {
|
||||
var replCon *rpcclient.RPCClient
|
||||
if replCon, err = rpcclient.NewRPCClient(utils.TCP, replConnCfg.Address,
|
||||
replConnCfg.TLS, "", "", "", 0, reconnects, connTimeout,
|
||||
replyTimeout, replConnCfg.Transport, nil, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sReplConns[i] = &SReplConn{Connection: replCon, Synchronous: replConnCfg.Synchronous}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// SReplConn represents one connection to a passive node where we will replicate session data
|
||||
type SReplConn struct {
|
||||
Connection rpcclient.ClientConnector
|
||||
Synchronous bool
|
||||
}
|
||||
|
||||
// NewSessionS constructs a new SessionS instance
|
||||
func NewSessionS(cgrCfg *config.CGRConfig,
|
||||
sReplConns []*SReplConn, dm *engine.DataManager,
|
||||
dm *engine.DataManager,
|
||||
connMgr *engine.ConnManager) *SessionS {
|
||||
cgrCfg.SessionSCfg().SessionIndexes[utils.OriginID] = true // Make sure we have indexing for OriginID since it is a requirement on prefix searching
|
||||
|
||||
@@ -73,7 +51,6 @@ func NewSessionS(cgrCfg *config.CGRConfig,
|
||||
cgrCfg: cgrCfg,
|
||||
dm: dm,
|
||||
connMgr: connMgr,
|
||||
sReplConns: sReplConns,
|
||||
biJClnts: make(map[rpcclient.ClientConnector]string),
|
||||
biJIDs: make(map[string]*biJClient),
|
||||
aSessions: make(map[string]*Session),
|
||||
@@ -115,8 +92,6 @@ type SessionS struct {
|
||||
pSessionsIdx map[string]map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue][cgrID]utils.StringMap[runID]sID
|
||||
pSessionsRIdx map[string][]*riFieldNameVal // reverse indexes for passive sessions, used on remove
|
||||
|
||||
sReplConns []*SReplConn // list of connections where we will replicate our session data
|
||||
|
||||
}
|
||||
|
||||
// ListenAndServe starts the service and binds it to the listen loop
|
||||
@@ -393,7 +368,7 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs
|
||||
utils.SessionS, err.Error(), s.ResourceID))
|
||||
}
|
||||
}
|
||||
sS.replicateSessions(s.CGRID, false, sS.sReplConns)
|
||||
sS.replicateSessions(s.CGRID, false, sS.cgrCfg.SessionSCfg().ReplicationConns)
|
||||
if clntConn := sS.biJClnt(s.ClientConnID); clntConn != nil {
|
||||
go func() {
|
||||
var rply string
|
||||
@@ -512,7 +487,7 @@ func (sS *SessionS) debitLoopSession(s *Session, sRunIdx int,
|
||||
debitStop := s.debitStop // avoid concurrency with endSession
|
||||
s.SRuns[sRunIdx].NextAutoDebit = utils.TimePointer(time.Now().Add(dbtIvl))
|
||||
s.Unlock()
|
||||
sS.replicateSessions(s.CGRID, false, sS.sReplConns)
|
||||
sS.replicateSessions(s.CGRID, false, sS.cgrCfg.SessionSCfg().ReplicationConns)
|
||||
if maxDebit < dbtIvl { // disconnect faster
|
||||
select {
|
||||
case <-debitStop: // call was disconnected already
|
||||
@@ -674,8 +649,8 @@ func (sS *SessionS) disconnectSession(s *Session, rsn string) (err error) {
|
||||
}
|
||||
|
||||
// replicateSessions will replicate sessions with or without cgrID specified
|
||||
func (sS *SessionS) replicateSessions(cgrID string, psv bool, rplConns []*SReplConn) (err error) {
|
||||
if len(rplConns) == 0 {
|
||||
func (sS *SessionS) replicateSessions(cgrID string, psv bool, connIDs []string) (err error) {
|
||||
if len(connIDs) == 0 {
|
||||
return
|
||||
}
|
||||
ss := sS.getSessions(cgrID, psv)
|
||||
@@ -685,29 +660,17 @@ func (sS *SessionS) replicateSessions(cgrID string, psv bool, rplConns []*SReplC
|
||||
&Session{CGRID: cgrID,
|
||||
EventStart: make(engine.MapEvent)}}
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
for _, rplConn := range rplConns {
|
||||
if rplConn.Synchronous {
|
||||
wg.Add(1)
|
||||
for _, s := range ss {
|
||||
sCln := s.Clone()
|
||||
var rply string
|
||||
if err := sS.connMgr.Call(connIDs, nil,
|
||||
utils.SessionSv1SetPassiveSession,
|
||||
sCln, &rply); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> cannot replicate session with id <%s>, err: %s",
|
||||
utils.SessionS, sCln.CGRID, err.Error()))
|
||||
}
|
||||
go func(conn rpcclient.ClientConnector, sync bool, ss []*Session) {
|
||||
for _, s := range ss {
|
||||
sCln := s.Clone()
|
||||
var rply string
|
||||
if err := conn.Call(
|
||||
utils.SessionSv1SetPassiveSession,
|
||||
sCln, &rply); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> cannot replicate session with id <%s>, err: %s",
|
||||
utils.SessionS, sCln.CGRID, err.Error()))
|
||||
}
|
||||
}
|
||||
if sync {
|
||||
wg.Done()
|
||||
}
|
||||
}(rplConn.Connection, rplConn.Synchronous, ss)
|
||||
}
|
||||
wg.Wait() // wait for synchronous replication to finish
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1205,7 +1168,7 @@ func (sS *SessionS) relocateSession(initOriginID, originID, originHost string) (
|
||||
}
|
||||
s.Unlock()
|
||||
sS.registerSession(s, false)
|
||||
sS.replicateSessions(initCGRID, false, sS.sReplConns)
|
||||
sS.replicateSessions(initCGRID, false, sS.cgrCfg.SessionSCfg().ReplicationConns)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1378,7 +1341,7 @@ func (sS *SessionS) initSession(tnt string, evStart engine.MapEvent, clntConnID
|
||||
// updateSession will reset terminator, perform debits and replicate sessions
|
||||
func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent, isMsg bool) (maxUsage time.Duration, err error) {
|
||||
if !isMsg {
|
||||
defer sS.replicateSessions(s.CGRID, false, sS.sReplConns)
|
||||
defer sS.replicateSessions(s.CGRID, false, sS.cgrCfg.SessionSCfg().ReplicationConns)
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
@@ -1442,7 +1405,7 @@ func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration,
|
||||
aTime *time.Time, isMsg bool) (err error) {
|
||||
if !isMsg {
|
||||
//check if we have replicate connection and close the session there
|
||||
defer sS.replicateSessions(s.CGRID, true, sS.sReplConns)
|
||||
defer sS.replicateSessions(s.CGRID, true, sS.cgrCfg.SessionSCfg().ReplicationConns)
|
||||
sS.unregisterSession(s.CGRID, false)
|
||||
s.stopSTerminator()
|
||||
s.stopDebitLoops()
|
||||
@@ -1659,25 +1622,16 @@ func (sS *SessionS) BiRPCv1SetPassiveSession(clnt rpcclient.ClientConnector,
|
||||
|
||||
// ArgsReplicateSessions used to specify wich Session to replicate over the given connections
|
||||
type ArgsReplicateSessions struct {
|
||||
CGRID string
|
||||
Passive bool
|
||||
Connections []*config.RemoteHost
|
||||
CGRID string
|
||||
Passive bool
|
||||
ConnIDs []string
|
||||
}
|
||||
|
||||
// BiRPCv1ReplicateSessions will replicate active sessions to either args.Connections or the internal configured ones
|
||||
// args.Filter is used to filter the sessions which are replicated, CGRID is the only one possible for now
|
||||
func (sS *SessionS) BiRPCv1ReplicateSessions(clnt rpcclient.ClientConnector,
|
||||
args ArgsReplicateSessions, reply *string) (err error) {
|
||||
sSConns := sS.sReplConns
|
||||
if len(args.Connections) != 0 {
|
||||
if sSConns, err = NewSReplConns(args.Connections,
|
||||
sS.cgrCfg.GeneralCfg().Reconnects,
|
||||
sS.cgrCfg.GeneralCfg().ConnectTimeout,
|
||||
sS.cgrCfg.GeneralCfg().ReplyTimeout); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
}
|
||||
if err = sS.replicateSessions(args.CGRID, args.Passive, sSConns); err != nil {
|
||||
if err = sS.replicateSessions(args.CGRID, args.Passive, args.ConnIDs); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
*reply = utils.OK
|
||||
@@ -3514,9 +3468,3 @@ func (sS *SessionS) BiRPCV1ProcessCDR(clnt rpcclient.ClientConnector,
|
||||
Event: ev}},
|
||||
rply)
|
||||
}
|
||||
|
||||
// SetReplicationConnections sets the new connections to the replictes sessions
|
||||
// only used on reload
|
||||
func (sS *SessionS) SetReplicationConnections(sReplConns []*SReplConn) {
|
||||
sS.sReplConns = sReplConns
|
||||
}
|
||||
|
||||
@@ -396,12 +396,7 @@ func TestSessionSRplManualReplicate(t *testing.T) {
|
||||
t.Error(err, aSessions)
|
||||
}
|
||||
argsRepl := ArgsReplicateSessions{
|
||||
Connections: []*config.RemoteHost{
|
||||
{
|
||||
Address: smgRplcSlaveCfg.ListenCfg().RPCJSONListen,
|
||||
Transport: utils.MetaJSON,
|
||||
Synchronous: true},
|
||||
},
|
||||
ConnIDs: []string{"rplConn"},
|
||||
}
|
||||
//replicate manually the session from master to slave
|
||||
var repply string
|
||||
@@ -444,12 +439,8 @@ func TestSessionSRplManualReplicate(t *testing.T) {
|
||||
// recover passive sessions from slave
|
||||
argsRepl = ArgsReplicateSessions{
|
||||
Passive: true,
|
||||
Connections: []*config.RemoteHost{
|
||||
{
|
||||
Address: smgRplcMasterCfg.ListenCfg().RPCJSONListen,
|
||||
Transport: utils.MetaJSON,
|
||||
Synchronous: true},
|
||||
}}
|
||||
ConnIDs: []string{"rplConn"},
|
||||
}
|
||||
if err := smgRplcSlvRPC.Call(utils.SessionSv1ReplicateSessions, argsRepl, &repply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -60,7 +60,7 @@ func TestSessionSIndexAndUnindexSessions(t *testing.T) {
|
||||
"Extra3": true,
|
||||
"Extra4": true,
|
||||
}
|
||||
sS := NewSessionS(sSCfg, nil, nil, nil)
|
||||
sS := NewSessionS(sSCfg, nil, nil)
|
||||
sEv := engine.NewMapEvent(map[string]interface{}{
|
||||
utils.EVENT_NAME: "TEST_EVENT",
|
||||
utils.ToR: "*voice",
|
||||
@@ -382,7 +382,7 @@ func TestSessionSIndexAndUnindexSessions(t *testing.T) {
|
||||
|
||||
func TestSessionSRegisterAndUnregisterASessions(t *testing.T) {
|
||||
sSCfg, _ := config.NewDefaultCGRConfig()
|
||||
sS := NewSessionS(sSCfg, nil, nil, nil)
|
||||
sS := NewSessionS(sSCfg, nil, nil)
|
||||
sSEv := engine.NewMapEvent(map[string]interface{}{
|
||||
utils.EVENT_NAME: "TEST_EVENT",
|
||||
utils.ToR: "*voice",
|
||||
@@ -603,7 +603,7 @@ func TestSessionSRegisterAndUnregisterASessions(t *testing.T) {
|
||||
|
||||
func TestSessionSRegisterAndUnregisterPSessions(t *testing.T) {
|
||||
sSCfg, _ := config.NewDefaultCGRConfig()
|
||||
sS := NewSessionS(sSCfg, nil, nil, nil)
|
||||
sS := NewSessionS(sSCfg, nil, nil)
|
||||
sSEv := engine.NewMapEvent(map[string]interface{}{
|
||||
utils.EVENT_NAME: "TEST_EVENT",
|
||||
utils.ToR: "*voice",
|
||||
@@ -1378,7 +1378,7 @@ func TestV1ProcessEventReplyAsNavigableMap(t *testing.T) {
|
||||
|
||||
func TestSessionStransitSState(t *testing.T) {
|
||||
sSCfg, _ := config.NewDefaultCGRConfig()
|
||||
sS := NewSessionS(sSCfg, nil, nil, nil)
|
||||
sS := NewSessionS(sSCfg, nil, nil)
|
||||
sSEv := engine.NewMapEvent(map[string]interface{}{
|
||||
utils.EVENT_NAME: "TEST_EVENT",
|
||||
utils.ToR: "*voice",
|
||||
@@ -1425,7 +1425,7 @@ func TestSessionStransitSState(t *testing.T) {
|
||||
|
||||
func TestSessionSrelocateSessionS(t *testing.T) {
|
||||
sSCfg, _ := config.NewDefaultCGRConfig()
|
||||
sS := NewSessionS(sSCfg, nil, nil, nil)
|
||||
sS := NewSessionS(sSCfg, nil, nil)
|
||||
sSEv := engine.NewMapEvent(map[string]interface{}{
|
||||
utils.EVENT_NAME: "TEST_EVENT",
|
||||
utils.ToR: "*voice",
|
||||
@@ -1567,7 +1567,7 @@ func TestSessionSNewV1AuthorizeArgsWithArgDispatcher2(t *testing.T) {
|
||||
func TestSessionSGetIndexedFilters(t *testing.T) {
|
||||
sSCfg, _ := config.NewDefaultCGRConfig()
|
||||
mpStr := engine.NewInternalDB(nil, nil)
|
||||
sS := NewSessionS(sSCfg, nil, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg(), nil), nil)
|
||||
sS := NewSessionS(sSCfg, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg(), nil), nil)
|
||||
expIndx := map[string][]string{}
|
||||
expUindx := []*engine.FilterRule{
|
||||
&engine.FilterRule{
|
||||
@@ -1585,7 +1585,7 @@ func TestSessionSGetIndexedFilters(t *testing.T) {
|
||||
sSCfg.SessionSCfg().SessionIndexes = utils.StringMap{
|
||||
"ToR": true,
|
||||
}
|
||||
sS = NewSessionS(sSCfg, nil, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg(), nil), nil)
|
||||
sS = NewSessionS(sSCfg, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg(), nil), nil)
|
||||
expIndx = map[string][]string{(utils.DynamicDataPrefix + utils.ToR): []string{utils.VOICE}}
|
||||
expUindx = nil
|
||||
if rplyindx, rplyUnindx := sS.getIndexedFilters("", fltrs); !reflect.DeepEqual(expIndx, rplyindx) {
|
||||
@@ -1602,7 +1602,7 @@ func TestSessionSGetIndexedFilters(t *testing.T) {
|
||||
ExpiryTime: time.Now().Add(-time.Hour),
|
||||
},
|
||||
})
|
||||
sS = NewSessionS(sSCfg, nil, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg(), nil), nil)
|
||||
sS = NewSessionS(sSCfg, engine.NewDataManager(mpStr, config.CgrConfig().CacheCfg(), nil), nil)
|
||||
expIndx = map[string][]string{}
|
||||
expUindx = nil
|
||||
fltrs = []string{"FLTR1", "FLTR2"}
|
||||
@@ -1619,7 +1619,7 @@ func TestSessionSgetSessionIDsMatchingIndexes(t *testing.T) {
|
||||
sSCfg.SessionSCfg().SessionIndexes = utils.StringMap{
|
||||
"ToR": true,
|
||||
}
|
||||
sS := NewSessionS(sSCfg, nil, nil, nil)
|
||||
sS := NewSessionS(sSCfg, nil, nil)
|
||||
sEv := engine.NewMapEvent(map[string]interface{}{
|
||||
utils.EVENT_NAME: "TEST_EVENT",
|
||||
utils.ToR: "*voice",
|
||||
@@ -1672,7 +1672,7 @@ func TestSessionSgetSessionIDsMatchingIndexes(t *testing.T) {
|
||||
"ToR": true,
|
||||
"Extra3": true,
|
||||
}
|
||||
sS = NewSessionS(sSCfg, nil, nil, nil)
|
||||
sS = NewSessionS(sSCfg, nil, nil)
|
||||
sS.indexSession(session, false)
|
||||
indx = map[string][]string{
|
||||
"~ToR": []string{utils.VOICE, utils.DATA},
|
||||
@@ -1706,7 +1706,7 @@ func TestSessionSgetSessionIDsMatchingIndexes(t *testing.T) {
|
||||
"ToR": true,
|
||||
"Extra2": true,
|
||||
}
|
||||
sS = NewSessionS(sSCfg, nil, nil, nil)
|
||||
sS = NewSessionS(sSCfg, nil, nil)
|
||||
sS.indexSession(session, true)
|
||||
indx = map[string][]string{
|
||||
"~ToR": []string{utils.VOICE, utils.DATA},
|
||||
@@ -1744,7 +1744,7 @@ func TestNewSessionS(t *testing.T) {
|
||||
pSessionsIdx: make(map[string]map[string]map[string]utils.StringMap),
|
||||
pSessionsRIdx: make(map[string][]*riFieldNameVal),
|
||||
}
|
||||
sS := NewSessionS(cgrCGF, nil, nil, nil)
|
||||
sS := NewSessionS(cgrCGF, nil, nil)
|
||||
if !reflect.DeepEqual(sS, eOut) {
|
||||
t.Errorf("Expected %s , received: %s", utils.ToJSON(sS), utils.ToJSON(eOut))
|
||||
}
|
||||
@@ -1905,7 +1905,7 @@ func TestV1ProcessMessageArgsParseFlags(t *testing.T) {
|
||||
|
||||
func TestSessionSgetSession(t *testing.T) {
|
||||
sSCfg, _ := config.NewDefaultCGRConfig()
|
||||
sS := NewSessionS(sSCfg, nil, nil, nil)
|
||||
sS := NewSessionS(sSCfg, nil, nil)
|
||||
sSEv := engine.NewMapEvent(map[string]interface{}{
|
||||
utils.EVENT_NAME: "TEST_EVENT",
|
||||
utils.ToR: "*voice",
|
||||
|
||||
Reference in New Issue
Block a user