Updated DNS Agent

This commit is contained in:
Trial97
2019-10-04 11:26:32 +03:00
committed by Dan Christian Bogos
parent cc00a80f51
commit bf369cbc03
4 changed files with 51 additions and 42 deletions

View File

@@ -709,9 +709,9 @@ func main() {
srvManager.AddServices(attrS, chrS, tS, stS, reS, supS, schS, rals,
rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg,
services.NewEventReaderService(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan),
services.NewDNSAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan),
)
/*
services.NewDNSAgent(),
services.NewFreeswitchAgent(),
services.NewKamailioAgent(),
services.NewAsteriskAgent(), // partial reload

View File

@@ -23,6 +23,8 @@ import (
"sync"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
@@ -30,46 +32,57 @@ import (
)
// NewDNSAgent returns the DNS Agent
func NewDNSAgent() servmanager.Service {
return new(DNSAgent)
func NewDNSAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
sSChan, dispatcherChan chan rpcclient.RpcClientConnection,
exitChan chan bool) servmanager.Service {
return &DNSAgent{
cfg: cfg,
filterSChan: filterSChan,
sSChan: sSChan,
dispatcherChan: dispatcherChan,
exitChan: exitChan,
}
}
// DNSAgent implements Agent interface
type DNSAgent struct {
sync.RWMutex
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
sSChan chan rpcclient.RpcClientConnection
dispatcherChan chan rpcclient.RpcClientConnection
exitChan chan bool
dns *agents.DNSAgent
}
// Start should handle the sercive start
func (dns *DNSAgent) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
func (dns *DNSAgent) Start() (err error) {
if dns.IsRunning() {
return fmt.Errorf("service aleady running")
}
filterS := <-dns.filterSChan
dns.filterSChan <- filterS
dns.Lock()
defer dns.Unlock()
// var sSInternal bool
var sS rpcclient.RpcClientConnection
utils.Logger.Info(fmt.Sprintf("starting %s service", utils.DNSAgent))
if !sp.GetConfig().DispatcherSCfg().Enabled && sp.GetConfig().DNSAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
if !dns.cfg.DispatcherSCfg().Enabled && dns.cfg.DNSAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
// sSInternal = true
srvSessionS, has := sp.GetService(utils.SessionS)
if !has {
utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>",
utils.DNSAgent, utils.SessionS))
return utils.ErrNotFound
}
sSIntConn := <-srvSessionS.GetIntenternalChan()
srvSessionS.GetIntenternalChan() <- sSIntConn
sSIntConn := <-dns.sSChan
dns.sSChan <- sSIntConn
sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS))
} else {
if sS, err = sp.NewConnection(utils.SessionS, sp.GetConfig().DNSAgentCfg().SessionSConns); err != nil {
if sS, err = NewConnection(dns.cfg, dns.sSChan, dns.dispatcherChan, dns.cfg.DNSAgentCfg().SessionSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.DNSAgent, utils.SessionS, err.Error()))
return
}
}
dns.dns, err = agents.NewDNSAgent(sp.GetConfig(), sp.GetFilterS(), sS)
dns.dns, err = agents.NewDNSAgent(dns.cfg, filterS, sS)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
return
@@ -88,7 +101,7 @@ func (dns *DNSAgent) Start(sp servmanager.ServiceProvider, waitCache bool) (err
go func() {
if err = dns.dns.ListenAndServe(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
sp.GetExitChan() <- true // stop the engine here
dns.exitChan <- true // stop the engine here
}
}()
return
@@ -101,21 +114,15 @@ func (dns *DNSAgent) GetIntenternalChan() (conn chan rpcclient.RpcClientConnecti
}
// Reload handles the change of config
func (dns *DNSAgent) Reload(sp servmanager.ServiceProvider) (err error) {
func (dns *DNSAgent) Reload() (err error) {
var sS rpcclient.RpcClientConnection
if !sp.GetConfig().DispatcherSCfg().Enabled && sp.GetConfig().DNSAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
if !dns.cfg.DispatcherSCfg().Enabled && dns.cfg.DNSAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
// sSInternal = true
srvSessionS, has := sp.GetService(utils.SessionS)
if !has {
utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>",
utils.DNSAgent, utils.SessionS))
return utils.ErrNotFound
}
sSIntConn := <-srvSessionS.GetIntenternalChan()
srvSessionS.GetIntenternalChan() <- sSIntConn
sSIntConn := <-dns.sSChan
dns.sSChan <- sSIntConn
sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS))
} else {
if sS, err = sp.NewConnection(utils.SessionS, sp.GetConfig().DNSAgentCfg().SessionSConns); err != nil {
if sS, err = NewConnection(dns.cfg, dns.sSChan, dns.dispatcherChan, dns.cfg.DNSAgentCfg().SessionSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.DNSAgent, utils.SessionS, err.Error()))
return
@@ -133,7 +140,7 @@ func (dns *DNSAgent) Reload(sp servmanager.ServiceProvider) (err error) {
go func() {
if err := dns.dns.ListenAndServe(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
sp.GetExitChan() <- true // stop the engine here
dns.exitChan <- true // stop the engine here
}
}()
return
@@ -150,11 +157,6 @@ func (dns *DNSAgent) Shutdown() (err error) {
return
}
// GetRPCInterface returns the interface to register for server
func (dns *DNSAgent) GetRPCInterface() interface{} {
return dns.dns
}
// IsRunning returns if the service is running
func (dns *DNSAgent) IsRunning() bool {
dns.RLock()
@@ -166,3 +168,8 @@ func (dns *DNSAgent) IsRunning() bool {
func (dns *DNSAgent) ServiceName() string {
return utils.DNSAgent
}
// ShouldRun returns if the service should be running
func (dns *DNSAgent) ShouldRun() bool {
return dns.cfg.DNSAgentCfg().Enabled
}

View File

@@ -53,8 +53,10 @@ func TestDNSAgentReload(t *testing.T) {
/*loadStorage*/ nil, filterSChan,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
srv := NewDNSAgent()
srvMngr.AddService(srv, NewSessionService(), &CacheService{connChan: cacheSChan})
sS := NewSessionService(cfg, nil, server, nil,
nil, nil, nil, nil, nil, nil, nil, nil, engineShutdown)
srv := NewDNSAgent(cfg, filterSChan, sS.GetIntenternalChan(), nil, engineShutdown)
srvMngr.AddServices(srv, sS)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -273,10 +273,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
}
if srvMngr.GetConfig().ERsCfg().Enabled {
go srvMngr.startService(utils.ERs)
}
if srvMngr.GetConfig().DNSAgentCfg().Enabled {
go srvMngr.startService(utils.DNSAgent)
} /*
if srvMngr.GetConfig().DNSAgentCfg().Enabled {
go srvMngr.startService(utils.DNSAgent)
}
if srvMngr.GetConfig().FsAgentCfg().Enabled {
go srvMngr.startService(utils.FreeSWITCHAgent)
}
@@ -375,11 +375,11 @@ func (srvMngr *ServiceManager) handleReload() {
case <-srvMngr.GetConfig().GetReloadChan(config.ERsJson):
if err = srvMngr.reloadService(utils.ERs); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.DNSAgentJson):
if err = srvMngr.reloadService(utils.DNSAgent); err != nil {
return
} /*
case <-srvMngr.GetConfig().GetReloadChan(config.DNSAgentJson):
if err = srvMngr.reloadService(utils.DNSAgent); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.FreeSWITCHAgentJSN):
if err = srvMngr.reloadService(utils.FreeSWITCHAgent); err != nil {
return