Active synchronization for channels between CGR and FreeSWITCH, adding new channel_sync_interval configuration in session manager, should fix and close #77

This commit is contained in:
DanB
2015-06-15 19:31:19 +02:00
parent d13040a9f1
commit 9e0b234d55
11 changed files with 101 additions and 10 deletions

View File

@@ -192,6 +192,7 @@ const CGRATES_CFG_JSON = `
"empty_balance_context": "", // if defined, prepaid calls will be transfered to this context on empty balance
"empty_balance_ann_file": "", // file to be played before disconnecting prepaid calls on empty balance (applies only if no context defined)
"subscribe_park": true, // subscribe via fsock to receive park events
"channel_sync_interval": "5m", // sync channels with freeswitch regularly
"connections":[ // instantiate connections to multiple FreeSWITCH servers
{"server": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5}
],

View File

@@ -320,6 +320,7 @@ func TestSmFsJsonCfg(t *testing.T) {
Empty_balance_context: utils.StringPointer(""),
Empty_balance_ann_file: utils.StringPointer(""),
Subscribe_park: utils.BoolPointer(true),
Channel_sync_interval: utils.StringPointer("5m"),
Connections: &[]*FsConnJsonCfg{
&FsConnJsonCfg{
Server: utils.StringPointer("127.0.0.1:8021"),

View File

@@ -152,6 +152,7 @@ type SmFsJsonCfg struct {
Empty_balance_context *string
Empty_balance_ann_file *string
Subscribe_park *bool
Channel_sync_interval *string
Connections *[]*FsConnJsonCfg
}

View File

@@ -70,6 +70,7 @@ type SmFsConfig struct {
EmptyBalanceContext string
EmptyBalanceAnnFile string
SubscribePark bool
ChannelSyncInterval time.Duration
Connections []*FsConnConfig
}
@@ -130,6 +131,11 @@ func (self *SmFsConfig) loadFromJsonCfg(jsnCfg *SmFsJsonCfg) error {
if jsnCfg.Subscribe_park != nil {
self.SubscribePark = *jsnCfg.Subscribe_park
}
if jsnCfg.Channel_sync_interval != nil {
if self.ChannelSyncInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Channel_sync_interval); err != nil {
return err
}
}
if jsnCfg.Connections != nil {
self.Connections = make([]*FsConnConfig, len(*jsnCfg.Connections))
for idx, jsnConnCfg := range *jsnCfg.Connections {

View File

@@ -172,6 +172,7 @@
// "empty_balance_context": "", // if defined, prepaid calls will be transfered to this context on empty balance
// "empty_balance_ann_file": "", // file to be played before disconnecting prepaid calls on empty balance (applies only if no context defined)
// "subscribe_park": true, // subscribe via fsock to receive park events
// "channel_sync_interval": "5m", // sync channels with freeswitch regularly
// "connections":[ // instantiate connections to multiple FreeSWITCH servers
// {"server": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5}
// ],

View File

@@ -71,6 +71,7 @@
"cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234>
"create_cdr": true, // create CDR out of events and sends them to CDRS component
"debit_interval": "5s", // interval to perform debits on.
"channel_sync_interval": "10s",
"connections":[ // instantiate connections to multiple FreeSWITCH servers
{"server": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 15}
],

View File

@@ -108,7 +108,6 @@ type CallCostLog struct {
// RPC method, used to log callcosts to db
func (self *CdrServer) LogCallCost(ccl *CallCostLog) error {
Logger.Debug(fmt.Sprintf("LogCallCost, callCostLog: %+v, cost: %+v", ccl, ccl.CallCost))
if ccl.CheckDuplicate {
cc, err := self.cdrDb.GetCallCostLog(ccl.CgrId, ccl.Source, ccl.RunId)
if err != nil && err.Error() != "record not found" {

View File

@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"log/syslog"
"strconv"
"time"
"github.com/cgrates/cgrates/config"
@@ -33,19 +34,21 @@ import (
// The freeswitch session manager type holding a buffer for the network connection
// and the active sessions
type FSSessionManager struct {
cfg *config.SmFsConfig
conns map[string]*fsock.FSock // Keep the list here for connection management purposes
sessions []*Session
rater engine.Connector
cdrsrv engine.Connector
cfg *config.SmFsConfig
conns map[string]*fsock.FSock // Keep the list here for connection management purposes
senderPools map[string]*fsock.FSockPool // Keep sender pools here
sessions []*Session
rater engine.Connector
cdrsrv engine.Connector
}
func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs engine.Connector) *FSSessionManager {
return &FSSessionManager{
cfg: smFsConfig,
conns: make(map[string]*fsock.FSock),
rater: rater,
cdrsrv: cdrs,
cfg: smFsConfig,
conns: make(map[string]*fsock.FSock),
senderPools: make(map[string]*fsock.FSockPool),
rater: rater,
cdrsrv: cdrs,
}
}
@@ -69,6 +72,23 @@ func (sm *FSSessionManager) Connect() error {
errChan <- err
}
}()
if fsSenderPool, err := fsock.NewFSockPool(5, connCfg.Server, connCfg.Password, 1,
make(map[string][]func(string, string)), make(map[string]string), engine.Logger.(*syslog.Writer), connId); err != nil {
return fmt.Errorf("Cannot connect FreeSWITCH senders pool, error: %s", err.Error())
} else if fsSenderPool == nil {
return errors.New("Cannot connect FreeSWITCH senders pool.")
} else {
sm.senderPools[connId] = fsSenderPool
}
if sm.cfg.ChannelSyncInterval != 0 { // Schedule running of the callsync
go func() {
for { // Schedule sync channels to run repetately
time.Sleep(sm.cfg.ChannelSyncInterval)
sm.SyncSessions()
}
}()
}
}
err := <-errChan // Will keep the Connect locked until the first error in one of the connections
return err
@@ -335,3 +355,54 @@ func (sm *FSSessionManager) Shutdown() (err error) {
func (sm *FSSessionManager) Sessions() []*Session {
return sm.sessions
}
// Sync sessions with FS
/*
map[secure: hostname:CgrDev1 callstate:ACTIVE callee_num:1002 initial_dest:1002 state:CS_EXECUTE dialplan:XML read_codec:SPEEX initial_ip_addr:127.0.0.1 write_codec:SPEEX write_bit_rate:44000
call_uuid:3427e500-10e5-4864-a589-e306b70419a2 presence_id: initial_cid_name:1001 context:default read_rate:32000 read_bit_rate:44000 callee_direction:SEND initial_context:default created:2015-06-15 18:48:13
dest:1002 callee_name:Outbound Call direction:inbound ip_addr:127.0.0.1 sent_callee_name:Outbound Call write_rate:32000 presence_data: sent_callee_num:1002 created_epoch:1434386893 cid_name:1001 application:sched_hangup
application_data:+10800 alloted_timeout uuid:3427e500-10e5-4864-a589-e306b70419a2 name:sofia/cgrtest/1001@127.0.0.1 cid_num:1001 initial_cid_num:1001 initial_dialplan:XML]
*/
func (sm *FSSessionManager) SyncSessions() error {
for connId, senderPool := range sm.senderPools {
fsConn, err := senderPool.PopFSock()
if err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Error on syncing active calls, senderPool: %+v, error: %s", senderPool, err.Error()))
continue
}
activeChanStr, err := fsConn.SendApiCmd("show channels")
senderPool.PushFSock(fsConn)
if err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Error on syncing active calls, senderPool: %+v, error: %s", senderPool, err.Error()))
continue
}
aChans := fsock.MapChanData(activeChanStr)
for _, session := range sm.sessions {
if session.connId != connId { // This session belongs to another connectionId
continue
}
var stillActive bool
for _, fsAChan := range aChans {
if fsAChan["call_uuid"] == session.eventStart.GetUUID() { // Channel still active
stillActive = true
break
}
}
if !stillActive {
engine.Logger.Warning(fmt.Sprintf("<SM-FreeSWITCH> Sync active channels, stale session detected, uuid: %s", session.eventStart.GetUUID()))
sm.RemoveSession(session.eventStart.GetUUID()) // Unreference it early so we avoid concurrency
fsev := session.eventStart.(FSEvent)
now := time.Now()
aTime, _ := fsev.GetAnswerTime("")
dur := now.Sub(aTime)
fsev[END_TIME] = now.String()
fsev[DURATION] = strconv.FormatFloat(dur.Seconds(), 'f', -1, 64)
if err := session.Close(fsev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Error on removing stale session with uuid: %s, error: %s", session.eventStart.GetUUID(), err.Error()))
continue
}
}
}
}
return nil
}

View File

@@ -240,3 +240,7 @@ func (self *KamailioSessionManager) Shutdown() error {
func (self *KamailioSessionManager) Sessions() []*Session {
return self.sessions
}
func (self *KamailioSessionManager) SyncSessions() error {
return nil
}

View File

@@ -346,3 +346,8 @@ func (osm *OsipsSessionManager) getSession(uuid string) *Session {
func (osm *OsipsSessionManager) Sessions() []*Session {
return osm.sessions
}
// Sync sessions with FS
func (osm *OsipsSessionManager) SyncSessions() error {
return nil
}

View File

@@ -35,4 +35,5 @@ type SessionManager interface {
ProcessCdr(*engine.StoredCdr) error
Shutdown() error
Sessions() []*Session
SyncSessions() error
}