diff --git a/agents/dnsagent.go b/agents/dnsagent.go index 294a5a34c..824bca9ae 100644 --- a/agents/dnsagent.go +++ b/agents/dnsagent.go @@ -35,6 +35,7 @@ import ( func NewDNSAgent(cgrCfg *config.CGRConfig, fltrS *engine.FilterS, sS rpcclient.RpcClientConnection) (da *DNSAgent, err error) { da = &DNSAgent{cgrCfg: cgrCfg, fltrS: fltrS, sS: sS} + err = da.initDNSServer() return } @@ -78,30 +79,16 @@ func (da *DNSAgent) initDNSServer() (err error) { func (da *DNSAgent) ListenAndServe() (err error) { utils.Logger.Info(fmt.Sprintf("<%s> start listening on <%s:%s>", utils.DNSAgent, da.cgrCfg.DNSAgentCfg().ListenNet, da.cgrCfg.DNSAgentCfg().Listen)) - errChan := make(chan error, 1) - rldChan := da.cgrCfg.GetReloadChan(config.DNSAgentJson) + return da.server.ListenAndServe() +} - if err = da.initDNSServer(); err != nil { +// Reload will stop the dns server and reinitialize it but will not start the server again +// this is in order to monitor if we receive error on ListenAndServe +func (da *DNSAgent) Reload() (err error) { + if err = da.Shutdown(); err != nil { return } - listenAndServe := func() { - errChan <- da.server.ListenAndServe() - } - go listenAndServe() - for { - select { - case err = <-errChan: - return - case <-rldChan: - if err = da.Shutdown(); err != nil { - return - } - if err = da.initDNSServer(); err != nil { - return - } - go listenAndServe() //restart the gorutine - } - } + return da.initDNSServer() } // handleMessage is the entry point of all DNS requests diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 2721a571e..3eea5ba81 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -287,60 +287,6 @@ func startRadiusAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.Rp exitChan <- true } -func startDNSAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, - filterSChan chan *engine.FilterS, exitChan chan bool) { - var err error - var sS rpcclient.RpcClientConnection - // var sSInternal bool - filterS := <-filterSChan - filterSChan <- filterS - utils.Logger.Info(fmt.Sprintf("starting %s service", utils.DNSAgent)) - intSMGChan := internalSMGChan - if cfg.DispatcherSCfg().Enabled { - intSMGChan = internalDispatcherSChan - } - if !cfg.DispatcherSCfg().Enabled && cfg.DNSAgentCfg().SessionSConns[0].Address == utils.MetaInternal { - // sSInternal = true - sSIntConn := <-internalSMGChan - internalSMGChan <- sSIntConn - sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS)) - } else { - sS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DNSAgentCfg().SessionSConns, intSMGChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", - utils.DNSAgent, utils.SessionS, err.Error())) - exitChan <- true - return - } - } - da, err := agents.NewDNSAgent(cfg, filterS, sS) - if err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) - exitChan <- true - return - } - // if sSInternal { // bidirectional client backwards connection - // sS.(*utils.BiRPCInternalClient).SetClientConn(da) - // var rply string - // if err := sS.Call(utils.SessionSv1RegisterInternalBiJSONConn, - // utils.EmptyString, &rply); err != nil { - // utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", - // utils.DNSAgent, utils.SessionS, err.Error())) - // exitChan <- true - // return - // } - // } - if err = da.ListenAndServe(); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) - } - exitChan <- true -} - func startFsAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { var err error var sS rpcclient.RpcClientConnection @@ -998,7 +944,9 @@ func main() { apiv2, _ := srvManager.GetService(utils.ApierV2) resp, _ := srvManager.GetService(utils.ResponderS) smg := services.NewSessionService() - srvManager.AddService(chS, attrS, chrS, tS, stS, reS, supS, schS, cdrS, rals, smg, services.NewEventReaderService()) + srvManager.AddService(chS, attrS, chrS, tS, stS, reS, supS, schS, cdrS, rals, smg, + services.NewEventReaderService(), + services.NewDNSAgent()) internalAttributeSChan := attrS.GetIntenternalChan() internalChargerSChan := chrS.GetIntenternalChan() internalThresholdSChan := tS.GetIntenternalChan() @@ -1072,10 +1020,6 @@ func main() { go startRadiusAgent(internalSMGChan, internalDispatcherSChan, filterSChan, exitChan) } - if cfg.DNSAgentCfg().Enabled { - go startDNSAgent(internalSMGChan, internalDispatcherSChan, filterSChan, exitChan) - } - if len(cfg.HttpAgentCfg()) != 0 { go startHTTPAgent(internalSMGChan, internalDispatcherSChan, server, filterSChan, cfg.GeneralCfg().DefaultTenant, exitChan) diff --git a/services/dnsagent.go b/services/dnsagent.go new file mode 100644 index 000000000..ff73c1d9a --- /dev/null +++ b/services/dnsagent.go @@ -0,0 +1,147 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/sessions" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewDNSAgent returns the DNS Agent +func NewDNSAgent() servmanager.Service { + return new(DNSAgent) +} + +// DNSAgent implements Agent interface +type DNSAgent struct { + sync.RWMutex + dns *agents.DNSAgent + connChan chan rpcclient.RpcClientConnection +} + +// Start should handle the sercive start +func (dns *DNSAgent) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { + if dns.IsRunning() { + return fmt.Errorf("service aleady running") + } + + dns.Lock() + defer dns.Unlock() + // var sSInternal bool + var sS rpcclient.RpcClientConnection + utils.Logger.Info(fmt.Sprintf("starting %s service", utils.DNSAgent)) + if !sp.GetConfig().DispatcherSCfg().Enabled && sp.GetConfig().DNSAgentCfg().SessionSConns[0].Address == utils.MetaInternal { + // sSInternal = true + srvSessionS, has := sp.GetService(utils.SessionS) + if !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>", + utils.DNSAgent, utils.SessionS)) + return utils.ErrNotFound + } + sSIntConn := <-srvSessionS.GetIntenternalChan() + srvSessionS.GetIntenternalChan() <- sSIntConn + sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS)) + } else { + if sS, err = sp.GetConnection(utils.SessionS, sp.GetConfig().DNSAgentCfg().SessionSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.DNSAgent, utils.SessionS, err.Error())) + return + } + } + dns.dns, err = agents.NewDNSAgent(sp.GetConfig(), sp.GetFilterS(), sS) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) + return + } + // if sSInternal { // bidirectional client backwards connection + // sS.(*utils.BiRPCInternalClient).SetClientConn(da) + // var rply string + // if err := sS.Call(utils.SessionSv1RegisterInternalBiJSONConn, + // utils.EmptyString, &rply); err != nil { + // utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + // utils.DNSAgent, utils.SessionS, err.Error())) + // exitChan <- true + // return + // } + // } + go func() { + if err = dns.dns.ListenAndServe(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) + sp.GetExitChan() <- true // stop the engine here + } + }() + return +} + +// GetIntenternalChan returns the internal connection chanel +// no chanel for DNSAgent +func (dns *DNSAgent) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return nil +} + +// Reload handles the change of config +func (dns *DNSAgent) Reload(sp servmanager.ServiceProvider) (err error) { + dns.Lock() + defer dns.Unlock() + if err = dns.dns.Reload(); err != nil { + dns.dns = nil // make sure we mark this service as stoped to not close server twice + return err + } + go func() { + if err = dns.dns.ListenAndServe(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) + sp.GetExitChan() <- true // stop the engine here + } + }() + return +} + +// Shutdown stops the service +func (dns *DNSAgent) Shutdown() (err error) { + dns.Lock() + defer dns.Unlock() + if err = dns.dns.Shutdown(); err != nil { + return + } + dns.dns = nil + return +} + +// GetRPCInterface returns the interface to register for server +func (dns *DNSAgent) GetRPCInterface() interface{} { + return dns.dns +} + +// IsRunning returns if the service is running +func (dns *DNSAgent) IsRunning() bool { + dns.RLock() + defer dns.RUnlock() + return dns != nil && dns.dns != nil +} + +// ServiceName returns the service name +func (dns *DNSAgent) ServiceName() string { + return utils.DNSAgent +} diff --git a/services/ers.go b/services/ers.go index 4037245b6..9c2c00434 100644 --- a/services/ers.go +++ b/services/ers.go @@ -31,8 +31,7 @@ import ( // NewEventReaderService returns the EventReader Service func NewEventReaderService() servmanager.Service { return &EventReaderService{ - connChan: make(chan rpcclient.RpcClientConnection, 1), - rldChan: make(chan struct{}, 1), + rldChan: make(chan struct{}, 1), } } @@ -42,7 +41,6 @@ type EventReaderService struct { ers *ers.ERService rldChan chan struct{} stopChan chan struct{} - connChan chan rpcclient.RpcClientConnection } // Start should handle the sercive start @@ -77,7 +75,7 @@ func (erS *EventReaderService) Start(sp servmanager.ServiceProvider, waitCache b // GetIntenternalChan returns the internal connection chanel func (erS *EventReaderService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { - return erS.connChan + return nil } // Reload handles the change of config diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 630b52e32..5a9a0d5a7 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -280,6 +280,9 @@ func (srvMngr *ServiceManager) StartServices() (err error) { if srvMngr.GetConfig().ERsCfg().Enabled { go srvMngr.startService(utils.ERs) } + if srvMngr.GetConfig().DNSAgentCfg().Enabled { + go srvMngr.startService(utils.DNSAgent) + } // startServer() return } @@ -301,7 +304,7 @@ func (srvMngr *ServiceManager) handleReload() { for { select { case ext := <-srvMngr.engineShutdown: - for srviceName, srv := range srvMngr.subsystems { + for srviceName, srv := range srvMngr.subsystems { // gracefully stop all running subsystems if !srv.IsRunning() { continue } @@ -356,6 +359,10 @@ func (srvMngr *ServiceManager) handleReload() { if err = srvMngr.reloadService(utils.ERs, srvMngr.GetConfig().ERsCfg().Enabled); err != nil { return } + case <-srvMngr.GetConfig().GetReloadChan(config.DNSAgentJson): + if err = srvMngr.reloadService(utils.DNSAgent, srvMngr.GetConfig().DNSAgentCfg().Enabled); err != nil { + return + } } // handle RPC server }