SMGenericV1.ActiveSessions returning NOT_FOUND for 0 active sessions, SMG one way session replication operational

This commit is contained in:
DanB
2016-11-08 15:27:03 +01:00
parent 9b0b6c0a95
commit 1a946e1ce5
6 changed files with 67 additions and 46 deletions

View File

@@ -109,12 +109,7 @@ func (self *SMGenericV1) ProcessCDR(ev sessionmanager.SMGenericEvent, reply *str
}
func (self *SMGenericV1) ActiveSessions(attrs utils.AttrSMGGetActiveSessions, reply *[]*sessionmanager.ActiveSession) error {
aSessions, _, err := self.sm.ActiveSessions(attrs.AsMapStringString(), false)
if err != nil {
return utils.NewErrServerError(err)
}
*reply = aSessions
return nil
return self.sm.BiRPCV1ActiveSessions(nil, attrs, reply)
}
func (self *SMGenericV1) ActiveSessionsCount(attrs utils.AttrSMGGetActiveSessions, reply *int) error {

View File

@@ -194,10 +194,8 @@ func TestRPCITDirectedRPC(t *testing.T) {
return
}
var sessions []*sessionmanager.ActiveSession
if err := rpcPoolFirst.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &sessions); err != nil {
t.Error(err) // {"id":2,"result":null,"error":"rpc: can't find service SMGenericV1.ActiveSessions"}
} else if len(sessions) != 0 {
t.Errorf("Received sessions: %+v", sessions)
if err := rpcPoolFirst.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &sessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
}

View File

@@ -386,10 +386,8 @@ func TestSMGDataLastUsedMultipleData(t *testing.T) {
} else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal {
t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue())
}
if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 0 {
t.Errorf("wrong active sessions: %+v", aSessions)
if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err, aSessions)
}
}
@@ -567,10 +565,8 @@ func TestSMGDataTTLExpiredMultiUpdates(t *testing.T) {
} else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal {
t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue())
}
if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 0 {
t.Errorf("wrong active sessions: %+v", aSessions)
if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err, aSessions)
}
}
@@ -767,10 +763,8 @@ func TestSMGDataMultipleDataNoUsage(t *testing.T) {
} else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal {
t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue())
}
if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 0 {
t.Errorf("wrong active sessions: %+v", aSessions)
if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err, aSessions)
}
}
@@ -968,9 +962,7 @@ func TestSMGDataMultipleDataConstantUsage(t *testing.T) {
} else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal {
t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue())
}
if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil {
if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
} else if len(aSessions) != 0 {
t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds())
}
}

View File

@@ -808,10 +808,8 @@ func TestSMGVoiceSessionTTLWithRelocate(t *testing.T) {
t.Errorf("Expecting: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue())
}
if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT),
OriginID: utils.StringPointer(smgEv.GetOriginID(utils.META_DEFAULT))}, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 0 {
t.Errorf("Unexpected number of sessions received: %+v", aSessions)
OriginID: utils.StringPointer(smgEv.GetOriginID(utils.META_DEFAULT))}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err, aSessions)
}
var cdrs []*engine.ExternalCDR
req := utils.RPCCDRsFilter{RunIDs: []string{utils.META_DEFAULT}, DestinationPrefixes: []string{smgEv.GetDestination(utils.META_DEFAULT)}}

View File

@@ -445,6 +445,7 @@ func (smg *SMGeneric) getPassiveSessions(cgrID, runID string) (pss map[string][]
}
// deletePassiveSessions is used to remove a reference from the passiveSessions table
// ToDo: test it
func (smg *SMGeneric) deletePassiveSessions(cgrID string) {
smg.pSessionsMux.Lock()
delete(smg.passiveSessions, cgrID)
@@ -469,6 +470,7 @@ func (smg *SMGeneric) setPassiveSessions(cgrID string, ss []*SMGSession) (err er
}
// remPassiveSession is called when a session is removed via RPC from passive sessions table
// ToDo: test
func (smg *SMGeneric) removePassiveSessions(cgrID string) (err error) {
for _, cacheKey := range []string{"InitiateSession" + cgrID, "UpdateSession" + cgrID, "TerminateSession" + cgrID} {
if _, err := smg.responseCache.Get(cacheKey); err == nil { // Stop processing passive when there has been an update over active RPC
@@ -483,14 +485,18 @@ func (smg *SMGeneric) removePassiveSessions(cgrID string) (err error) {
return
}
// passiveToActive will transition the sessions from passive to active table
// ToDo: test
func (smg *SMGeneric) passiveToActive(cgrID string) (pSS []*SMGSession) {
pSessions := smg.getPassiveSessions(cgrID, "")
if len(pSS) == 0 {
if len(pSessions) == 0 {
return
}
pSS = pSessions[cgrID]
for _, s := range pSS {
smg.recordASession(s)
s.rals = smg.rals
s.cdrsrv = smg.cdrsrv
}
smg.deletePassiveSessions(cgrID)
return
@@ -609,9 +615,11 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient
}
aSessions := smg.getASession(cgrID)
if len(aSessions) == 0 {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> SessionUpdate with no active sessions for event: <%s>", cgrID))
err = utils.ErrServerError
return
if aSessions = smg.passiveToActive(cgrID); len(aSessions) == 0 {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> SessionUpdate with no active sessions for event: <%s>", cgrID))
err = utils.ErrServerError
return
}
}
for _, s := range aSessions {
var maxDur time.Duration
@@ -994,6 +1002,8 @@ func (smg *SMGeneric) BiRPCV1ActiveSessions(clnt rpcclient.RpcClientConnection,
aSessions, _, err := smg.ActiveSessions(attrs.AsMapStringString(), false)
if err != nil {
return utils.NewErrServerError(err)
} else if len(aSessions) == 0 {
return utils.ErrNotFound
}
*reply = aSessions
return nil

View File

@@ -133,6 +133,14 @@ func TestSMGRplcInitiate(t *testing.T) {
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated
cgrID := smgEv.GetCGRID(utils.META_DEFAULT)
var aSessions []*ActiveSession
if err := smgRplcMstrRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{OriginID: utils.StringPointer("123451")}, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 1 {
t.Errorf("Unexpected number of sessions received: %+v", aSessions)
} else if aSessions[0].Usage != time.Duration(90)*time.Second {
t.Errorf("Received usage: %v", aSessions[0].Usage)
}
if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err != nil {
t.Error(err)
} else if len(pSessions) != 1 {
@@ -144,6 +152,7 @@ func TestSMGRplcInitiate(t *testing.T) {
}
}
// Update on slave
func TestSMGRplcUpdate(t *testing.T) {
if !*testIntegration {
return
@@ -154,24 +163,39 @@ func TestSMGRplcUpdate(t *testing.T) {
utils.USAGE: "1m",
}
var maxUsage float64
if err := smgRplcMstrRPC.Call("SMGenericV1.UpdateSession", smgEv, &maxUsage); err != nil {
if err := smgRplcSlvRPC.Call("SMGenericV1.UpdateSession", smgEv, &maxUsage); err != nil {
t.Error(err)
}
if maxUsage != 60 {
} else if maxUsage != 60 {
t.Error("Bad max usage: ", maxUsage)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated
cgrID := smgEv.GetCGRID(utils.META_DEFAULT)
var pSessions map[string][]*SMGSession
if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err != nil {
var aSessions []*ActiveSession
if err := smgRplcSlvRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{OriginID: utils.StringPointer("123451")}, &aSessions); err != nil {
t.Error(err)
} else if len(pSessions) != 1 {
t.Errorf("PassiveSessions: %+v", pSessions)
} else if _, hasOriginID := pSessions[cgrID]; !hasOriginID {
t.Errorf("PassiveSessions: %+v", pSessions)
} else if pSessions[cgrID][0].TotalUsage != time.Duration(150*time.Second) {
t.Errorf("PassiveSession: %+v", pSessions[cgrID][0])
} else if len(aSessions) != 1 {
t.Errorf("Unexpected number of sessions received: %+v", aSessions)
} else if aSessions[0].Usage != time.Duration(150)*time.Second {
t.Errorf("Received usage: %v", aSessions[0].Usage)
}
var pSessions map[string][]*SMGSession
// Make sure we don't have passive session on active host
if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
/*
cgrID := smgEv.GetCGRID(utils.META_DEFAULT)
// Make sure session was replicated
if err := smgRplcMstrRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err != nil {
t.Error(err)
} else if len(pSessions) != 1 {
t.Errorf("PassiveSessions: %+v", pSessions)
} else if _, hasOriginID := pSessions[cgrID]; !hasOriginID {
t.Errorf("PassiveSessions: %+v", pSessions)
} else if pSessions[cgrID][0].TotalUsage != time.Duration(150*time.Second) {
t.Errorf("PassiveSession: %+v", pSessions[cgrID][0])
}
*/
}
func TestSMGRplcTerminate(t *testing.T) {
@@ -184,10 +208,14 @@ func TestSMGRplcTerminate(t *testing.T) {
utils.USAGE: "3m",
}
var reply string
if err := smgRplcMstrRPC.Call("SMGenericV1.TerminateSession", smgEv, &reply); err != nil {
if err := smgRplcSlvRPC.Call("SMGenericV1.TerminateSession", smgEv, &reply); err != nil {
t.Error(err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated
var aSessions []*ActiveSession
if err := smgRplcSlvRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{OriginID: utils.StringPointer("123451")}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err, aSessions)
}
var pSessions map[string][]*SMGSession
if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)