From 9e0b234d55551aef0086b3c815b28cd31778e482 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 15 Jun 2015 19:31:19 +0200 Subject: [PATCH] Active synchronization for channels between CGR and FreeSWITCH, adding new channel_sync_interval configuration in session manager, should fix and close #77 --- config/config_defaults.go | 1 + config/config_json_test.go | 1 + config/libconfig_json.go | 1 + config/smconfig.go | 6 ++ data/conf/cgrates/cgrates.json | 1 + .../cgrates/etc/cgrates/cgrates.json | 1 + engine/cdrs.go | 1 - sessionmanager/fssessionmanager.go | 89 +++++++++++++++++-- sessionmanager/kamailiosm.go | 4 + sessionmanager/osipssm.go | 5 ++ sessionmanager/sessionmanager.go | 1 + 11 files changed, 101 insertions(+), 10 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index 0162026b8..adbef2f23 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -192,6 +192,7 @@ const CGRATES_CFG_JSON = ` "empty_balance_context": "", // if defined, prepaid calls will be transfered to this context on empty balance "empty_balance_ann_file": "", // file to be played before disconnecting prepaid calls on empty balance (applies only if no context defined) "subscribe_park": true, // subscribe via fsock to receive park events + "channel_sync_interval": "5m", // sync channels with freeswitch regularly "connections":[ // instantiate connections to multiple FreeSWITCH servers {"server": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5} ], diff --git a/config/config_json_test.go b/config/config_json_test.go index a16e69e4e..e00d75d6f 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -320,6 +320,7 @@ func TestSmFsJsonCfg(t *testing.T) { Empty_balance_context: utils.StringPointer(""), Empty_balance_ann_file: utils.StringPointer(""), Subscribe_park: utils.BoolPointer(true), + Channel_sync_interval: utils.StringPointer("5m"), Connections: &[]*FsConnJsonCfg{ &FsConnJsonCfg{ Server: utils.StringPointer("127.0.0.1:8021"), diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 9b00ee879..23030c04c 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -152,6 +152,7 @@ type SmFsJsonCfg struct { Empty_balance_context *string Empty_balance_ann_file *string Subscribe_park *bool + Channel_sync_interval *string Connections *[]*FsConnJsonCfg } diff --git a/config/smconfig.go b/config/smconfig.go index 62fc30ad9..0075783bd 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -70,6 +70,7 @@ type SmFsConfig struct { EmptyBalanceContext string EmptyBalanceAnnFile string SubscribePark bool + ChannelSyncInterval time.Duration Connections []*FsConnConfig } @@ -130,6 +131,11 @@ func (self *SmFsConfig) loadFromJsonCfg(jsnCfg *SmFsJsonCfg) error { if jsnCfg.Subscribe_park != nil { self.SubscribePark = *jsnCfg.Subscribe_park } + if jsnCfg.Channel_sync_interval != nil { + if self.ChannelSyncInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Channel_sync_interval); err != nil { + return err + } + } if jsnCfg.Connections != nil { self.Connections = make([]*FsConnConfig, len(*jsnCfg.Connections)) for idx, jsnConnCfg := range *jsnCfg.Connections { diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 8486315d2..d8c6fe481 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -172,6 +172,7 @@ // "empty_balance_context": "", // if defined, prepaid calls will be transfered to this context on empty balance // "empty_balance_ann_file": "", // file to be played before disconnecting prepaid calls on empty balance (applies only if no context defined) // "subscribe_park": true, // subscribe via fsock to receive park events +// "channel_sync_interval": "5m", // sync channels with freeswitch regularly // "connections":[ // instantiate connections to multiple FreeSWITCH servers // {"server": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5} // ], diff --git a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json index 903d2f953..481478905 100644 --- a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json @@ -71,6 +71,7 @@ "cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234> "create_cdr": true, // create CDR out of events and sends them to CDRS component "debit_interval": "5s", // interval to perform debits on. + "channel_sync_interval": "10s", "connections":[ // instantiate connections to multiple FreeSWITCH servers {"server": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 15} ], diff --git a/engine/cdrs.go b/engine/cdrs.go index cf42c232a..2881475c5 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -108,7 +108,6 @@ type CallCostLog struct { // RPC method, used to log callcosts to db func (self *CdrServer) LogCallCost(ccl *CallCostLog) error { - Logger.Debug(fmt.Sprintf("LogCallCost, callCostLog: %+v, cost: %+v", ccl, ccl.CallCost)) if ccl.CheckDuplicate { cc, err := self.cdrDb.GetCallCostLog(ccl.CgrId, ccl.Source, ccl.RunId) if err != nil && err.Error() != "record not found" { diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 36fe7a7c7..b70b3c296 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "log/syslog" + "strconv" "time" "github.com/cgrates/cgrates/config" @@ -33,19 +34,21 @@ import ( // The freeswitch session manager type holding a buffer for the network connection // and the active sessions type FSSessionManager struct { - cfg *config.SmFsConfig - conns map[string]*fsock.FSock // Keep the list here for connection management purposes - sessions []*Session - rater engine.Connector - cdrsrv engine.Connector + cfg *config.SmFsConfig + conns map[string]*fsock.FSock // Keep the list here for connection management purposes + senderPools map[string]*fsock.FSockPool // Keep sender pools here + sessions []*Session + rater engine.Connector + cdrsrv engine.Connector } func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs engine.Connector) *FSSessionManager { return &FSSessionManager{ - cfg: smFsConfig, - conns: make(map[string]*fsock.FSock), - rater: rater, - cdrsrv: cdrs, + cfg: smFsConfig, + conns: make(map[string]*fsock.FSock), + senderPools: make(map[string]*fsock.FSockPool), + rater: rater, + cdrsrv: cdrs, } } @@ -69,6 +72,23 @@ func (sm *FSSessionManager) Connect() error { errChan <- err } }() + if fsSenderPool, err := fsock.NewFSockPool(5, connCfg.Server, connCfg.Password, 1, + make(map[string][]func(string, string)), make(map[string]string), engine.Logger.(*syslog.Writer), connId); err != nil { + return fmt.Errorf("Cannot connect FreeSWITCH senders pool, error: %s", err.Error()) + } else if fsSenderPool == nil { + return errors.New("Cannot connect FreeSWITCH senders pool.") + } else { + sm.senderPools[connId] = fsSenderPool + } + if sm.cfg.ChannelSyncInterval != 0 { // Schedule running of the callsync + go func() { + for { // Schedule sync channels to run repetately + time.Sleep(sm.cfg.ChannelSyncInterval) + sm.SyncSessions() + } + + }() + } } err := <-errChan // Will keep the Connect locked until the first error in one of the connections return err @@ -335,3 +355,54 @@ func (sm *FSSessionManager) Shutdown() (err error) { func (sm *FSSessionManager) Sessions() []*Session { return sm.sessions } + +// Sync sessions with FS +/* +map[secure: hostname:CgrDev1 callstate:ACTIVE callee_num:1002 initial_dest:1002 state:CS_EXECUTE dialplan:XML read_codec:SPEEX initial_ip_addr:127.0.0.1 write_codec:SPEEX write_bit_rate:44000 +call_uuid:3427e500-10e5-4864-a589-e306b70419a2 presence_id: initial_cid_name:1001 context:default read_rate:32000 read_bit_rate:44000 callee_direction:SEND initial_context:default created:2015-06-15 18:48:13 +dest:1002 callee_name:Outbound Call direction:inbound ip_addr:127.0.0.1 sent_callee_name:Outbound Call write_rate:32000 presence_data: sent_callee_num:1002 created_epoch:1434386893 cid_name:1001 application:sched_hangup +application_data:+10800 alloted_timeout uuid:3427e500-10e5-4864-a589-e306b70419a2 name:sofia/cgrtest/1001@127.0.0.1 cid_num:1001 initial_cid_num:1001 initial_dialplan:XML] +*/ +func (sm *FSSessionManager) SyncSessions() error { + for connId, senderPool := range sm.senderPools { + fsConn, err := senderPool.PopFSock() + if err != nil { + engine.Logger.Err(fmt.Sprintf(" Error on syncing active calls, senderPool: %+v, error: %s", senderPool, err.Error())) + continue + } + activeChanStr, err := fsConn.SendApiCmd("show channels") + senderPool.PushFSock(fsConn) + if err != nil { + engine.Logger.Err(fmt.Sprintf(" Error on syncing active calls, senderPool: %+v, error: %s", senderPool, err.Error())) + continue + } + aChans := fsock.MapChanData(activeChanStr) + for _, session := range sm.sessions { + if session.connId != connId { // This session belongs to another connectionId + continue + } + var stillActive bool + for _, fsAChan := range aChans { + if fsAChan["call_uuid"] == session.eventStart.GetUUID() { // Channel still active + stillActive = true + break + } + } + if !stillActive { + engine.Logger.Warning(fmt.Sprintf(" Sync active channels, stale session detected, uuid: %s", session.eventStart.GetUUID())) + sm.RemoveSession(session.eventStart.GetUUID()) // Unreference it early so we avoid concurrency + fsev := session.eventStart.(FSEvent) + now := time.Now() + aTime, _ := fsev.GetAnswerTime("") + dur := now.Sub(aTime) + fsev[END_TIME] = now.String() + fsev[DURATION] = strconv.FormatFloat(dur.Seconds(), 'f', -1, 64) + if err := session.Close(fsev); err != nil { // Stop loop, refund advanced charges and save the costs deducted so far to database + engine.Logger.Err(fmt.Sprintf(" Error on removing stale session with uuid: %s, error: %s", session.eventStart.GetUUID(), err.Error())) + continue + } + } + } + } + return nil +} diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go index 3924f10de..e511134d6 100644 --- a/sessionmanager/kamailiosm.go +++ b/sessionmanager/kamailiosm.go @@ -240,3 +240,7 @@ func (self *KamailioSessionManager) Shutdown() error { func (self *KamailioSessionManager) Sessions() []*Session { return self.sessions } + +func (self *KamailioSessionManager) SyncSessions() error { + return nil +} diff --git a/sessionmanager/osipssm.go b/sessionmanager/osipssm.go index 231daf107..b69804998 100644 --- a/sessionmanager/osipssm.go +++ b/sessionmanager/osipssm.go @@ -346,3 +346,8 @@ func (osm *OsipsSessionManager) getSession(uuid string) *Session { func (osm *OsipsSessionManager) Sessions() []*Session { return osm.sessions } + +// Sync sessions with FS +func (osm *OsipsSessionManager) SyncSessions() error { + return nil +} diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index cf81f8f41..181589afa 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -35,4 +35,5 @@ type SessionManager interface { ProcessCdr(*engine.StoredCdr) error Shutdown() error Sessions() []*Session + SyncSessions() error }