From 1a946e1ce591e9bcb622814ccef7ab60e0932cf8 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 8 Nov 2016 15:27:03 +0100 Subject: [PATCH] SMGenericV1.ActiveSessions returning NOT_FOUND for 0 active sessions, SMG one way session replication operational --- apier/v1/smgenericv1.go | 7 +--- general_tests/rpcclient_it_test.go | 6 ++-- sessionmanager/data_it_test.go | 22 ++++-------- sessionmanager/smg_it_test.go | 6 ++-- sessionmanager/smgeneric.go | 18 +++++++--- sessionmanager/smgreplc_it_test.go | 54 +++++++++++++++++++++++------- 6 files changed, 67 insertions(+), 46 deletions(-) diff --git a/apier/v1/smgenericv1.go b/apier/v1/smgenericv1.go index 546d28748..dd15b1750 100644 --- a/apier/v1/smgenericv1.go +++ b/apier/v1/smgenericv1.go @@ -109,12 +109,7 @@ func (self *SMGenericV1) ProcessCDR(ev sessionmanager.SMGenericEvent, reply *str } func (self *SMGenericV1) ActiveSessions(attrs utils.AttrSMGGetActiveSessions, reply *[]*sessionmanager.ActiveSession) error { - aSessions, _, err := self.sm.ActiveSessions(attrs.AsMapStringString(), false) - if err != nil { - return utils.NewErrServerError(err) - } - *reply = aSessions - return nil + return self.sm.BiRPCV1ActiveSessions(nil, attrs, reply) } func (self *SMGenericV1) ActiveSessionsCount(attrs utils.AttrSMGGetActiveSessions, reply *int) error { diff --git a/general_tests/rpcclient_it_test.go b/general_tests/rpcclient_it_test.go index f01b94105..ad9724420 100644 --- a/general_tests/rpcclient_it_test.go +++ b/general_tests/rpcclient_it_test.go @@ -194,10 +194,8 @@ func TestRPCITDirectedRPC(t *testing.T) { return } var sessions []*sessionmanager.ActiveSession - if err := rpcPoolFirst.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &sessions); err != nil { - t.Error(err) // {"id":2,"result":null,"error":"rpc: can't find service SMGenericV1.ActiveSessions"} - } else if len(sessions) != 0 { - t.Errorf("Received sessions: %+v", sessions) + if err := rpcPoolFirst.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &sessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) } } diff --git a/sessionmanager/data_it_test.go b/sessionmanager/data_it_test.go index a7f70deb0..6a2bb7347 100644 --- a/sessionmanager/data_it_test.go +++ b/sessionmanager/data_it_test.go @@ -386,10 +386,8 @@ func TestSMGDataLastUsedMultipleData(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { - t.Error(err) - } else if len(aSessions) != 0 { - t.Errorf("wrong active sessions: %+v", aSessions) + if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err, aSessions) } } @@ -567,10 +565,8 @@ func TestSMGDataTTLExpiredMultiUpdates(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { - t.Error(err) - } else if len(aSessions) != 0 { - t.Errorf("wrong active sessions: %+v", aSessions) + if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err, aSessions) } } @@ -767,10 +763,8 @@ func TestSMGDataMultipleDataNoUsage(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { - t.Error(err) - } else if len(aSessions) != 0 { - t.Errorf("wrong active sessions: %+v", aSessions) + if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err, aSessions) } } @@ -968,9 +962,7 @@ func TestSMGDataMultipleDataConstantUsage(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) - } else if len(aSessions) != 0 { - t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) } } diff --git a/sessionmanager/smg_it_test.go b/sessionmanager/smg_it_test.go index 31f92e7d6..f38bd842b 100644 --- a/sessionmanager/smg_it_test.go +++ b/sessionmanager/smg_it_test.go @@ -808,10 +808,8 @@ func TestSMGVoiceSessionTTLWithRelocate(t *testing.T) { t.Errorf("Expecting: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue()) } if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT), - OriginID: utils.StringPointer(smgEv.GetOriginID(utils.META_DEFAULT))}, &aSessions); err != nil { - t.Error(err) - } else if len(aSessions) != 0 { - t.Errorf("Unexpected number of sessions received: %+v", aSessions) + OriginID: utils.StringPointer(smgEv.GetOriginID(utils.META_DEFAULT))}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err, aSessions) } var cdrs []*engine.ExternalCDR req := utils.RPCCDRsFilter{RunIDs: []string{utils.META_DEFAULT}, DestinationPrefixes: []string{smgEv.GetDestination(utils.META_DEFAULT)}} diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 8685c3180..5e381316f 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -445,6 +445,7 @@ func (smg *SMGeneric) getPassiveSessions(cgrID, runID string) (pss map[string][] } // deletePassiveSessions is used to remove a reference from the passiveSessions table +// ToDo: test it func (smg *SMGeneric) deletePassiveSessions(cgrID string) { smg.pSessionsMux.Lock() delete(smg.passiveSessions, cgrID) @@ -469,6 +470,7 @@ func (smg *SMGeneric) setPassiveSessions(cgrID string, ss []*SMGSession) (err er } // remPassiveSession is called when a session is removed via RPC from passive sessions table +// ToDo: test func (smg *SMGeneric) removePassiveSessions(cgrID string) (err error) { for _, cacheKey := range []string{"InitiateSession" + cgrID, "UpdateSession" + cgrID, "TerminateSession" + cgrID} { if _, err := smg.responseCache.Get(cacheKey); err == nil { // Stop processing passive when there has been an update over active RPC @@ -483,14 +485,18 @@ func (smg *SMGeneric) removePassiveSessions(cgrID string) (err error) { return } +// passiveToActive will transition the sessions from passive to active table +// ToDo: test func (smg *SMGeneric) passiveToActive(cgrID string) (pSS []*SMGSession) { pSessions := smg.getPassiveSessions(cgrID, "") - if len(pSS) == 0 { + if len(pSessions) == 0 { return } pSS = pSessions[cgrID] for _, s := range pSS { smg.recordASession(s) + s.rals = smg.rals + s.cdrsrv = smg.cdrsrv } smg.deletePassiveSessions(cgrID) return @@ -609,9 +615,11 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient } aSessions := smg.getASession(cgrID) if len(aSessions) == 0 { - utils.Logger.Err(fmt.Sprintf(" SessionUpdate with no active sessions for event: <%s>", cgrID)) - err = utils.ErrServerError - return + if aSessions = smg.passiveToActive(cgrID); len(aSessions) == 0 { + utils.Logger.Err(fmt.Sprintf(" SessionUpdate with no active sessions for event: <%s>", cgrID)) + err = utils.ErrServerError + return + } } for _, s := range aSessions { var maxDur time.Duration @@ -994,6 +1002,8 @@ func (smg *SMGeneric) BiRPCV1ActiveSessions(clnt rpcclient.RpcClientConnection, aSessions, _, err := smg.ActiveSessions(attrs.AsMapStringString(), false) if err != nil { return utils.NewErrServerError(err) + } else if len(aSessions) == 0 { + return utils.ErrNotFound } *reply = aSessions return nil diff --git a/sessionmanager/smgreplc_it_test.go b/sessionmanager/smgreplc_it_test.go index dca25afdc..fbc40a7c7 100644 --- a/sessionmanager/smgreplc_it_test.go +++ b/sessionmanager/smgreplc_it_test.go @@ -133,6 +133,14 @@ func TestSMGRplcInitiate(t *testing.T) { } time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated cgrID := smgEv.GetCGRID(utils.META_DEFAULT) + var aSessions []*ActiveSession + if err := smgRplcMstrRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{OriginID: utils.StringPointer("123451")}, &aSessions); err != nil { + t.Error(err) + } else if len(aSessions) != 1 { + t.Errorf("Unexpected number of sessions received: %+v", aSessions) + } else if aSessions[0].Usage != time.Duration(90)*time.Second { + t.Errorf("Received usage: %v", aSessions[0].Usage) + } if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err != nil { t.Error(err) } else if len(pSessions) != 1 { @@ -144,6 +152,7 @@ func TestSMGRplcInitiate(t *testing.T) { } } +// Update on slave func TestSMGRplcUpdate(t *testing.T) { if !*testIntegration { return @@ -154,24 +163,39 @@ func TestSMGRplcUpdate(t *testing.T) { utils.USAGE: "1m", } var maxUsage float64 - if err := smgRplcMstrRPC.Call("SMGenericV1.UpdateSession", smgEv, &maxUsage); err != nil { + if err := smgRplcSlvRPC.Call("SMGenericV1.UpdateSession", smgEv, &maxUsage); err != nil { t.Error(err) - } - if maxUsage != 60 { + } else if maxUsage != 60 { t.Error("Bad max usage: ", maxUsage) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated - cgrID := smgEv.GetCGRID(utils.META_DEFAULT) - var pSessions map[string][]*SMGSession - if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err != nil { + var aSessions []*ActiveSession + if err := smgRplcSlvRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{OriginID: utils.StringPointer("123451")}, &aSessions); err != nil { t.Error(err) - } else if len(pSessions) != 1 { - t.Errorf("PassiveSessions: %+v", pSessions) - } else if _, hasOriginID := pSessions[cgrID]; !hasOriginID { - t.Errorf("PassiveSessions: %+v", pSessions) - } else if pSessions[cgrID][0].TotalUsage != time.Duration(150*time.Second) { - t.Errorf("PassiveSession: %+v", pSessions[cgrID][0]) + } else if len(aSessions) != 1 { + t.Errorf("Unexpected number of sessions received: %+v", aSessions) + } else if aSessions[0].Usage != time.Duration(150)*time.Second { + t.Errorf("Received usage: %v", aSessions[0].Usage) } + var pSessions map[string][]*SMGSession + // Make sure we don't have passive session on active host + if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + /* + cgrID := smgEv.GetCGRID(utils.META_DEFAULT) + // Make sure session was replicated + if err := smgRplcMstrRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err != nil { + t.Error(err) + } else if len(pSessions) != 1 { + t.Errorf("PassiveSessions: %+v", pSessions) + } else if _, hasOriginID := pSessions[cgrID]; !hasOriginID { + t.Errorf("PassiveSessions: %+v", pSessions) + } else if pSessions[cgrID][0].TotalUsage != time.Duration(150*time.Second) { + t.Errorf("PassiveSession: %+v", pSessions[cgrID][0]) + } + */ + } func TestSMGRplcTerminate(t *testing.T) { @@ -184,10 +208,14 @@ func TestSMGRplcTerminate(t *testing.T) { utils.USAGE: "3m", } var reply string - if err := smgRplcMstrRPC.Call("SMGenericV1.TerminateSession", smgEv, &reply); err != nil { + if err := smgRplcSlvRPC.Call("SMGenericV1.TerminateSession", smgEv, &reply); err != nil { t.Error(err) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated + var aSessions []*ActiveSession + if err := smgRplcSlvRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{OriginID: utils.StringPointer("123451")}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err, aSessions) + } var pSessions map[string][]*SMGSession if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err)