From 12bf0667efe7d2afbef1bef193326df291649591 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 10 Nov 2016 13:32:30 +0100 Subject: [PATCH] SMGeneric simplified active/passive table management, unified SMGenericV1.ActiveSessions and SMGenericV1.PassiveSessions logic --- apier/v1/smgenericbirpcv1.go | 15 +- apier/v1/smgenericv1.go | 20 ++- general_tests/rpcclient_it_test.go | 2 +- sessionmanager/data_it_test.go | 40 ++--- sessionmanager/smg_it_test.go | 16 +- sessionmanager/smgeneric.go | 280 +++++++++++++++-------------- sessionmanager/smgeneric_test.go | 24 ++- sessionmanager/smgreplc_it_test.go | 41 ++--- utils/apitpdata.go | 80 --------- utils/apitpdata_test.go | 30 ---- 10 files changed, 221 insertions(+), 327 deletions(-) diff --git a/apier/v1/smgenericbirpcv1.go b/apier/v1/smgenericbirpcv1.go index c5c8cb88d..1e9c6aa79 100644 --- a/apier/v1/smgenericbirpcv1.go +++ b/apier/v1/smgenericbirpcv1.go @@ -20,7 +20,6 @@ package v1 import ( "github.com/cenk/rpc2" "github.com/cgrates/cgrates/sessionmanager" - "github.com/cgrates/cgrates/utils" ) func NewSMGenericBiRpcV1(sm *sessionmanager.SMGeneric) *SMGenericBiRpcV1 { @@ -78,18 +77,18 @@ func (self *SMGenericBiRpcV1) ProcessCDR(clnt *rpc2.Client, ev sessionmanager.SM return self.sm.BiRPCV1ProcessCDR(clnt, ev, reply) } -func (self *SMGenericBiRpcV1) ActiveSessions(clnt *rpc2.Client, attrs utils.AttrSMGGetActiveSessions, reply *[]*sessionmanager.ActiveSession) error { +func (self *SMGenericBiRpcV1) ActiveSessions(clnt *rpc2.Client, attrs map[string]string, reply *[]*sessionmanager.ActiveSession) error { return self.sm.BiRPCV1ActiveSessions(clnt, attrs, reply) } -func (self *SMGenericBiRpcV1) ActiveSessionsCount(attrs utils.AttrSMGGetActiveSessions, reply *int) error { - return self.sm.BiRPCV1ActiveSessionsCount(attrs, reply) +func (self *SMGenericBiRpcV1) ActiveSessionsCount(clnt *rpc2.Client, attrs map[string]string, reply *int) error { + return self.sm.BiRPCV1ActiveSessionsCount(clnt, attrs, reply) } -func (self *SMGenericBiRpcV1) SetPassiveSessions(args sessionmanager.ArgsSetPassiveSessions, reply *string) error { - return self.sm.BiRPCV1SetPassiveSessions(args, reply) +func (self *SMGenericBiRpcV1) á¹”assiveSessions(clnt *rpc2.Client, attrs map[string]string, reply *[]*sessionmanager.ActiveSession) error { + return self.sm.BiRPCV1ActiveSessions(clnt, attrs, reply) } -func (self *SMGenericBiRpcV1) GetPassiveSessions(args sessionmanager.ArgsGetPassiveSessions, pSessions *map[string][]*sessionmanager.SMGSession) error { - return self.sm.BiRPCV1GetPassiveSessions(args, pSessions) +func (self *SMGenericBiRpcV1) PassiveSessionsCount(clnt *rpc2.Client, attrs map[string]string, reply *int) error { + return self.sm.BiRPCV1ActiveSessionsCount(clnt, attrs, reply) } diff --git a/apier/v1/smgenericv1.go b/apier/v1/smgenericv1.go index ae1298ce9..2f7c33605 100644 --- a/apier/v1/smgenericv1.go +++ b/apier/v1/smgenericv1.go @@ -70,20 +70,24 @@ func (self *SMGenericV1) ProcessCDR(ev sessionmanager.SMGenericEvent, reply *str return self.sm.BiRPCV1ProcessCDR(nil, ev, reply) } -func (self *SMGenericV1) ActiveSessions(attrs utils.AttrSMGGetActiveSessions, reply *[]*sessionmanager.ActiveSession) error { +func (self *SMGenericV1) ActiveSessions(attrs map[string]string, reply *[]*sessionmanager.ActiveSession) error { return self.sm.BiRPCV1ActiveSessions(nil, attrs, reply) } -func (self *SMGenericV1) ActiveSessionsCount(attrs utils.AttrSMGGetActiveSessions, reply *int) error { - return self.sm.BiRPCV1ActiveSessionsCount(attrs, reply) +func (self *SMGenericV1) ActiveSessionsCount(attrs map[string]string, reply *int) error { + return self.sm.BiRPCV1ActiveSessionsCount(nil, attrs, reply) +} + +func (self *SMGenericV1) PassiveSessions(attrs map[string]string, reply *[]*sessionmanager.ActiveSession) error { + return self.sm.BiRPCV1PassiveSessions(nil, attrs, reply) +} + +func (self *SMGenericV1) PassiveSessionsCount(attrs map[string]string, reply *int) error { + return self.sm.BiRPCV1PassiveSessionsCount(nil, attrs, reply) } func (self *SMGenericV1) SetPassiveSessions(args sessionmanager.ArgsSetPassiveSessions, reply *string) error { - return self.sm.BiRPCV1SetPassiveSessions(args, reply) -} - -func (self *SMGenericV1) GetPassiveSessions(args sessionmanager.ArgsGetPassiveSessions, pSessions *map[string][]*sessionmanager.SMGSession) error { - return self.sm.BiRPCV1GetPassiveSessions(args, pSessions) + return self.sm.BiRPCV1SetPassiveSessions(nil, args, reply) } // rpcclient.RpcClientConnection interface diff --git a/general_tests/rpcclient_it_test.go b/general_tests/rpcclient_it_test.go index 54ef16fb5..55584a183 100644 --- a/general_tests/rpcclient_it_test.go +++ b/general_tests/rpcclient_it_test.go @@ -196,7 +196,7 @@ func TestRPCITDirectedRPC(t *testing.T) { return } var sessions []*sessionmanager.ActiveSession - if err := rpcPoolFirst.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &sessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := rpcPoolFirst.Call("SMGenericV1.ActiveSessions", map[string]string{}, &sessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } diff --git a/sessionmanager/data_it_test.go b/sessionmanager/data_it_test.go index 6a2bb7347..c698b9bd1 100644 --- a/sessionmanager/data_it_test.go +++ b/sessionmanager/data_it_test.go @@ -234,7 +234,7 @@ func TestSMGDataLastUsedMultipleData(t *testing.T) { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } aSessions := make([]*ActiveSession, 0) - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 || aSessions[0].Usage.Seconds() != 1048576 { t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) @@ -265,7 +265,7 @@ func TestSMGDataLastUsedMultipleData(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 || aSessions[0].Usage.Seconds() != 1068576 { t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) @@ -296,7 +296,7 @@ func TestSMGDataLastUsedMultipleData(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 || aSessions[0].Usage.Seconds() != 1088576 { t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) @@ -327,7 +327,7 @@ func TestSMGDataLastUsedMultipleData(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 || aSessions[0].Usage.Seconds() != 1108576 { t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) @@ -358,7 +358,7 @@ func TestSMGDataLastUsedMultipleData(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 || aSessions[0].Usage.Seconds() != 1128576 { t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) @@ -386,7 +386,7 @@ func TestSMGDataLastUsedMultipleData(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err, aSessions) } } @@ -525,7 +525,7 @@ func TestSMGDataTTLExpiredMultiUpdates(t *testing.T) { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } aSessions := make([]*ActiveSession, 0) - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 || aSessions[0].Usage.Seconds() != 1048576 { t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) @@ -565,7 +565,7 @@ func TestSMGDataTTLExpiredMultiUpdates(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err, aSessions) } } @@ -611,7 +611,7 @@ func TestSMGDataMultipleDataNoUsage(t *testing.T) { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } aSessions := make([]*ActiveSession, 0) - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 || aSessions[0].Usage.Seconds() != 1048576 { t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) @@ -642,7 +642,7 @@ func TestSMGDataMultipleDataNoUsage(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 || aSessions[0].Usage.Seconds() != 1048576 { t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) @@ -673,7 +673,7 @@ func TestSMGDataMultipleDataNoUsage(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 || aSessions[0].Usage.Seconds() != 1048576 { t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) @@ -704,7 +704,7 @@ func TestSMGDataMultipleDataNoUsage(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 || aSessions[0].Usage.Seconds() != 1048576 { t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) @@ -735,7 +735,7 @@ func TestSMGDataMultipleDataNoUsage(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 || aSessions[0].Usage.Seconds() != 1048576 { t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) @@ -763,7 +763,7 @@ func TestSMGDataMultipleDataNoUsage(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err, aSessions) } } @@ -809,7 +809,7 @@ func TestSMGDataMultipleDataConstantUsage(t *testing.T) { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } aSessions := make([]*ActiveSession, 0) - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 || aSessions[0].Usage.Seconds() != 1048576 { t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) @@ -841,7 +841,7 @@ func TestSMGDataMultipleDataConstantUsage(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 || aSessions[0].Usage.Seconds() != 1049176 { t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) @@ -872,7 +872,7 @@ func TestSMGDataMultipleDataConstantUsage(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 || aSessions[0].Usage.Seconds() != 1049776 { t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) @@ -903,7 +903,7 @@ func TestSMGDataMultipleDataConstantUsage(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 || aSessions[0].Usage.Seconds() != 1050376 { t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) @@ -934,7 +934,7 @@ func TestSMGDataMultipleDataConstantUsage(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 || aSessions[0].Usage.Seconds() != 1050976 { t.Errorf("wrong active sessions: %f", aSessions[0].Usage.Seconds()) @@ -962,7 +962,7 @@ func TestSMGDataMultipleDataConstantUsage(t *testing.T) { } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", nil, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } diff --git a/sessionmanager/smg_it_test.go b/sessionmanager/smg_it_test.go index f38bd842b..51543d6b9 100644 --- a/sessionmanager/smg_it_test.go +++ b/sessionmanager/smg_it_test.go @@ -639,7 +639,7 @@ func TestSMGVoiceSessionTTL(t *testing.T) { t.Error("Bad max usage: ", maxUsage) } var aSessions []*ActiveSession - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT), OriginID: utils.StringPointer("12360")}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", map[string]string{utils.MEDI_RUNID: utils.META_DEFAULT, utils.ACCID: "12360"}, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 { t.Errorf("Unexpected number of sessions received: %+v", aSessions) @@ -672,7 +672,7 @@ func TestSMGVoiceSessionTTL(t *testing.T) { if maxUsage != 120 { t.Error("Bad max usage: ", maxUsage) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT), OriginID: utils.StringPointer("12360")}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", map[string]string{utils.MEDI_RUNID: utils.META_DEFAULT, utils.ACCID: "12360"}, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 { t.Errorf("Unexpected number of sessions received: %+v", aSessions) @@ -757,8 +757,8 @@ func TestSMGVoiceSessionTTLWithRelocate(t *testing.T) { t.Errorf("Expecting: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue()) } var aSessions []*ActiveSession - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT), - OriginID: utils.StringPointer(smgEv.GetOriginID(utils.META_DEFAULT))}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", map[string]string{utils.MEDI_RUNID: utils.META_DEFAULT, + utils.ACCID: smgEv.GetOriginID(utils.META_DEFAULT)}, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 { t.Errorf("Unexpected number of sessions received: %+v", aSessions) @@ -792,8 +792,8 @@ func TestSMGVoiceSessionTTLWithRelocate(t *testing.T) { } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != eAcntVal { t.Errorf("Expecting: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT), - OriginID: utils.StringPointer(smgEv.GetOriginID(utils.META_DEFAULT))}, &aSessions); err != nil { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", map[string]string{utils.MEDI_RUNID: utils.META_DEFAULT, + utils.ACCID: smgEv.GetOriginID(utils.META_DEFAULT)}, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 { t.Errorf("Unexpected number of sessions received: %+v", aSessions) @@ -807,8 +807,8 @@ func TestSMGVoiceSessionTTLWithRelocate(t *testing.T) { } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != eAcntVal { t.Errorf("Expecting: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue()) } - if err := smgRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{RunID: utils.StringPointer(utils.META_DEFAULT), - OriginID: utils.StringPointer(smgEv.GetOriginID(utils.META_DEFAULT))}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := smgRPC.Call("SMGenericV1.ActiveSessions", map[string]string{utils.MEDI_RUNID: utils.META_DEFAULT, + utils.ACCID: smgEv.GetOriginID(utils.META_DEFAULT)}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err, aSessions) } var cdrs []*engine.ExternalCDR diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 112886804..0347cb78f 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -87,7 +87,7 @@ type SMGeneric struct { pSessionsRIndex map[string][]*riFieldNameVal // reverse indexes for active sessions, used on remove pSIMux sync.RWMutex // protects pSessionsIndex sessionTerminators map[string]*smgSessionTerminator // terminate and cleanup the session if timer expires - sTsMux sync.Mutex // protects sessionTerminators + sTsMux sync.RWMutex // protects sessionTerminators responseCache *cache.ResponseCache // cache replies here } @@ -143,7 +143,7 @@ func (smg *SMGeneric) setSessionTerminator(s *SMGSession) { // resetTerminatorTimer updates the timer for the session to a new ttl and terminate info func (smg *SMGeneric) resetTerminatorTimer(cgrID string, ttl time.Duration, ttlLastUsed, ttlUsage *time.Duration) { - smg.aSessionsMux.RLock() + smg.sTsMux.RLock() if st, found := smg.sessionTerminators[cgrID]; found { if ttl != 0 { st.ttl = ttl @@ -156,7 +156,7 @@ func (smg *SMGeneric) resetTerminatorTimer(cgrID string, ttl time.Duration, ttlL } st.timer.Reset(st.ttl) } - smg.aSessionsMux.RUnlock() + smg.sTsMux.RUnlock() } // ttlTerminate is called when a session times-out @@ -165,11 +165,11 @@ func (smg *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) { if tmtr.ttlUsage != nil { debitUsage = *tmtr.ttlUsage } - aSessions := smg.getASession(s.CGRID) + aSessions := smg.getSessions(s.CGRID, false) if len(aSessions) == 0 { // will not continue if the session is not longer active return } - for _, s := range aSessions { + for _, s := range aSessions[s.CGRID] { s.debit(debitUsage, tmtr.ttlLastUsed) } smg.sessionEnd(s.CGRID, s.TotalUsage) @@ -344,13 +344,6 @@ func (smg *SMGeneric) getSessionIDsForPrefix(prefix string, passiveSessions bool return } -// Returns sessions/derived for a specific uuid -func (smg *SMGeneric) getASession(cgrID string) []*SMGSession { - smg.aSessionsMux.RLock() - defer smg.aSessionsMux.RUnlock() - return smg.activeSessions[cgrID] -} - // sessionStart will handle a new session, pass the connectionId so we can communicate on disconnect request func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.RpcClientConnection) error { cgrID := evStart.GetCGRID(utils.META_DEFAULT) @@ -387,7 +380,7 @@ func (smg *SMGeneric) sessionStart(evStart SMGenericEvent, clntConn rpcclient.Rp // sessionEnd will end a session from outside func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) error { _, err := engine.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level - ss := smg.getASession(cgrID) + ss := smg.getSessions(cgrID, false) if len(ss) == 0 { if ss = smg.passiveToActive(cgrID); len(ss) == 0 { return nil, nil // ToDo: handle here also debits @@ -396,7 +389,7 @@ func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) error { if !smg.unrecordASession(cgrID) { // Unreference it early so we avoid concurrency return nil, nil // Did not find the session so no need to close it anymore } - for idx, s := range ss { + for idx, s := range ss[cgrID] { s.TotalUsage = usage // save final usage as totalUsage if idx == 0 && s.stopDebit != nil { close(s.stopDebit) // Stop automatic debits @@ -415,7 +408,7 @@ func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) error { } } return nil, nil - }, time.Duration(2)*time.Second, cgrID) + }, smg.cgrCfg.LockingTimeout, cgrID) return err } @@ -425,20 +418,20 @@ func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) erro if utils.IsSliceMember([]string{initialID, cgrID, newOriginID}, "") { // Not allowed empty params here return nil, utils.ErrMandatoryIeMissing } - ssNew := smg.getASession(cgrID) + ssNew := smg.getSessions(cgrID, false) if len(ssNew) != 0 { // Already relocated return nil, nil } - if pSSNew := smg.getPassiveSessions(cgrID, ""); len(pSSNew) != 0 { // passive sessions recorded, will be recovered so no need of relocation + if pSSNew := smg.getSessions(cgrID, true); len(pSSNew) != 0 { // passive sessions recorded, will be recovered so no need of relocation return nil, nil } - ss := smg.getASession(initialID) + ss := smg.getSessions(initialID, false) if len(ss) == 0 { // No need of relocation if ss = smg.passiveToActive(initialID); len(ss) == 0 { return nil, utils.ErrNotFound } } - for i, s := range ss { + for i, s := range ss[initialID] { s.CGRID = cgrID // Overwrite initial CGRID with new one s.EventStart[utils.ACCID] = newOriginID // Overwrite OriginID for session indexing smg.recordASession(s) @@ -447,7 +440,7 @@ func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) erro } } return nil, nil - }, time.Duration(2)*time.Second, initialID) + }, smg.cgrCfg.LockingTimeout, initialID) return err } @@ -477,31 +470,23 @@ func (smg *SMGeneric) replicateSessions(cgrID string) (err error) { return } -func (smg *SMGeneric) getPassiveSessions(cgrID, runID string) (pss map[string][]*SMGSession) { - smg.pSessionsMux.RLock() - if cgrID == "" { - if len(smg.passiveSessions) != 0 { - pss = smg.passiveSessions - } - } else { - pSSlc := smg.passiveSessions[cgrID] - if runID != "" { - var found bool - for _, s := range pSSlc { - if s.RunID == runID { - found = true - pSSlc = []*SMGSession{s} - } - } - if !found { - pSSlc = []*SMGSession{} - } - } - if len(pSSlc) != 0 { - pss = map[string][]*SMGSession{cgrID: pSSlc} - } +// getSessions is used to return in a thread-safe manner active or passive sessions +func (smg *SMGeneric) getSessions(cgrID string, passiveSessions bool) (aSS map[string][]*SMGSession) { + ssMux := smg.aSessionsMux + ssMp := smg.activeSessions + if passiveSessions { + ssMux = smg.pSessionsMux + ssMp = smg.passiveSessions + } + ssMux.RLock() + defer ssMux.RUnlock() + if len(cgrID) == 0 { + return ssMp + } + aSS = make(map[string][]*SMGSession) + if ss, hasCGRID := ssMp[cgrID]; hasCGRID { + aSS[cgrID] = ss } - smg.pSessionsMux.RUnlock() return } @@ -555,13 +540,12 @@ func (smg *SMGeneric) deletePassiveSessions(cgrID string) { // passiveToActive will transition the sessions from passive to active table // ToDo: test -func (smg *SMGeneric) passiveToActive(cgrID string) (pSS []*SMGSession) { - pSessions := smg.getPassiveSessions(cgrID, "") +func (smg *SMGeneric) passiveToActive(cgrID string) (pSessions map[string][]*SMGSession) { + pSessions = smg.getSessions(cgrID, true) if len(pSessions) == 0 { return } - pSS = pSessions[cgrID] - for _, s := range pSS { + for _, s := range pSessions[cgrID] { smg.recordASession(s) s.rals = smg.rals s.cdrsrv = smg.cdrsrv @@ -570,6 +554,66 @@ func (smg *SMGeneric) passiveToActive(cgrID string) (pSS []*SMGSession) { return } +// asActiveSessions returns sessions from either active or passive table as []*ActiveSession +func (smg *SMGeneric) asActiveSessions(fltrs map[string]string, count, passiveSessions bool) (aSessions []*ActiveSession, counter int, err error) { + aSessions = make([]*ActiveSession, 0) // Make sure we return at least empty list and not nil + // Check first based on indexes so we can downsize the list of matching sessions + matchingSessionIDs, checkedFilters := smg.getSessionIDsMatchingIndexes(fltrs, passiveSessions) + if len(matchingSessionIDs) == 0 && len(checkedFilters) != 0 { + return + } + for fltrFldName := range fltrs { + if _, alreadyChecked := checkedFilters[fltrFldName]; alreadyChecked && fltrFldName != utils.MEDI_RUNID { // Optimize further checks, RunID should stay since it can create bugs + delete(fltrs, fltrFldName) + } + } + var remainingSessions []*SMGSession // Survived index matching + var ss map[string][]*SMGSession + if passiveSessions { + ss = smg.getSessions(fltrs[utils.CGRID], true) + } else { + ss = smg.getSessions(fltrs[utils.CGRID], false) + } + for cgrID, sGrp := range ss { + if _, hasCGRID := matchingSessionIDs[cgrID]; !hasCGRID && len(checkedFilters) != 0 { + continue + } + for _, s := range sGrp { + remainingSessions = append(remainingSessions, s) + } + } + if len(fltrs) != 0 { // Still have some filters to match + for i := 0; i < len(remainingSessions); { + sMp, err := remainingSessions[i].EventStart.AsMapStringString() + if err != nil { + return nil, 0, err + } + if _, hasRunID := sMp[utils.MEDI_RUNID]; !hasRunID { + sMp[utils.MEDI_RUNID] = utils.META_DEFAULT + } + matchingAll := true + for fltrFldName, fltrFldVal := range fltrs { + if fldVal, hasIt := sMp[fltrFldName]; !hasIt || fltrFldVal != fldVal { // No Match + matchingAll = false + break + } + } + if !matchingAll { + remainingSessions = append(remainingSessions[:i], remainingSessions[i+1:]...) + continue // if we have stripped, don't increase index so we can check next element by next run + } + i++ + } + } + if count { + return nil, len(remainingSessions), nil + } + for _, s := range remainingSessions { + aSessions = append(aSessions, s.AsActiveSession(smg.Timezone)) // Expensive for large number of sessions + } + return +} + // Methods to apply on sessions, mostly exported through RPC/Bi-RPC // MaxUsage calculates maximum usage allowed for given gevent @@ -681,7 +725,7 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient } return } - aSessions := smg.getASession(cgrID) + aSessions := smg.getSessions(cgrID, false) if len(aSessions) == 0 { if aSessions = smg.passiveToActive(cgrID); len(aSessions) == 0 { utils.Logger.Err(fmt.Sprintf(" SessionUpdate with no active sessions for event: <%s>", cgrID)) @@ -689,7 +733,7 @@ func (smg *SMGeneric) UpdateSession(gev SMGenericEvent, clnt rpcclient.RpcClient return } } - for _, s := range aSessions { + for _, s := range aSessions[cgrID] { var maxDur time.Duration if maxDur, err = s.debit(maxUsage, lastUsed); err != nil { return @@ -748,7 +792,7 @@ func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcCli var hasActiveSession bool for _, sessionID := range sessionIDs { defer smg.replicateSessions(sessionID) - aSessions := smg.getASession(sessionID) + aSessions := smg.getSessions(sessionID, false) if len(aSessions) == 0 { if aSessions = smg.passiveToActive(cgrID); len(aSessions) == 0 { utils.Logger.Err(fmt.Sprintf(" SessionTerminate with no active sessions for cgrID: <%s>", cgrID)) @@ -756,7 +800,7 @@ func (smg *SMGeneric) TerminateSession(gev SMGenericEvent, clnt rpcclient.RpcCli } } hasActiveSession = true - s := aSessions[0] + s := aSessions[sessionID][0] if errUsage != nil { usage = s.TotalUsage - s.LastUsage + lastUsed } @@ -895,69 +939,9 @@ func (smg *SMGeneric) Connect() error { return nil } -// Used by APIer to retrieve sessions -func (smg *SMGeneric) getASessions() map[string][]*SMGSession { - smg.aSessionsMux.RLock() - defer smg.aSessionsMux.RUnlock() - return smg.activeSessions -} - -func (smg *SMGeneric) ActiveSessions(fltrs map[string]string, count bool) (aSessions []*ActiveSession, counter int, err error) { - aSessions = make([]*ActiveSession, 0) // Make sure we return at least empty list and not nil - // Check first based on indexes so we can downsize the list of matching sessions - matchingSessionIDs, checkedFilters := smg.getSessionIDsMatchingIndexes(fltrs, false) - if len(matchingSessionIDs) == 0 && len(checkedFilters) != 0 { - return - } - for fltrFldName := range fltrs { - if _, alreadyChecked := checkedFilters[fltrFldName]; alreadyChecked && fltrFldName != utils.MEDI_RUNID { // Optimize further checks, RunID should stay since it can create bugs - delete(fltrs, fltrFldName) - } - } - var remainingSessions []*SMGSession // Survived index matching - for sUUID, sGrp := range smg.getASessions() { - if _, hasUUID := matchingSessionIDs[sUUID]; !hasUUID && len(checkedFilters) != 0 { - continue - } - for _, s := range sGrp { - remainingSessions = append(remainingSessions, s) - } - } - if len(fltrs) != 0 { // Still have some filters to match - for i := 0; i < len(remainingSessions); { - sMp, err := remainingSessions[i].EventStart.AsMapStringString() - if err != nil { - return nil, 0, err - } - if _, hasRunID := sMp[utils.MEDI_RUNID]; !hasRunID { - sMp[utils.MEDI_RUNID] = utils.META_DEFAULT - } - matchingAll := true - for fltrFldName, fltrFldVal := range fltrs { - if fldVal, hasIt := sMp[fltrFldName]; !hasIt || fltrFldVal != fldVal { // No Match - matchingAll = false - break - } - } - if !matchingAll { - remainingSessions = append(remainingSessions[:i], remainingSessions[i+1:]...) - continue // if we have stripped, don't increase index so we can check next element by next run - } - i++ - } - } - if count { - return nil, len(remainingSessions), nil - } - for _, s := range remainingSessions { - aSessions = append(aSessions, s.AsActiveSession(smg.Timezone)) // Expensive for large number of sessions - } - return -} - // System shutdown func (smg *SMGeneric) Shutdown() error { - for ssId := range smg.getASessions() { // Force sessions shutdown + for ssId := range smg.getSessions("", false) { // Force sessions shutdown smg.sessionEnd(ssId, time.Duration(smg.cgrCfg.MaxCallDuration)) } return nil @@ -1036,7 +1020,8 @@ func (smg *SMGeneric) BiRPCV1InitiateSession(clnt rpcclient.RpcClientConnection, // Interim updates, returns remaining duration from the RALs func (smg *SMGeneric) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *float64) (err error) { - if minMaxUsage, err := smg.UpdateSession(ev, clnt); err != nil { + var minMaxUsage time.Duration + if minMaxUsage, err = smg.UpdateSession(ev, clnt); err != nil { if err != rpcclient.ErrSessionNotFound { err = utils.NewErrServerError(err) } @@ -1077,8 +1062,13 @@ func (smg *SMGeneric) BiRPCV1ProcessCDR(clnt rpcclient.RpcClientConnection, ev S return nil } -func (smg *SMGeneric) BiRPCV1ActiveSessions(clnt rpcclient.RpcClientConnection, attrs utils.AttrSMGGetActiveSessions, reply *[]*ActiveSession) error { - aSessions, _, err := smg.ActiveSessions(attrs.AsMapStringString(), false) +func (smg *SMGeneric) BiRPCV1ActiveSessions(clnt rpcclient.RpcClientConnection, fltr map[string]string, reply *[]*ActiveSession) error { + for fldName, fldVal := range fltr { + if fldVal == "" { + fltr[fldName] = utils.META_NONE + } + } + aSessions, _, err := smg.asActiveSessions(fltr, false, false) if err != nil { return utils.NewErrServerError(err) } else if len(aSessions) == 0 { @@ -1088,8 +1078,43 @@ func (smg *SMGeneric) BiRPCV1ActiveSessions(clnt rpcclient.RpcClientConnection, return nil } -func (smg *SMGeneric) BiRPCV1ActiveSessionsCount(attrs utils.AttrSMGGetActiveSessions, reply *int) error { - if _, count, err := smg.ActiveSessions(attrs.AsMapStringString(), true); err != nil { +func (smg *SMGeneric) BiRPCV1ActiveSessionsCount(clnt rpcclient.RpcClientConnection, fltr map[string]string, reply *int) error { + for fldName, fldVal := range fltr { + if fldVal == "" { + fltr[fldName] = utils.META_NONE + } + } + if _, count, err := smg.asActiveSessions(fltr, true, false); err != nil { + return err + } else { + *reply = count + } + return nil +} + +func (smg *SMGeneric) BiRPCV1PassiveSessions(clnt rpcclient.RpcClientConnection, fltr map[string]string, reply *[]*ActiveSession) error { + for fldName, fldVal := range fltr { + if fldVal == "" { + fltr[fldName] = utils.META_NONE + } + } + aSessions, _, err := smg.asActiveSessions(fltr, false, true) + if err != nil { + return utils.NewErrServerError(err) + } else if len(aSessions) == 0 { + return utils.ErrNotFound + } + *reply = aSessions + return nil +} + +func (smg *SMGeneric) BiRPCV1PassiveSessionsCount(clnt rpcclient.RpcClientConnection, fltr map[string]string, reply *int) error { + for fldName, fldVal := range fltr { + if fldVal == "" { + fltr[fldName] = utils.META_NONE + } + } + if _, count, err := smg.asActiveSessions(fltr, true, true); err != nil { return err } else { *reply = count @@ -1103,7 +1128,7 @@ type ArgsSetPassiveSessions struct { } // BiRPCV1SetPassiveSession used for replicating SMGSessions -func (smg *SMGeneric) BiRPCV1SetPassiveSessions(args ArgsSetPassiveSessions, reply *string) (err error) { +func (smg *SMGeneric) BiRPCV1SetPassiveSessions(clnt rpcclient.RpcClientConnection, args ArgsSetPassiveSessions, reply *string) (err error) { if len(args.Sessions) == 0 { err = smg.removePassiveSessions(args.CGRID) } else { @@ -1114,20 +1139,3 @@ func (smg *SMGeneric) BiRPCV1SetPassiveSessions(args ArgsSetPassiveSessions, rep } return } - -type ArgsGetPassiveSessions struct { - CGRID string - RunID string -} - -func (smg *SMGeneric) BiRPCV1GetPassiveSessions(attrs ArgsGetPassiveSessions, pSessions *map[string][]*SMGSession) error { - if attrs.RunID != "" && attrs.CGRID == "" { - return utils.ErrMandatoryIeMissing - } - pSS := smg.getPassiveSessions(attrs.CGRID, attrs.RunID) - if len(pSS) == 0 { - return utils.ErrNotFound - } - *pSessions = pSS - return nil -} diff --git a/sessionmanager/smgeneric_test.go b/sessionmanager/smgeneric_test.go index 84f24c53b..cccf2a9d7 100644 --- a/sessionmanager/smgeneric_test.go +++ b/sessionmanager/smgeneric_test.go @@ -31,7 +31,6 @@ func init() { smgCfg, _ = config.NewDefaultCGRConfig() smgCfg.SmGenericConfig.SessionIndexes = utils.StringMap{"Tenant": true, "Account": true, "Extra3": true, "Extra4": true} - } func TestSMGSessionIndexing(t *testing.T) { @@ -436,27 +435,27 @@ func TestSMGActiveSessions(t *testing.T) { "Extra3": "extra3", } smg.recordASession(&SMGSession{CGRID: smGev2.GetCGRID(utils.META_DEFAULT), RunID: utils.META_DEFAULT, EventStart: smGev2}) - if aSessions, _, err := smg.ActiveSessions(nil, false); err != nil { + if aSessions, _, err := smg.asActiveSessions(nil, false, false); err != nil { t.Error(err) } else if len(aSessions) != 2 { t.Errorf("Received sessions: %+v", aSessions) } - if aSessions, _, err := smg.ActiveSessions(map[string]string{"Tenant": "itsyscom.com"}, false); err != nil { + if aSessions, _, err := smg.asActiveSessions(map[string]string{"Tenant": "itsyscom.com"}, false, false); err != nil { t.Error(err) } else if len(aSessions) != 1 { t.Errorf("Received sessions: %+v", aSessions) } - if aSessions, _, err := smg.ActiveSessions(map[string]string{utils.TOR: "*voice"}, false); err != nil { + if aSessions, _, err := smg.asActiveSessions(map[string]string{utils.TOR: "*voice"}, false, false); err != nil { t.Error(err) } else if len(aSessions) != 2 { t.Errorf("Received sessions: %+v", aSessions) } - if aSessions, _, err := smg.ActiveSessions(map[string]string{"Extra3": utils.MetaEmpty}, false); err != nil { + if aSessions, _, err := smg.asActiveSessions(map[string]string{"Extra3": utils.MetaEmpty}, false, false); err != nil { t.Error(err) } else if len(aSessions) != 1 { t.Errorf("Received sessions: %+v", aSessions) } - if aSessions, _, err := smg.ActiveSessions(map[string]string{utils.SUPPLIER: "supplier2"}, false); err != nil { + if aSessions, _, err := smg.asActiveSessions(map[string]string{utils.SUPPLIER: "supplier2"}, false, false); err != nil { t.Error(err) } else if len(aSessions) != 1 { t.Errorf("Received sessions: %+v", aSessions) @@ -465,7 +464,7 @@ func TestSMGActiveSessions(t *testing.T) { func TestGetPassiveSessions(t *testing.T) { smg := NewSMGeneric(smgCfg, nil, nil, nil, "UTC") - if pSS := smg.getPassiveSessions("", ""); len(pSS) != 0 { + if pSS := smg.getSessions("", true); len(pSS) != 0 { t.Errorf("PassiveSessions: %+v", pSS) } smGev1 := SMGenericEvent{ @@ -518,21 +517,18 @@ func TestGetPassiveSessions(t *testing.T) { "Extra2": 5, "Extra3": "", } - if pSS := smg.getPassiveSessions("", ""); len(pSS) != 1 { + if pSS := smg.getSessions("", true); len(pSS) != 1 { t.Errorf("PassiveSessions: %+v", pSS) } smgSession21 := &SMGSession{CGRID: smGev2.GetCGRID(utils.META_DEFAULT), EventStart: smGev2, RunID: utils.META_DEFAULT} smg.passiveSessions[smgSession21.CGRID] = []*SMGSession{smgSession21} - if pSS := smg.getPassiveSessions("", ""); len(pSS) != 2 { + if pSS := smg.getSessions("", true); len(pSS) != 2 { t.Errorf("PassiveSessions: %+v", pSS) } - if pSS := smg.getPassiveSessions(smgSession11.CGRID, ""); len(pSS) != 1 || len(pSS[smgSession11.CGRID]) != 2 { + if pSS := smg.getSessions(smgSession11.CGRID, true); len(pSS) != 1 || len(pSS[smgSession11.CGRID]) != 2 { t.Errorf("PassiveSessions: %+v", pSS) } - if pSS := smg.getPassiveSessions(smgSession11.CGRID, smgSession12.RunID); len(pSS) != 1 || len(pSS[smgSession11.CGRID]) != 1 { - t.Errorf("PassiveSessions: %+v", pSS) - } - if pSS := smg.getPassiveSessions("aabbcc", ""); len(pSS) != 0 { + if pSS := smg.getSessions("aabbcc", true); len(pSS) != 0 { t.Errorf("PassiveSessions: %+v", pSS) } } diff --git a/sessionmanager/smgreplc_it_test.go b/sessionmanager/smgreplc_it_test.go index a469c8391..5d0d86670 100644 --- a/sessionmanager/smgreplc_it_test.go +++ b/sessionmanager/smgreplc_it_test.go @@ -105,8 +105,8 @@ func TestSMGRplcInitiate(t *testing.T) { if !*testIntegration { return } - var pSessions map[string][]*SMGSession - if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + var pSessions []*ActiveSession + if err := smgRplcSlvRPC.Call("SMGenericV1.PassiveSessions", nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } smgEv := SMGenericEvent{ @@ -132,23 +132,20 @@ func TestSMGRplcInitiate(t *testing.T) { t.Error("Bad max usage: ", maxUsage) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated - cgrID := smgEv.GetCGRID(utils.META_DEFAULT) var aSessions []*ActiveSession - if err := smgRplcMstrRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{OriginID: utils.StringPointer("123451")}, &aSessions); err != nil { + if err := smgRplcMstrRPC.Call("SMGenericV1.ActiveSessions", map[string]string{utils.ACCID: "123451"}, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 { t.Errorf("Unexpected number of sessions received: %+v", aSessions) } else if aSessions[0].Usage != time.Duration(90)*time.Second { t.Errorf("Received usage: %v", aSessions[0].Usage) } - if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err != nil { + if err := smgRplcSlvRPC.Call("SMGenericV1.PassiveSessions", map[string]string{utils.ACCID: "123451"}, &pSessions); err != nil { t.Error(err) } else if len(pSessions) != 1 { t.Errorf("PassiveSessions: %+v", pSessions) - } else if _, hasOriginID := pSessions[cgrID]; !hasOriginID { - t.Errorf("PassiveSessions: %+v", pSessions) - } else if pSessions[cgrID][0].TotalUsage != time.Duration(90*time.Second) { - t.Errorf("PassiveSession: %+v", pSessions[cgrID][0]) + } else if pSessions[0].Usage != time.Duration(90*time.Second) { + t.Errorf("PassiveSession: %+v", aSessions[0]) } } @@ -170,32 +167,32 @@ func TestSMGRplcUpdate(t *testing.T) { } time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated var aSessions []*ActiveSession - if err := smgRplcSlvRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{OriginID: utils.StringPointer("123451")}, &aSessions); err != nil { + if err := smgRplcSlvRPC.Call("SMGenericV1.ActiveSessions", map[string]string{utils.ACCID: "123451"}, &aSessions); err != nil { t.Error(err) } else if len(aSessions) != 1 { t.Errorf("Unexpected number of sessions received: %+v", aSessions) } else if aSessions[0].Usage != time.Duration(150)*time.Second { t.Errorf("Received usage: %v", aSessions[0].Usage) } - var pSessions map[string][]*SMGSession + var pSessions []*ActiveSession // Make sure we don't have passive session on active host - if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := smgRplcSlvRPC.Call("SMGenericV1.PassiveSessions", nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } // Master should not longer have activeSession - if err := smgRplcMstrRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{OriginID: utils.StringPointer("123451")}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := smgRplcMstrRPC.Call("SMGenericV1.ActiveSessions", map[string]string{utils.ACCID: "123451"}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } cgrID := smgEv.GetCGRID(utils.META_DEFAULT) // Make sure session was replicated - if err := smgRplcMstrRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err != nil { + if err := smgRplcMstrRPC.Call("SMGenericV1.PassiveSessions", nil, &pSessions); err != nil { t.Error(err) } else if len(pSessions) != 1 { t.Errorf("PassiveSessions: %+v", pSessions) - } else if _, hasOriginID := pSessions[cgrID]; !hasOriginID { - t.Errorf("PassiveSessions: %+v", pSessions) - } else if pSessions[cgrID][0].TotalUsage != time.Duration(150*time.Second) { - t.Errorf("PassiveSession: %+v", pSessions[cgrID][0]) + } else if pSessions[0].CGRID != cgrID { + t.Errorf("PassiveSession: %+v", pSessions[0]) + } else if pSessions[0].Usage != time.Duration(150*time.Second) { + t.Errorf("PassiveSession: %+v", pSessions[0]) } } @@ -215,17 +212,17 @@ func TestSMGRplcTerminate(t *testing.T) { } time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated var aSessions []*ActiveSession - if err := smgRplcMstrRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{OriginID: utils.StringPointer("123451")}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := smgRplcMstrRPC.Call("SMGenericV1.ActiveSessions", map[string]string{utils.ACCID: "123451"}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err, aSessions) } - if err := smgRplcSlvRPC.Call("SMGenericV1.ActiveSessions", utils.AttrSMGGetActiveSessions{OriginID: utils.StringPointer("123451")}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := smgRplcSlvRPC.Call("SMGenericV1.ActiveSessions", map[string]string{utils.ACCID: "123451"}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err, aSessions) } var pSessions map[string][]*SMGSession - if err := smgRplcMstrRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := smgRplcMstrRPC.Call("SMGenericV1.PassiveSessions", nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", ArgsGetPassiveSessions{}, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := smgRplcSlvRPC.Call("SMGenericV1.PassiveSessions", nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 88ae912e2..b833b3ee3 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1140,86 +1140,6 @@ type AliasValue struct { Weight float64 } -// AttrSMGGetActiveSessions will filter returned sessions by SMGenericV1 -type AttrSMGGetActiveSessions struct { - ToR *string - OriginID *string - RunID *string - RequestType *string - Tenant *string - Category *string - Account *string - Subject *string - Destination *string - Supplier *string -} - -// Used for easier filtering, keep struct format to mark filter fields clearly -func (attrs *AttrSMGGetActiveSessions) AsMapStringString() map[string]string { - out := make(map[string]string) - if attrs.ToR != nil { - if *attrs.ToR == "" { - *attrs.ToR = MetaEmpty - } - out[TOR] = *attrs.ToR - } - if attrs.OriginID != nil { - if *attrs.OriginID == "" { - *attrs.OriginID = MetaEmpty - } - out[ACCID] = *attrs.OriginID - } - if attrs.RunID != nil { - if *attrs.RunID == "" { - *attrs.RunID = MetaEmpty - } - out[MEDI_RUNID] = *attrs.RunID - } - if attrs.RequestType != nil { - if *attrs.RequestType == "" { - *attrs.RequestType = MetaEmpty - } - out[REQTYPE] = *attrs.RequestType - } - if attrs.Tenant != nil { - if *attrs.Tenant == "" { - *attrs.Tenant = MetaEmpty - } - out[TENANT] = *attrs.Tenant - } - if attrs.Category != nil { - if *attrs.Category == "" { - *attrs.Category = MetaEmpty - } - out[CATEGORY] = *attrs.Category - } - if attrs.Account != nil { - if *attrs.Account == "" { - *attrs.Account = MetaEmpty - } - out[ACCOUNT] = *attrs.Account - } - if attrs.Subject != nil { - if *attrs.Subject == "" { - *attrs.Subject = MetaEmpty - } - out[SUBJECT] = MetaEmpty - } - if attrs.Destination != nil { - if *attrs.Destination == "" { - *attrs.Destination = MetaEmpty - } - out[DESTINATION] = *attrs.Destination - } - if attrs.Supplier != nil { - if *attrs.Supplier == "" { - *attrs.Supplier = MetaEmpty - } - out[SUPPLIER] = *attrs.Supplier - } - return out -} - type AttrRateCDRs struct { RPCCDRsFilter StoreCDRs *bool diff --git a/utils/apitpdata_test.go b/utils/apitpdata_test.go index 608a896d1..da1b5645a 100644 --- a/utils/apitpdata_test.go +++ b/utils/apitpdata_test.go @@ -30,33 +30,3 @@ func TestNewDTCSFromRPKey(t *testing.T) { t.Error("Received: ", dtcs) } } - -func TestAPIAttrSMGGetActiveSessionsAsMapStr(t *testing.T) { - attrs := &AttrSMGGetActiveSessions{ - ToR: StringPointer(""), - OriginID: StringPointer(""), - RunID: StringPointer(""), - RequestType: StringPointer(""), - Tenant: StringPointer(""), - Category: StringPointer(""), - Account: StringPointer(""), - Subject: StringPointer(""), - Destination: StringPointer(""), - Supplier: StringPointer(""), - } - expectMP := map[string]string{ - TOR: MetaEmpty, - ACCID: MetaEmpty, - MEDI_RUNID: MetaEmpty, - REQTYPE: MetaEmpty, - TENANT: MetaEmpty, - CATEGORY: MetaEmpty, - ACCOUNT: MetaEmpty, - SUBJECT: MetaEmpty, - DESTINATION: MetaEmpty, - SUPPLIER: MetaEmpty, - } - if mp := attrs.AsMapStringString(); !reflect.DeepEqual(expectMP, mp) { - t.Errorf("Expecting: %+v, received: %+v", expectMP, mp) - } -}