Completing the SM-OpenSIPS component based on E_ACC_EVENT and E_ACC_MISSED_EVENT with automatic CDR generation

This commit is contained in:
DanB
2015-05-10 19:31:08 +02:00
parent 57db2b44e1
commit 5043de5d17
13 changed files with 548 additions and 241 deletions

View File

@@ -254,7 +254,7 @@ func startSmOpenSIPS(responder *engine.Responder, cdrDb engine.CdrStorage, cache
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, raterConn, cdrsConn)
sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, raterConn, cdrsConn, cdrDb)
if err := sm.Connect(); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> error: %s!", err))
}

View File

@@ -222,8 +222,8 @@ route{
send_reply("500","Internal Server Error");
exit;
}
$avp(cgr_reqtype)="*pseudoprepaid";
#$avp(cgr_account)=$fU;
$avp(cgr_reqtype)="*prepaid";
$avp(cgr_account)=$fU;
$avp(cgr_destination)=$rU;
$avp(cgr_supplier)="supplier1";
setflag(CDR);

View File

@@ -23,7 +23,6 @@ import (
"net/rpc/jsonrpc"
"os"
"path"
"reflect"
"strings"
"testing"
"time"
@@ -133,66 +132,6 @@ func TestTutFsCallsLoadTariffPlanFromFolder(t *testing.T) {
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups
}
// Check loaded stats
func TestTutFsCallsCacheStats(t *testing.T) {
if !*testCalls {
return
}
var rcvStats *utils.CacheStats
expectedStats := &utils.CacheStats{Destinations: 4, RatingPlans: 3, RatingProfiles: 8, Actions: 6, SharedGroups: 1, RatingAliases: 1, AccountAliases: 1, DerivedChargers: 1}
var args utils.AttrCacheStats
if err := tutFsCallsRpc.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil {
t.Error("Got error on ApierV1.GetCacheStats: ", err.Error())
} else if !reflect.DeepEqual(expectedStats, rcvStats) {
t.Errorf("Calling ApierV1.GetCacheStats expected: %v, received: %v", expectedStats, rcvStats)
}
}
// Check items age
func TestTutFsCallsGetCachedItemAge(t *testing.T) {
if !*testCalls {
return
}
var rcvAge *utils.CachedItemAge
if err := tutFsCallsRpc.Call("ApierV1.GetCachedItemAge", "1002", &rcvAge); err != nil {
t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error())
} else if rcvAge.Destination > time.Duration(2)*time.Second {
t.Errorf("Cache too old: %d", rcvAge)
}
if err := tutFsCallsRpc.Call("ApierV1.GetCachedItemAge", "RP_RETAIL1", &rcvAge); err != nil {
t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error())
} else if rcvAge.RatingPlan > time.Duration(2)*time.Second {
t.Errorf("Cache too old: %d", rcvAge)
}
if err := tutFsCallsRpc.Call("ApierV1.GetCachedItemAge", "*out:cgrates.org:call:*any", &rcvAge); err != nil {
t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error())
} else if rcvAge.RatingProfile > time.Duration(2)*time.Second {
t.Errorf("Cache too old: %d", rcvAge)
}
if err := tutFsCallsRpc.Call("ApierV1.GetCachedItemAge", "LOG_WARNING", &rcvAge); err != nil {
t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error())
} else if rcvAge.Action > time.Duration(2)*time.Second {
t.Errorf("Cache too old: %d", rcvAge)
}
if err := tutFsCallsRpc.Call("ApierV1.GetCachedItemAge", "SHARED_A", &rcvAge); err != nil {
t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error())
} else if rcvAge.SharedGroup > time.Duration(2)*time.Second {
t.Errorf("Cache too old: %d", rcvAge)
}
/*
if err := tutFsCallsRpc.Call("ApierV1.GetCachedItemAge", "1006", &rcvAge); err != nil {
t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error())
} else if rcvAge.RatingAlias > time.Duration(2)*time.Second {
t.Errorf("Cache too old: %d", rcvAge)
}
if err := tutFsCallsRpc.Call("ApierV1.GetCachedItemAge", "1006", &rcvAge); err != nil {
t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error())
} else if rcvAge.RatingAlias > time.Duration(2)*time.Second || rcvAge.AccountAlias > time.Duration(2)*time.Second {
t.Errorf("Cache too old: %d", rcvAge)
}
*/
}
// Make sure account was debited properly
func TestTutFsCallsAccountsBefore(t *testing.T) {
if !*testCalls {
@@ -235,122 +174,6 @@ func TestTutFsCallsAccountsBefore(t *testing.T) {
}
}
// Check call costs
func TestTutFsCallsGetCosts(t *testing.T) {
if !*testCalls {
return
}
tStart, _ := utils.ParseDate("2014-08-04T13:00:00Z")
tEnd, _ := utils.ParseDate("2014-08-04T13:00:20Z")
cd := engine.CallDescriptor{
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "1001",
Account: "1001",
Destination: "1002",
DurationIndex: 0,
TimeStart: tStart,
TimeEnd: tEnd,
}
var cc engine.CallCost
if err := tutFsCallsRpc.Call("Responder.GetCost", cd, &cc); err != nil {
t.Error("Got error on Responder.GetCost: ", err.Error())
} else if cc.Cost != 0.6 {
t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost)
}
tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z")
tEnd, _ = utils.ParseDate("2014-08-04T13:01:25Z")
cd = engine.CallDescriptor{
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "1001",
Account: "1001",
Destination: "1002",
DurationIndex: 0,
TimeStart: tStart,
TimeEnd: tEnd,
}
if err := tutFsCallsRpc.Call("Responder.GetCost", cd, &cc); err != nil {
t.Error("Got error on Responder.GetCost: ", err.Error())
} else if cc.Cost != 0.6417 { // 0.01 first minute, 0.04 25 seconds with RT_20CNT
t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost)
}
tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z")
tEnd, _ = utils.ParseDate("2014-08-04T13:00:20Z")
cd = engine.CallDescriptor{
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "1001",
Account: "1001",
Destination: "1003",
DurationIndex: 0,
TimeStart: tStart,
TimeEnd: tEnd,
}
if err := tutFsCallsRpc.Call("Responder.GetCost", cd, &cc); err != nil {
t.Error("Got error on Responder.GetCost: ", err.Error())
} else if cc.Cost != 1 {
t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost)
}
tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z")
tEnd, _ = utils.ParseDate("2014-08-04T13:01:25Z")
cd = engine.CallDescriptor{
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "1001",
Account: "1001",
Destination: "1003",
DurationIndex: 0,
TimeStart: tStart,
TimeEnd: tEnd,
}
if err := tutFsCallsRpc.Call("Responder.GetCost", cd, &cc); err != nil {
t.Error("Got error on Responder.GetCost: ", err.Error())
} else if cc.Cost != 1.3 {
t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost)
}
tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z")
tEnd, _ = utils.ParseDate("2014-08-04T13:00:20Z")
cd = engine.CallDescriptor{
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "1001",
Account: "1001",
Destination: "1004",
DurationIndex: 0,
TimeStart: tStart,
TimeEnd: tEnd,
}
if err := tutFsCallsRpc.Call("Responder.GetCost", cd, &cc); err != nil {
t.Error("Got error on Responder.GetCost: ", err.Error())
} else if cc.Cost != 1 {
t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost)
}
tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z")
tEnd, _ = utils.ParseDate("2014-08-04T13:01:25Z")
cd = engine.CallDescriptor{
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "1001",
Account: "1001",
Destination: "1004",
DurationIndex: 0,
TimeStart: tStart,
TimeEnd: tEnd,
}
if err := tutFsCallsRpc.Call("Responder.GetCost", cd, &cc); err != nil {
t.Error("Got error on Responder.GetCost: ", err.Error())
} else if cc.Cost != 1.3 {
t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost)
}
}
func TestTutFsCallsCdrStats(t *testing.T) {
if !*testCalls {
return

View File

@@ -0,0 +1,322 @@
/*
Real-time Charging System for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can Storagetribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITH*out ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package general_tests
import (
"net/rpc"
"net/rpc/jsonrpc"
//"os"
"path"
"reflect"
//"strings"
"testing"
"time"
//"github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var tutLocalCfgPath string
var tutFsLocalCfg *config.CGRConfig
var tutLocalRpc *rpc.Client
func TestTutLocalInitCfg(t *testing.T) {
if !*testLocal {
return
}
tutLocalCfgPath = path.Join(*dataDir, "conf", "samples", "cgradmin")
// Init config first
var err error
tutFsLocalCfg, err = config.NewCGRConfigFromFolder(tutLocalCfgPath)
if err != nil {
t.Error(err)
}
tutFsLocalCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush()
config.SetCgrConfig(tutFsLocalCfg)
}
// Remove data in both rating and accounting db
func TestTutLocalResetDataDb(t *testing.T) {
if !*testLocal {
return
}
if err := engine.InitDataDb(tutFsLocalCfg); err != nil {
t.Fatal(err)
}
}
// Wipe out the cdr database
func TestTutLocalResetStorDb(t *testing.T) {
if !*testLocal {
return
}
if err := engine.InitStorDb(tutFsLocalCfg); err != nil {
t.Fatal(err)
}
}
// Start CGR Engine
func TestTutLocalStartEngine(t *testing.T) {
if !*testLocal {
return
}
if _, err := engine.StopStartEngine(tutLocalCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater
func TestTutLocalRpcConn(t *testing.T) {
if !*testLocal {
return
}
var err error
tutLocalRpc, err = jsonrpc.Dial("tcp", tutFsLocalCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
}
// Load the tariff plan, creating accounts and their balances
func TestTutLocalLoadTariffPlanFromFolder(t *testing.T) {
if !*testLocal {
return
}
reply := ""
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")}
if err := tutLocalRpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
t.Error(err)
} else if reply != "OK" {
t.Error(reply)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups
}
// Check loaded stats
func TestTutLocalCacheStats(t *testing.T) {
if !*testLocal {
return
}
var rcvStats *utils.CacheStats
expectedStats := &utils.CacheStats{Destinations: 4, RatingPlans: 3, RatingProfiles: 8, Actions: 6, SharedGroups: 1, RatingAliases: 1, AccountAliases: 1, DerivedChargers: 1}
var args utils.AttrCacheStats
if err := tutLocalRpc.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil {
t.Error("Got error on ApierV1.GetCacheStats: ", err.Error())
} else if !reflect.DeepEqual(expectedStats, rcvStats) {
t.Errorf("Calling ApierV1.GetCacheStats expected: %v, received: %v", expectedStats, rcvStats)
}
}
// Check items age
func TestTutLocalGetCachedItemAge(t *testing.T) {
if !*testLocal {
return
}
var rcvAge *utils.CachedItemAge
if err := tutLocalRpc.Call("ApierV1.GetCachedItemAge", "1002", &rcvAge); err != nil {
t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error())
} else if rcvAge.Destination > time.Duration(2)*time.Second {
t.Errorf("Cache too old: %d", rcvAge)
}
if err := tutLocalRpc.Call("ApierV1.GetCachedItemAge", "RP_RETAIL1", &rcvAge); err != nil {
t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error())
} else if rcvAge.RatingPlan > time.Duration(2)*time.Second {
t.Errorf("Cache too old: %d", rcvAge)
}
if err := tutLocalRpc.Call("ApierV1.GetCachedItemAge", "*out:cgrates.org:call:*any", &rcvAge); err != nil {
t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error())
} else if rcvAge.RatingProfile > time.Duration(2)*time.Second {
t.Errorf("Cache too old: %d", rcvAge)
}
if err := tutLocalRpc.Call("ApierV1.GetCachedItemAge", "LOG_WARNING", &rcvAge); err != nil {
t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error())
} else if rcvAge.Action > time.Duration(2)*time.Second {
t.Errorf("Cache too old: %d", rcvAge)
}
if err := tutLocalRpc.Call("ApierV1.GetCachedItemAge", "SHARED_A", &rcvAge); err != nil {
t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error())
} else if rcvAge.SharedGroup > time.Duration(2)*time.Second {
t.Errorf("Cache too old: %d", rcvAge)
}
/*
if err := tutLocalRpc.Call("ApierV1.GetCachedItemAge", "1006", &rcvAge); err != nil {
t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error())
} else if rcvAge.RatingAlias > time.Duration(2)*time.Second {
t.Errorf("Cache too old: %d", rcvAge)
}
if err := tutLocalRpc.Call("ApierV1.GetCachedItemAge", "1006", &rcvAge); err != nil {
t.Error("Got error on ApierV1.GetCachedItemAge: ", err.Error())
} else if rcvAge.RatingAlias > time.Duration(2)*time.Second || rcvAge.AccountAlias > time.Duration(2)*time.Second {
t.Errorf("Cache too old: %d", rcvAge)
}
*/
}
// Check call costs
func TestTutLocalGetCosts(t *testing.T) {
if !*testLocal {
return
}
tStart, _ := utils.ParseDate("2014-08-04T13:00:00Z")
tEnd, _ := utils.ParseDate("2014-08-04T13:00:20Z")
cd := engine.CallDescriptor{
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "1001",
Account: "1001",
Destination: "1002",
DurationIndex: 0,
TimeStart: tStart,
TimeEnd: tEnd,
}
var cc engine.CallCost
if err := tutLocalRpc.Call("Responder.GetCost", cd, &cc); err != nil {
t.Error("Got error on Responder.GetCost: ", err.Error())
} else if cc.Cost != 0.6 {
t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost)
}
tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z")
tEnd, _ = utils.ParseDate("2014-08-04T13:01:25Z")
cd = engine.CallDescriptor{
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "1001",
Account: "1001",
Destination: "1002",
DurationIndex: 0,
TimeStart: tStart,
TimeEnd: tEnd,
}
if err := tutLocalRpc.Call("Responder.GetCost", cd, &cc); err != nil {
t.Error("Got error on Responder.GetCost: ", err.Error())
} else if cc.Cost != 0.6417 { // 0.01 first minute, 0.04 25 seconds with RT_20CNT
t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost)
}
tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z")
tEnd, _ = utils.ParseDate("2014-08-04T13:00:20Z")
cd = engine.CallDescriptor{
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "1001",
Account: "1001",
Destination: "1003",
DurationIndex: 0,
TimeStart: tStart,
TimeEnd: tEnd,
}
if err := tutLocalRpc.Call("Responder.GetCost", cd, &cc); err != nil {
t.Error("Got error on Responder.GetCost: ", err.Error())
} else if cc.Cost != 1 {
t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost)
}
tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z")
tEnd, _ = utils.ParseDate("2014-08-04T13:01:25Z")
cd = engine.CallDescriptor{
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "1001",
Account: "1001",
Destination: "1003",
DurationIndex: 0,
TimeStart: tStart,
TimeEnd: tEnd,
}
if err := tutLocalRpc.Call("Responder.GetCost", cd, &cc); err != nil {
t.Error("Got error on Responder.GetCost: ", err.Error())
} else if cc.Cost != 1.3 {
t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost)
}
tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z")
tEnd, _ = utils.ParseDate("2014-08-04T13:00:20Z")
cd = engine.CallDescriptor{
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "1001",
Account: "1001",
Destination: "1004",
DurationIndex: 0,
TimeStart: tStart,
TimeEnd: tEnd,
}
if err := tutLocalRpc.Call("Responder.GetCost", cd, &cc); err != nil {
t.Error("Got error on Responder.GetCost: ", err.Error())
} else if cc.Cost != 1 {
t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost)
}
tStart, _ = utils.ParseDate("2014-08-04T13:00:00Z")
tEnd, _ = utils.ParseDate("2014-08-04T13:01:25Z")
cd = engine.CallDescriptor{
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "1001",
Account: "1001",
Destination: "1004",
DurationIndex: 0,
TimeStart: tStart,
TimeEnd: tEnd,
}
if err := tutLocalRpc.Call("Responder.GetCost", cd, &cc); err != nil {
t.Error("Got error on Responder.GetCost: ", err.Error())
} else if cc.Cost != 1.3 {
t.Errorf("Calling Responder.GetCost got callcost: %v", cc.Cost)
}
}
// Check call costs
func TestTutLocalMaxDebit(t *testing.T) {
if !*testLocal {
return
}
tStart, _ := utils.ParseDate("2014-08-04T13:00:00Z")
tEnd, _ := utils.ParseDate("2014-08-04T13:00:20Z")
cd := engine.CallDescriptor{
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "1001",
Account: "1001",
Destination: "1002",
DurationIndex: 0,
TimeStart: tStart,
TimeEnd: tEnd,
}
var cc engine.CallCost
if err := tutLocalRpc.Call("Responder.MaxDebit", cd, &cc); err != nil {
t.Error("Got error on Responder.GetCost: ", err.Error())
} else if cc.GetDuration() == 20 {
t.Errorf("Calling Responder.MaxDebit got callcost: %v", cc.GetDuration())
}
}
func TestTutLocalStopCgrEngine(t *testing.T) {
if !*testLocal {
return
}
if err := engine.KillEngine(100); err != nil {
t.Error(err)
}
}

View File

@@ -101,27 +101,31 @@ func (sm *FSSessionManager) GetSession(uuid string) *Session {
}
// Disconnects a session by sending hangup command to freeswitch
func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify string) {
func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", ev.GetUUID(), notify)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send disconect api notification to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
return err
}
if notify == INSUFFICIENT_FUNDS {
if len(sm.cfg.EmptyBalanceContext) != 0 {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_transfer %s %s %s\n\n", ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), sm.cfg.EmptyBalanceContext)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not transfer the call to empty balance context, error: <%s>, connId: %s", err.Error(), connId))
return err
}
return
return nil
} else if len(sm.cfg.EmptyBalanceAnnFile) != 0 {
if _, err := sm.conns[connId].SendApiCmd(fmt.Sprintf("uuid_broadcast %s playback!manager_request::%s aleg\n\n", ev.GetUUID(), sm.cfg.EmptyBalanceAnnFile)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send uuid_broadcast to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
return err
}
return
return nil
}
}
if err := sm.conns[connId].SendMsgCmd(ev.GetUUID(), map[string]string{"call-command": "hangup", "hangup-cause": "MANAGER_REQUEST"}); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Could not send disconect msg to freeswitch, error: <%s>, connId: %s", err.Error(), connId))
return err
}
return
return nil
}
// Remove session from session list, removes all related in case of multiple runs

View File

@@ -1,6 +1,6 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2012-2015 ITsysCOM
Real-time Charging System for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -18,17 +18,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package sessionmanager
// "github.com/cgrates/cgrates/timespans"
// "testing"
import (
"testing"
)
/*func TestConnect(t *testing.T) {
sm := &FSSessionManager{}
sm.Connect(&SessionDelegate{&timespans.Responder{}}, "localhost:8021", "ClueCon")
//for {
ev := sm.readNextEvent()
if ev == nil {
t.Error("Got nil event!")
}
//log.Print(ev)
//}
}*/
func TestFSSMInterface(t *testing.T) {
var _ SessionManager = SessionManager(new(FSSessionManager))
}

View File

@@ -134,12 +134,14 @@ func (self *KamailioSessionManager) Connect() error {
return err
}
func (self *KamailioSessionManager) DisconnectSession(ev engine.Event, connId, notify string) {
func (self *KamailioSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error {
sessionIds := ev.GetSessionIds()
disconnectEv := &KamSessionDisconnect{Event: CGR_SESSION_DISCONNECT, HashEntry: sessionIds[0], HashId: sessionIds[1], Reason: notify}
if err := self.conns[connId].Send(disconnectEv.String()); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed sending disconnect request, error %s, connection id: %s", err.Error(), connId))
return err
}
return nil
}
func (self *KamailioSessionManager) RemoveSession(uuid string) {
for i, ss := range self.sessions {

View File

@@ -0,0 +1,27 @@
/*
Real-time Charging System for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package sessionmanager
import (
"testing"
)
func TestKamSMInterface(t *testing.T) {
var _ SessionManager = SessionManager(new(KamailioSessionManager))
}

View File

@@ -20,6 +20,7 @@ package sessionmanager
import (
"encoding/json"
"fmt"
"strings"
"time"
@@ -80,7 +81,7 @@ func (osipsev *OsipsEvent) GetCgrId() string {
}
func (osipsev *OsipsEvent) GetUUID() string {
return osipsev.osipsEvent.AttrValues[CALLID] + ";" + osipsev.osipsEvent.AttrValues[FROM_TAG] + ";" + osipsev.osipsEvent.AttrValues[TO_TAG]
return osipsev.osipsEvent.AttrValues[CALLID]
}
// Returns the dialog identifier which opensips needs to disconnect a dialog
@@ -187,11 +188,38 @@ func (osipsEv *OsipsEvent) GetOriginatorIP(fieldName string) string {
return osipsEv.osipsEvent.OriginatorAddress.IP.String()
}
func (osipsev *OsipsEvent) MissingParameter() bool {
engine.Logger.Debug(fmt.Sprintf("Missing parameters on: %+v", osipsev.osipsEvent))
var nilTime time.Time
if osipsev.GetName() == "E_ACC_EVENT" && osipsev.osipsEvent.AttrValues["method"] == "INVITE" {
return len(osipsev.GetUUID()) == 0 ||
len(osipsev.GetAccount(utils.META_DEFAULT)) == 0 ||
len(osipsev.GetDestination(utils.META_DEFAULT)) == 0 ||
len(osipsev.osipsEvent.AttrValues[OSIPS_DIALOG_ID]) == 0
} else if osipsev.GetName() == "E_ACC_EVENT" && osipsev.osipsEvent.AttrValues["method"] == "BYE" {
return len(osipsev.osipsEvent.AttrValues[OSIPS_DIALOG_ID]) == 0 ||
len(osipsev.osipsEvent.AttrValues[TIME]) == 0
} else if osipsev.GetName() == "E_ACC_EVENT" && osipsev.osipsEvent.AttrValues["method"] == "UPDATE" { // Updated event out of start/stop
// Data needed when stopping a prepaid loop or building a CDR with start/stop event
setupTime, err := osipsev.GetSetupTime(TIME)
if err != nil || setupTime.Equal(nilTime) {
return true
}
aTime, err := osipsev.GetAnswerTime(utils.META_DEFAULT)
if err != nil || aTime.Equal(nilTime) {
return true
}
endTime, err := osipsev.GetEndTime()
if err != nil || endTime.Equal(nilTime) {
return true
}
_, err = osipsev.GetDuration(utils.META_DEFAULT)
if err != nil {
return true
}
if osipsev.osipsEvent.AttrValues[OSIPS_DIALOG_ID] == "" {
return true
}
return false
}
return true
}
@@ -213,6 +241,10 @@ func (osipsev *OsipsEvent) GetExtraFields() map[string]string {
return extraFields
}
func (osipsev *OsipsEvent) DialogId() string {
return osipsev.osipsEvent.AttrValues[OSIPS_DIALOG_ID]
}
func (osipsEv *OsipsEvent) AsStoredCdr() *engine.StoredCdr {
storCdr := new(engine.StoredCdr)
storCdr.CgrId = osipsEv.GetCgrId()
@@ -244,5 +276,6 @@ func (osipsEv *OsipsEvent) updateDurationFromEvent(updatedOsipsEv *OsipsEvent) e
}
answerTime, err := osipsEv.GetAnswerTime(utils.META_DEFAULT)
osipsEv.osipsEvent.AttrValues[OSIPS_DURATION] = endTime.Sub(answerTime).String()
osipsEv.osipsEvent.AttrValues["method"] = "UPDATE" // So we can know it is an end event
return nil
}

View File

@@ -79,8 +79,8 @@ duration::
*/
func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, rater, cdrsrv engine.Connector) (*OsipsSessionManager, error) {
osm := &OsipsSessionManager{cfg: smOsipsCfg, rater: rater, cdrsrv: cdrsrv}
func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, rater, cdrsrv engine.Connector, cdrDb engine.CdrStorage) (*OsipsSessionManager, error) {
osm := &OsipsSessionManager{cfg: smOsipsCfg, rater: rater, cdrsrv: cdrsrv, cdrDb: cdrDb, cdrStartEvents: make(map[string]*OsipsEvent)}
osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){
"E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.onOpensipsStart}, // Raised when OpenSIPS starts so we can register our event handlers
"E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.onCdr}, // Raised if cdr_flag is configured
@@ -94,10 +94,13 @@ type OsipsSessionManager struct {
cfg *config.SmOsipsConfig
rater engine.Connector
cdrsrv engine.Connector
cdrDb engine.CdrStorage
eventHandlers map[string][]func(*osipsdagram.OsipsEvent)
evSubscribeStop chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it
stopServing chan struct{} // Stop serving datagrams
miConn *osipsdagram.OsipsMiDatagramConnector // Pool of connections used to various OpenSIPS servers, keep reference towards events received so we can issue commands always to the same remote
sessions []*Session
cdrStartEvents map[string]*OsipsEvent // Used when building CDRs
}
func (osm *OsipsSessionManager) Connect() (err error) {
@@ -118,14 +121,21 @@ func (osm *OsipsSessionManager) Connect() (err error) {
return errors.New("<SM-OpenSIPS> Stopped reading events")
}
func (osm *OsipsSessionManager) RemoveSession(uuid string) {
return
for i, ss := range osm.sessions {
if ss.eventStart.GetUUID() == uuid {
osm.sessions = append(osm.sessions[:i], osm.sessions[i+1:]...)
return
}
}
}
func (osm *OsipsSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engine.CallCost) error {
return nil
}
func (osm *OsipsSessionManager) DebitInterval() time.Duration {
var nilDuration time.Duration
return nilDuration
return osm.cfg.DebitInterval
}
func (osm *OsipsSessionManager) CdrDb() engine.CdrStorage {
return osm.cdrDb
}
func (osm *OsipsSessionManager) DbLogger() engine.LogStorage {
return nil
@@ -139,6 +149,39 @@ func (osm *OsipsSessionManager) WarnSessionMinDuration(sessionUuid, connId strin
func (osm *OsipsSessionManager) Shutdown() error {
return nil
}
func (osm *OsipsSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error {
var reply string
return osm.cdrsrv.ProcessCdr(storedCdr, &reply)
}
func (osm *OsipsSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error {
sessionIds := ev.GetSessionIds()
if len(sessionIds) != 2 {
errMsg := fmt.Sprintf("Failed disconnecting session for event: %+v, notify: %s, dialogId: %v", ev, notify, sessionIds)
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> " + errMsg))
return errors.New(errMsg)
}
cmd := fmt.Sprintf(":dlg_end_dlg:\n%s\n%s\n\n", sessionIds[0], sessionIds[1])
if reply, err := osm.miConn.SendCommand([]byte(cmd)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed disconnecting session for event: %+v, notify: %s, dialogId: %v, error: <%s>", ev, notify, sessionIds, err))
return err
} else if !bytes.HasPrefix(reply, []byte("200 OK")) {
errStr := fmt.Sprintf("Failed disconnecting session for event: %+v, notify: %s, dialogId: %v", ev, notify, sessionIds)
engine.Logger.Err("<SM-OpenSIPS> " + errStr)
return errors.New(errStr)
}
return nil
}
// Searches and return the session with the specifed uuid
func (osm *OsipsSessionManager) getSession(uuid string) *Session {
for _, s := range osm.sessions {
if s.eventStart.GetUUID() == uuid {
return s
}
}
return nil
}
// Automatic subscribe to OpenSIPS for events, trigered on Connect or OpenSIPS restart
func (osm *OsipsSessionManager) SubscribeEvents(evStop chan struct{}) error {
@@ -151,8 +194,7 @@ func (osm *OsipsSessionManager) SubscribeEvents(evStop chan struct{}) error {
return nil
case <-time.After(osm.cfg.EventsSubscribeInterval): // Subscribe on interval
if err := osm.subscribeEvents(); err != nil {
close(osm.stopServing) // Do not serve anymore since we got errors on subscribing
return err
close(osm.stopServing) // Order stop serving, do not return here since we will block the channel consuming
}
}
}
@@ -191,6 +233,13 @@ func (osm *OsipsSessionManager) onOpensipsStart(cdrDagram *osipsdagram.OsipsEven
go osm.SubscribeEvents(osm.evSubscribeStop)
}
func (osm *OsipsSessionManager) onCdr(cdrDagram *osipsdagram.OsipsEvent) {
osipsEv, _ := NewOsipsEvent(cdrDagram)
if err := osm.ProcessCdr(osipsEv.AsStoredCdr()); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", osipsEv.GetCgrId(), osipsEv.GetUUID(), err.Error()))
}
}
func (osm *OsipsSessionManager) onAccEvent(osipsDgram *osipsdagram.OsipsEvent) {
osipsEv, _ := NewOsipsEvent(osipsDgram)
if osipsEv.GetReqType(utils.META_DEFAULT) == utils.META_NONE { // Do not process this request
@@ -200,44 +249,19 @@ func (osm *OsipsSessionManager) onAccEvent(osipsDgram *osipsdagram.OsipsEvent) {
if err := osm.callStart(osipsEv); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing CALL_START out of %+v, error: <%s>", osipsDgram, err.Error()))
}
if err := osm.processCdrStart(osipsEv); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing cdr start out of %+v, error: <%s>", osipsDgram, err.Error()))
}
} else if osipsDgram.AttrValues["method"] == "BYE" {
if err := osm.callEnd(osipsEv); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing CALL_END out of %+v, error: <%s>", osipsDgram, err.Error()))
}
if err := osm.processCdrStop(osipsEv); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing cdr stop out of %+v, error: <%s>", osipsDgram, err.Error()))
}
}
}
func (osm *OsipsSessionManager) onCdr(cdrDagram *osipsdagram.OsipsEvent) {
osipsEv, _ := NewOsipsEvent(cdrDagram)
if err := osm.ProcessCdr(osipsEv.AsStoredCdr()); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", osipsEv.GetCgrId(), osipsEv.GetUUID(), err.Error()))
}
}
func (osm *OsipsSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error {
var reply string
return osm.cdrsrv.ProcessCdr(storedCdr, &reply)
}
func (osm *OsipsSessionManager) DisconnectSession(ev engine.Event, connId, notify string) error {
sessionIds := ev.GetSessionIds()
if len(sessionIds) != 2 {
errMsg := fmt.Sprintf("Failed disconnecting session for event: %+v, notify: %s, dialogId: %v", ev, notify, sessionIds)
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> " + errMsg))
return errors.New(errMsg)
}
cmd := fmt.Sprintf(":dlg_end_dlg:\n%s\n%s\n\n", sessionIds[0], sessionIds[1])
if reply, err := osm.miConn.SendCommand([]byte(cmd)); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed disconnecting session for event: %+v, notify: %s, dialogId: %v, error: <%s>", ev, notify, sessionIds, err))
return err
} else if !bytes.HasPrefix(reply, []byte("200 OK")) {
errStr := fmt.Sprintf("Failed disconnecting session for event: %+v, notify: %s, dialogId: %v", ev, notify, sessionIds)
engine.Logger.Err("<SM-OpenSIPS> " + errStr)
return errors.New(errStr)
}
return nil
}
func (osm *OsipsSessionManager) callStart(osipsEv *OsipsEvent) error {
if osipsEv.MissingParameter() {
if err := osm.DisconnectSession(osipsEv, "", utils.ERR_MANDATORY_IE_MISSING); err != nil {
@@ -245,9 +269,61 @@ func (osm *OsipsSessionManager) callStart(osipsEv *OsipsEvent) error {
}
return errors.New(utils.ERR_MANDATORY_IE_MISSING)
}
s := NewSession(osipsEv, "", osm)
if s != nil {
osm.sessions = append(osm.sessions, s)
}
return nil
}
func (osm *OsipsSessionManager) callEnd(osipsEv *OsipsEvent) error {
s := osm.getSession(osipsEv.GetUUID())
if s == nil { // Not handled by us
return nil
}
osm.RemoveSession(s.eventStart.GetUUID()) // Unreference it early so we avoid concurrency
origEvent := s.eventStart.(*OsipsEvent) // Need a complete event for methods in close
if err := origEvent.updateDurationFromEvent(osipsEv); err != nil {
return err
}
if origEvent.MissingParameter() {
return errors.New(utils.ERR_MANDATORY_IE_MISSING)
}
if err := s.Close(origEvent); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database
return err
}
return nil
}
// Records the event start in case of received so we can create CDR out of it
func (osm *OsipsSessionManager) processCdrStart(osipsEv *OsipsEvent) error {
if osm.cdrsrv == nil {
return nil
}
if dialogId := osipsEv.DialogId(); dialogId == "" {
return errors.New("Missing dialog_id")
} else {
osm.cdrStartEvents[dialogId] = osipsEv
}
return nil
}
// processCdrStop builds the complete CDR out of eventStart+eventStop and sends it to the CDRS component
func (osm *OsipsSessionManager) processCdrStop(osipsEv *OsipsEvent) error {
if osm.cdrsrv == nil {
return nil
}
var osipsEvStart *OsipsEvent
var hasIt bool
if dialogId := osipsEv.DialogId(); dialogId == "" {
return errors.New("Missing dialog_id")
} else if osipsEvStart, hasIt = osm.cdrStartEvents[dialogId]; !hasIt {
return errors.New("Missing event start info")
} else {
delete(osm.cdrStartEvents, dialogId) // Cleanup the event once we got it
}
if err := osipsEvStart.updateDurationFromEvent(osipsEv); err != nil {
return err
}
return osm.ProcessCdr(osipsEvStart.AsStoredCdr())
}

View File

@@ -0,0 +1,27 @@
/*
Real-time Charging System for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package sessionmanager
import (
"testing"
)
func TestOsipsSMInterface(t *testing.T) {
var _ SessionManager = SessionManager(new(OsipsSessionManager))
}

View File

@@ -111,7 +111,7 @@ func (s *Session) debitLoop(runIdx int) {
func (s *Session) Close(ev engine.Event) error {
close(s.stopDebit) // Close the channel so all the sessionRuns listening will be notified
if _, err := ev.GetEndTime(); err != nil {
engine.Logger.Err("Error parsing answer event stop time.")
engine.Logger.Err("Error parsing event stop time.")
for idx := range s.sessionRuns {
s.sessionRuns[idx].CallDescriptor.TimeEnd = s.sessionRuns[idx].CallDescriptor.TimeStart.Add(s.sessionRuns[idx].CallDescriptor.DurationIndex)
}

View File

@@ -29,7 +29,7 @@ type SessionManager interface {
Rater() engine.Connector
DebitInterval() time.Duration
Connect() error
DisconnectSession(engine.Event, string, string)
DisconnectSession(engine.Event, string, string) error
WarnSessionMinDuration(string, string)
RemoveSession(string)
ProcessCdr(*engine.StoredCdr) error