From 9464478cf7384df610be633d97bd03d3ca405fef Mon Sep 17 00:00:00 2001 From: Trial97 Date: Tue, 1 Oct 2019 15:48:04 +0300 Subject: [PATCH] Added HTTPAgent as service in ServiceManager --- cmd/cgr-engine/cgr-engine.go | 39 +-------------- config/config.go | 8 ++- services/httpagent.go | 95 ++++++++++++++++++++++++++++++++++++ servmanager/servmanager.go | 7 +++ 4 files changed, 109 insertions(+), 40 deletions(-) create mode 100644 services/httpagent.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 0f1472059..520cf7819 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -31,7 +31,6 @@ import ( "syscall" "time" - "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/analyzers" v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/cdrc" @@ -138,38 +137,6 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne } } -func startHTTPAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, - server *utils.Server, filterSChan chan *engine.FilterS, dfltTenant string, exitChan chan bool) { - filterS := <-filterSChan - filterSChan <- filterS - var sS rpcclient.RpcClientConnection - intSMGChan := internalSMGChan - if cfg.DispatcherSCfg().Enabled { - intSMGChan = internalDispatcherSChan - } - utils.Logger.Info("Starting HTTP agent") - var err error - for _, agntCfg := range cfg.HttpAgentCfg() { - if len(agntCfg.SessionSConns) != 0 { - sS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - agntCfg.SessionSConns, intSMGChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> could not connect to %s, error: %s", - utils.HTTPAgent, utils.SessionS, err.Error())) - exitChan <- true - return - } - } - server.RegisterHttpHandler(agntCfg.Url, - agents.NewHTTPAgent(sS, filterS, dfltTenant, agntCfg.RequestPayload, - agntCfg.ReplyPayload, agntCfg.RequestProcessors)) - } -} - // startFilterService fires up the FilterS func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS, internalStatSChan, internalResourceSChan, internalRalSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, @@ -709,6 +676,7 @@ func main() { services.NewAsteriskAgent(), // partial reload services.NewRadiusAgent(), // partial reload services.NewDiameterAgent(), // partial reload + services.NewHTTPAgent(), // no reload ) internalAttributeSChan := attrS.GetIntenternalChan() internalChargerSChan := chrS.GetIntenternalChan() @@ -761,11 +729,6 @@ func main() { // Start CDRC components if necessary go startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan, filterSChan, exitChan) - if len(cfg.HttpAgentCfg()) != 0 { - go startHTTPAgent(internalSMGChan, internalDispatcherSChan, server, filterSChan, - cfg.GeneralCfg().DefaultTenant, exitChan) - } - // Start FilterS go startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, internalRaterChan, cfg, dm, exitChan) diff --git a/config/config.go b/config/config.go index c66e51421..c8b76391e 100755 --- a/config/config.go +++ b/config/config.go @@ -1210,8 +1210,11 @@ func (cfg *CGRConfig) AsteriskAgentCfg() *AsteriskAgentCfg { return cfg.asteriskAgentCfg } -func (self *CGRConfig) HttpAgentCfg() []*HttpAgentCfg { - return self.httpAgentCfg +// HttpAgentCfg returns the config for HttpAgent +func (cfg *CGRConfig) HttpAgentCfg() []*HttpAgentCfg { + cfg.lks[HttpAgentJson].Lock() + defer cfg.lks[HttpAgentJson].Unlock() + return cfg.httpAgentCfg } func (cfg *CGRConfig) FilterSCfg() *FilterSCfg { @@ -1577,6 +1580,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { } fallthrough case HttpAgentJson: + cfg.rldChans[HttpAgentJson] <- struct{}{} if !fall { break } diff --git a/services/httpagent.go b/services/httpagent.go new file mode 100644 index 000000000..9834ba926 --- /dev/null +++ b/services/httpagent.go @@ -0,0 +1,95 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewHTTPAgent returns the HTTP Agent +func NewHTTPAgent() servmanager.Service { + return new(HTTPAgent) +} + +// HTTPAgent implements Agent interface +type HTTPAgent struct { + sync.RWMutex + ha *agents.HTTPAgent +} + +// Start should handle the sercive start +func (ha *HTTPAgent) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { + if ha.IsRunning() { + return fmt.Errorf("service aleady running") + } + + ha.Lock() + defer ha.Unlock() + utils.Logger.Info("Starting HTTP agent") + for _, agntCfg := range sp.GetConfig().HttpAgentCfg() { + var sS rpcclient.RpcClientConnection + if sS, err = sp.NewConnection(utils.SessionS, agntCfg.SessionSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> could not connect to %s, error: %s", + utils.HTTPAgent, utils.SessionS, err.Error())) + return + } + sp.GetServer().RegisterHttpHandler(agntCfg.Url, + agents.NewHTTPAgent(sS, sp.GetFilterS(), sp.GetConfig().GeneralCfg().DefaultTenant, agntCfg.RequestPayload, + agntCfg.ReplyPayload, agntCfg.RequestProcessors)) + } + return +} + +// GetIntenternalChan returns the internal connection chanel +func (ha *HTTPAgent) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return nil +} + +// Reload handles the change of config +func (ha *HTTPAgent) Reload(sp servmanager.ServiceProvider) (err error) { + return // no reload +} + +// Shutdown stops the service +func (ha *HTTPAgent) Shutdown() (err error) { + return // no shutdown for the momment +} + +// GetRPCInterface returns the interface to register for server +func (ha *HTTPAgent) GetRPCInterface() interface{} { + return ha.ha +} + +// IsRunning returns if the service is running +func (ha *HTTPAgent) IsRunning() bool { + ha.RLock() + defer ha.RUnlock() + return ha != nil && ha.ha != nil +} + +// ServiceName returns the service name +func (ha *HTTPAgent) ServiceName() string { + return utils.HTTPAgent +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 292ce5020..a6a2498fb 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -298,6 +298,9 @@ func (srvMngr *ServiceManager) StartServices() (err error) { if srvMngr.GetConfig().DiameterAgentCfg().Enabled { go srvMngr.startService(utils.DiameterAgent) } + if len(srvMngr.GetConfig().HttpAgentCfg()) != 0 { + go srvMngr.startService(utils.HTTPAgent) + } // startServer() return } @@ -398,6 +401,10 @@ func (srvMngr *ServiceManager) handleReload() { if err = srvMngr.reloadService(utils.DiameterAgent, srvMngr.GetConfig().DiameterAgentCfg().Enabled); err != nil { return } + case <-srvMngr.GetConfig().GetReloadChan(config.HttpAgentJson): + if err = srvMngr.reloadService(utils.HTTPAgent, len(srvMngr.GetConfig().HttpAgentCfg()) != 0); err != nil { + return + } } // handle RPC server }