Update tests and config for new sessions

This commit is contained in:
TeoV
2019-02-05 07:42:47 -05:00
committed by Dan Christian Bogos
parent 884bc5b50e
commit e1d3aa5a35
9 changed files with 138 additions and 187 deletions

View File

@@ -80,7 +80,7 @@ func TestSSv1ItResetStorDb(t *testing.T) {
}
func TestSSv1ItStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(sSv1CfgPath, 100); err != nil {
if _, err := engine.StopStartEngine(sSv1CfgPath, 1000); err != nil {
t.Fatal(err)
}
}
@@ -368,7 +368,7 @@ func TestSSv1ItInitiateSessionWithDigest(t *testing.T) {
aSessions := make([]*sessions.ActiveSession, 0)
if err := sSv1BiRpc.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 4 { // the digest has increased the number of sessions
} else if len(aSessions) != 2 {
t.Errorf("wrong active sessions: %s", utils.ToJSON(aSessions))
}
}
@@ -435,7 +435,7 @@ func TestSSv1ItUpdateSession(t *testing.T) {
aSessions := make([]*sessions.ActiveSession, 0)
if err := sSv1BiRpc.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err != nil {
t.Error(err)
} else if len(aSessions) != 4 { // the digest has increased the number of sessions
} else if len(aSessions) != 2 {
t.Errorf("wrong active sessions: %s", utils.ToJSON(aSessions))
}
}
@@ -659,7 +659,7 @@ func TestSSv1ItForceUpdateSession(t *testing.T) {
}
var acnt *engine.Account
attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"}
eAcntVal := 9.25
eAcntVal := 9.3995
if err := sSApierRpc.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
t.Error(err)
} else if acnt.BalanceMap[utils.MONETARY].GetTotalValue() != eAcntVal {
@@ -731,7 +731,7 @@ func TestSSv1ItForceUpdateSession(t *testing.T) {
t.Errorf("wrong active ssesions: %s", utils.ToJSON(aSessions))
}
eAcntVal = 9.10
eAcntVal = 9.2495
if err := sSApierRpc.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
t.Error(err)
} else if acnt.BalanceMap[utils.MONETARY].GetTotalValue() != eAcntVal {
@@ -833,7 +833,7 @@ func TestSSv1ItDynamicDebit(t *testing.T) {
args1, &rply1); err != nil {
t.Error(err)
return
} else if *rply1.MaxUsage != -1 {
} else if *rply1.MaxUsage != time.Duration(0*time.Second) {
t.Errorf("Unexpected MaxUsage: %v", rply1.MaxUsage)
}
@@ -873,7 +873,7 @@ func TestSSv1ItDynamicDebit(t *testing.T) {
} else if len(aSessions) != 2 {
t.Errorf("wrong active sessions: %s", utils.ToJSON(aSessions))
}
rplyt := ""
var rplyt string
if err := sSv1BiRpc.Call(utils.SessionSv1ForceDisconnect,
nil, &rplyt); err != nil {
t.Error(err)
@@ -891,7 +891,7 @@ func TestSSv1ItStopCgrEngine(t *testing.T) {
if err := sSv1BiRpc.Close(); err != nil { // Close the connection so we don't get EOF warnings from client
t.Error(err)
}
if err := engine.KillEngine(100); err != nil {
if err := engine.KillEngine(1000); err != nil {
t.Error(err)
}
}

View File

@@ -1,142 +0,0 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v1
import (
"encoding/json"
"net/rpc"
"net/rpc/jsonrpc"
"path"
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var smgV1CfgPath string
var smgV1Cfg *config.CGRConfig
var smgV1Rpc *rpc.Client
var smgV1LoadInst utils.LoadInstance // Share load information between tests
func TestSMGV1InitCfg(t *testing.T) {
smgV1CfgPath = path.Join(*dataDir, "conf", "samples", "smgeneric")
// Init config first
var err error
smgV1Cfg, err = config.NewCGRConfigFromFolder(smgV1CfgPath)
if err != nil {
t.Error(err)
}
smgV1Cfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush()
config.SetCgrConfig(smgV1Cfg)
}
// Remove data in both rating and accounting db
func TestSMGV1ResetDataDb(t *testing.T) {
if err := engine.InitDataDb(smgV1Cfg); err != nil {
t.Fatal(err)
}
}
// Wipe out the cdr database
func TestSMGV1ResetStorDb(t *testing.T) {
if err := engine.InitStorDb(smgV1Cfg); err != nil {
t.Fatal(err)
}
}
// Start CGR Engine
func TestSMGV1StartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(smgV1CfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater
func TestSMGV1RpcConn(t *testing.T) {
var err error
smgV1Rpc, err = jsonrpc.Dial("tcp", smgV1Cfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
}
// Load the tariff plan, creating accounts and their balances
func TestSMGV1LoadTariffPlanFromFolder(t *testing.T) {
var reply string
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "oldtutorial")}
if err := smgV1Rpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
t.Error(err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups
}
// Check loaded stats
func TestSMGV1CacheStats(t *testing.T) {
var reply string
if err := smgV1Rpc.Call("ApierV1.LoadCache", utils.AttrReloadCache{}, &reply); err != nil {
t.Error(err)
} else if reply != "OK" {
t.Error(reply)
}
var rcvStats *utils.CacheStats
expectedStats := &utils.CacheStats{Destinations: 5, ReverseDestinations: 7, RatingPlans: 4, RatingProfiles: 10,
Actions: 9, ActionPlans: 4, AccountActionPlans: 5, SharedGroups: 1, DerivedChargers: 1,
Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 3, StatQueues: 1,
StatQueueProfiles: 1, Thresholds: 7, ThresholdProfiles: 7, Filters: 16, SupplierProfiles: 3, AttributeProfiles: 1}
var args utils.AttrCacheStats
if err := smgV1Rpc.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil {
t.Error("Got error on ApierV1.GetCacheStats: ", err.Error())
} else if !reflect.DeepEqual(expectedStats, rcvStats) {
t.Errorf("Calling ApierV1.GetCacheStats expected: %+v, received: %+v", expectedStats, rcvStats)
}
}
// Make sure account was debited properly
func TestSMGV1AccountsBefore(t *testing.T) {
var reply *engine.Account
attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"}
if err := smgV1Rpc.Call("ApierV2.GetAccount", attrs, &reply); err != nil {
t.Error("Got error on ApierV2.GetAccount: ", err.Error())
} else if reply.BalanceMap[utils.MONETARY].GetTotalValue() != 10.0 { // Make sure we debitted
jsn, _ := json.Marshal(reply)
t.Errorf("Received: %s", jsn)
}
}
// Make sure account was debited properly
func TestSMGV1GetMaxUsage(t *testing.T) {
setupReq := map[string]interface{}{utils.RequestType: utils.META_PREPAID, utils.Tenant: "cgrates.org",
utils.Account: "1003", utils.Destination: "1002", utils.SetupTime: "2015-11-10T15:20:00Z"}
var maxTime float64
if err := smgV1Rpc.Call("SMGenericV1.GetMaxUsage", setupReq, &maxTime); err != nil {
t.Error(err)
} else if maxTime != 2700 {
t.Errorf("Calling ApierV1.GetMaxUsage got maxTime: %f", maxTime)
}
}
func TestSMGV1StopCgrEngine(t *testing.T) {
if err := engine.KillEngine(100); err != nil {
t.Error(err)
}
}

View File

@@ -1,32 +1,14 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v2
import (
"fmt"
"net/rpc"
"net/rpc/jsonrpc"
"path"
"reflect"
"testing"
"github.com/cgrates/cgrates/apier/v1"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -82,6 +64,7 @@ func TestITMongoTutorial(t *testing.T) {
func testTPitLoadConfig(t *testing.T) {
tpCfgPath = path.Join(*dataDir, "conf", "samples", configDIR)
if tpCfg, err = config.NewCGRConfigFromFolder(tpCfgPath); err != nil {
fmt.Println("err : ", err)
t.Error(err)
}
switch configDIR {

View File

@@ -285,16 +285,25 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
sm := sessions.NewSessionS(cfg, ralsConns, resSConns, threshSConns,
statSConns, suplSConns, attrSConns, cdrsConn, chargerSConn,
sReplConns, cfg.GeneralCfg().DefaultTimezone)
if err = sm.ListenAndServe(exitChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.SessionS, err))
}
//start sync session in a separate gorutine
go func() {
if err = sm.ListenAndServe(exitChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.SessionS, err))
}
}()
// Pass internal connection via BiRPCClient
internalSMGChan <- sm
// Register RPC handler
smgRpc := v1.NewSMGenericV1(sm)
server.RpcRegister(smgRpc)
ssv1 := v1.NewSessionSv1(sm) // methods with multiple options
server.RpcRegister(ssv1)
// Register BiRpc handlers
if cfg.SessionSCfg().ListenBijson != "" {
for method, handler := range smgRpc.Handlers() {
server.BiRPCRegisterName(method, handler)
}
for method, handler := range ssv1.Handlers() {
server.BiRPCRegisterName(method, handler)
}

View File

@@ -63,8 +63,24 @@
},
"attributes": {
"enabled": true,
},
"chargers": {
"enabled": true,
"attributes_conns": [
{"address": "*internal"}
],
},
"sessions": {
"enabled": true, // starts SessionManager service: <true|false>
"chargers_conns": [
{"address": "*internal"}
],
},

View File

@@ -256,6 +256,7 @@
"suppliers": {
"enabled": true,
"prefix_indexed_fields":["Destination",],
"stats_conns": [
{"address": "*internal"},
],

View File

@@ -12,6 +12,13 @@
},
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "redis", // data_db type: <redis|mongo>
"db_port": 6379, // data_db port to reach the database
"db_name": "10", // data_db database name to connect to
},
"stor_db": {
"db_type": "postgres", // stor database type to use: <mysql|postgres>
"db_port": 5432, // the port to reach the stordb
@@ -86,13 +93,30 @@
},
"attributes": {
"enabled": true,
},
"suppliers": {
"enabled": true,
},
"chargers": {
"enabled": true,
"attributes_conns": [
{"address": "*internal"}
],
},
"sessions": {
"enabled": true,
"chargers_conns": [
{"address": "*internal"}
],
},

View File

@@ -362,6 +362,7 @@ func (sS *SessionS) setSTerminator(s *Session) {
// forceSTerminate is called when a session times-out or it is forced from CGRateS side
func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUsed *time.Duration) (err error) {
if extraDebit != 0 {
for i := range s.SRuns {
if _, err = sS.debitSession(s, i, extraDebit, nil); err != nil {
utils.Logger.Warning(
@@ -414,18 +415,20 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs
}
}
if clntConn := sS.biJClnt(s.ClientConnID); clntConn != nil {
var rply string
if err := clntConn.conn.Call(utils.SessionSv1DisconnectSession,
utils.AttrDisconnectSession{
EventStart: s.EventStart.AsMapInterface(),
Reason: ErrForcedDisconnect.Error()},
&rply); err != nil {
if err != utils.ErrNotImplemented {
utils.Logger.Warning(
fmt.Sprintf("<%s> err: %s remotely disconnect session with id: %s",
utils.SessionS, err.Error(), s.CGRID))
go func() {
var rply string
if err := clntConn.conn.Call(utils.SessionSv1DisconnectSession,
utils.AttrDisconnectSession{
EventStart: s.EventStart.AsMapInterface(),
Reason: ErrForcedDisconnect.Error()},
&rply); err != nil {
if err != utils.ErrNotImplemented {
utils.Logger.Warning(
fmt.Sprintf("<%s> err: %s remotely disconnect session with id: %s",
utils.SessionS, err.Error(), s.CGRID))
}
}
}
}()
}
sS.replicateSessions(s.CGRID, false, sS.sReplConns)
return
@@ -1178,6 +1181,10 @@ func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage
s.EventStart.Set(k, v) // update previoius field with new one
}
sS.setSTerminator(s) // reset the terminator
//init has no updtEv
if updtEv == nil {
updtEv = engine.NewMapEvent(s.EventStart.AsMapInterface())
}
var reqMaxUsage time.Duration
if reqMaxUsage, err = updtEv.GetDuration(utils.Usage); err != nil {
if err != utils.ErrNotFound {
@@ -1186,7 +1193,6 @@ func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage
reqMaxUsage = sS.cgrCfg.SessionSCfg().MaxCallDuration
err = nil
}
s.RLock()
var maxUsageSet bool // so we know if we have set the 0 on purpose
prepaidReqs := []string{utils.META_PREPAID, utils.META_PSEUDOPREPAID}
for i, sr := range s.SRuns {
@@ -1205,7 +1211,7 @@ func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage
maxUsageSet = true
}
}
s.RUnlock()
return
}
@@ -2366,8 +2372,10 @@ func (sS *SessionS) BiRPCv1ForceDisconnect(clnt rpcclient.RpcClientConnection,
err = utils.ErrPartiallyExecuted
}
}
if err != nil {
if err == nil {
*reply = utils.OK
} else {
*reply = err.Error()
}
return nil
}

View File

@@ -1213,5 +1213,57 @@ func TestSessionSregisterSessionWithTerminator(t *testing.T) {
t.Errorf("Expecting %+v, received: %+v",
time.Duration(2*time.Second), rcvS[0].sTerminator.ttl)
}
}
func TestSessionSrelocateSessionS(t *testing.T) {
sSCfg, _ := config.NewDefaultCGRConfig()
sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC")
sSEv := engine.NewSafEvent(map[string]interface{}{
utils.EVENT_NAME: "TEST_EVENT",
utils.ToR: "*voice",
utils.OriginID: "111",
utils.Direction: "*out",
utils.Account: "account1",
utils.Subject: "subject1",
utils.Destination: "+4986517174963",
utils.Category: "call",
utils.Tenant: "cgrates.org",
utils.RequestType: "*prepaid",
utils.SetupTime: "2015-11-09 14:21:24",
utils.AnswerTime: "2015-11-09 14:22:02",
utils.Usage: "1m23s",
utils.LastUsed: "21s",
utils.PDD: "300ms",
utils.SUPPLIER: "supplier1",
utils.OriginHost: "127.0.0.1",
})
initialCGRID := GetSetCGRID(sSEv)
s := &Session{
CGRID: initialCGRID,
EventStart: sSEv,
}
//register the session as active
sS.registerSession(s, false)
//verify the session
rcvS := sS.getSessions(s.CGRID, false)
if !reflect.DeepEqual(rcvS[0], s) {
t.Errorf("Expecting %+v, received: %+v", s, rcvS[0])
}
//relocate the session
sS.relocateSessions("111", "222", "127.0.0.1")
//check if the session exist with old CGRID
rcvS = sS.getSessions(initialCGRID, false)
if len(rcvS) != 0 {
t.Errorf("Expecting 0, received: %+v", len(rcvS))
}
ev := engine.NewSafEvent(map[string]interface{}{
utils.OriginID: "222",
utils.OriginHost: "127.0.0.1"})
cgrID := GetSetCGRID(ev)
//check the session with new CGRID
rcvS = sS.getSessions(cgrID, false)
if !reflect.DeepEqual(rcvS[0], s) {
t.Errorf("Expecting %+v, received: %+v", s, rcvS[0])
}
}