diff --git a/apier/v1/sessionsv1_it_test.go b/apier/v1/sessionsv1_it_test.go index 25e9ea7b2..a77566ad6 100644 --- a/apier/v1/sessionsv1_it_test.go +++ b/apier/v1/sessionsv1_it_test.go @@ -80,7 +80,7 @@ func TestSSv1ItResetStorDb(t *testing.T) { } func TestSSv1ItStartEngine(t *testing.T) { - if _, err := engine.StopStartEngine(sSv1CfgPath, 100); err != nil { + if _, err := engine.StopStartEngine(sSv1CfgPath, 1000); err != nil { t.Fatal(err) } } @@ -368,7 +368,7 @@ func TestSSv1ItInitiateSessionWithDigest(t *testing.T) { aSessions := make([]*sessions.ActiveSession, 0) if err := sSv1BiRpc.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err != nil { t.Error(err) - } else if len(aSessions) != 4 { // the digest has increased the number of sessions + } else if len(aSessions) != 2 { t.Errorf("wrong active sessions: %s", utils.ToJSON(aSessions)) } } @@ -435,7 +435,7 @@ func TestSSv1ItUpdateSession(t *testing.T) { aSessions := make([]*sessions.ActiveSession, 0) if err := sSv1BiRpc.Call(utils.SessionSv1GetActiveSessions, nil, &aSessions); err != nil { t.Error(err) - } else if len(aSessions) != 4 { // the digest has increased the number of sessions + } else if len(aSessions) != 2 { t.Errorf("wrong active sessions: %s", utils.ToJSON(aSessions)) } } @@ -659,7 +659,7 @@ func TestSSv1ItForceUpdateSession(t *testing.T) { } var acnt *engine.Account attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} - eAcntVal := 9.25 + eAcntVal := 9.3995 if err := sSApierRpc.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { t.Error(err) } else if acnt.BalanceMap[utils.MONETARY].GetTotalValue() != eAcntVal { @@ -731,7 +731,7 @@ func TestSSv1ItForceUpdateSession(t *testing.T) { t.Errorf("wrong active ssesions: %s", utils.ToJSON(aSessions)) } - eAcntVal = 9.10 + eAcntVal = 9.2495 if err := sSApierRpc.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { t.Error(err) } else if acnt.BalanceMap[utils.MONETARY].GetTotalValue() != eAcntVal { @@ -833,7 +833,7 @@ func TestSSv1ItDynamicDebit(t *testing.T) { args1, &rply1); err != nil { t.Error(err) return - } else if *rply1.MaxUsage != -1 { + } else if *rply1.MaxUsage != time.Duration(0*time.Second) { t.Errorf("Unexpected MaxUsage: %v", rply1.MaxUsage) } @@ -873,7 +873,7 @@ func TestSSv1ItDynamicDebit(t *testing.T) { } else if len(aSessions) != 2 { t.Errorf("wrong active sessions: %s", utils.ToJSON(aSessions)) } - rplyt := "" + var rplyt string if err := sSv1BiRpc.Call(utils.SessionSv1ForceDisconnect, nil, &rplyt); err != nil { t.Error(err) @@ -891,7 +891,7 @@ func TestSSv1ItStopCgrEngine(t *testing.T) { if err := sSv1BiRpc.Close(); err != nil { // Close the connection so we don't get EOF warnings from client t.Error(err) } - if err := engine.KillEngine(100); err != nil { + if err := engine.KillEngine(1000); err != nil { t.Error(err) } } diff --git a/apier/v1/smgenericv1_it_test.go b/apier/v1/smgenericv1_it_test.go deleted file mode 100644 index efe76f637..000000000 --- a/apier/v1/smgenericv1_it_test.go +++ /dev/null @@ -1,142 +0,0 @@ -// +build integration - -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package v1 - -import ( - "encoding/json" - "net/rpc" - "net/rpc/jsonrpc" - "path" - "reflect" - "testing" - "time" - - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -var smgV1CfgPath string -var smgV1Cfg *config.CGRConfig -var smgV1Rpc *rpc.Client -var smgV1LoadInst utils.LoadInstance // Share load information between tests - -func TestSMGV1InitCfg(t *testing.T) { - smgV1CfgPath = path.Join(*dataDir, "conf", "samples", "smgeneric") - // Init config first - var err error - smgV1Cfg, err = config.NewCGRConfigFromFolder(smgV1CfgPath) - if err != nil { - t.Error(err) - } - smgV1Cfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush() - config.SetCgrConfig(smgV1Cfg) -} - -// Remove data in both rating and accounting db -func TestSMGV1ResetDataDb(t *testing.T) { - if err := engine.InitDataDb(smgV1Cfg); err != nil { - t.Fatal(err) - } -} - -// Wipe out the cdr database -func TestSMGV1ResetStorDb(t *testing.T) { - if err := engine.InitStorDb(smgV1Cfg); err != nil { - t.Fatal(err) - } -} - -// Start CGR Engine -func TestSMGV1StartEngine(t *testing.T) { - if _, err := engine.StopStartEngine(smgV1CfgPath, *waitRater); err != nil { - t.Fatal(err) - } -} - -// Connect rpc client to rater -func TestSMGV1RpcConn(t *testing.T) { - var err error - smgV1Rpc, err = jsonrpc.Dial("tcp", smgV1Cfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed - if err != nil { - t.Fatal(err) - } -} - -// Load the tariff plan, creating accounts and their balances -func TestSMGV1LoadTariffPlanFromFolder(t *testing.T) { - var reply string - attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "oldtutorial")} - if err := smgV1Rpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil { - t.Error(err) - } - time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups -} - -// Check loaded stats -func TestSMGV1CacheStats(t *testing.T) { - var reply string - if err := smgV1Rpc.Call("ApierV1.LoadCache", utils.AttrReloadCache{}, &reply); err != nil { - t.Error(err) - } else if reply != "OK" { - t.Error(reply) - } - var rcvStats *utils.CacheStats - expectedStats := &utils.CacheStats{Destinations: 5, ReverseDestinations: 7, RatingPlans: 4, RatingProfiles: 10, - Actions: 9, ActionPlans: 4, AccountActionPlans: 5, SharedGroups: 1, DerivedChargers: 1, - Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 3, StatQueues: 1, - StatQueueProfiles: 1, Thresholds: 7, ThresholdProfiles: 7, Filters: 16, SupplierProfiles: 3, AttributeProfiles: 1} - var args utils.AttrCacheStats - if err := smgV1Rpc.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil { - t.Error("Got error on ApierV1.GetCacheStats: ", err.Error()) - } else if !reflect.DeepEqual(expectedStats, rcvStats) { - t.Errorf("Calling ApierV1.GetCacheStats expected: %+v, received: %+v", expectedStats, rcvStats) - } -} - -// Make sure account was debited properly -func TestSMGV1AccountsBefore(t *testing.T) { - var reply *engine.Account - attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} - if err := smgV1Rpc.Call("ApierV2.GetAccount", attrs, &reply); err != nil { - t.Error("Got error on ApierV2.GetAccount: ", err.Error()) - } else if reply.BalanceMap[utils.MONETARY].GetTotalValue() != 10.0 { // Make sure we debitted - jsn, _ := json.Marshal(reply) - t.Errorf("Received: %s", jsn) - } -} - -// Make sure account was debited properly -func TestSMGV1GetMaxUsage(t *testing.T) { - setupReq := map[string]interface{}{utils.RequestType: utils.META_PREPAID, utils.Tenant: "cgrates.org", - utils.Account: "1003", utils.Destination: "1002", utils.SetupTime: "2015-11-10T15:20:00Z"} - var maxTime float64 - if err := smgV1Rpc.Call("SMGenericV1.GetMaxUsage", setupReq, &maxTime); err != nil { - t.Error(err) - } else if maxTime != 2700 { - t.Errorf("Calling ApierV1.GetMaxUsage got maxTime: %f", maxTime) - } -} - -func TestSMGV1StopCgrEngine(t *testing.T) { - if err := engine.KillEngine(100); err != nil { - t.Error(err) - } -} diff --git a/apier/v2/tp_it_test.go b/apier/v2/tp_it_test.go index 054c64ade..f28ec8feb 100644 --- a/apier/v2/tp_it_test.go +++ b/apier/v2/tp_it_test.go @@ -1,32 +1,14 @@ -// +build integration - -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ package v2 import ( + "fmt" "net/rpc" "net/rpc/jsonrpc" "path" "reflect" "testing" - "github.com/cgrates/cgrates/apier/v1" + v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -82,6 +64,7 @@ func TestITMongoTutorial(t *testing.T) { func testTPitLoadConfig(t *testing.T) { tpCfgPath = path.Join(*dataDir, "conf", "samples", configDIR) if tpCfg, err = config.NewCGRConfigFromFolder(tpCfgPath); err != nil { + fmt.Println("err : ", err) t.Error(err) } switch configDIR { diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index de44dd76e..2503e25fa 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -285,16 +285,25 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in sm := sessions.NewSessionS(cfg, ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, cdrsConn, chargerSConn, sReplConns, cfg.GeneralCfg().DefaultTimezone) - if err = sm.ListenAndServe(exitChan); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.SessionS, err)) - } + //start sync session in a separate gorutine + go func() { + if err = sm.ListenAndServe(exitChan); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.SessionS, err)) + } + }() // Pass internal connection via BiRPCClient internalSMGChan <- sm // Register RPC handler + smgRpc := v1.NewSMGenericV1(sm) + server.RpcRegister(smgRpc) + ssv1 := v1.NewSessionSv1(sm) // methods with multiple options server.RpcRegister(ssv1) // Register BiRpc handlers if cfg.SessionSCfg().ListenBijson != "" { + for method, handler := range smgRpc.Handlers() { + server.BiRPCRegisterName(method, handler) + } for method, handler := range ssv1.Handlers() { server.BiRPCRegisterName(method, handler) } diff --git a/data/conf/samples/smgeneric/cgrates.json b/data/conf/samples/smgeneric/cgrates.json index dea2259b4..9e8996fca 100644 --- a/data/conf/samples/smgeneric/cgrates.json +++ b/data/conf/samples/smgeneric/cgrates.json @@ -63,8 +63,24 @@ }, +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": [ + {"address": "*internal"} + ], +}, + + "sessions": { "enabled": true, // starts SessionManager service: + "chargers_conns": [ + {"address": "*internal"} + ], }, diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index 59a3d2097..f29cace91 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -256,6 +256,7 @@ "suppliers": { "enabled": true, + "prefix_indexed_fields":["Destination",], "stats_conns": [ {"address": "*internal"}, ], diff --git a/data/conf/samples/tutpostgres/cgrates.json b/data/conf/samples/tutpostgres/cgrates.json index 333cf01a6..1b0016c8f 100644 --- a/data/conf/samples/tutpostgres/cgrates.json +++ b/data/conf/samples/tutpostgres/cgrates.json @@ -12,6 +12,13 @@ }, +"data_db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_type": "redis", // data_db type: + "db_port": 6379, // data_db port to reach the database + "db_name": "10", // data_db database name to connect to +}, + + "stor_db": { "db_type": "postgres", // stor database type to use: "db_port": 5432, // the port to reach the stordb @@ -86,13 +93,30 @@ }, +"attributes": { + "enabled": true, +}, + + "suppliers": { "enabled": true, }, +"chargers": { + "enabled": true, + "attributes_conns": [ + {"address": "*internal"} + ], +}, + + + "sessions": { "enabled": true, + "chargers_conns": [ + {"address": "*internal"} + ], }, diff --git a/sessions/sessions.go b/sessions/sessions.go index 4f4e54a1d..15320a7fb 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -362,6 +362,7 @@ func (sS *SessionS) setSTerminator(s *Session) { // forceSTerminate is called when a session times-out or it is forced from CGRateS side func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUsed *time.Duration) (err error) { if extraDebit != 0 { + for i := range s.SRuns { if _, err = sS.debitSession(s, i, extraDebit, nil); err != nil { utils.Logger.Warning( @@ -414,18 +415,20 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs } } if clntConn := sS.biJClnt(s.ClientConnID); clntConn != nil { - var rply string - if err := clntConn.conn.Call(utils.SessionSv1DisconnectSession, - utils.AttrDisconnectSession{ - EventStart: s.EventStart.AsMapInterface(), - Reason: ErrForcedDisconnect.Error()}, - &rply); err != nil { - if err != utils.ErrNotImplemented { - utils.Logger.Warning( - fmt.Sprintf("<%s> err: %s remotely disconnect session with id: %s", - utils.SessionS, err.Error(), s.CGRID)) + go func() { + var rply string + if err := clntConn.conn.Call(utils.SessionSv1DisconnectSession, + utils.AttrDisconnectSession{ + EventStart: s.EventStart.AsMapInterface(), + Reason: ErrForcedDisconnect.Error()}, + &rply); err != nil { + if err != utils.ErrNotImplemented { + utils.Logger.Warning( + fmt.Sprintf("<%s> err: %s remotely disconnect session with id: %s", + utils.SessionS, err.Error(), s.CGRID)) + } } - } + }() } sS.replicateSessions(s.CGRID, false, sS.sReplConns) return @@ -1178,6 +1181,10 @@ func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage s.EventStart.Set(k, v) // update previoius field with new one } sS.setSTerminator(s) // reset the terminator + //init has no updtEv + if updtEv == nil { + updtEv = engine.NewMapEvent(s.EventStart.AsMapInterface()) + } var reqMaxUsage time.Duration if reqMaxUsage, err = updtEv.GetDuration(utils.Usage); err != nil { if err != utils.ErrNotFound { @@ -1186,7 +1193,6 @@ func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage reqMaxUsage = sS.cgrCfg.SessionSCfg().MaxCallDuration err = nil } - s.RLock() var maxUsageSet bool // so we know if we have set the 0 on purpose prepaidReqs := []string{utils.META_PREPAID, utils.META_PSEUDOPREPAID} for i, sr := range s.SRuns { @@ -1205,7 +1211,7 @@ func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage maxUsageSet = true } } - s.RUnlock() + return } @@ -2366,8 +2372,10 @@ func (sS *SessionS) BiRPCv1ForceDisconnect(clnt rpcclient.RpcClientConnection, err = utils.ErrPartiallyExecuted } } - if err != nil { + if err == nil { *reply = utils.OK + } else { + *reply = err.Error() } return nil } diff --git a/sessions/sessions_test.go b/sessions/sessions_test.go index d7eb6f29c..07e99de88 100644 --- a/sessions/sessions_test.go +++ b/sessions/sessions_test.go @@ -1213,5 +1213,57 @@ func TestSessionSregisterSessionWithTerminator(t *testing.T) { t.Errorf("Expecting %+v, received: %+v", time.Duration(2*time.Second), rcvS[0].sTerminator.ttl) } +} + +func TestSessionSrelocateSessionS(t *testing.T) { + sSCfg, _ := config.NewDefaultCGRConfig() + sS := NewSessionS(sSCfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, "UTC") + sSEv := engine.NewSafEvent(map[string]interface{}{ + utils.EVENT_NAME: "TEST_EVENT", + utils.ToR: "*voice", + utils.OriginID: "111", + utils.Direction: "*out", + utils.Account: "account1", + utils.Subject: "subject1", + utils.Destination: "+4986517174963", + utils.Category: "call", + utils.Tenant: "cgrates.org", + utils.RequestType: "*prepaid", + utils.SetupTime: "2015-11-09 14:21:24", + utils.AnswerTime: "2015-11-09 14:22:02", + utils.Usage: "1m23s", + utils.LastUsed: "21s", + utils.PDD: "300ms", + utils.SUPPLIER: "supplier1", + utils.OriginHost: "127.0.0.1", + }) + initialCGRID := GetSetCGRID(sSEv) + s := &Session{ + CGRID: initialCGRID, + EventStart: sSEv, + } + //register the session as active + sS.registerSession(s, false) + //verify the session + rcvS := sS.getSessions(s.CGRID, false) + if !reflect.DeepEqual(rcvS[0], s) { + t.Errorf("Expecting %+v, received: %+v", s, rcvS[0]) + } + //relocate the session + sS.relocateSessions("111", "222", "127.0.0.1") + //check if the session exist with old CGRID + rcvS = sS.getSessions(initialCGRID, false) + if len(rcvS) != 0 { + t.Errorf("Expecting 0, received: %+v", len(rcvS)) + } + ev := engine.NewSafEvent(map[string]interface{}{ + utils.OriginID: "222", + utils.OriginHost: "127.0.0.1"}) + cgrID := GetSetCGRID(ev) + //check the session with new CGRID + rcvS = sS.getSessions(cgrID, false) + if !reflect.DeepEqual(rcvS[0], s) { + t.Errorf("Expecting %+v, received: %+v", s, rcvS[0]) + } }