diff --git a/dispatchers/sessions_it_test.go b/dispatchers/sessions_it_test.go index 9164d5398..7447eae3e 100755 --- a/dispatchers/sessions_it_test.go +++ b/dispatchers/sessions_it_test.go @@ -21,12 +21,14 @@ along with this program. If not, see package dispatchers import ( + "path" "reflect" "sort" "strings" "testing" "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" @@ -41,23 +43,28 @@ var sTestsDspSession = []func(t *testing.T){ testDspSessionTestAuthKey, testDspSessionAuthorize, testDspSessionInit, + testDspGetSessions, testDspSessionUpdate, testDspSessionTerminate, testDspSessionProcessCDR, testDspSessionProcessEvent, + + testDspSessionReplicate, + testDspSessionPassive, + testDspSessionForceDisconect, } //Test start here func TestDspSessionSTMySQL(t *testing.T) { - testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "attributes", "dispatchers", "testit", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "attributes", "dispatchers", "testit", "tutorial", "dispatchers") } func TestDspSessionSMongo(t *testing.T) { - testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "testit", "oldtutorial", "dispatchers") + testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "attributes_mongo", "dispatchers_mongo", "testit", "tutorial", "dispatchers") } func testDspSessionAddBalacne(t *testing.T) { - initUsage := 30 * time.Minute + initUsage := 40 * time.Minute attrSetBalance := utils.AttrSetBalance{ Tenant: "cgrates.org", Account: "1001", @@ -84,6 +91,17 @@ func testDspSessionAddBalacne(t *testing.T) { t.Errorf("Expecting: %v, received: %v", time.Duration(eAcntVal), time.Duration(acnt.BalanceMap[utils.VOICE].GetTotalValue())) } + if err := allEngine2.RCP.Call("ApierV2.SetBalance", attrSetBalance, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Received: %s", reply) + } + if err := allEngine2.RCP.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != eAcntVal { + t.Errorf("Expecting: %v, received: %v", + time.Duration(eAcntVal), time.Duration(acnt.BalanceMap[utils.VOICE].GetTotalValue())) + } } func testDspSessionPing(t *testing.T) { @@ -191,6 +209,7 @@ func testDspSessionAuthorize(t *testing.T) { Tenant: "cgrates.org", ID: "TestSSv1ItAuth", Event: map[string]interface{}{ + utils.CGRID: "c87609aa1cb6e9529ab1836cfeeebaab7aa7ebaf", utils.Tenant: "cgrates.org", utils.Category: "call", utils.ToR: utils.VOICE, @@ -243,6 +262,7 @@ func testDspSessionInit(t *testing.T) { Tenant: "cgrates.org", ID: "TestSSv1ItInitiateSession", Event: map[string]interface{}{ + utils.CGRID: "c87609aa1cb6e9529ab1836cfeeebaab7aa7ebaf", utils.Tenant: "cgrates.org", utils.Category: "call", utils.ToR: utils.VOICE, @@ -270,6 +290,44 @@ func testDspSessionInit(t *testing.T) { } } +func testDspGetSessions(t *testing.T) { + filtr := FilterSessionWithApiKey{ + DispatcherResource: DispatcherResource{ + APIKey: "ses12345", + }, + TenantArg: utils.TenantArg{ + Tenant: "cgrates.org", + }, + Filters: map[string]string{}, + } + var reply int + if err := dispEngine.RCP.Call(utils.SessionSv1GetActiveSessionsCount, + &filtr, &reply); err != nil { + t.Fatal(err) + } else if reply != 2 { + t.Errorf("Expected 2 active sessions recived %v", reply) + } + var rply []*sessions.ActiveSession + if err := dispEngine.RCP.Call(utils.SessionSv1GetActiveSessions, + &filtr, &rply); err != nil { + t.Fatal(err) + } else if len(rply) != 2 { + t.Errorf("Unexpected number of sessions returned %v :%s", len(rply), utils.ToJSON(rply)) + } + + if err := dispEngine.RCP.Call(utils.SessionSv1GetPassiveSessionsCount, + &filtr, &reply); err != nil { + t.Fatal(err) + } else if reply != 0 { + t.Errorf("Expected no pasive sessions recived %v", reply) + } + rply = nil + if err := dispEngine.RCP.Call(utils.SessionSv1GetPassiveSessions, + &filtr, &rply); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Fatalf("Expected %v recived %v with reply %s", utils.ErrNotFound, err, utils.ToJSON(rply)) + } +} + func testDspSessionUpdate(t *testing.T) { reqUsage := 5 * time.Minute argsUpdate := &UpdateSessionWithApiKey{ @@ -328,7 +386,77 @@ func testDspSessionUpdate(t *testing.T) { t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eAttrs), utils.ToJSON(rply.Attributes)) } - if *rply.MaxUsage != reqUsage { + if rply.MaxUsage == nil || *rply.MaxUsage != reqUsage { + t.Errorf("Unexpected MaxUsage: %v", utils.ToJSON(rply)) + } +} + +func testDspSessionUpdate2(t *testing.T) { + reqUsage := 5 * time.Minute + argsUpdate := &UpdateSessionWithApiKey{ + DispatcherResource: DispatcherResource{ + APIKey: "ses12345", + }, + V1UpdateSessionArgs: sessions.V1UpdateSessionArgs{ + GetAttributes: true, + UpdateSession: true, + CGREvent: utils.CGREvent{ + Tenant: "cgrates.org", + ID: "TestSSv1ItUpdateSession", + Event: map[string]interface{}{ + utils.CGRID: "c87609aa1cb6e9529ab1836cfeeebaab7aa7ebaf", + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.ToR: utils.VOICE, + utils.OriginID: "TestSSv1It1", + utils.RequestType: utils.META_PREPAID, + utils.Account: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: reqUsage, + }, + }, + }, + } + var rply sessions.V1UpdateSessionReply + if err := dispEngine.RCP.Call(utils.SessionSv1UpdateSession, + argsUpdate, &rply); err != nil { + t.Fatal(err) + } + eAttrs := &engine.AttrSProcessEventReply{ + MatchedProfiles: []string{"ATTR_1001_SESSIONAUTH"}, + AlteredFields: []string{"LCRProfile", "Password", "RequestType", "PaypalAccount"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "TestSSv1ItUpdateSession", + Event: map[string]interface{}{ + utils.CGRID: "c87609aa1cb6e9529ab1836cfeeebaab7aa7ebaf", + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.ToR: utils.VOICE, + utils.Account: "1001", + utils.Destination: "1002", + "LCRProfile": "premium_cli", + "Password": "CGRateS.org", + "PaypalAccount": "cgrates@paypal.com", + utils.OriginID: "TestSSv1It1", + utils.RequestType: utils.META_PREPAID, + utils.SetupTime: "2018-01-07T17:00:00Z", + utils.AnswerTime: "2018-01-07T17:00:10Z", + utils.Usage: float64(reqUsage), + }, + }, + } + sort.Strings(eAttrs.AlteredFields) + if rply.Attributes != nil && rply.Attributes.AlteredFields != nil { + sort.Strings(rply.Attributes.AlteredFields) + } + if !reflect.DeepEqual(eAttrs, rply.Attributes) { + t.Errorf("expecting: %+v, received: %+v", + utils.ToJSON(eAttrs), utils.ToJSON(rply.Attributes)) + } + if rply.MaxUsage == nil || *rply.MaxUsage != reqUsage { t.Errorf("Unexpected MaxUsage: %v", utils.ToJSON(rply)) } } @@ -416,7 +544,7 @@ func testDspSessionProcessEvent(t *testing.T) { Tenant: "cgrates.org", ID: "TestSSv1ItProcessEvent", Event: map[string]interface{}{ - "CGRID": "c87609aa1cb6e9529ab1836cfeeeb0ab7aa7ebaf", + utils.CGRID: "c87609aa1cb6e9529ab1836cfeeebaab7aa7ebaf", utils.Tenant: "cgrates.org", utils.Category: "call", utils.ToR: utils.VOICE, @@ -449,7 +577,7 @@ func testDspSessionProcessEvent(t *testing.T) { Tenant: "cgrates.org", ID: "TestSSv1ItProcessEvent", Event: map[string]interface{}{ - "CGRID": "c87609aa1cb6e9529ab1836cfeeeb0ab7aa7ebaf", + utils.CGRID: "c87609aa1cb6e9529ab1836cfeeebaab7aa7ebaf", utils.Tenant: "cgrates.org", utils.Category: "call", utils.ToR: utils.VOICE, @@ -469,3 +597,207 @@ func testDspSessionProcessEvent(t *testing.T) { utils.ToJSON(eAttrs), utils.ToJSON(rply.Attributes)) } } + +func testDspSessionReplicate(t *testing.T) { + allEngine.initDataDb(t) + allEngine.resetStorDb(t) + allEngine.loadData(t, path.Join(dspDataDir, "tariffplans", "testit")) + testDspSessionAddBalacne(t) + testDspSessionAuthorize(t) + testDspSessionInit(t) + + var reply string + if err := dispEngine.RCP.Call(utils.SessionSv1ReplicateSessions, ArgsReplicateSessionsWithApiKey{ + DispatcherResource: DispatcherResource{ + APIKey: "ses12345", + }, + TenantArg: utils.TenantArg{ + Tenant: "cgrates.org", + }, + ArgsReplicateSessions: sessions.ArgsReplicateSessions{ + CGRID: "c87609aa1cb6e9529ab1836cfeeebaab7aa7ebaf", + Passive: false, + Connections: []*config.HaPoolConfig{ + &config.HaPoolConfig{ + Address: "127.0.0.1:7012", + Transport: utils.MetaJSONrpc, + }, + }, + }, + }, &reply); err != nil { + t.Fatal(err) + } else if reply != utils.OK { + t.Errorf("Unexpected reply %s", reply) + } + + var repl int + time.Sleep(10 * time.Millisecond) + if err := allEngine2.RCP.Call(utils.SessionSv1GetPassiveSessionsCount, + map[string]string{}, &repl); err != nil { + t.Fatal(err) + } else if repl != 2 { + t.Errorf("Expected 1 sessions recived %v", repl) + } +} + +func testDspSessionPassive(t *testing.T) { + allEngine.stopEngine(t) + testDspSessionUpdate2(t) + var repl int + filtr := FilterSessionWithApiKey{ + DispatcherResource: DispatcherResource{ + APIKey: "ses12345", + }, + TenantArg: utils.TenantArg{ + Tenant: "cgrates.org", + }, + Filters: map[string]string{}, + } + time.Sleep(10 * time.Millisecond) + if err := dispEngine.RCP.Call(utils.SessionSv1GetPassiveSessionsCount, + filtr, &repl); err != nil { + t.Fatal(err) + } else if repl != 0 { + t.Errorf("Expected no passive sessions recived %v", repl) + } + if err := dispEngine.RCP.Call(utils.SessionSv1GetActiveSessionsCount, + filtr, &repl); err != nil { + t.Fatal(err) + } else if repl != 2 { + t.Errorf("Expected 1 active sessions recived %v", repl) + } + + var rply []*sessions.ActiveSession + if err := dispEngine.RCP.Call(utils.SessionSv1GetActiveSessions, + &filtr, &rply); err != nil { + t.Fatal(err) + } else if len(rply) != 2 { + t.Errorf("Unexpected number of sessions returned %v :%s", len(rply), utils.ToJSON(rply)) + } + + var reply string + if err := dispEngine.RCP.Call(utils.SessionSv1SetPassiveSession, SessionWithApiKey{ + DispatcherResource: DispatcherResource{ + APIKey: "ses12345", + }, + Session: sessions.Session{ + CGRID: rply[0].CGRID, + Tenant: rply[0].Tenant, + ResourceID: "TestSSv1It1", + EventStart: engine.NewSafEvent(map[string]interface{}{ + utils.CGRID: "c87609aa1cb6e9529ab1836cfeeebaab7aa7ebaf", + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.ToR: utils.VOICE, + utils.OriginID: "TestSSv1It1", + utils.RequestType: utils.META_PREPAID, + utils.Account: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: 5 * time.Minute, + }), + SRuns: []*sessions.SRun{ + &sessions.SRun{ + Event: engine.NewMapEvent(map[string]interface{}{ + "RunID": "CustomerCharges", + utils.CGRID: "c87609aa1cb6e9529ab1836cfeeebaab7aa7ebaf", + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.ToR: utils.VOICE, + utils.OriginID: "TestSSv1It1", + utils.RequestType: utils.META_PREPAID, + utils.Account: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Date(2018, time.January, 7, 16, 60, 0, 0, time.UTC), + utils.AnswerTime: time.Date(2018, time.January, 7, 16, 60, 10, 0, time.UTC), + utils.Usage: 5 * time.Minute, + }), + CD: &engine.CallDescriptor{}, + EventCost: &engine.EventCost{}, + + LastUsage: 5 * time.Minute, + TotalUsage: 10 * time.Minute, + }, + }, + }, + }, &reply); err != nil { + t.Fatal(err) + } else if reply != utils.OK { + t.Errorf("Unexpected reply %s", reply) + } + time.Sleep(10 * time.Millisecond) + if err := dispEngine.RCP.Call(utils.SessionSv1GetPassiveSessionsCount, + filtr, &repl); err != nil { + t.Fatal(err) + } else if repl != 1 { + t.Errorf("Expected 1 passive sessions recived %v", repl) + } + if err := dispEngine.RCP.Call(utils.SessionSv1GetActiveSessionsCount, + filtr, &repl); err != nil { + t.Fatal(err) + } else if repl != 0 { + t.Errorf("Expected no active sessions recived %v", repl) + } +} + +func testDspSessionForceDisconect(t *testing.T) { + allEngine.startEngine(t) + allEngine.initDataDb(t) + allEngine.resetStorDb(t) + allEngine.loadData(t, path.Join(dspDataDir, "tariffplans", "testit")) + testDspSessionAddBalacne(t) + testDspSessionAuthorize(t) + testDspSessionInit(t) + var repl int + filtr := FilterSessionWithApiKey{ + DispatcherResource: DispatcherResource{ + APIKey: "ses12345", + }, + TenantArg: utils.TenantArg{ + Tenant: "cgrates.org", + }, + Filters: map[string]string{}, + } + time.Sleep(10 * time.Millisecond) + if err := dispEngine.RCP.Call(utils.SessionSv1GetPassiveSessionsCount, + filtr, &repl); err != nil { + t.Fatal(err) + } else if repl != 0 { + t.Errorf("Expected no passive sessions recived %v", repl) + } + if err := dispEngine.RCP.Call(utils.SessionSv1GetActiveSessionsCount, + filtr, &repl); err != nil { + t.Fatal(err) + } else if repl != 2 { + t.Errorf("Expected 1 active sessions recived %v", repl) + } + + var rply []*sessions.ActiveSession + if err := dispEngine.RCP.Call(utils.SessionSv1GetActiveSessions, + &filtr, &rply); err != nil { + t.Fatal(err) + } else if len(rply) != 2 { + t.Errorf("Unexpected number of sessions returned %v :%s", len(rply), utils.ToJSON(rply)) + } + + var reply string + if err := dispEngine.RCP.Call(utils.SessionSv1ForceDisconnect, &filtr, &reply); err != nil { + t.Fatal(err) + } else if reply != utils.OK { + t.Errorf("Unexpected reply %s", reply) + } + time.Sleep(10 * time.Millisecond) + if err := dispEngine.RCP.Call(utils.SessionSv1GetPassiveSessionsCount, + filtr, &repl); err != nil { + t.Fatal(err) + } else if repl != 0 { + t.Errorf("Expected 1 passive sessions recived %v", repl) + } + if err := dispEngine.RCP.Call(utils.SessionSv1GetActiveSessionsCount, + filtr, &repl); err != nil { + t.Fatal(err) + } else if repl != 0 { + t.Errorf("Expected no active sessions recived %v", repl) + } +} diff --git a/sessions/sessions.go b/sessions/sessions.go index 7d4f40529..7d70cc86b 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -669,7 +669,7 @@ func (sS *SessionS) replicateSessions(cgrID string, psv bool, rplConns []*SReplC ss = []*Session{&Session{CGRID: cgrID, EventStart: engine.NewSafEvent(nil)}} } var wg sync.WaitGroup - for _, rplConn := range sS.sReplConns { + for _, rplConn := range rplConns { if rplConn.Synchronous { wg.Add(1) } @@ -916,14 +916,14 @@ func (sS *SessionS) asActiveSessions(fltrs map[string]string, i++ } } - if count { - return nil, len(remainingSessions), nil - } for _, s := range remainingSessions { aSs = append(aSs, s.AsActiveSessions(sS.cgrCfg.GeneralCfg().DefaultTimezone, sS.cgrCfg.GeneralCfg().NodeID)...) // Expensive for large number of sessions } + if count { + return nil, len(aSs), nil + } return } @@ -1438,7 +1438,6 @@ func (sS *SessionS) BiRPCv1SetPassiveSession(clnt rpcclient.RpcClientConnection, //we unregister it first then regiser the new one if len(sS.getSessions(s.CGRID, false)) != 0 { sS.unregisterSession(s.CGRID, false) - } sS.registerSession(s, true) }