Updated SessionS.asActiveSessions to accept filters

This commit is contained in:
Tripon Alexandru-Ionut
2019-05-08 17:29:20 +03:00
committed by Dan Christian Bogos
parent 024b1f765e
commit 002b9ac933
22 changed files with 608 additions and 665 deletions

View File

@@ -457,27 +457,27 @@ func (dS *DispatcherSessionSv1) UpdateSession(args *sessions.V1UpdateSessionArgs
return dS.dS.SessionSv1UpdateSession(args, reply)
}
func (dS *DispatcherSessionSv1) GetActiveSessions(args *dispatchers.FilterSessionWithApiKey,
func (dS *DispatcherSessionSv1) GetActiveSessions(args *utils.SessionFilter,
reply *[]*sessions.ActiveSession) (err error) {
return dS.dS.SessionSv1GetActiveSessions(args, reply)
}
func (dS *DispatcherSessionSv1) GetActiveSessionsCount(args *dispatchers.FilterSessionWithApiKey,
func (dS *DispatcherSessionSv1) GetActiveSessionsCount(args *utils.SessionFilter,
reply *int) (err error) {
return dS.dS.SessionSv1GetActiveSessionsCount(args, reply)
}
func (dS *DispatcherSessionSv1) ForceDisconnect(args *dispatchers.FilterSessionWithApiKey,
func (dS *DispatcherSessionSv1) ForceDisconnect(args *utils.SessionFilter,
reply *string) (err error) {
return dS.dS.SessionSv1ForceDisconnect(args, reply)
}
func (dS *DispatcherSessionSv1) GetPassiveSessions(args *dispatchers.FilterSessionWithApiKey,
func (dS *DispatcherSessionSv1) GetPassiveSessions(args *utils.SessionFilter,
reply *[]*sessions.ActiveSession) (err error) {
return dS.dS.SessionSv1GetPassiveSessions(args, reply)
}
func (dS *DispatcherSessionSv1) GetPassiveSessionsCount(args *dispatchers.FilterSessionWithApiKey,
func (dS *DispatcherSessionSv1) GetPassiveSessionsCount(args *utils.SessionFilter,
reply *int) (err error) {
return dS.dS.SessionSv1GetPassiveSessionsCount(args, reply)
}

View File

@@ -81,11 +81,11 @@ type SessionSv1Interface interface {
TerminateSession(args *sessions.V1TerminateSessionArgs, rply *string) error
ProcessCDR(cgrEv *utils.CGREventWithArgDispatcher, rply *string) error
ProcessEvent(args *sessions.V1ProcessEventArgs, rply *sessions.V1ProcessEventReply) error
GetActiveSessions(args *dispatchers.FilterSessionWithApiKey, rply *[]*sessions.ActiveSession) error
GetActiveSessionsCount(args *dispatchers.FilterSessionWithApiKey, rply *int) error
ForceDisconnect(args *dispatchers.FilterSessionWithApiKey, rply *string) error
GetPassiveSessions(args *dispatchers.FilterSessionWithApiKey, rply *[]*sessions.ActiveSession) error
GetPassiveSessionsCount(args *dispatchers.FilterSessionWithApiKey, rply *int) error
GetActiveSessions(args *utils.SessionFilter, rply *[]*sessions.ActiveSession) error
GetActiveSessionsCount(args *utils.SessionFilter, rply *int) error
ForceDisconnect(args *utils.SessionFilter, rply *string) error
GetPassiveSessions(args *utils.SessionFilter, rply *[]*sessions.ActiveSession) error
GetPassiveSessionsCount(args *utils.SessionFilter, rply *int) error
Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error
ReplicateSessions(args dispatchers.ArgsReplicateSessionsWithApiKey, rply *string) error
SetPassiveSession(args *sessions.Session, reply *string) error

View File

@@ -295,6 +295,10 @@ func testPrecacheGetCacheStatsAfterRestart(t *testing.T) {
Items: 0,
Groups: 0,
},
utils.CacheSessionFilterIndexes: {
Items: 0,
Groups: 0,
},
}
if err := precacheRPC.Call(utils.CacheSv1GetCacheStats, args, &reply); err != nil {
t.Error(err.Error())

View File

@@ -77,29 +77,29 @@ func (ssv1 *SessionSv1) ProcessEvent(args *sessions.V1ProcessEventArgs,
return ssv1.Ss.BiRPCv1ProcessEvent(nil, args, rply)
}
func (ssv1 *SessionSv1) GetActiveSessions(args *dispatchers.FilterSessionWithApiKey,
func (ssv1 *SessionSv1) GetActiveSessions(args *utils.SessionFilter,
rply *[]*sessions.ActiveSession) error {
return ssv1.Ss.BiRPCv1GetActiveSessions(nil, &args.FilterWithPaginator, rply)
return ssv1.Ss.BiRPCv1GetActiveSessions(nil, args, rply)
}
func (ssv1 *SessionSv1) GetActiveSessionsCount(args *dispatchers.FilterSessionWithApiKey,
func (ssv1 *SessionSv1) GetActiveSessionsCount(args *utils.SessionFilter,
rply *int) error {
return ssv1.Ss.BiRPCv1GetActiveSessionsCount(nil, args.Filters, rply)
return ssv1.Ss.BiRPCv1GetActiveSessionsCount(nil, args, rply)
}
func (ssv1 *SessionSv1) ForceDisconnect(args *dispatchers.FilterSessionWithApiKey,
func (ssv1 *SessionSv1) ForceDisconnect(args *utils.SessionFilter,
rply *string) error {
return ssv1.Ss.BiRPCv1ForceDisconnect(nil, args.Filters, rply)
return ssv1.Ss.BiRPCv1ForceDisconnect(nil, args, rply)
}
func (ssv1 *SessionSv1) GetPassiveSessions(args *dispatchers.FilterSessionWithApiKey,
func (ssv1 *SessionSv1) GetPassiveSessions(args *utils.SessionFilter,
rply *[]*sessions.ActiveSession) error {
return ssv1.Ss.BiRPCv1GetPassiveSessions(nil, &args.FilterWithPaginator, rply)
return ssv1.Ss.BiRPCv1GetPassiveSessions(nil, args, rply)
}
func (ssv1 *SessionSv1) GetPassiveSessionsCount(args *dispatchers.FilterSessionWithApiKey,
func (ssv1 *SessionSv1) GetPassiveSessionsCount(args *utils.SessionFilter,
rply *int) error {
return ssv1.Ss.BiRPCv1GetPassiveSessionsCount(nil, args.Filters, rply)
return ssv1.Ss.BiRPCv1GetPassiveSessionsCount(nil, args, rply)
}
func (ssv1 *SessionSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error {

View File

@@ -95,27 +95,27 @@ func (ssv1 *SessionSv1) BiRPCv1ProcessEvent(clnt *rpc2.Client, args *sessions.V1
return ssv1.Ss.BiRPCv1ProcessEvent(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1GetActiveSessions(clnt *rpc2.Client, args *sessions.FilterWithPaginator,
func (ssv1 *SessionSv1) BiRPCv1GetActiveSessions(clnt *rpc2.Client, args *utils.SessionFilter,
rply *[]*sessions.ActiveSession) error {
return ssv1.Ss.BiRPCv1GetActiveSessions(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1GetActiveSessionsCount(clnt *rpc2.Client, args map[string]string,
func (ssv1 *SessionSv1) BiRPCv1GetActiveSessionsCount(clnt *rpc2.Client, args *utils.SessionFilter,
rply *int) error {
return ssv1.Ss.BiRPCv1GetActiveSessionsCount(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessions(clnt *rpc2.Client, args *sessions.FilterWithPaginator,
func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessions(clnt *rpc2.Client, args *utils.SessionFilter,
rply *[]*sessions.ActiveSession) error {
return ssv1.Ss.BiRPCv1GetPassiveSessions(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessionsCount(clnt *rpc2.Client, args map[string]string,
func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessionsCount(clnt *rpc2.Client, args *utils.SessionFilter,
rply *int) error {
return ssv1.Ss.BiRPCv1GetPassiveSessionsCount(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCv1ForceDisconnect(clnt *rpc2.Client, args map[string]string,
func (ssv1 *SessionSv1) BiRPCv1ForceDisconnect(clnt *rpc2.Client, args *utils.SessionFilter,
rply *string) error {
return ssv1.Ss.BiRPCv1ForceDisconnect(clnt, args, rply)
}

View File

@@ -314,7 +314,7 @@ func TestSSv1ItInitiateSession(t *testing.T) {
utils.ToJSON(eAttrs), utils.ToJSON(rply.Attributes))
}
aSessions := make([]*sessions.ActiveSession, 0)
if err := sSv1BiRpc.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err != nil {
if err := sSv1BiRpc.Call(utils.SessionSv1GetActiveSessions, &utils.SessionFilter{}, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 2 {
t.Errorf("wrong active sessions: %s \n , and len(aSessions) %+v", utils.ToJSON(aSessions), len(aSessions))
@@ -347,7 +347,7 @@ func TestSSv1ItInitiateSessionWithDigest(t *testing.T) {
var rply sessions.V1InitReplyWithDigest
if err := sSv1BiRpc.Call(utils.SessionSv1InitiateSessionWithDigest,
args, &rply); err != nil {
t.Error(err)
t.Fatal(err)
}
if *rply.MaxUsage != initUsage.Seconds() {
t.Errorf("Unexpected MaxUsage: %v", rply.MaxUsage)

View File

@@ -160,7 +160,8 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne
func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, internalThresholdSChan,
internalStatSChan, internalSupplierSChan, internalAttrSChan, internalCDRSChan, internalChargerSChan,
internalDispatcherSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
internalDispatcherSChan chan rpcclient.RpcClientConnection, server *utils.Server,
dm *engine.DataManager, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS Session service.")
var err error
var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, cdrsConn, chargerSConn, dispatcherSConn rpcclient.RpcClientConnection
@@ -308,7 +309,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
}
sm := sessions.NewSessionS(cfg, ralsConns, resSConns, threshSConns,
statSConns, suplSConns, attrSConns, cdrsConn, chargerSConn,
sReplConns, cfg.GeneralCfg().DefaultTimezone)
sReplConns, dm, cfg.GeneralCfg().DefaultTimezone)
//start sync session in a separate gorutine
go func() {
if err = sm.ListenAndServe(exitChan); err != nil {
@@ -1580,7 +1581,8 @@ func main() {
var dm *engine.DataManager
if cfg.RalsCfg().RALsEnabled || cfg.SchedulerCfg().Enabled || cfg.ChargerSCfg().Enabled ||
cfg.AttributeSCfg().Enabled || cfg.ResourceSCfg().Enabled || cfg.StatSCfg().Enabled ||
cfg.ThresholdSCfg().Enabled || cfg.SupplierSCfg().Enabled || cfg.DispatcherSCfg().Enabled { // Some services can run without db, ie: SessionS or CDRC
cfg.ThresholdSCfg().Enabled || cfg.SupplierSCfg().Enabled || cfg.DispatcherSCfg().Enabled ||
cfg.SessionSCfg().Enabled { // Some services can run without db, ie: CDRC
dm, err = engine.ConfigureDataStorage(cfg.DataDbCfg().DataDbType,
cfg.DataDbCfg().DataDbHost, cfg.DataDbCfg().DataDbPort,
cfg.DataDbCfg().DataDbName, cfg.DataDbCfg().DataDbUser,
@@ -1723,7 +1725,7 @@ func main() {
go startSessionS(internalSMGChan, internalRaterChan, internalRsChan,
internalThresholdSChan, internalStatSChan, internalSupplierSChan,
internalAttributeSChan, internalCdrSChan, internalChargerSChan,
internalDispatcherSChan, server, exitChan)
internalDispatcherSChan, server, dm, exitChan)
}
// Start FreeSWITCHAgent
if cfg.FsAgentCfg().Enabled {

View File

@@ -148,6 +148,7 @@ const CGRATES_CFG_JSON = `
"attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control attribute filter indexes caching
"charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control charger filter indexes caching
"dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control dispatcher filter indexes caching
"session_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false,"precache": false}, // control session filter indexes caching
"dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false}, // control dispatcher routes caching
"diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false}, // diameter messages caching
"rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false}, // RPC responses caching

View File

@@ -163,6 +163,9 @@ func TestCacheJsonCfg(t *testing.T) {
utils.CacheLoadIDs: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
utils.CacheSessionFilterIndexes: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
}
if gCfg, err := dfCgrJsonCfg.CacheJsonCfg(); err != nil {

View File

@@ -707,6 +707,8 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) {
TTL: time.Duration(10 * time.Second), StaticTTL: false},
utils.CacheLoadIDs: &CacheParamCfg{Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheSessionFilterIndexes: &CacheParamCfg{Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
}
if !reflect.DeepEqual(eCacheCfg, cgrCfg.CacheCfg()) {

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package console
import (
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
)
@@ -51,14 +50,14 @@ func (self *CmdActiveSessions) RpcMethod() string {
func (self *CmdActiveSessions) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &dispatchers.FilterSessionWithApiKey{ArgDispatcher: new(utils.ArgDispatcher)}
self.rpcParams = &utils.SessionFilter{ArgDispatcher: new(utils.ArgDispatcher)}
}
return self.rpcParams
}
func (self *CmdActiveSessions) PostprocessRpcParams() error {
param := self.rpcParams.(*dispatchers.FilterSessionWithApiKey)
param := self.rpcParams.(*utils.SessionFilter)
self.rpcParams = param
return nil
}

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package console
import (
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
)
@@ -51,13 +50,13 @@ func (self *CmdPassiveSessions) RpcMethod() string {
func (self *CmdPassiveSessions) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &dispatchers.FilterSessionWithApiKey{ArgDispatcher: new(utils.ArgDispatcher)}
self.rpcParams = &utils.SessionFilter{ArgDispatcher: new(utils.ArgDispatcher)}
}
return self.rpcParams
}
func (self *CmdPassiveSessions) PostprocessRpcParams() error {
param := self.rpcParams.(*dispatchers.FilterSessionWithApiKey)
param := self.rpcParams.(*utils.SessionFilter)
self.rpcParams = param
return nil
}

View File

@@ -184,84 +184,79 @@ func (dS *DispatcherService) SessionSv1ProcessEvent(args *sessions.V1ProcessEven
utils.SessionSv1ProcessEvent, args, reply)
}
func (dS *DispatcherService) SessionSv1GetActiveSessions(args *FilterSessionWithApiKey,
func (dS *DispatcherService) SessionSv1GetActiveSessions(args *utils.SessionFilter,
reply *[]*sessions.ActiveSession) (err error) {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing("ArgDispatcher")
}
if dS.attrS != nil {
if err = dS.authorize(utils.SessionSv1GetActiveSessions,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
args.Tenant, args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaSessionS, args.RouteID,
utils.SessionSv1GetActiveSessions, args.FilterWithPaginator, reply)
return dS.Dispatch(&utils.CGREvent{Tenant: args.Tenant}, utils.MetaSessionS, args.RouteID,
utils.SessionSv1GetActiveSessions, args, reply)
}
func (dS *DispatcherService) SessionSv1GetActiveSessionsCount(args *FilterSessionWithApiKey,
func (dS *DispatcherService) SessionSv1GetActiveSessionsCount(args *utils.SessionFilter,
reply *int) (err error) {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing("ArgDispatcher")
}
if dS.attrS != nil {
if err = dS.authorize(utils.SessionSv1GetActiveSessionsCount,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
args.Tenant, args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaSessionS, args.RouteID,
utils.SessionSv1GetActiveSessionsCount, args.Filters, reply)
return dS.Dispatch(&utils.CGREvent{Tenant: args.Tenant}, utils.MetaSessionS, args.RouteID,
utils.SessionSv1GetActiveSessionsCount, args, reply)
}
func (dS *DispatcherService) SessionSv1ForceDisconnect(args *FilterSessionWithApiKey,
func (dS *DispatcherService) SessionSv1ForceDisconnect(args *utils.SessionFilter,
reply *string) (err error) {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing("ArgDispatcher")
}
if dS.attrS != nil {
if err = dS.authorize(utils.SessionSv1ForceDisconnect,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
args.Tenant, args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaSessionS, args.RouteID,
utils.SessionSv1ForceDisconnect, args.Filters, reply)
return dS.Dispatch(&utils.CGREvent{Tenant: args.Tenant}, utils.MetaSessionS, args.RouteID,
utils.SessionSv1ForceDisconnect, args, reply)
}
func (dS *DispatcherService) SessionSv1GetPassiveSessions(args *FilterSessionWithApiKey,
func (dS *DispatcherService) SessionSv1GetPassiveSessions(args *utils.SessionFilter,
reply *[]*sessions.ActiveSession) (err error) {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing("ArgDispatcher")
}
if dS.attrS != nil {
if err = dS.authorize(utils.SessionSv1GetPassiveSessions,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
args.Tenant, args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaSessionS, args.RouteID,
utils.SessionSv1GetPassiveSessions, args.FilterWithPaginator, reply)
return dS.Dispatch(&utils.CGREvent{Tenant: args.Tenant}, utils.MetaSessionS, args.RouteID,
utils.SessionSv1GetPassiveSessions, args, reply)
}
func (dS *DispatcherService) SessionSv1GetPassiveSessionsCount(args *FilterSessionWithApiKey,
func (dS *DispatcherService) SessionSv1GetPassiveSessionsCount(args *utils.SessionFilter,
reply *int) (err error) {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing("ArgDispatcher")
}
if dS.attrS != nil {
if err = dS.authorize(utils.SessionSv1GetPassiveSessionsCount,
args.TenantArg.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
args.Tenant, args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaSessionS, args.RouteID,
utils.SessionSv1GetPassiveSessionsCount, args.Filters, reply)
return dS.Dispatch(&utils.CGREvent{Tenant: args.Tenant}, utils.MetaSessionS, args.RouteID,
utils.SessionSv1GetPassiveSessionsCount, args, reply)
}
func (dS *DispatcherService) SessionSv1ReplicateSessions(args ArgsReplicateSessionsWithApiKey,

View File

@@ -286,16 +286,12 @@ func testDspSessionInit(t *testing.T) {
}
func testDspGetSessions(t *testing.T) {
filtr := FilterSessionWithApiKey{
filtr := utils.SessionFilter{
ArgDispatcher: &utils.ArgDispatcher{
APIKey: utils.StringPointer("ses12345"),
},
TenantArg: utils.TenantArg{
Tenant: "cgrates.org",
},
FilterWithPaginator: sessions.FilterWithPaginator{
Filters: map[string]string{},
},
Tenant: "cgrates.org",
Filters: []string{},
}
var reply int
if err := dispEngine.RCP.Call(utils.SessionSv1GetActiveSessionsCount,
@@ -700,16 +696,12 @@ func testDspSessionPassive(t *testing.T) {
allEngine.stopEngine(t)
testDspSessionUpdate2(t)
var repl int
filtr := FilterSessionWithApiKey{
filtr := utils.SessionFilter{
ArgDispatcher: &utils.ArgDispatcher{
APIKey: utils.StringPointer("ses12345"),
},
TenantArg: utils.TenantArg{
Tenant: "cgrates.org",
},
FilterWithPaginator: sessions.FilterWithPaginator{
Filters: map[string]string{},
},
Tenant: "cgrates.org",
Filters: []string{},
}
time.Sleep(10 * time.Millisecond)
if err := dispEngine.RCP.Call(utils.SessionSv1GetPassiveSessionsCount,
@@ -806,16 +798,12 @@ func testDspSessionForceDisconect(t *testing.T) {
testDspSessionAuthorize(t)
testDspSessionInit(t)
var repl int
filtr := FilterSessionWithApiKey{
filtr := utils.SessionFilter{
ArgDispatcher: &utils.ArgDispatcher{
APIKey: utils.StringPointer("ses12345"),
},
TenantArg: utils.TenantArg{
Tenant: "cgrates.org",
},
FilterWithPaginator: sessions.FilterWithPaginator{
Filters: map[string]string{},
},
Tenant: "cgrates.org",
Filters: []string{},
}
time.Sleep(10 * time.Millisecond)
if err := dispEngine.RCP.Call(utils.SessionSv1GetPassiveSessionsCount,

View File

@@ -40,12 +40,6 @@ type DispatcherEvent struct {
Subsystem string
}
type FilterSessionWithApiKey struct {
*utils.ArgDispatcher
utils.TenantArg
sessions.FilterWithPaginator
}
type ArgsReplicateSessionsWithApiKey struct {
*utils.ArgDispatcher
utils.TenantArg

View File

@@ -378,5 +378,9 @@ func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats {
Items: 0,
Groups: 0,
},
utils.CacheSessionFilterIndexes: {
Items: 0,
Groups: 0,
},
}
}

View File

@@ -270,8 +270,3 @@ func (sr *SRun) debitReserve(dur time.Duration, lastUsage *time.Duration) (rDur
}
return
}
type FilterWithPaginator struct {
Filters map[string]string
*utils.Paginator
}

View File

@@ -71,7 +71,7 @@ type SReplConn struct {
// NewSessionS constructs a new SessionS instance
func NewSessionS(cgrCfg *config.CGRConfig, ralS, resS, thdS,
statS, splS, attrS, cdrS, chargerS rpcclient.RpcClientConnection,
sReplConns []*SReplConn, tmz string) *SessionS {
sReplConns []*SReplConn, dm *engine.DataManager, tmz string) *SessionS {
cgrCfg.SessionSCfg().SessionIndexes[utils.OriginID] = true // Make sure we have indexing for OriginID since it is a requirement on prefix searching
if chargerS != nil && reflect.ValueOf(chargerS).IsNil() {
chargerS = nil
@@ -111,11 +111,11 @@ func NewSessionS(cgrCfg *config.CGRConfig, ralS, resS, thdS,
biJClnts: make(map[rpcclient.RpcClientConnection]string),
biJIDs: make(map[string]*biJClient),
aSessions: make(map[string]*Session),
aSessionsIdx: make(map[string]map[string]utils.StringMap),
aSessionsRIdx: make(map[string][]*riFieldNameVal),
pSessions: make(map[string]*Session),
pSessionsIdx: make(map[string]map[string]utils.StringMap),
pSessionsRIdx: make(map[string][]*riFieldNameVal)}
pSessionsRIdx: make(map[string][]*riFieldNameVal),
dm: dm,
}
}
// biJClient contains info we need to reach back a bidirectional json client
@@ -146,17 +146,16 @@ type SessionS struct {
aSsMux sync.RWMutex // protects aSessions
aSessions map[string]*Session // group sessions per sessionId, multiple runs based on derived charging
aSIMux sync.RWMutex // protects aSessionsIdx
aSessionsIdx map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue]utils.StringMap[cgrID]
aSessionsRIdx map[string][]*riFieldNameVal // reverse indexes for active sessions, used on remove
aSIMux sync.RWMutex // protects aSessionsIdx
aSessionsRIdx map[string][]*riFieldNameVal // reverse indexes for active sessions, used on remove
pSsMux sync.RWMutex // protects pSessions
pSessions map[string]*Session // group passive sessions based on cgrID
pSIMux sync.RWMutex // protects pSessionsIdx
pSessionsIdx map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue]utils.StringMap[cgrID]
pSessionsRIdx map[string][]*riFieldNameVal // reverse indexes for passive sessions, used on remove
pSIMux sync.RWMutex // protects pSessionsIdx
pSessionsRIdx map[string][]*riFieldNameVal // reverse indexes for passive sessions, used on remove
dm *engine.DataManager
}
// ListenAndServe starts the service and binds it to the listen loop
@@ -774,39 +773,40 @@ func (sS *SessionS) unregisterSession(cgrID string, passive bool) bool {
// indexSession will index an active or passive Session based on configuration
func (sS *SessionS) indexSession(s *Session, pSessions bool) {
idxMux := &sS.aSIMux // pointer to original mux since will have no effect if we copy it
ssIndx := sS.aSessionsIdx
itemIDPrefix := "act"
ssRIdx := sS.aSessionsRIdx
if pSessions {
idxMux = &sS.pSIMux
ssIndx = sS.pSessionsIdx
itemIDPrefix = "psv"
ssRIdx = sS.pSessionsRIdx
}
idxMux.Lock()
defer idxMux.Unlock()
for fieldName := range sS.cgrCfg.SessionSCfg().SessionIndexes {
fieldVal, err := s.EventStart.GetString(fieldName)
if err != nil {
if err == utils.ErrNotFound {
fieldVal = utils.NOT_AVAILABLE
} else {
utils.Logger.Err(fmt.Sprintf("<%s> retrieving field: %s from event: %+v, err: <%s>", utils.SessionS, fieldName, s.EventStart, err))
continue
for _, sr := range s.SRuns {
fieldVal, err := sr.Event.GetString(fieldName)
if err != nil {
if err == utils.ErrNotFound {
fieldVal = utils.NOT_AVAILABLE
} else {
utils.Logger.Err(fmt.Sprintf("<%s> retrieving field: %s from event: %+v, err: <%s>", utils.SessionS, fieldName, sr.Event, err))
continue
}
}
if fieldVal == "" {
fieldVal = utils.MetaEmpty
}
// insert to cache
fieldValKey := utils.ConcatenatedKey(itemIDPrefix, utils.MetaString, utils.DynamicDataPrefix+fieldName, fieldVal)
itemIDs := utils.NewStringMap()
if x, ok := engine.Cache.Get(utils.CacheSessionFilterIndexes, fieldValKey); ok && x != nil { // Attempt to find in cache first
itemIDs = x.(utils.StringMap)
}
itemIDs[s.CGRID] = true
engine.Cache.Set(utils.CacheSessionFilterIndexes, fieldValKey, itemIDs, nil,
true, utils.NonTransactional)
ssRIdx[s.CGRID] = append(ssRIdx[s.CGRID], &riFieldNameVal{fieldName: fieldName, fieldValue: fieldVal}) // reverse index
}
if fieldVal == "" {
fieldVal = utils.MetaEmpty
}
if _, hasFieldName := ssIndx[fieldName]; !hasFieldName { // Init it here
ssIndx[fieldName] = make(map[string]utils.StringMap)
}
if _, hasFieldVal := ssIndx[fieldName][fieldVal]; !hasFieldVal {
ssIndx[fieldName][fieldVal] = make(utils.StringMap)
}
ssIndx[fieldName][fieldVal][s.CGRID] = true
if _, hasIt := ssRIdx[s.CGRID]; !hasIt {
ssRIdx[s.CGRID] = make([]*riFieldNameVal, 0)
}
ssRIdx[s.CGRID] = append(ssRIdx[s.CGRID], &riFieldNameVal{fieldName: fieldName, fieldValue: fieldVal})
}
return
}
@@ -815,11 +815,11 @@ func (sS *SessionS) indexSession(s *Session, pSessions bool) {
// called on terminate or relocate
func (sS *SessionS) unindexSession(cgrID string, pSessions bool) bool {
idxMux := &sS.aSIMux
ssIndx := sS.aSessionsIdx
itemIDPrefix := "act"
ssRIdx := sS.aSessionsRIdx
if pSessions {
idxMux = &sS.pSIMux
ssIndx = sS.pSessionsIdx
itemIDPrefix = "psv"
ssRIdx = sS.pSessionsRIdx
}
idxMux.Lock()
@@ -828,142 +828,169 @@ func (sS *SessionS) unindexSession(cgrID string, pSessions bool) bool {
return false
}
for _, riFNV := range ssRIdx[cgrID] {
delete(ssIndx[riFNV.fieldName][riFNV.fieldValue], cgrID)
if len(ssIndx[riFNV.fieldName][riFNV.fieldValue]) == 0 {
delete(ssIndx[riFNV.fieldName], riFNV.fieldValue)
}
if len(ssIndx[riFNV.fieldName]) == 0 {
delete(ssIndx, riFNV.fieldName)
}
fieldValKey := utils.ConcatenatedKey(itemIDPrefix, utils.MetaString, utils.DynamicDataPrefix+riFNV.fieldName, riFNV.fieldValue)
engine.Cache.Remove(utils.CacheSessionFilterIndexes, fieldValKey, true, utils.NonTransactional)
}
delete(ssRIdx, cgrID)
return true
}
// getSessionIDsForPrefix works with session relocation returning list of sessions with ID matching prefix for OriginID field
func (sS *SessionS) getSessionIDsForPrefix(prefix string, pSessions bool) (cgrIDs []string) {
idxMux := &sS.aSIMux
ssIndx := sS.aSessionsIdx
if pSessions {
idxMux = &sS.pSIMux
ssIndx = sS.pSessionsIdx
}
idxMux.RLock()
for originID := range ssIndx[utils.OriginID] {
if strings.HasPrefix(originID, prefix) {
cgrIDs = append(cgrIDs,
ssIndx[utils.OriginID][originID].Slice()...)
func (sS *SessionS) getIndexedFilters(tenant string, fltrs []string) (indexedFltr map[string][]string, unindexedFltr []*engine.FilterRule) {
indexedFltr = make(map[string][]string)
for _, fltrID := range fltrs {
f, err := sS.dm.GetFilter(tenant, fltrID,
true, true, utils.NonTransactional)
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefixNotFound(fltrID)
}
continue
}
if f.ActivationInterval != nil &&
!f.ActivationInterval.IsActiveAtTime(time.Now()) { // not active
continue
}
for _, fltr := range f.Rules {
if fltr.Type != utils.MetaString ||
!sS.cgrCfg.SessionSCfg().SessionIndexes.HasKey(utils.DynamicDataPrefix+fltr.FieldName) {
unindexedFltr = append(unindexedFltr, fltr)
continue
}
indexedFltr[fltr.FieldName] = fltr.Values
}
}
idxMux.RUnlock()
return
}
// getSessionIDsMatchingIndexes will check inside indexes if it can find sessionIDs matching all filters
// matchedIndexes returns map[matchedFieldName]possibleMatchedFieldVal so we optimize further to avoid checking them
func (sS *SessionS) getSessionIDsMatchingIndexes(fltrs map[string]string,
pSessions bool) (utils.StringMap, map[string]string) {
idxMux := &sS.aSIMux
ssIndx := sS.aSessionsIdx
func (sS *SessionS) getSessionIDsMatchingIndexes(fltrs map[string][]string,
pSessions bool) []string {
itemIDPrefix := "act"
if pSessions {
idxMux = &sS.pSIMux
ssIndx = sS.pSessionsIdx
itemIDPrefix = "psv"
}
idxMux.RLock()
defer idxMux.RUnlock()
matchedIndexes := make(map[string]string)
matchingSessions := make(utils.StringMap)
checkNr := 0
findFunc := func(cgrID, fltrName, fltrVal string) bool {
for cgrmID := range ssIndx[fltrName][fltrVal] {
if cgrID == cgrmID {
return true
getMatchingIndexes := func(itemIDPrefix, fieldName string, values []string) (matchingSessionsbyValue utils.StringMap) {
matchingSessionsbyValue = make(utils.StringMap)
for _, fieldVal := range values {
fieldValKey := utils.ConcatenatedKey(itemIDPrefix, utils.MetaString, fieldName, fieldVal)
itemIDs := utils.NewStringMap()
if x, ok := engine.Cache.Get(utils.CacheSessionFilterIndexes, fieldValKey); ok && x != nil { // Attempt to find in cache first
itemIDs = x.(utils.StringMap)
}
for cgrID := range itemIDs {
matchingSessionsbyValue[cgrID] = true
}
}
return false
return matchingSessionsbyValue
}
for fltrName, fltrVal := range fltrs {
if _, hasFldName := ssIndx[fltrName]; !hasFldName {
continue
}
matchingSessions := make(utils.StringMap)
checkNr := 0
for fieldName, values := range fltrs {
matchingSessionsbyValue := getMatchingIndexes(itemIDPrefix, fieldName, values)
checkNr += 1
if _, hasFldVal := ssIndx[fltrName][fltrVal]; !hasFldVal {
matchedIndexes[fltrName] = utils.META_NONE
return make(utils.StringMap), matchedIndexes
}
matchedIndexes[fltrName] = fltrVal
if checkNr == 1 { // First run will init the MatchingSessions
matchingSessions = ssIndx[fltrName][fltrVal]
if checkNr == 1 {
matchingSessions = matchingSessionsbyValue
continue
}
// Higher run, takes out non matching indexes
for cgrID := range matchingSessions {
if !findFunc(cgrID, fltrName, fltrVal) {
if !matchingSessionsbyValue.HasKey(cgrID) {
delete(matchingSessions, cgrID)
}
}
if len(matchingSessions) == 0 {
return make([]string, 0)
}
}
return matchingSessions.Clone(), matchedIndexes
return matchingSessions.Slice()
}
// ToDo: break the method asActiveSessions to return []*Session
// func (sS *SessionS) filterSessions(fltrs map[string]string, psv bool) (ss []*Session) {
// asActiveSessions returns sessions from either active or passive table as []*ActiveSession
func (sS *SessionS) asActiveSessions(fltrs map[string]string,
count, psv bool) (aSs []*ActiveSession, counter int, err error) {
aSs = make([]*ActiveSession, 0) // Make sure we return at least empty list and not nil
if len(fltrs) == 0 { // no filters applied
func (sS *SessionS) filterSessions(sf *utils.SessionFilter, psv bool) (aSs []*ActiveSession) {
if len(sf.Filters) == 0 {
ss := sS.getSessions(utils.EmptyString, psv)
for _, s := range ss {
aSs = append(aSs,
s.AsActiveSessions(sS.cgrCfg.GeneralCfg().DefaultTimezone,
sS.cgrCfg.GeneralCfg().NodeID)...) // Expensive for large number of sessions
}
if count {
return nil, len(aSs), nil
if sf.Limit != nil && *sf.Limit > 0 && *sf.Limit < len(aSs) {
return aSs[:*sf.Limit]
}
}
return
}
// Check first based on indexes so we can downsize the list of matching sessions
matchingSessionIDs, checkedFilters := sS.getSessionIDsMatchingIndexes(fltrs, psv)
if len(matchingSessionIDs) == 0 && len(checkedFilters) != 0 {
tenant := utils.FirstNonEmpty(sf.Tenant, sS.cgrCfg.GeneralCfg().DefaultTenant)
indx, unindx := sS.getIndexedFilters(tenant, sf.Filters)
ss := sS.getSessionsFromCGRIDs(psv, sS.getSessionIDsMatchingIndexes(indx, psv)...)
pass := func(filterRules []*engine.FilterRule,
me engine.MapEvent) (pass bool) {
pass = true
if len(filterRules) == 0 {
return
}
var err error
ev := config.NewNavigableMap(me)
for _, fltr := range filterRules {
if pass, err = fltr.Pass(ev, sS.statS, tenant); err != nil || !pass {
pass = false
return
}
}
return
}
for fltrFldName := range fltrs {
if _, alreadyChecked := checkedFilters[fltrFldName]; alreadyChecked &&
fltrFldName != utils.RunID { // Optimize further checks, RunID should stay since it can create bugs
delete(fltrs, fltrFldName)
}
}
remainingSessions := sS.getSessions(fltrs[utils.CGRID], psv)
if len(fltrs) != 0 { // Still have some filters to match
for _, s := range remainingSessions {
for _, sr := range s.SRuns {
matchingAll := true
for fltrFldName, fltrFldVal := range fltrs {
if sr.Event.GetStringIgnoreErrors(fltrFldName) != fltrFldVal { // No Match
matchingAll = false
break
}
}
if matchingAll {
aSs = append(aSs, s.asActiveSessions(sr, sS.cgrCfg.GeneralCfg().DefaultTimezone,
sS.cgrCfg.GeneralCfg().NodeID))
for _, s := range ss {
s.RLock()
for _, sr := range s.SRuns {
if pass(unindx, sr.Event) {
aSs = append(aSs,
s.asActiveSessions(sr, sS.cgrCfg.GeneralCfg().DefaultTimezone,
sS.cgrCfg.GeneralCfg().NodeID)) // Expensive for large number of sessions
if sf.Limit != nil && *sf.Limit > 0 && *sf.Limit < len(aSs) {
s.RUnlock()
return aSs[:*sf.Limit]
}
}
}
} else {
for _, s := range remainingSessions {
aSs = append(aSs,
s.AsActiveSessions(sS.cgrCfg.GeneralCfg().DefaultTimezone,
sS.cgrCfg.GeneralCfg().NodeID)...) // Expensive for large number of sessions
}
s.RUnlock()
}
if count {
return nil, len(aSs), nil
return
}
func (sS *SessionS) filterSessionsCount(sf *utils.SessionFilter, psv bool) (count int) {
count = 0
if len(sf.Filters) == 0 {
ss := sS.getSessions(utils.EmptyString, psv)
for _, s := range ss {
s.RLock()
count += len(s.SRuns)
s.RUnlock()
}
return
}
tenant := utils.FirstNonEmpty(sf.Tenant, sS.cgrCfg.GeneralCfg().DefaultTenant)
indx, unindx := sS.getIndexedFilters(tenant, sf.Filters)
ss := sS.getSessionsFromCGRIDs(psv, sS.getSessionIDsMatchingIndexes(indx, psv)...)
pass := func(filterRules []*engine.FilterRule,
me engine.MapEvent) (pass bool) {
pass = true
if len(filterRules) == 0 {
return
}
var err error
ev := config.NewNavigableMap(me)
for _, fltr := range filterRules {
if pass, err = fltr.Pass(ev, sS.statS, tenant); err != nil || !pass {
return
}
}
return
}
for _, s := range ss {
s.RLock()
for _, sr := range s.SRuns {
if pass(unindx, sr.Event) {
count += 1
}
}
s.RUnlock()
}
return
}
@@ -1056,6 +1083,34 @@ func (sS *SessionS) getSessions(cgrID string, pSessions bool) (ss []*Session) {
return
}
// getSessions is used to return in a thread-safe manner active or passive sessions
func (sS *SessionS) getSessionsFromCGRIDs(pSessions bool, cgrIDs ...string) (ss []*Session) {
ssMux := &sS.aSsMux // get the pointer so we don't copy, otherwise locks will not work
ssMp := sS.aSessions // reference it so we don't overwrite the new map without protection
if pSessions {
ssMux = &sS.pSsMux
ssMp = sS.pSessions
}
ssMux.RLock()
defer ssMux.RUnlock()
if len(cgrIDs) == 0 {
ss = make([]*Session, len(ssMp))
var i int
for _, s := range ssMp {
ss[i] = s
i++
}
return
}
ss = make([]*Session, len(cgrIDs))
for i, cgrID := range cgrIDs {
if s, hasCGRID := ssMp[cgrID]; hasCGRID {
ss[i] = s
}
}
return
}
// transitSState will transit the sessions from one state (active/passive) to another (passive/active)
func (sS *SessionS) transitSState(cgrID string, psv bool) (ss []*Session) {
ss = sS.getSessions(cgrID, !psv)
@@ -1442,129 +1497,49 @@ func (sS *SessionS) CallBiRPC(clnt rpcclient.RpcClientConnection,
// BiRPCv1GetActiveSessions returns the list of active sessions based on filter
func (sS *SessionS) BiRPCv1GetActiveSessions(clnt rpcclient.RpcClientConnection,
args *FilterWithPaginator, reply *[]*ActiveSession) (err error) {
args *utils.SessionFilter, reply *[]*ActiveSession) (err error) {
if args == nil { //protection in case on nil
args = &FilterWithPaginator{}
args = &utils.SessionFilter{}
}
for fldName, fldVal := range args.Filters {
if fldVal == "" {
args.Filters[fldName] = utils.META_NONE
}
}
aSs, _, err := sS.asActiveSessions(args.Filters, false, false)
if err != nil {
return utils.NewErrServerError(err)
} else if len(aSs) == 0 {
aSs := sS.filterSessions(args, false)
if len(aSs) == 0 {
return utils.ErrNotFound
}
if args.Paginator == nil { //small optimization
*reply = aSs
} else {
var limit, offset int
if args.Limit != nil && *args.Limit > 0 {
limit = *args.Limit
}
if args.Offset != nil && *args.Offset > 0 {
offset = *args.Offset
}
if limit == 0 && offset == 0 {
*reply = aSs
return
}
if offset > len(aSs) {
return fmt.Errorf("Offset : %+v is greater than lenght of active sessions : %+v", offset, len(aSs))
}
if offset != 0 {
limit = limit + offset
}
if limit == 0 {
limit = len(aSs[offset:])
} else if limit > len(aSs) {
limit = len(aSs)
}
*reply = aSs[offset:limit]
}
*reply = aSs
return nil
}
// BiRPCv1GetActiveSessionsCount counts the active sessions
func (sS *SessionS) BiRPCv1GetActiveSessionsCount(clnt rpcclient.RpcClientConnection,
fltr map[string]string, reply *int) error {
for fldName, fldVal := range fltr {
if fldVal == "" {
fltr[fldName] = utils.META_NONE
}
}
if _, count, err := sS.asActiveSessions(fltr, true, false); err != nil {
return err
} else {
*reply = count
args *utils.SessionFilter, reply *int) error {
if args == nil { //protection in case on nil
args = &utils.SessionFilter{}
}
*reply = sS.filterSessionsCount(args, false)
return nil
}
// BiRPCv1GetPassiveSessions returns the passive sessions handled by SessionS
func (sS *SessionS) BiRPCv1GetPassiveSessions(clnt rpcclient.RpcClientConnection,
args *FilterWithPaginator, reply *[]*ActiveSession) error {
args *utils.SessionFilter, reply *[]*ActiveSession) error {
if args == nil { //protection in case on nil
args = &FilterWithPaginator{}
args = &utils.SessionFilter{}
}
for fldName, fldVal := range args.Filters {
if fldVal == "" {
args.Filters[fldName] = utils.META_NONE
}
}
pSs, _, err := sS.asActiveSessions(args.Filters, false, true)
if err != nil {
return utils.NewErrServerError(err)
} else if len(pSs) == 0 {
pSs := sS.filterSessions(args, true)
if len(pSs) == 0 {
return utils.ErrNotFound
}
if args.Paginator == nil { //small optimization
*reply = pSs
} else {
var limit, offset int
if args.Limit != nil && *args.Limit > 0 {
limit = *args.Limit
}
if args.Offset != nil && *args.Offset > 0 {
offset = *args.Offset
}
if limit == 0 && offset == 0 {
*reply = pSs
return nil
}
if offset > len(pSs) {
return fmt.Errorf("Offset : %+v is greater than lenght of passive sessions : %+v", offset, len(pSs))
}
if offset != 0 {
limit = limit + offset
}
if limit == 0 {
limit = len(pSs[offset:])
} else if limit > len(pSs) {
limit = len(pSs)
}
*reply = pSs[offset:limit]
}
*reply = pSs
return nil
}
// BiRPCv1GetPassiveSessionsCount counts the passive sessions handled by the system
func (sS *SessionS) BiRPCv1GetPassiveSessionsCount(clnt rpcclient.RpcClientConnection,
fltr map[string]string, reply *int) error {
for fldName, fldVal := range fltr {
if fldVal == "" {
fltr[fldName] = utils.META_NONE
}
}
if _, count, err := sS.asActiveSessions(fltr, true, true); err != nil {
return err
} else {
*reply = count
args *utils.SessionFilter, reply *int) error {
if args == nil { //protection in case on nil
args = &utils.SessionFilter{}
}
*reply = sS.filterSessionsCount(args, true)
return nil
}
@@ -2737,16 +2712,12 @@ func (sS *SessionS) BiRPCv1SyncSessions(clnt rpcclient.RpcClientConnection,
// BiRPCv1ForceDisconnect will force disconnecting sessions matching sessions
func (sS *SessionS) BiRPCv1ForceDisconnect(clnt rpcclient.RpcClientConnection,
fltr map[string]string, reply *string) error {
for fldName, fldVal := range fltr {
if fldVal == "" {
fltr[fldName] = utils.META_NONE
}
args *utils.SessionFilter, reply *string) (err error) {
if args == nil { //protection in case on nil
args = &utils.SessionFilter{}
}
aSs, _, err := sS.asActiveSessions(fltr, false, false)
if err != nil {
return utils.NewErrServerError(err)
} else if len(aSs) == 0 {
aSs := sS.filterSessions(args, false)
if len(aSs) == 0 {
return utils.ErrNotFound
}
for _, as := range aSs {

View File

@@ -59,7 +59,7 @@ func TestSessionSIndexAndUnindexSessions(t *testing.T) {
"Extra3": true,
"Extra4": true,
}
sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
sEv := engine.NewSafEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: "*voice",
@@ -87,40 +87,45 @@ func TestSessionSIndexAndUnindexSessions(t *testing.T) {
session := &Session{
CGRID: GetSetCGRID(sEv),
EventStart: sEv,
SRuns: []*SRun{
&SRun{
Event: sEv.AsMapInterface(),
},
},
}
cgrID := GetSetCGRID(sEv)
sS.indexSession(session, false)
eIndexes := map[string]map[string]utils.StringMap{
"OriginID": map[string]utils.StringMap{
"12345": utils.StringMap{
cgrID: true,
},
},
"Tenant": map[string]utils.StringMap{
"cgrates.org": utils.StringMap{
cgrID: true,
},
},
"Account": map[string]utils.StringMap{
"account1": utils.StringMap{
cgrID: true,
},
},
"Extra3": map[string]utils.StringMap{
utils.MetaEmpty: utils.StringMap{
cgrID: true,
},
},
"Extra4": map[string]utils.StringMap{
utils.NOT_AVAILABLE: utils.StringMap{
cgrID: true,
},
},
}
if !reflect.DeepEqual(eIndexes, sS.aSessionsIdx) {
t.Errorf("Expecting: %s, received: %s",
utils.ToJSON(eIndexes), utils.ToJSON(sS.aSessionsIdx))
}
// eIndexes := map[string]map[string]utils.StringMap{
// "OriginID": map[string]utils.StringMap{
// "12345": utils.StringMap{
// cgrID: true,
// },
// },
// "Tenant": map[string]utils.StringMap{
// "cgrates.org": utils.StringMap{
// cgrID: true,
// },
// },
// "Account": map[string]utils.StringMap{
// "account1": utils.StringMap{
// cgrID: true,
// },
// },
// "Extra3": map[string]utils.StringMap{
// utils.MetaEmpty: utils.StringMap{
// cgrID: true,
// },
// },
// "Extra4": map[string]utils.StringMap{
// utils.NOT_AVAILABLE: utils.StringMap{
// cgrID: true,
// },
// },
// }
// if !reflect.DeepEqual(eIndexes, sS.aSessionsIdx) {
// t.Errorf("Expecting: %s, received: %s",
// utils.ToJSON(eIndexes), utils.ToJSON(sS.aSessionsIdx))
// }
eRIdxes := map[string][]*riFieldNameVal{
cgrID: []*riFieldNameVal{
&riFieldNameVal{fieldName: "Tenant", fieldValue: "cgrates.org"},
@@ -149,6 +154,11 @@ func TestSessionSIndexAndUnindexSessions(t *testing.T) {
session2 := &Session{
CGRID: cgrID2,
EventStart: sSEv2,
SRuns: []*SRun{
&SRun{
Event: sSEv2.AsMapInterface(),
},
},
}
sS.indexSession(session2, false)
sSEv3 := engine.NewSafEvent(map[string]interface{}{
@@ -162,60 +172,65 @@ func TestSessionSIndexAndUnindexSessions(t *testing.T) {
session3 := &Session{
CGRID: cgrID3,
EventStart: sSEv3,
SRuns: []*SRun{
&SRun{
Event: sSEv3.AsMapInterface(),
},
},
}
sS.indexSession(session3, false)
eIndexes = map[string]map[string]utils.StringMap{
"OriginID": map[string]utils.StringMap{
"12345": utils.StringMap{
cgrID: true,
},
"12346": utils.StringMap{
cgrID2: true,
},
"12347": utils.StringMap{
cgrID3: true,
},
},
"Tenant": map[string]utils.StringMap{
"cgrates.org": utils.StringMap{
cgrID: true,
cgrID3: true,
},
"itsyscom.com": utils.StringMap{
cgrID2: true,
},
},
"Account": map[string]utils.StringMap{
"account1": utils.StringMap{
cgrID: true,
},
"account2": utils.StringMap{
cgrID2: true,
cgrID3: true,
},
},
"Extra3": map[string]utils.StringMap{
utils.MetaEmpty: utils.StringMap{
cgrID: true,
cgrID2: true,
},
utils.NOT_AVAILABLE: utils.StringMap{
cgrID3: true,
},
},
"Extra4": map[string]utils.StringMap{
utils.NOT_AVAILABLE: utils.StringMap{
cgrID: true,
cgrID3: true,
},
"info2": utils.StringMap{
cgrID2: true,
},
},
}
if !reflect.DeepEqual(eIndexes, sS.aSessionsIdx) {
t.Errorf("Expecting: %+v, received: %+v", eIndexes, sS.aSessionsIdx)
}
// eIndexes = map[string]map[string]utils.StringMap{
// "OriginID": map[string]utils.StringMap{
// "12345": utils.StringMap{
// cgrID: true,
// },
// "12346": utils.StringMap{
// cgrID2: true,
// },
// "12347": utils.StringMap{
// cgrID3: true,
// },
// },
// "Tenant": map[string]utils.StringMap{
// "cgrates.org": utils.StringMap{
// cgrID: true,
// cgrID3: true,
// },
// "itsyscom.com": utils.StringMap{
// cgrID2: true,
// },
// },
// "Account": map[string]utils.StringMap{
// "account1": utils.StringMap{
// cgrID: true,
// },
// "account2": utils.StringMap{
// cgrID2: true,
// cgrID3: true,
// },
// },
// "Extra3": map[string]utils.StringMap{
// utils.MetaEmpty: utils.StringMap{
// cgrID: true,
// cgrID2: true,
// },
// utils.NOT_AVAILABLE: utils.StringMap{
// cgrID3: true,
// },
// },
// "Extra4": map[string]utils.StringMap{
// utils.NOT_AVAILABLE: utils.StringMap{
// cgrID: true,
// cgrID3: true,
// },
// "info2": utils.StringMap{
// cgrID2: true,
// },
// },
// }
// if !reflect.DeepEqual(eIndexes, sS.aSessionsIdx) {
// t.Errorf("Expecting: %+v, received: %+v", eIndexes, sS.aSessionsIdx)
// }
eRIdxes = map[string][]*riFieldNameVal{
cgrID: []*riFieldNameVal{
&riFieldNameVal{fieldName: "Tenant", fieldValue: "cgrates.org"},
@@ -246,49 +261,49 @@ func TestSessionSIndexAndUnindexSessions(t *testing.T) {
}
// Unidex first session
sS.unindexSession(cgrID, false)
eIndexes = map[string]map[string]utils.StringMap{
"OriginID": map[string]utils.StringMap{
"12346": utils.StringMap{
cgrID2: true,
},
"12347": utils.StringMap{
cgrID3: true,
},
},
"Tenant": map[string]utils.StringMap{
"cgrates.org": utils.StringMap{
cgrID3: true,
},
"itsyscom.com": utils.StringMap{
cgrID2: true,
},
},
"Account": map[string]utils.StringMap{
"account2": utils.StringMap{
cgrID2: true,
cgrID3: true,
},
},
"Extra3": map[string]utils.StringMap{
utils.MetaEmpty: utils.StringMap{
cgrID2: true,
},
utils.NOT_AVAILABLE: utils.StringMap{
cgrID3: true,
},
},
"Extra4": map[string]utils.StringMap{
"info2": utils.StringMap{
cgrID2: true,
},
utils.NOT_AVAILABLE: utils.StringMap{
cgrID3: true,
},
},
}
if !reflect.DeepEqual(eIndexes, sS.aSessionsIdx) {
t.Errorf("Expecting: %+v, received: %+v", eIndexes, sS.aSessionsIdx)
}
// eIndexes = map[string]map[string]utils.StringMap{
// "OriginID": map[string]utils.StringMap{
// "12346": utils.StringMap{
// cgrID2: true,
// },
// "12347": utils.StringMap{
// cgrID3: true,
// },
// },
// "Tenant": map[string]utils.StringMap{
// "cgrates.org": utils.StringMap{
// cgrID3: true,
// },
// "itsyscom.com": utils.StringMap{
// cgrID2: true,
// },
// },
// "Account": map[string]utils.StringMap{
// "account2": utils.StringMap{
// cgrID2: true,
// cgrID3: true,
// },
// },
// "Extra3": map[string]utils.StringMap{
// utils.MetaEmpty: utils.StringMap{
// cgrID2: true,
// },
// utils.NOT_AVAILABLE: utils.StringMap{
// cgrID3: true,
// },
// },
// "Extra4": map[string]utils.StringMap{
// "info2": utils.StringMap{
// cgrID2: true,
// },
// utils.NOT_AVAILABLE: utils.StringMap{
// cgrID3: true,
// },
// },
// }
// if !reflect.DeepEqual(eIndexes, sS.aSessionsIdx) {
// t.Errorf("Expecting: %+v, received: %+v", eIndexes, sS.aSessionsIdx)
// }
eRIdxes = map[string][]*riFieldNameVal{
cgrID2: []*riFieldNameVal{
&riFieldNameVal{fieldName: "Tenant", fieldValue: "itsyscom.com"},
@@ -310,36 +325,36 @@ func TestSessionSIndexAndUnindexSessions(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", eRIdxes, sS.aSessionsRIdx)
}
sS.unindexSession(cgrID3, false)
eIndexes = map[string]map[string]utils.StringMap{
"OriginID": map[string]utils.StringMap{
"12346": utils.StringMap{
cgrID2: true,
},
},
"Tenant": map[string]utils.StringMap{
"itsyscom.com": utils.StringMap{
cgrID2: true,
},
},
"Account": map[string]utils.StringMap{
"account2": utils.StringMap{
cgrID2: true,
},
},
"Extra3": map[string]utils.StringMap{
utils.MetaEmpty: utils.StringMap{
cgrID2: true,
},
},
"Extra4": map[string]utils.StringMap{
"info2": utils.StringMap{
cgrID2: true,
},
},
}
if !reflect.DeepEqual(eIndexes, sS.aSessionsIdx) {
t.Errorf("Expecting: %+v, received: %+v", eIndexes, sS.aSessionsIdx)
}
// eIndexes = map[string]map[string]utils.StringMap{
// "OriginID": map[string]utils.StringMap{
// "12346": utils.StringMap{
// cgrID2: true,
// },
// },
// "Tenant": map[string]utils.StringMap{
// "itsyscom.com": utils.StringMap{
// cgrID2: true,
// },
// },
// "Account": map[string]utils.StringMap{
// "account2": utils.StringMap{
// cgrID2: true,
// },
// },
// "Extra3": map[string]utils.StringMap{
// utils.MetaEmpty: utils.StringMap{
// cgrID2: true,
// },
// },
// "Extra4": map[string]utils.StringMap{
// "info2": utils.StringMap{
// cgrID2: true,
// },
// },
// }
// if !reflect.DeepEqual(eIndexes, sS.aSessionsIdx) {
// t.Errorf("Expecting: %+v, received: %+v", eIndexes, sS.aSessionsIdx)
// }
eRIdxes = map[string][]*riFieldNameVal{
cgrID2: []*riFieldNameVal{
&riFieldNameVal{fieldName: "Tenant", fieldValue: "itsyscom.com"},
@@ -357,7 +372,7 @@ func TestSessionSIndexAndUnindexSessions(t *testing.T) {
func TestSessionSRegisterAndUnregisterASessions(t *testing.T) {
sSCfg, _ := config.NewDefaultCGRConfig()
sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
sSEv := engine.NewSafEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: "*voice",
@@ -380,6 +395,11 @@ func TestSessionSRegisterAndUnregisterASessions(t *testing.T) {
s := &Session{
CGRID: "session1",
EventStart: sSEv,
SRuns: []*SRun{
&SRun{
Event: sSEv.AsMapInterface(),
},
},
}
//register the session
sS.registerSession(s, false)
@@ -390,17 +410,17 @@ func TestSessionSRegisterAndUnregisterASessions(t *testing.T) {
}
//verify if the index was created according to session
eIndexes := map[string]map[string]utils.StringMap{
"OriginID": map[string]utils.StringMap{
"111": utils.StringMap{
"session1": true,
},
},
}
if !reflect.DeepEqual(eIndexes, sS.aSessionsIdx) {
t.Errorf("Expecting: %s, received: %s",
utils.ToJSON(eIndexes), utils.ToJSON(sS.aSessionsIdx))
}
// eIndexes := map[string]map[string]utils.StringMap{
// "OriginID": map[string]utils.StringMap{
// "111": utils.StringMap{
// "session1": true,
// },
// },
// }
// if !reflect.DeepEqual(eIndexes, sS.aSessionsIdx) {
// t.Errorf("Expecting: %s, received: %s",
// utils.ToJSON(eIndexes), utils.ToJSON(sS.aSessionsIdx))
// }
//verify if the revIdx was created according to session
eRIdxes := map[string][]*riFieldNameVal{
"session1": []*riFieldNameVal{
@@ -431,6 +451,11 @@ func TestSessionSRegisterAndUnregisterASessions(t *testing.T) {
s2 := &Session{
CGRID: "session2",
EventStart: sSEv2,
SRuns: []*SRun{
&SRun{
Event: sSEv2.AsMapInterface(),
},
},
}
//register the second session
sS.registerSession(s2, false)
@@ -440,20 +465,20 @@ func TestSessionSRegisterAndUnregisterASessions(t *testing.T) {
}
//verify if the index was created according to session
eIndexes = map[string]map[string]utils.StringMap{
"OriginID": map[string]utils.StringMap{
"111": utils.StringMap{
"session1": true,
},
"222": utils.StringMap{
"session2": true,
},
},
}
if !reflect.DeepEqual(eIndexes, sS.aSessionsIdx) {
t.Errorf("Expecting: %s, received: %s",
utils.ToJSON(eIndexes), utils.ToJSON(sS.aSessionsIdx))
}
// eIndexes = map[string]map[string]utils.StringMap{
// "OriginID": map[string]utils.StringMap{
// "111": utils.StringMap{
// "session1": true,
// },
// "222": utils.StringMap{
// "session2": true,
// },
// },
// }
// if !reflect.DeepEqual(eIndexes, sS.aSessionsIdx) {
// t.Errorf("Expecting: %s, received: %s",
// utils.ToJSON(eIndexes), utils.ToJSON(sS.aSessionsIdx))
// }
//verify if the revIdx was created according to session
eRIdxes = map[string][]*riFieldNameVal{
"session1": []*riFieldNameVal{
@@ -463,7 +488,10 @@ func TestSessionSRegisterAndUnregisterASessions(t *testing.T) {
&riFieldNameVal{fieldName: "OriginID", fieldValue: "222"},
},
}
if len(eRIdxes) != len(sS.aSessionsRIdx) && eRIdxes["session2"][0] != sS.aSessionsRIdx["session2"][0] {
if len(eRIdxes) != len(sS.aSessionsRIdx) &&
len(eRIdxes["session2"]) > 0 &&
len(sS.aSessionsRIdx["session2"]) > 0 &&
eRIdxes["session2"][0] != sS.aSessionsRIdx["session2"][0] {
t.Errorf("Expecting: %+v, received: %+v", eRIdxes, sS.aSessionsRIdx)
}
@@ -488,6 +516,11 @@ func TestSessionSRegisterAndUnregisterASessions(t *testing.T) {
s3 := &Session{
CGRID: "session1",
EventStart: sSEv3,
SRuns: []*SRun{
&SRun{
Event: sSEv3.AsMapInterface(),
},
},
}
//register the third session with cgrID as first one (should be replaced)
sS.registerSession(s3, false)
@@ -501,17 +534,17 @@ func TestSessionSRegisterAndUnregisterASessions(t *testing.T) {
//unregister the session and check if the index was removed
sS.unregisterSession("session1", false)
eIndexes = map[string]map[string]utils.StringMap{
"OriginID": map[string]utils.StringMap{
"222": utils.StringMap{
"session2": true,
},
},
}
if !reflect.DeepEqual(eIndexes, sS.aSessionsIdx) {
t.Errorf("Expecting: %s, received: %s",
utils.ToJSON(eIndexes), utils.ToJSON(sS.aSessionsIdx))
}
// eIndexes = map[string]map[string]utils.StringMap{
// "OriginID": map[string]utils.StringMap{
// "222": utils.StringMap{
// "session2": true,
// },
// },
// }
// if !reflect.DeepEqual(eIndexes, sS.aSessionsIdx) {
// t.Errorf("Expecting: %s, received: %s",
// utils.ToJSON(eIndexes), utils.ToJSON(sS.aSessionsIdx))
// }
eRIdxes = map[string][]*riFieldNameVal{
"session2": []*riFieldNameVal{
&riFieldNameVal{fieldName: "OriginID", fieldValue: "222"},
@@ -528,11 +561,11 @@ func TestSessionSRegisterAndUnregisterASessions(t *testing.T) {
sS.unregisterSession("session2", false)
eIndexes = map[string]map[string]utils.StringMap{}
if !reflect.DeepEqual(eIndexes, sS.aSessionsIdx) {
t.Errorf("Expecting: %s, received: %s",
utils.ToJSON(eIndexes), utils.ToJSON(sS.aSessionsIdx))
}
// eIndexes = map[string]map[string]utils.StringMap{}
// if !reflect.DeepEqual(eIndexes, sS.aSessionsIdx) {
// t.Errorf("Expecting: %s, received: %s",
// utils.ToJSON(eIndexes), utils.ToJSON(sS.aSessionsIdx))
// }
eRIdxes = map[string][]*riFieldNameVal{}
if len(eRIdxes) != len(sS.aSessionsRIdx) {
t.Errorf("Expecting: %+v, received: %+v", eRIdxes, sS.aSessionsRIdx)
@@ -546,7 +579,7 @@ func TestSessionSRegisterAndUnregisterASessions(t *testing.T) {
func TestSessionSRegisterAndUnregisterPSessions(t *testing.T) {
sSCfg, _ := config.NewDefaultCGRConfig()
sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
sSEv := engine.NewSafEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: "*voice",
@@ -569,6 +602,11 @@ func TestSessionSRegisterAndUnregisterPSessions(t *testing.T) {
s := &Session{
CGRID: "session1",
EventStart: sSEv,
SRuns: []*SRun{
&SRun{
Event: sSEv.AsMapInterface(),
},
},
}
//register the session
sS.registerSession(s, true)
@@ -579,24 +617,27 @@ func TestSessionSRegisterAndUnregisterPSessions(t *testing.T) {
}
//verify if the index was created according to session
eIndexes := map[string]map[string]utils.StringMap{
"OriginID": map[string]utils.StringMap{
"111": utils.StringMap{
"session1": true,
},
},
}
if !reflect.DeepEqual(eIndexes, sS.pSessionsIdx) {
t.Errorf("Expecting: %s, received: %s",
utils.ToJSON(eIndexes), utils.ToJSON(sS.pSessionsIdx))
}
// eIndexes := map[string]map[string]utils.StringMap{
// "OriginID": map[string]utils.StringMap{
// "111": utils.StringMap{
// "session1": true,
// },
// },
// }
// if !reflect.DeepEqual(eIndexes, sS.pSessionsIdx) {
// t.Errorf("Expecting: %s, received: %s",
// utils.ToJSON(eIndexes), utils.ToJSON(sS.pSessionsIdx))
// }
//verify if the revIdx was created according to session
eRIdxes := map[string][]*riFieldNameVal{
"session1": []*riFieldNameVal{
&riFieldNameVal{fieldName: "OriginID", fieldValue: "111"},
},
}
if len(eRIdxes) != len(sS.pSessionsRIdx) && eRIdxes["session1"][0] != sS.pSessionsRIdx["session1"][0] {
if len(eRIdxes) != len(sS.pSessionsRIdx) &&
len(eRIdxes["session2"]) > 0 &&
len(sS.aSessionsRIdx["session2"]) > 0 &&
eRIdxes["session1"][0] != sS.pSessionsRIdx["session1"][0] {
t.Errorf("Expecting: %+v, received: %+v", eRIdxes, sS.pSessionsRIdx)
}
@@ -620,6 +661,11 @@ func TestSessionSRegisterAndUnregisterPSessions(t *testing.T) {
s2 := &Session{
CGRID: "session2",
EventStart: sSEv2,
SRuns: []*SRun{
&SRun{
Event: sSEv2.AsMapInterface(),
},
},
}
//register the second session
sS.registerSession(s2, true)
@@ -629,20 +675,20 @@ func TestSessionSRegisterAndUnregisterPSessions(t *testing.T) {
}
//verify if the index was created according to session
eIndexes = map[string]map[string]utils.StringMap{
"OriginID": map[string]utils.StringMap{
"111": utils.StringMap{
"session1": true,
},
"222": utils.StringMap{
"session2": true,
},
},
}
if !reflect.DeepEqual(eIndexes, sS.pSessionsIdx) {
t.Errorf("Expecting: %s, received: %s",
utils.ToJSON(eIndexes), utils.ToJSON(sS.pSessionsIdx))
}
// eIndexes = map[string]map[string]utils.StringMap{
// "OriginID": map[string]utils.StringMap{
// "111": utils.StringMap{
// "session1": true,
// },
// "222": utils.StringMap{
// "session2": true,
// },
// },
// }
// if !reflect.DeepEqual(eIndexes, sS.pSessionsIdx) {
// t.Errorf("Expecting: %s, received: %s",
// utils.ToJSON(eIndexes), utils.ToJSON(sS.pSessionsIdx))
// }
//verify if the revIdx was created according to session
eRIdxes = map[string][]*riFieldNameVal{
"session1": []*riFieldNameVal{
@@ -652,7 +698,10 @@ func TestSessionSRegisterAndUnregisterPSessions(t *testing.T) {
&riFieldNameVal{fieldName: "OriginID", fieldValue: "222"},
},
}
if len(eRIdxes) != len(sS.pSessionsRIdx) && eRIdxes["session2"][0] != sS.pSessionsRIdx["session2"][0] {
if len(eRIdxes) != len(sS.pSessionsRIdx) &&
len(eRIdxes["session2"]) > 0 &&
len(sS.aSessionsRIdx["session2"]) > 0 &&
eRIdxes["session2"][0] != sS.pSessionsRIdx["session2"][0] {
t.Errorf("Expecting: %+v, received: %+v", eRIdxes, sS.pSessionsRIdx)
}
@@ -677,6 +726,11 @@ func TestSessionSRegisterAndUnregisterPSessions(t *testing.T) {
s3 := &Session{
CGRID: "session1",
EventStart: sSEv3,
SRuns: []*SRun{
&SRun{
Event: sSEv3.AsMapInterface(),
},
},
}
//register the third session with cgrID as first one (should be replaced)
sS.registerSession(s3, false)
@@ -690,17 +744,17 @@ func TestSessionSRegisterAndUnregisterPSessions(t *testing.T) {
//unregister the session and check if the index was removed
sS.unregisterSession("session1", true)
eIndexes = map[string]map[string]utils.StringMap{
"OriginID": map[string]utils.StringMap{
"222": utils.StringMap{
"session2": true,
},
},
}
if !reflect.DeepEqual(eIndexes, sS.pSessionsIdx) {
t.Errorf("Expecting: %s, received: %s",
utils.ToJSON(eIndexes), utils.ToJSON(sS.pSessionsIdx))
}
// eIndexes = map[string]map[string]utils.StringMap{
// "OriginID": map[string]utils.StringMap{
// "222": utils.StringMap{
// "session2": true,
// },
// },
// }
// if !reflect.DeepEqual(eIndexes, sS.pSessionsIdx) {
// t.Errorf("Expecting: %s, received: %s",
// utils.ToJSON(eIndexes), utils.ToJSON(sS.pSessionsIdx))
// }
eRIdxes = map[string][]*riFieldNameVal{
"session2": []*riFieldNameVal{
&riFieldNameVal{fieldName: "OriginID", fieldValue: "222"},
@@ -717,11 +771,11 @@ func TestSessionSRegisterAndUnregisterPSessions(t *testing.T) {
sS.unregisterSession("session2", true)
eIndexes = map[string]map[string]utils.StringMap{}
if !reflect.DeepEqual(eIndexes, sS.pSessionsIdx) {
t.Errorf("Expecting: %s, received: %s",
utils.ToJSON(eIndexes), utils.ToJSON(sS.pSessionsIdx))
}
// eIndexes = map[string]map[string]utils.StringMap{}
// if !reflect.DeepEqual(eIndexes, sS.pSessionsIdx) {
// t.Errorf("Expecting: %s, received: %s",
// utils.ToJSON(eIndexes), utils.ToJSON(sS.pSessionsIdx))
// }
eRIdxes = map[string][]*riFieldNameVal{}
if len(eRIdxes) != len(sS.pSessionsRIdx) {
t.Errorf("Expecting: %+v, received: %+v", eRIdxes, sS.pSessionsRIdx)
@@ -1043,7 +1097,7 @@ func TestSessionSV1ProcessEventReplyAsNavigableMap(t *testing.T) {
func TestSessionStransitSState(t *testing.T) {
sSCfg, _ := config.NewDefaultCGRConfig()
sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
sSEv := engine.NewSafEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: "*voice",
@@ -1088,95 +1142,9 @@ func TestSessionStransitSState(t *testing.T) {
}
}
func TestSessionSgetSessionIDsForPrefix(t *testing.T) {
sSCfg, _ := config.NewDefaultCGRConfig()
sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
sSEv := engine.NewSafEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: "*voice",
utils.OriginID: "111",
utils.Direction: "*out",
utils.Account: "account1",
utils.Subject: "subject1",
utils.Destination: "+4986517174963",
utils.Category: "call",
utils.Tenant: "cgrates.org",
utils.RequestType: "*prepaid",
utils.SetupTime: "2015-11-09 14:21:24",
utils.AnswerTime: "2015-11-09 14:22:02",
utils.Usage: "1m23s",
utils.LastUsed: "21s",
utils.PDD: "300ms",
utils.SUPPLIER: "supplier1",
utils.OriginHost: "127.0.0.1",
})
s := &Session{
CGRID: "session1",
EventStart: sSEv,
}
//register the session as active
sS.registerSession(s, false)
//verify if was registered
rcvS := sS.getSessions("session1", false)
if !reflect.DeepEqual(rcvS[0], s) {
t.Errorf("Expecting %+v, received: %+v", s, rcvS[0])
}
rcv := sS.getSessionIDsForPrefix("1", false)
exp := []string{"session1"}
if !reflect.DeepEqual(rcv, exp) {
t.Errorf("Expecting %+v, received: %+v", exp, rcv)
}
sSEv2 := engine.NewSafEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: "*voice",
utils.OriginID: "121",
utils.Direction: "*out",
utils.Account: "account1",
utils.Subject: "subject1",
utils.Destination: "+4986517174963",
utils.Category: "call",
utils.Tenant: "cgrates.org",
utils.RequestType: "*prepaid",
utils.SetupTime: "2015-11-09 14:21:24",
utils.AnswerTime: "2015-11-09 14:22:02",
utils.Usage: "1m23s",
utils.LastUsed: "21s",
utils.PDD: "300ms",
utils.SUPPLIER: "supplier1",
utils.OriginHost: "127.0.0.1",
})
s2 := &Session{
CGRID: "session2",
EventStart: sSEv2,
}
//register the session as active
sS.registerSession(s2, false)
//check for reverse
rcv = sS.getSessionIDsForPrefix("1", false)
exp = []string{"session1", "session2"}
exp2 := []string{"session2", "session1"}
if !reflect.DeepEqual(rcv, exp) && !reflect.DeepEqual(rcv, exp2) {
t.Errorf("Expecting %+v, received: %+v", exp, rcv)
}
rcv = sS.getSessionIDsForPrefix("12", false)
exp = []string{"session2"}
if !reflect.DeepEqual(rcv, exp) {
t.Errorf("Expecting %+v, received: %+v", exp, rcv)
}
sS.unregisterSession("session2", false)
rcv = sS.getSessionIDsForPrefix("12", false)
if rcv != nil {
t.Errorf("Expecting nil, received: %+v", rcv)
}
}
func TestSessionSregisterSessionWithTerminator(t *testing.T) {
sSCfg, _ := config.NewDefaultCGRConfig()
sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
sSEv := engine.NewSafEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: "*voice",
@@ -1215,7 +1183,7 @@ func TestSessionSregisterSessionWithTerminator(t *testing.T) {
func TestSessionSrelocateSessionS(t *testing.T) {
sSCfg, _ := config.NewDefaultCGRConfig()
sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
sSEv := engine.NewSafEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: "*voice",

View File

@@ -1416,3 +1416,10 @@ type ArgsGetGroup struct {
CacheID string
GroupID string
}
type SessionFilter struct {
Limit *int
Filters []string
Tenant string
*ArgDispatcher
}

View File

@@ -1060,6 +1060,7 @@ const (
CacheAttributeFilterIndexes = "attribute_filter_indexes"
CacheChargerFilterIndexes = "charger_filter_indexes"
CacheDispatcherFilterIndexes = "dispatcher_filter_indexes"
CacheSessionFilterIndexes = "session_filter_indexes"
CacheDiameterMessages = "diameter_messages"
CacheRPCResponses = "rpc_responses"
CacheClosedSessions = "closed_sessions"

View File

@@ -181,6 +181,16 @@ func (sm StringMap) HasKey(key string) (has bool) {
return
}
func (sm StringMap) GetSlice() (result []string) {
result = make([]string, len(sm))
i := 0
for k := range sm {
result[i] = k
i += 1
}
return
}
/*
func NoDots(m map[string]struct{}) map[string]struct{} {
return MapKeysReplace(m, ".", "")