diff --git a/agents/fsagent.go b/agents/fsagent.go index dd26d7c2e..0c9ecffbd 100644 --- a/agents/fsagent.go +++ b/agents/fsagent.go @@ -362,6 +362,7 @@ func (sm *FSsessions) disconnectSession(connIdx int, uuid, redirectNr, notify st return nil } +// Shutdown stops all connected fsock connections func (sm *FSsessions) Shutdown() (err error) { for connIdx, fSock := range sm.conns { if !fSock.Connected() { @@ -372,6 +373,10 @@ func (sm *FSsessions) Shutdown() (err error) { if _, err = fSock.SendApiCmd("hupall MANAGER_REQUEST cgr_reqtype *prepaid"); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Error on calls shutdown: %s, connection index: %v", utils.FreeSWITCHAgent, err.Error(), connIdx)) } + if err = fSock.Disconnect(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Error on disconnect: %s, connection index: %v", utils.FreeSWITCHAgent, err.Error(), connIdx)) + } + } return } @@ -381,7 +386,7 @@ func (sm *FSsessions) Call(serviceMethod string, args interface{}, reply interfa return utils.RPCCall(sm, serviceMethod, args, reply) } -// Internal method to disconnect session in FreeSWITCH +// V1DisconnectSession internal method to disconnect session in FreeSWITCH func (fsa *FSsessions) V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) (err error) { ev := engine.NewMapEvent(args.EventStart) channelID := ev.GetStringIgnoreErrors(utils.OriginID) @@ -434,3 +439,16 @@ func (fsa *FSsessions) V1GetActiveSessionIDs(ignParam string, *sessionIDs = sIDs return } + +// SetSessionSConnection sets the new connection to the threshold service +// only used on reload +func (sm *FSsessions) SetSessionSConnection(sS rpcclient.RpcClientConnection) { + sm.sS = sS +} + +// Reload recreates the connection buffers +// only used on reload +func (sm *FSsessions) Reload() { + sm.conns = make([]*fsock.FSock, len(sm.cfg.EventSocketConns)) + sm.senderPools = make([]*fsock.FSockPool, len(sm.cfg.EventSocketConns)) +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 3eea5ba81..738baeba8 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -287,53 +287,6 @@ func startRadiusAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.Rp exitChan <- true } -func startFsAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { - var err error - var sS rpcclient.RpcClientConnection - var sSInternal bool - utils.Logger.Info("Starting FreeSWITCH agent") - intSMGChan := internalSMGChan - if cfg.DispatcherSCfg().Enabled { - intSMGChan = internalDispatcherSChan - } - if !cfg.DispatcherSCfg().Enabled && cfg.FsAgentCfg().SessionSConns[0].Address == utils.MetaInternal { - sSInternal = true - sSIntConn := <-internalSMGChan - internalSMGChan <- sSIntConn - sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS)) - } else { - sS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.FsAgentCfg().SessionSConns, intSMGChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", - utils.FreeSWITCHAgent, utils.SessionS, err.Error())) - exitChan <- true - return - } - } - sm := agents.NewFSsessions(cfg.FsAgentCfg(), sS, cfg.GeneralCfg().DefaultTimezone) - if sSInternal { // bidirectional client backwards connection - sS.(*utils.BiRPCInternalClient).SetClientConn(sm) - var rply string - if err := sS.Call(utils.SessionSv1RegisterInternalBiJSONConn, - utils.EmptyString, &rply); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", - utils.FreeSWITCHAgent, utils.SessionS, err.Error())) - exitChan <- true - return - } - } - if err = sm.Connect(); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err)) - } - - exitChan <- true -} - func startKamAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { var err error var sS rpcclient.RpcClientConnection @@ -946,7 +899,8 @@ func main() { smg := services.NewSessionService() srvManager.AddService(chS, attrS, chrS, tS, stS, reS, supS, schS, cdrS, rals, smg, services.NewEventReaderService(), - services.NewDNSAgent()) + services.NewDNSAgent(), + services.NewFreeswitchAgent()) internalAttributeSChan := attrS.GetIntenternalChan() internalChargerSChan := chrS.GetIntenternalChan() internalThresholdSChan := tS.GetIntenternalChan() @@ -998,11 +952,6 @@ func main() { // Start CDRC components if necessary go startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan, filterSChan, exitChan) - // Start FreeSWITCHAgent - if cfg.FsAgentCfg().Enabled { - go startFsAgent(internalSMGChan, internalDispatcherSChan, exitChan) - } - // Start SM-Kamailio if cfg.KamAgentCfg().Enabled { go startKamAgent(internalSMGChan, internalDispatcherSChan, exitChan) diff --git a/config/config.go b/config/config.go index 4b418cc78..f4ae2c607 100755 --- a/config/config.go +++ b/config/config.go @@ -1189,8 +1189,11 @@ func (cfg *CGRConfig) SessionSCfg() *SessionSCfg { return cfg.sessionSCfg } -func (self *CGRConfig) FsAgentCfg() *FsAgentCfg { - return self.fsAgentCfg +// FsAgentCfg returns the config for FsAgent +func (cfg *CGRConfig) FsAgentCfg() *FsAgentCfg { + cfg.lks[FreeSWITCHAgentJSN].Lock() + defer cfg.lks[FreeSWITCHAgentJSN].Unlock() + return cfg.fsAgentCfg } func (self *CGRConfig) KamAgentCfg() *KamAgentCfg { @@ -1342,7 +1345,7 @@ func (cfg *CGRConfig) V1GetConfigSection(args *StringWithArgDispatcher, reply *m jsonString = utils.ToJSON(cfg.CdrsCfg()) case SessionSJson: jsonString = utils.ToJSON(cfg.SessionSCfg()) - case FS_JSN: + case FreeSWITCHAgentJSN: jsonString = utils.ToJSON(cfg.FsAgentCfg()) case KamailioAgentJSN: jsonString = utils.ToJSON(cfg.KamAgentCfg()) @@ -1542,6 +1545,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { } fallthrough case FreeSWITCHAgentJSN: + cfg.rldChans[FreeSWITCHAgentJSN] <- struct{}{} if !fall { break } diff --git a/config/config_json.go b/config/config_json.go index 58e7541f2..c15642595 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -39,7 +39,6 @@ const ( FreeSWITCHAgentJSN = "freeswitch_agent" KamailioAgentJSN = "kamailio_agent" AsteriskAgentJSN = "asterisk_agent" - FS_JSN = "freeswitch" OSIPS_JSN = "opensips" DA_JSN = "diameter_agent" RA_JSN = "radius_agent" diff --git a/services/dnsagent.go b/services/dnsagent.go index ee818f869..004410135 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -37,8 +37,7 @@ func NewDNSAgent() servmanager.Service { // DNSAgent implements Agent interface type DNSAgent struct { sync.RWMutex - dns *agents.DNSAgent - connChan chan rpcclient.RpcClientConnection + dns *agents.DNSAgent } // Start should handle the sercive start @@ -104,7 +103,6 @@ func (dns *DNSAgent) GetIntenternalChan() (conn chan rpcclient.RpcClientConnecti // Reload handles the change of config func (dns *DNSAgent) Reload(sp servmanager.ServiceProvider) (err error) { var sS rpcclient.RpcClientConnection - utils.Logger.Info(fmt.Sprintf("starting %s service", utils.DNSAgent)) if !sp.GetConfig().DispatcherSCfg().Enabled && sp.GetConfig().DNSAgentCfg().SessionSConns[0].Address == utils.MetaInternal { // sSInternal = true srvSessionS, has := sp.GetService(utils.SessionS) diff --git a/services/freeswitchagent.go b/services/freeswitchagent.go new file mode 100644 index 000000000..fcbd9bb39 --- /dev/null +++ b/services/freeswitchagent.go @@ -0,0 +1,173 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/sessions" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewFreeswitchAgent returns the Freeswitch Agent +func NewFreeswitchAgent() servmanager.Service { + return new(FreeswitchAgent) +} + +// FreeswitchAgent implements Agent interface +type FreeswitchAgent struct { + sync.RWMutex + fS *agents.FSsessions +} + +// Start should handle the sercive start +func (fS *FreeswitchAgent) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { + if fS.IsRunning() { + return fmt.Errorf("service aleady running") + } + + fS.Lock() + defer fS.Unlock() + var sS rpcclient.RpcClientConnection + var sSInternal bool + utils.Logger.Info("Starting FreeSWITCH agent") + if !sp.GetConfig().DispatcherSCfg().Enabled && sp.GetConfig().FsAgentCfg().SessionSConns[0].Address == utils.MetaInternal { + sSInternal = true + srvSessionS, has := sp.GetService(utils.SessionS) + if !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>", + utils.DNSAgent, utils.SessionS)) + return utils.ErrNotFound + } + sSIntConn := <-srvSessionS.GetIntenternalChan() + srvSessionS.GetIntenternalChan() <- sSIntConn + sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS)) + } else { + if sS, err = sp.GetConnection(utils.SessionS, sp.GetConfig().FsAgentCfg().SessionSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.FreeSWITCHAgent, utils.SessionS, err.Error())) + return + } + } + fS.fS = agents.NewFSsessions(sp.GetConfig().FsAgentCfg(), sS, sp.GetConfig().GeneralCfg().DefaultTimezone) + if sSInternal { // bidirectional client backwards connection + sS.(*utils.BiRPCInternalClient).SetClientConn(fS.fS) + var rply string + if err = sS.Call(utils.SessionSv1RegisterInternalBiJSONConn, + utils.EmptyString, &rply); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.FreeSWITCHAgent, utils.SessionS, err.Error())) + return + } + } + + go func() { + if err := fS.fS.Connect(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err)) + sp.GetExitChan() <- true // stop the engine here + } + }() + return +} + +// GetIntenternalChan returns the internal connection chanel +// no chanel for FreeswitchAgent +func (fS *FreeswitchAgent) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return nil +} + +// Reload handles the change of config +func (fS *FreeswitchAgent) Reload(sp servmanager.ServiceProvider) (err error) { + var sS rpcclient.RpcClientConnection + var sSInternal bool + if !sp.GetConfig().DispatcherSCfg().Enabled && sp.GetConfig().FsAgentCfg().SessionSConns[0].Address == utils.MetaInternal { + sSInternal = true + srvSessionS, has := sp.GetService(utils.SessionS) + if !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>", + utils.DNSAgent, utils.SessionS)) + return utils.ErrNotFound + } + sSIntConn := <-srvSessionS.GetIntenternalChan() + srvSessionS.GetIntenternalChan() <- sSIntConn + sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS)) + } else { + if sS, err = sp.GetConnection(utils.SessionS, sp.GetConfig().FsAgentCfg().SessionSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.FreeSWITCHAgent, utils.SessionS, err.Error())) + return + } + } + if err = fS.Shutdown(); err != nil { + return + } + fS.Lock() + defer fS.Unlock() + fS.fS.SetSessionSConnection(sS) + fS.fS.Reload() + if sSInternal { // bidirectional client backwards connection + sS.(*utils.BiRPCInternalClient).SetClientConn(fS.fS) + var rply string + if err = sS.Call(utils.SessionSv1RegisterInternalBiJSONConn, + utils.EmptyString, &rply); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.FreeSWITCHAgent, utils.SessionS, err.Error())) + return + } + } + go func() { + if err := fS.fS.Connect(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err)) + sp.GetExitChan() <- true // stop the engine here + } + }() + return +} + +// Shutdown stops the service +func (fS *FreeswitchAgent) Shutdown() (err error) { + fS.Lock() + defer fS.Unlock() + if err = fS.fS.Shutdown(); err != nil { + return + } + fS.fS = nil + return +} + +// GetRPCInterface returns the interface to register for server +func (fS *FreeswitchAgent) GetRPCInterface() interface{} { + return fS.fS +} + +// IsRunning returns if the service is running +func (fS *FreeswitchAgent) IsRunning() bool { + fS.RLock() + defer fS.RUnlock() + return fS != nil && fS.fS != nil +} + +// ServiceName returns the service name +func (fS *FreeswitchAgent) ServiceName() string { + return utils.FreeSWITCHAgent +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 5a9a0d5a7..30a01a1d0 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -283,6 +283,9 @@ func (srvMngr *ServiceManager) StartServices() (err error) { if srvMngr.GetConfig().DNSAgentCfg().Enabled { go srvMngr.startService(utils.DNSAgent) } + if srvMngr.GetConfig().FsAgentCfg().Enabled { + go srvMngr.startService(utils.FreeSWITCHAgent) + } // startServer() return } @@ -363,6 +366,10 @@ func (srvMngr *ServiceManager) handleReload() { if err = srvMngr.reloadService(utils.DNSAgent, srvMngr.GetConfig().DNSAgentCfg().Enabled); err != nil { return } + case <-srvMngr.GetConfig().GetReloadChan(config.FreeSWITCHAgentJSN): + if err = srvMngr.reloadService(utils.FreeSWITCHAgent, srvMngr.GetConfig().FsAgentCfg().Enabled); err != nil { + return + } } // handle RPC server }