Revise DNS Agent and Service

This commit is contained in:
arberkatellari
2023-08-01 10:59:18 -04:00
committed by Dan Christian Bogos
parent 990feb0f26
commit 6eda4303f5
9 changed files with 294 additions and 109 deletions

View File

@@ -22,6 +22,7 @@ import (
"crypto/tls"
"fmt"
"strings"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
@@ -40,39 +41,67 @@ func NewDNSAgent(cgrCfg *config.CGRConfig, fltrS *engine.FilterS,
// DNSAgent translates DNS requests towards CGRateS infrastructure
type DNSAgent struct {
sync.RWMutex
cgrCfg *config.CGRConfig // loaded CGRateS configuration
fltrS *engine.FilterS // connection towards FilterS
server *dns.Server
servers []*dns.Server
connMgr *engine.ConnManager
}
// initDNSServer instantiates the DNS server
func (da *DNSAgent) initDNSServer() (_ error) {
da.server = &dns.Server{
Addr: da.cgrCfg.DNSAgentCfg().Listen,
Net: da.cgrCfg.DNSAgentCfg().ListenNet,
Handler: dns.HandlerFunc(func(w dns.ResponseWriter, m *dns.Msg) {
go da.handleMessage(w, m)
}),
}
if strings.HasSuffix(da.cgrCfg.DNSAgentCfg().ListenNet, utils.TLSNoCaps) {
cert, err := tls.LoadX509KeyPair(da.cgrCfg.TLSCfg().ServerCerificate, da.cgrCfg.TLSCfg().ServerKey)
if err != nil {
return err
}
da.server.Net = "tcp-tls"
da.server.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
da.servers = make([]*dns.Server, 0, len(da.cgrCfg.DNSAgentCfg().Listeners))
for i := range da.cgrCfg.DNSAgentCfg().Listeners {
da.servers = append(da.servers, &dns.Server{
Addr: da.cgrCfg.DNSAgentCfg().Listeners[i].Address,
Net: da.cgrCfg.DNSAgentCfg().Listeners[i].Network,
Handler: dns.HandlerFunc(func(w dns.ResponseWriter, m *dns.Msg) {
go da.handleMessage(w, m)
}),
})
if strings.HasSuffix(da.cgrCfg.DNSAgentCfg().Listeners[i].Network, utils.TLSNoCaps) {
cert, err := tls.LoadX509KeyPair(da.cgrCfg.TLSCfg().ServerCerificate, da.cgrCfg.TLSCfg().ServerKey)
if err != nil {
return err
}
da.servers[i].Net = "tcp-tls"
da.servers[i].TLSConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
}
}
}
return
}
// ListenAndServe will run the DNS handler doing also the connection to listen address
func (da *DNSAgent) ListenAndServe() (err error) {
utils.Logger.Info(fmt.Sprintf("<%s> start listening on <%s:%s>",
utils.DNSAgent, da.cgrCfg.DNSAgentCfg().ListenNet, da.cgrCfg.DNSAgentCfg().Listen))
return da.server.ListenAndServe()
func (da *DNSAgent) ListenAndServe(stopChan chan struct{}) error {
errChan := make(chan error)
for _, server := range da.servers {
utils.Logger.Info(fmt.Sprintf("<%s> start listening on <%s:%s>",
utils.DNSAgent, server.Net, server.Addr))
go func(srv *dns.Server) {
err := srv.ListenAndServe()
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> error <%v>, on ListenAndServe <%s:%s>",
utils.DNSAgent, err, srv.Net, srv.Addr))
if strings.Contains(err.Error(), "address already in use") {
return
}
errChan <- err
}
}(server)
}
select {
case <-stopChan:
return da.Shutdown()
case err := <-errChan:
if shtdErr := da.Shutdown(); shtdErr != nil {
return shtdErr
}
return err
}
}
// Reload will reinitialize the server
@@ -107,7 +136,19 @@ func (da *DNSAgent) handleMessage(w dns.ResponseWriter, req *dns.Msg) {
// Shutdown stops the DNS server
func (da *DNSAgent) Shutdown() error {
return da.server.Shutdown()
var err error
for _, server := range da.servers {
shtdErr := server.Shutdown()
if shtdErr == nil {
continue
}
utils.Logger.Warning(fmt.Sprintf("<%s> error <%v>, on Shutdown <%s:%s>",
utils.DNSAgent, shtdErr, server.Net, server.Addr))
if shtdErr.Error() != "dns: server not started" {
err = shtdErr
}
}
return err
}
// handleMessage is the entry point of all DNS requests

View File

@@ -985,8 +985,12 @@ const CGRATES_CFG_JSON = `
"dns_agent": {
"enabled": false, // enables the DNS agent: <true|false>
"listen": "127.0.0.1:2053", // address where to listen for DNS requests <x.y.z.y:1234>
"listen_net": "udp", // network to listen on <udp|tcp|tcp-tls>
"listeners":[
{
"address": "127.0.0.1:53", // address where to listen for DNS requests <x.y.z.y:1234>
"network": "udp" // network to listen on <udp|tcp|tcp-tls>
}
],
"sessions_conns": ["*internal"],
"timezone": "", // timezone of the events if not specified <UTC|Local|$IANA_TZ_DB>
"request_processors": [ // request processors to be applied to DNS messages

View File

@@ -536,9 +536,13 @@ func testCGRConfigReloadDNSAgent(t *testing.T) {
t.Errorf("Expected OK received: %s", reply)
}
expAttr := &DNSAgentCfg{
Enabled: true,
Listen: ":2053",
ListenNet: "udp",
Enabled: true,
Listeners: []Listener{
{
Address: ":2053",
Network: "udp",
},
},
SessionSConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS)},
// Timezone string
// RequestProcessors []*RequestProcessor

View File

@@ -757,9 +757,14 @@ func TestHttpAgentJsonCfg(t *testing.T) {
func TestDNSAgentJsonCfg(t *testing.T) {
eCfg := &DNSAgentJsonCfg{
Enabled: utils.BoolPointer(false),
Listen_net: utils.StringPointer("udp"),
Listen: utils.StringPointer("127.0.0.1:2053"),
Enabled: utils.BoolPointer(false),
Listeners: &[]*ListenerJsnCfg{
{
Network: utils.StringPointer("udp"),
Address: utils.StringPointer("127.0.0.1:53"),
},
},
Sessions_conns: &[]string{utils.ConcatenatedKey(utils.MetaInternal)},
Timezone: utils.StringPointer(""),
Request_processors: &[]*ReqProcessorJsnCfg{},

File diff suppressed because one or more lines are too long

View File

@@ -26,9 +26,13 @@ import (
func TestDNSAgentCfgloadFromJsonCfg(t *testing.T) {
jsnCfg := &DNSAgentJsonCfg{
Enabled: utils.BoolPointer(true),
Listen: utils.StringPointer("127.0.0.1:2053"),
Listen_net: utils.StringPointer("udp"),
Enabled: utils.BoolPointer(true),
Listeners: &[]*ListenerJsnCfg{
{
Address: utils.StringPointer("127.0.0.1:2053"),
Network: utils.StringPointer("udp"),
},
},
Sessions_conns: &[]string{utils.MetaInternal, "*conn1"},
Timezone: utils.StringPointer("UTC"),
Request_processors: &[]*ReqProcessorJsnCfg{
@@ -46,9 +50,13 @@ func TestDNSAgentCfgloadFromJsonCfg(t *testing.T) {
},
}
expected := &DNSAgentCfg{
Enabled: true,
Listen: "127.0.0.1:2053",
ListenNet: "udp",
Enabled: true,
Listeners: []Listener{
{
Address: "127.0.0.1:2053",
Network: "udp",
},
},
SessionSConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS), "*conn1"},
Timezone: "UTC",
RequestProcessors: []*RequestProcessor{
@@ -192,17 +200,23 @@ func TestDNSAgentCfgAsMapInterface(t *testing.T) {
cfgJSONStr := `{
"dns_agent": {
"enabled": false,
"listen": "127.0.0.1:2053",
"listen_net": "udp",
"listeners":[
{
"address": "127.0.0.1:2053",
"network": "udp"
}
],
"sessions_conns": ["*internal"],
"timezone": "",
"request_processors": [],
},
}`
eMap := map[string]any{
utils.EnabledCfg: false,
utils.ListenCfg: "127.0.0.1:2053",
utils.ListenNetCfg: "udp",
utils.EnabledCfg: false,
utils.ListenersCfg: []map[string]any{{
utils.AddressCfg: "127.0.0.1:2053",
utils.NetworkCfg: "udp",
}},
utils.SessionSConnsCfg: []string{"*internal"},
utils.TimezoneCfg: "",
utils.RequestProcessorsCfg: []map[string]any{},
@@ -218,8 +232,12 @@ func TestDNSAgentCfgAsMapInterface1(t *testing.T) {
cfgJSONStr := `{
"dns_agent": {
"enabled": false,
"listen": "127.0.0.1:2053",
"listen_net": "udp",
"listeners":[
{
"address": "127.0.0.1:2053",
"network": "udp"
}
],
"sessions_conns": ["*internal:*sessions", "*conn1"],
"timezone": "UTC",
"request_processors": [
@@ -247,9 +265,13 @@ func TestDNSAgentCfgAsMapInterface1(t *testing.T) {
},
}`
eMap := map[string]any{
utils.EnabledCfg: false,
utils.ListenCfg: "127.0.0.1:2053",
utils.ListenNetCfg: "udp",
utils.EnabledCfg: false,
utils.ListenersCfg: []map[string]any{
{
utils.AddressCfg: "127.0.0.1:2053",
utils.NetworkCfg: "udp",
},
},
utils.SessionSConnsCfg: []string{utils.MetaInternal, "*conn1"},
utils.TimezoneCfg: "UTC",
utils.RequestProcessorsCfg: []map[string]any{
@@ -303,9 +325,13 @@ func TestRequestProcessorClone(t *testing.T) {
func TestDNSAgentCfgClone(t *testing.T) {
ban := &DNSAgentCfg{
Enabled: true,
Listen: "127.0.0.1:2053",
ListenNet: "udp",
Enabled: true,
Listeners: []Listener{
{
Address: "127.0.0.1:2053",
Network: "udp",
},
},
SessionSConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS), "*conn1"},
Timezone: "UTC",
RequestProcessors: []*RequestProcessor{
@@ -337,18 +363,26 @@ func TestDiffDNSAgentJsonCfg(t *testing.T) {
var d *DNSAgentJsonCfg
v1 := &DNSAgentCfg{
Enabled: false,
Listen: "localhost:8080",
ListenNet: "tcp",
Enabled: false,
Listeners: []Listener{
{
Address: "localhost:8080",
Network: "tcp",
},
},
SessionSConns: []string{"*localhost"},
Timezone: "UTC",
RequestProcessors: []*RequestProcessor{},
}
v2 := &DNSAgentCfg{
Enabled: true,
Listen: "localhost:8037",
ListenNet: "udp",
Enabled: true,
Listeners: []Listener{
{
Address: "localhost:8037",
Network: "udp",
},
},
SessionSConns: []string{"*birpc"},
Timezone: "EEST",
RequestProcessors: []*RequestProcessor{
@@ -359,9 +393,13 @@ func TestDiffDNSAgentJsonCfg(t *testing.T) {
}
expected := &DNSAgentJsonCfg{
Enabled: utils.BoolPointer(true),
Listen: utils.StringPointer("localhost:8037"),
Listen_net: utils.StringPointer("udp"),
Enabled: utils.BoolPointer(true),
Listeners: &[]*ListenerJsnCfg{
{
Address: utils.StringPointer("localhost:8037"),
Network: utils.StringPointer("udp"),
},
},
Sessions_conns: &[]string{"*birpc"},
Timezone: utils.StringPointer("EEST"),
Request_processors: &[]*ReqProcessorJsnCfg{
@@ -378,29 +416,38 @@ func TestDiffDNSAgentJsonCfg(t *testing.T) {
v2_2 := v1
expected2 := &DNSAgentJsonCfg{
Listeners: &[]*ListenerJsnCfg{},
Request_processors: &[]*ReqProcessorJsnCfg{},
}
rcv = diffDNSAgentJsonCfg(d, v1, v2_2, ";")
if !reflect.DeepEqual(rcv, expected2) {
t.Errorf("Expected %v \n but received \n %v", utils.ToJSON(expected2), utils.ToJSON(rcv))
t.Errorf("Expected %v \n but received \n %v", expected2, rcv)
}
}
func TestDnsAgentCloneSection(t *testing.T) {
dnsCfg := &DNSAgentCfg{
Enabled: false,
Listen: "localhost:8080",
ListenNet: "tcp",
Enabled: false,
Listeners: []Listener{
{
Address: "localhost:8080",
Network: "tcp",
},
},
SessionSConns: []string{"*localhost"},
Timezone: "UTC",
RequestProcessors: []*RequestProcessor{},
}
exp := &DNSAgentCfg{
Enabled: false,
Listen: "localhost:8080",
ListenNet: "tcp",
Enabled: false,
Listeners: []Listener{
{
Address: "localhost:8080",
Network: "tcp",
},
},
SessionSConns: []string{"*localhost"},
Timezone: "UTC",
RequestProcessors: []*RequestProcessor{},

View File

@@ -23,11 +23,15 @@ import (
"github.com/cgrates/cgrates/utils"
)
type Listener struct {
Address string
Network string // udp or tcp
}
// DNSAgentCfg the config section that describes the DNS Agent
type DNSAgentCfg struct {
Enabled bool
Listen string
ListenNet string // udp or tcp
Listeners []Listener
SessionSConns []string
Timezone string
RequestProcessors []*RequestProcessor
@@ -49,11 +53,18 @@ func (da *DNSAgentCfg) loadFromJSONCfg(jsnCfg *DNSAgentJsonCfg, sep string) (err
if jsnCfg.Enabled != nil {
da.Enabled = *jsnCfg.Enabled
}
if jsnCfg.Listen_net != nil {
da.ListenNet = *jsnCfg.Listen_net
}
if jsnCfg.Listen != nil {
da.Listen = *jsnCfg.Listen
if jsnCfg.Listeners != nil {
da.Listeners = make([]Listener, 0, len(*jsnCfg.Listeners))
for _, listnr := range *jsnCfg.Listeners {
var ls Listener
if listnr.Address != nil {
ls.Address = *listnr.Address
}
if listnr.Network != nil {
ls.Network = *listnr.Network
}
da.Listeners = append(da.Listeners, ls)
}
}
if jsnCfg.Timezone != nil {
da.Timezone = *jsnCfg.Timezone
@@ -65,15 +76,26 @@ func (da *DNSAgentCfg) loadFromJSONCfg(jsnCfg *DNSAgentJsonCfg, sep string) (err
return
}
func (lstn *Listener) AsMapInterface(separator string) map[string]any {
return map[string]any{
utils.AddressCfg: lstn.Address,
utils.NetworkCfg: lstn.Network,
}
}
// AsMapInterface returns the config as a map[string]any
func (da DNSAgentCfg) AsMapInterface(separator string) any {
mp := map[string]any{
utils.EnabledCfg: da.Enabled,
utils.ListenCfg: da.Listen,
utils.ListenNetCfg: da.ListenNet,
utils.TimezoneCfg: da.Timezone,
utils.EnabledCfg: da.Enabled,
utils.TimezoneCfg: da.Timezone,
}
listeners := make([]map[string]any, len(da.Listeners))
for i, item := range da.Listeners {
listeners[i] = item.AsMapInterface(separator)
}
mp[utils.ListenersCfg] = listeners
requestProcessors := make([]map[string]any, len(da.RequestProcessors))
for i, item := range da.RequestProcessors {
requestProcessors[i] = item.AsMapInterface(separator)
@@ -93,10 +115,14 @@ func (da DNSAgentCfg) CloneSection() Section { return da.Clone() }
func (da DNSAgentCfg) Clone() (cln *DNSAgentCfg) {
cln = &DNSAgentCfg{
Enabled: da.Enabled,
Listen: da.Listen,
ListenNet: da.ListenNet,
Listeners: da.Listeners,
Timezone: da.Timezone,
}
if da.Listeners != nil {
cln.Listeners = make([]Listener, len(da.Listeners))
copy(cln.Listeners, da.Listeners)
}
if da.SessionSConns != nil {
cln.SessionSConns = utils.CloneStringSlice(da.SessionSConns)
}
@@ -109,11 +135,15 @@ func (da DNSAgentCfg) Clone() (cln *DNSAgentCfg) {
return
}
type ListenerJsnCfg struct {
Address *string
Network *string
}
// DNSAgentJsonCfg
type DNSAgentJsonCfg struct {
Enabled *bool
Listen *string
Listen_net *string
Listeners *[]*ListenerJsnCfg
Sessions_conns *[]string
Timezone *string
Request_processors *[]*ReqProcessorJsnCfg
@@ -126,12 +156,44 @@ func diffDNSAgentJsonCfg(d *DNSAgentJsonCfg, v1, v2 *DNSAgentCfg, separator stri
if v1.Enabled != v2.Enabled {
d.Enabled = utils.BoolPointer(v2.Enabled)
}
if v1.Listen != v2.Listen {
d.Listen = utils.StringPointer(v2.Listen)
minLen := len(v1.Listeners)
if len(v2.Listeners) < minLen {
minLen = len(v2.Listeners)
}
if v1.ListenNet != v2.ListenNet {
d.Listen_net = utils.StringPointer(v2.ListenNet)
diffListeners := &[]*ListenerJsnCfg{}
for i := 0; i < minLen; i++ {
if v1.Listeners[i].Address != v2.Listeners[i].Address ||
v1.Listeners[i].Network != v2.Listeners[i].Network {
*diffListeners = append(*diffListeners, &ListenerJsnCfg{
Address: utils.StringPointer(v2.Listeners[i].Address),
Network: utils.StringPointer(v2.Listeners[i].Network),
})
}
}
if len(v1.Listeners) > minLen {
for i := minLen; i < len(v1.Listeners); i++ {
*diffListeners = append(*diffListeners, &ListenerJsnCfg{
Address: utils.StringPointer(v1.Listeners[i].Address),
Network: utils.StringPointer(v1.Listeners[i].Network),
})
}
}
if len(v2.Listeners) > minLen {
for i := minLen; i < len(v2.Listeners); i++ {
*diffListeners = append(*diffListeners, &ListenerJsnCfg{
Address: utils.StringPointer(v2.Listeners[i].Address),
Network: utils.StringPointer(v2.Listeners[i].Network),
})
}
}
d.Listeners = diffListeners
if !utils.SliceStringEqual(v1.SessionSConns, v2.SessionSConns) {
d.Sessions_conns = utils.SliceStringPointer(getBiRPCInternalJSONConns(v2.SessionSConns))
}

View File

@@ -48,14 +48,14 @@ type DNSAgent struct {
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
stopChan chan struct{}
dns *agents.DNSAgent
connMgr *engine.ConnManager
srvDep map[string]*sync.WaitGroup
oldListen string
}
// Start should handle the sercive start
// Start should handle the service start
func (dns *DNSAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err error) {
if dns.IsRunning() {
return utils.ErrServiceAlreadyRunning
@@ -67,37 +67,45 @@ func (dns *DNSAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err
dns.Lock()
defer dns.Unlock()
dns.oldListen = dns.cfg.DNSAgentCfg().Listen
dns.dns, err = agents.NewDNSAgent(dns.cfg, filterS, dns.connMgr)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
dns.dns = nil
return
}
go dns.listenAndServe(shtDwn)
dns.stopChan = make(chan struct{})
go dns.listenAndServe(dns.stopChan, shtDwn)
return
}
// Reload handles the change of config
func (dns *DNSAgent) Reload(ctx *context.Context, shtDwn context.CancelFunc) (err error) {
if dns.oldListen == dns.cfg.DNSAgentCfg().Listen {
return
}
filterS := <-dns.filterSChan
dns.filterSChan <- filterS
dns.Lock()
defer dns.Unlock()
if err = dns.dns.Shutdown(); err != nil {
dns.Shutdown()
dns.dns, err = agents.NewDNSAgent(dns.cfg, filterS, dns.connMgr)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
dns.dns = nil
return
}
dns.oldListen = dns.cfg.DNSAgentCfg().Listen
if err = dns.dns.Reload(); err != nil {
return
}
go dns.listenAndServe(shtDwn)
dns.dns.Lock()
defer dns.dns.Unlock()
dns.stopChan = make(chan struct{})
go dns.listenAndServe(dns.stopChan, shtDwn)
return
}
func (dns *DNSAgent) listenAndServe(shtDwn context.CancelFunc) (err error) {
if err = dns.dns.ListenAndServe(); err != nil {
func (dns *DNSAgent) listenAndServe(stopChan chan struct{}, shtDwn context.CancelFunc) (err error) {
dns.dns.RLock()
defer dns.dns.RUnlock()
if err = dns.dns.ListenAndServe(stopChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
shtDwn() // stop the engine here
}

View File

@@ -1974,6 +1974,8 @@ const (
// DiameterAgentCfg
ListenNetCfg = "listen_net"
NetworkCfg = "network"
ListenersCfg = "listeners"
ConcurrentRequestsCfg = "concurrent_requests"
ListenCfg = "listen"
DictionariesPathCfg = "dictionaries_path"