From afb3af06328066475337848c04cb7d5542b31868 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 4 Oct 2019 12:49:57 +0300 Subject: [PATCH] Updated Asterisk Agent --- cmd/cgr-engine/cgr-engine.go | 2 +- services/asteriskagent.go | 67 ++++++++++++++++++------------------ services/kamailioagent.go | 5 --- servmanager/servmanager.go | 14 ++++---- 4 files changed, 42 insertions(+), 46 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 6456d38d2..6f544adc4 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -712,9 +712,9 @@ func main() { services.NewDNSAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), services.NewFreeswitchAgent(cfg, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), services.NewKamailioAgent(cfg, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), + services.NewAsteriskAgent(cfg, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), // partial reload ) /* - services.NewAsteriskAgent(), // partial reload services.NewRadiusAgent(), // partial reload services.NewDiameterAgent(), // partial reload services.NewHTTPAgent(), // no reload diff --git a/services/asteriskagent.go b/services/asteriskagent.go index 4093482fd..54c0f23d8 100644 --- a/services/asteriskagent.go +++ b/services/asteriskagent.go @@ -23,6 +23,7 @@ import ( "sync" "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" @@ -30,18 +31,30 @@ import ( ) // NewAsteriskAgent returns the Asterisk Agent -func NewAsteriskAgent() servmanager.Service { - return new(AsteriskAgent) +func NewAsteriskAgent(cfg *config.CGRConfig, sSChan, + dispatcherChan chan rpcclient.RpcClientConnection, + exitChan chan bool) servmanager.Service { + return &AsteriskAgent{ + cfg: cfg, + sSChan: sSChan, + dispatcherChan: dispatcherChan, + exitChan: exitChan, + } } // AsteriskAgent implements Agent interface type AsteriskAgent struct { sync.RWMutex + cfg *config.CGRConfig + sSChan chan rpcclient.RpcClientConnection + dispatcherChan chan rpcclient.RpcClientConnection + exitChan chan bool + smas []*agents.AsteriskAgent } // Start should handle the sercive start -func (ast *AsteriskAgent) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { +func (ast *AsteriskAgent) Start() (err error) { if ast.IsRunning() { return fmt.Errorf("service aleady running") } @@ -51,19 +64,13 @@ func (ast *AsteriskAgent) Start(sp servmanager.ServiceProvider, waitCache bool) var sS rpcclient.RpcClientConnection var sSInternal bool utils.Logger.Info("Starting Asterisk agent") - if !sp.GetConfig().DispatcherSCfg().Enabled && sp.GetConfig().AsteriskAgentCfg().SessionSConns[0].Address == utils.MetaInternal { + if !ast.cfg.DispatcherSCfg().Enabled && ast.cfg.AsteriskAgentCfg().SessionSConns[0].Address == utils.MetaInternal { sSInternal = true - srvSessionS, has := sp.GetService(utils.SessionS) - if !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>", - utils.AsteriskAgent, utils.SessionS)) - return utils.ErrNotFound - } - sSIntConn := <-srvSessionS.GetIntenternalChan() - srvSessionS.GetIntenternalChan() <- sSIntConn + sSIntConn := <-ast.sSChan + ast.sSChan <- sSIntConn sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS)) } else { - if sS, err = sp.NewConnection(utils.SessionS, sp.GetConfig().AsteriskAgentCfg().SessionSConns); err != nil { + if sS, err = NewConnection(ast.cfg, ast.sSChan, ast.dispatcherChan, ast.cfg.AsteriskAgentCfg().SessionSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.AsteriskAgent, utils.SessionS, err.Error())) return @@ -76,9 +83,9 @@ func (ast *AsteriskAgent) Start(sp servmanager.ServiceProvider, waitCache bool) } exitChan <- true } - ast.smas = make([]*agents.AsteriskAgent, len(sp.GetConfig().AsteriskAgentCfg().AsteriskConns)) - for connIdx := range sp.GetConfig().AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers - if ast.smas[connIdx], err = agents.NewAsteriskAgent(sp.GetConfig(), connIdx, sS); err != nil { + ast.smas = make([]*agents.AsteriskAgent, len(ast.cfg.AsteriskAgentCfg().AsteriskConns)) + for connIdx := range ast.cfg.AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers + if ast.smas[connIdx], err = agents.NewAsteriskAgent(ast.cfg, connIdx, sS); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.AsteriskAgent, err)) return } @@ -92,7 +99,7 @@ func (ast *AsteriskAgent) Start(sp servmanager.ServiceProvider, waitCache bool) return } } - go listenAndServe(ast.smas[connIdx], sp.GetExitChan()) + go listenAndServe(ast.smas[connIdx], ast.exitChan) } return } @@ -103,22 +110,16 @@ func (ast *AsteriskAgent) GetIntenternalChan() (conn chan rpcclient.RpcClientCon } // Reload handles the change of config -func (ast *AsteriskAgent) Reload(sp servmanager.ServiceProvider) (err error) { +func (ast *AsteriskAgent) Reload() (err error) { var sS rpcclient.RpcClientConnection var sSInternal bool - if !sp.GetConfig().DispatcherSCfg().Enabled && sp.GetConfig().AsteriskAgentCfg().SessionSConns[0].Address == utils.MetaInternal { + if !ast.cfg.DispatcherSCfg().Enabled && ast.cfg.AsteriskAgentCfg().SessionSConns[0].Address == utils.MetaInternal { sSInternal = true - srvSessionS, has := sp.GetService(utils.SessionS) - if !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>", - utils.AsteriskAgent, utils.SessionS)) - return utils.ErrNotFound - } - sSIntConn := <-srvSessionS.GetIntenternalChan() - srvSessionS.GetIntenternalChan() <- sSIntConn + sSIntConn := <-ast.sSChan + ast.sSChan <- sSIntConn sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS)) } else { - if sS, err = sp.NewConnection(utils.SessionS, sp.GetConfig().AsteriskAgentCfg().SessionSConns); err != nil { + if sS, err = NewConnection(ast.cfg, ast.sSChan, ast.dispatcherChan, ast.cfg.AsteriskAgentCfg().SessionSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.AsteriskAgent, utils.SessionS, err.Error())) return @@ -147,11 +148,6 @@ func (ast *AsteriskAgent) Shutdown() (err error) { return // no shutdown for the momment } -// GetRPCInterface returns the interface to register for server -func (ast *AsteriskAgent) GetRPCInterface() interface{} { - return ast.smas -} - // IsRunning returns if the service is running func (ast *AsteriskAgent) IsRunning() bool { ast.RLock() @@ -163,3 +159,8 @@ func (ast *AsteriskAgent) IsRunning() bool { func (ast *AsteriskAgent) ServiceName() string { return utils.AsteriskAgent } + +// ShouldRun returns if the service should be running +func (ast *AsteriskAgent) ShouldRun() bool { + return ast.cfg.AsteriskAgentCfg().Enabled +} diff --git a/services/kamailioagent.go b/services/kamailioagent.go index 70a9c176d..df0770d01 100644 --- a/services/kamailioagent.go +++ b/services/kamailioagent.go @@ -162,11 +162,6 @@ func (kam *KamailioAgent) Shutdown() (err error) { return } -// GetRPCInterface returns the interface to register for server -func (kam *KamailioAgent) GetRPCInterface() interface{} { - return kam.kam -} - // IsRunning returns if the service is running func (kam *KamailioAgent) IsRunning() bool { kam.RLock() diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 85c468434..f99803ef1 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -282,10 +282,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.GetConfig().KamAgentCfg().Enabled { go srvMngr.startService(utils.KamailioAgent) + } + if srvMngr.GetConfig().AsteriskAgentCfg().Enabled { + go srvMngr.startService(utils.AsteriskAgent) } /* - if srvMngr.GetConfig().AsteriskAgentCfg().Enabled { - go srvMngr.startService(utils.AsteriskAgent) - } if srvMngr.GetConfig().RadiusAgentCfg().Enabled { go srvMngr.startService(utils.RadiusAgent) } @@ -387,11 +387,11 @@ func (srvMngr *ServiceManager) handleReload() { case <-srvMngr.GetConfig().GetReloadChan(config.KamailioAgentJSN): if err = srvMngr.reloadService(utils.KamailioAgent); err != nil { return + } + case <-srvMngr.GetConfig().GetReloadChan(config.AsteriskAgentJSN): + if err = srvMngr.reloadService(utils.AsteriskAgent); err != nil { + return } /* - case <-srvMngr.GetConfig().GetReloadChan(config.AsteriskAgentJSN): - if err = srvMngr.reloadService(utils.AsteriskAgent); err != nil { - return - } case <-srvMngr.GetConfig().GetReloadChan(config.RA_JSN): if err = srvMngr.reloadService(utils.RadiusAgent); err != nil { return