diff --git a/agents/dnsagent.go b/agents/dnsagent.go
index 3772af60d..63c54055c 100644
--- a/agents/dnsagent.go
+++ b/agents/dnsagent.go
@@ -22,6 +22,7 @@ import (
"crypto/tls"
"fmt"
"strings"
+ "sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -39,6 +40,7 @@ func NewDNSAgent(cgrCfg *config.CGRConfig, fltrS *engine.FilterS,
// DNSAgent translates DNS requests towards CGRateS infrastructure
type DNSAgent struct {
+ sync.RWMutex
cgrCfg *config.CGRConfig // loaded CGRateS configuration
fltrS *engine.FilterS // connection towards FilterS
servers []*dns.Server
@@ -81,6 +83,9 @@ func (da *DNSAgent) ListenAndServe(stopChan chan struct{}) error {
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> error <%v>, on ListenAndServe <%s:%s>",
utils.DNSAgent, err, srv.Net, srv.Addr))
+ if strings.Contains(err.Error(), "address already in use") {
+ return
+ }
errChan <- err
}
}(server)
diff --git a/config/dnsagntcfg.go b/config/dnsagntcfg.go
index 22aa42a90..7f9d2e373 100644
--- a/config/dnsagntcfg.go
+++ b/config/dnsagntcfg.go
@@ -57,12 +57,6 @@ func (da *DNSAgentCfg) loadFromJSONCfg(jsnCfg *DNSAgentJsonCfg, sep string) (err
da.Listeners = append(da.Listeners, ls)
}
}
- // if jsnCfg.Listen_net != nil {
- // da.ListenNet = *jsnCfg.Listen_net
- // }
- // if jsnCfg.Listen != nil {
- // da.Listen = *jsnCfg.Listen
- // }
if jsnCfg.Timezone != nil {
da.Timezone = *jsnCfg.Timezone
}
@@ -110,9 +104,7 @@ func (lstn *Listener) AsMapInterface(separator string) map[string]any {
// AsMapInterface returns the config as a map[string]any
func (da *DNSAgentCfg) AsMapInterface(separator string) (initialMP map[string]any) {
initialMP = map[string]any{
- utils.EnabledCfg: da.Enabled,
- // utils.ListenCfg: da.Listen,
- // utils.ListenNetCfg: da.ListenNet,
+ utils.EnabledCfg: da.Enabled,
utils.TimezoneCfg: da.Timezone,
}
@@ -146,9 +138,7 @@ func (da DNSAgentCfg) Clone() (cln *DNSAgentCfg) {
cln = &DNSAgentCfg{
Enabled: da.Enabled,
Listeners: da.Listeners,
- // Listen: da.Listen,
- // ListenNet: da.ListenNet,
- Timezone: da.Timezone,
+ Timezone: da.Timezone,
}
if da.Listeners != nil {
diff --git a/data/conf/samples/dnsagent_internal/attributes.json b/data/conf/samples/dnsagent_internal/attributes.json
index e79f73516..0eb837549 100644
--- a/data/conf/samples/dnsagent_internal/attributes.json
+++ b/data/conf/samples/dnsagent_internal/attributes.json
@@ -1,5 +1,4 @@
{
-
"dns_agent": {
"request_processors": [
{
@@ -27,5 +26,4 @@
},
],
},
-
}
\ No newline at end of file
diff --git a/data/conf/samples/dnsagent_internal/cgrates.json b/data/conf/samples/dnsagent_internal/cgrates.json
index 2fd04cc6e..4d0a82c69 100644
--- a/data/conf/samples/dnsagent_internal/cgrates.json
+++ b/data/conf/samples/dnsagent_internal/cgrates.json
@@ -66,16 +66,13 @@
"dns_agent": {
"enabled": true,
"listeners":[
- {
- "address":":2053",
- "network":"tcp"
- },
{
"address":":2053",
"network":"udp"
- }
+ },
],
- "sessions_conns": ["*localhost"]
+ "sessions_conns": ["*localhost"],
+
},
diff --git a/data/conf/samples/dnsagent_internal/dryrun.json b/data/conf/samples/dnsagent_internal/dryrun.json
index 26535baec..375ebd4d6 100644
--- a/data/conf/samples/dnsagent_internal/dryrun.json
+++ b/data/conf/samples/dnsagent_internal/dryrun.json
@@ -1,5 +1,4 @@
{
-
"dns_agent": {
"request_processors": [
{
@@ -20,5 +19,4 @@
},
],
},
-
}
\ No newline at end of file
diff --git a/engine/storage_utils.go b/engine/storage_utils.go
index c834285a6..f97b388f9 100644
--- a/engine/storage_utils.go
+++ b/engine/storage_utils.go
@@ -75,7 +75,7 @@ func NewStorDBConn(dbType, host, port, name, user, pass, marshaler string,
case utils.MetaInternal:
db = NewInternalDB(stringIndexedFields, prefixIndexedFields, false, itmsCfg)
default:
- err = fmt.Errorf("unknown db 1'%s' valid options are [%s, %s, %s, %s]",
+ err = fmt.Errorf("unknown db '%s' valid options are [%s, %s, %s, %s]",
dbType, utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres, utils.MetaInternal)
}
return
diff --git a/services/dnsagent.go b/services/dnsagent.go
index c01b14ead..733b26aee 100644
--- a/services/dnsagent.go
+++ b/services/dnsagent.go
@@ -37,7 +37,6 @@ func NewDNSAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
cfg: cfg,
filterSChan: filterSChan,
shdChan: shdChan,
- stopChan: make(chan struct{}),
connMgr: connMgr,
srvDep: srvDep,
}
@@ -62,6 +61,7 @@ func (dns *DNSAgent) Start() (err error) {
if dns.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
+
filterS := <-dns.filterSChan
dns.filterSChan <- filterS
@@ -73,26 +73,38 @@ func (dns *DNSAgent) Start() (err error) {
dns.dns = nil
return
}
+ dns.stopChan = make(chan struct{})
go dns.listenAndServe(dns.stopChan)
return
}
// Reload handles the change of config
func (dns *DNSAgent) Reload() (err error) {
- if dns.IsRunning() {
- close(dns.stopChan)
- }
+ filterS := <-dns.filterSChan
+ dns.filterSChan <- filterS
+
dns.Lock()
defer dns.Unlock()
- if err = dns.dns.Reload(); err != nil {
+
+ dns.Shutdown()
+
+ dns.dns, err = agents.NewDNSAgent(dns.cfg, filterS, dns.connMgr)
+ if err != nil {
+ utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
+ dns.dns = nil
return
}
+
+ dns.dns.Lock()
+ defer dns.dns.Unlock()
dns.stopChan = make(chan struct{})
go dns.listenAndServe(dns.stopChan)
return
}
func (dns *DNSAgent) listenAndServe(stopChan chan struct{}) (err error) {
+ dns.dns.RLock()
+ defer dns.dns.RUnlock()
if err = dns.dns.ListenAndServe(stopChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
dns.shdChan.CloseOnce() // stop the engine here
@@ -102,9 +114,12 @@ func (dns *DNSAgent) listenAndServe(stopChan chan struct{}) (err error) {
// Shutdown stops the service
func (dns *DNSAgent) Shutdown() (err error) {
- dns.Lock()
- defer dns.Unlock()
+ if dns.dns == nil {
+ return
+ }
close(dns.stopChan)
+ dns.dns.Lock()
+ defer dns.dns.Unlock()
dns.dns = nil
return
}
diff --git a/services/dnsagent_it_test.go b/services/dnsagent_it_test.go
index c7ac11eb2..6b3d93786 100644
--- a/services/dnsagent_it_test.go
+++ b/services/dnsagent_it_test.go
@@ -21,6 +21,7 @@ along with this program. If not, see
package services
import (
+ "path"
"runtime"
"sync"
"testing"
@@ -69,11 +70,14 @@ func TestDNSAgentStartReloadShut(t *testing.T) {
NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db)
runtime.Gosched()
time.Sleep(10 * time.Millisecond) //need to switch to gorutine
-
+ if err := srv.Shutdown(); err != nil {
+ t.Error(err)
+ }
+ time.Sleep(10 * time.Millisecond)
if err := srv.Start(); err != nil {
t.Error(err)
}
- time.Sleep(1 * time.Millisecond)
+ time.Sleep(10 * time.Millisecond)
if err := srv.Reload(); err != nil {
t.Error(err)
}
@@ -87,93 +91,87 @@ func TestDNSAgentStartReloadShut(t *testing.T) {
}
}
-// func TestDNSAgentReloadFirst(t *testing.T) {
-// cfg := config.NewDefaultCGRConfig()
-// cfg.SessionSCfg().Enabled = true
-// cfg.SessionSCfg().ListenBijson = ""
-// utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID)
-// utils.Logger.SetLogLevel(7)
-// filterSChan := make(chan *engine.FilterS, 1)
-// filterSChan <- nil
-// shdChan := utils.NewSyncedChan()
-// defer func() {
-// shdChan.CloseOnce()
-// time.Sleep(10 * time.Millisecond)
-// }()
-// shdWg := new(sync.WaitGroup)
-// chS := engine.NewCacheS(cfg, nil, nil)
+func TestDNSAgentReloadFirst(t *testing.T) {
-// cacheSChan := make(chan rpcclient.ClientConnector, 1)
-// cacheSChan <- chS
+ cfg := config.NewDefaultCGRConfig()
+ cfg.SessionSCfg().Enabled = true
+ cfg.SessionSCfg().ListenBijson = ""
+ utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID)
+ utils.Logger.SetLogLevel(7)
+ filterSChan := make(chan *engine.FilterS, 1)
+ filterSChan <- nil
+ shdChan := utils.NewSyncedChan()
+ defer func() {
+ shdChan.CloseOnce()
+ time.Sleep(10 * time.Millisecond)
+ }()
+ shdWg := new(sync.WaitGroup)
+ chS := engine.NewCacheS(cfg, nil, nil)
-// server := cores.NewServer(nil)
-// srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, nil)
-// srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
-// db := NewDataDBService(cfg, nil, srvDep)
-// anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
-// sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1),
-// shdChan, nil, anz, srvDep)
-// srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep)
-// engine.NewConnManager(cfg, nil)
-// srvMngr.AddServices(srv, sS,
-// NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db)
-// if err := srvMngr.StartServices(); err != nil {
-// t.Fatal(err)
-// }
-// time.Sleep(1000 * time.Millisecond)
-// if srv.IsRunning() {
-// t.Fatalf("Expected service to be down")
-// }
-// var reply string
-// if err := cfg.V1ReloadConfig(&config.ReloadArgs{
-// Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "dnsagent_reload"),
-// Section: config.DNSAgentJson,
-// }, &reply); err != nil {
-// t.Fatal(err)
-// } else if reply != utils.OK {
-// t.Fatalf("Expecting OK ,received %s", reply)
-// }
-// runtime.Gosched()
-// time.Sleep(1000 * time.Millisecond) //need to switch to gorutine
-// if !srv.IsRunning() {
-// t.Fatalf("Expected service to be running")
-// }
-// err := srv.Start()
-// if err == nil || err != utils.ErrServiceAlreadyRunning {
-// t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err)
-// }
-// fmt.Println("1")
-// time.Sleep(1000 * time.Millisecond)
-// err = srv.Reload()
-// if err != nil {
-// t.Fatalf("\nExpecting ,\n Received <%+v>", err)
-// }
-// time.Sleep(10 * time.Second)
-// fmt.Println("2")
+ cacheSChan := make(chan rpcclient.ClientConnector, 1)
+ cacheSChan <- chS
-// if !srv.IsRunning() {
-// fmt.Println("2.2")
-// t.Fatalf("Expected service to be up")
-// }
-
-// err = srv.Reload()
-// if err != nil {
-// t.Fatalf("\nExpecting ,\n Received <%+v>", err)
-// }
-// fmt.Println("3")
-// cfg.DNSAgentCfg().Enabled = false
-// fmt.Println("4")
-// cfg.GetReloadChan(config.DNSAgentJson) <- struct{}{}
-// fmt.Println("5")
-// time.Sleep(1000 * time.Millisecond)
-// fmt.Println("6")
-// if srv.IsRunning() {
-// fmt.Println("7")
-// t.Fatalf("Expected service to be down")
-// }
-// fmt.Println("8")
-
-// }
+ server := cores.NewServer(nil)
+ srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, nil)
+ srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
+ db := NewDataDBService(cfg, nil, srvDep)
+ anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
+ sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1),
+ shdChan, nil, anz, srvDep)
+ srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep)
+ engine.NewConnManager(cfg, nil)
+ srvMngr.AddServices(srv, sS,
+ NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db)
+ if err := srvMngr.StartServices(); err != nil {
+ t.Fatal(err)
+ }
+ time.Sleep(100 * time.Millisecond)
+ if srv.IsRunning() {
+ t.Fatalf("Expected service to be down")
+ }
+ var reply string
+ if err := cfg.V1ReloadConfig(&config.ReloadArgs{
+ Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "dnsagent_reload"),
+ Section: config.DNSAgentJson,
+ }, &reply); err != nil {
+ t.Fatal(err)
+ } else if reply != utils.OK {
+ t.Fatalf("Expecting OK ,received %s", reply)
+ }
+ runtime.Gosched()
+ time.Sleep(10 * time.Millisecond) //need to switch to gorutine
+ if !srv.IsRunning() {
+ t.Fatalf("Expected service to be running")
+ }
+ err := srv.Start()
+ if err == nil || err != utils.ErrServiceAlreadyRunning {
+ t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err)
+ }
+ if err := cfg.V1ReloadConfig(&config.ReloadArgs{
+ Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "dnsagent_reload"),
+ Section: config.DNSAgentJson,
+ }, &reply); err != nil {
+ t.Fatal(err)
+ } else if reply != utils.OK {
+ t.Fatalf("Expecting OK ,received %s", reply)
+ }
+ if err := cfg.V1ReloadConfig(&config.ReloadArgs{
+ Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "dnsagent_reload"),
+ Section: config.DNSAgentJson,
+ }, &reply); err != nil {
+ t.Fatal(err)
+ } else if reply != utils.OK {
+ t.Fatalf("Expecting OK ,received %s", reply)
+ }
+ time.Sleep(10 * time.Millisecond)
+ cfg.DNSAgentCfg().Enabled = false
+ time.Sleep(10 * time.Millisecond)
+ cfg.GetReloadChan(config.DNSAgentJson) <- struct{}{}
+ time.Sleep(100 * time.Millisecond)
+ if srv.IsRunning() {
+ t.Fatalf("Expected service to be down")
+ }
+}
func TestDNSAgentReload2(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
@@ -196,9 +194,10 @@ func TestDNSAgentReload2(t *testing.T) {
runtime.Gosched()
dnsSrv := srv.(*DNSAgent)
dnsSrv.dns = agentSrv
+
err = dnsSrv.listenAndServe(make(chan struct{}))
if err == nil || err.Error() != "dns: bad network" {
- t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "dns: bad network", err)
+ t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "dns: bad network", err)
}
}
@@ -220,11 +219,13 @@ func TestDNSAgentReload4(t *testing.T) {
runtime.Gosched()
dnsSrv := srv.(*DNSAgent)
dnsSrv.dns = nil
+
err := dnsSrv.Start()
if err == nil || err.Error() != "open bad_certificate: no such file or directory" {
- t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "open bad_certificate: no such file or directory", err)
+ t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "open bad_certificate: no such file or directory", err)
}
dnsSrv.dns = nil
+
}
func TestDNSAgentReload5(t *testing.T) {
@@ -246,10 +247,12 @@ func TestDNSAgentReload5(t *testing.T) {
time.Sleep(10 * time.Millisecond)
runtime.Gosched()
runtime.Gosched()
+
err = srv.Reload()
if err != nil {
- t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
+ t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
}
+
}
func TestDNSAgentReload6(t *testing.T) {
@@ -263,20 +266,25 @@ func TestDNSAgentReload6(t *testing.T) {
filterSChan <- nil
shdChan := utils.NewSyncedChan()
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
- srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep)
cfg.DNSAgentCfg().Listeners[0].Address = "127.0.0.1:0"
+ srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep)
+ time.Sleep(10 * time.Millisecond)
+
err := srv.Start()
if err != nil {
- t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
+ t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
}
- cfg.DNSAgentCfg().Listeners[0].Network = "tls"
- cfg.TLSCfg().ServerCerificate = "bad_certificate"
- cfg.TLSCfg().ServerKey = "bad_key"
+
time.Sleep(10 * time.Millisecond)
runtime.Gosched()
runtime.Gosched()
+
+ cfg.DNSAgentCfg().Listeners[0].Network = "tls"
+ cfg.TLSCfg().ServerCerificate = "bad_certificate"
+ cfg.TLSCfg().ServerKey = "bad_key"
err = srv.Reload()
if err == nil || err.Error() != "open bad_certificate: no such file or directory" {
- t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "open bad_certificate: no such file or directory", err)
+ t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "open bad_certificate: no such file or directory", err)
}
+
}
diff --git a/services/stordb_it_test.go b/services/stordb_it_test.go
index 1d345ffa3..d70b7aebc 100644
--- a/services/stordb_it_test.go
+++ b/services/stordb_it_test.go
@@ -223,13 +223,13 @@ func TestStorDBReloadVersion1(t *testing.T) {
}
stordb.db = nil
err = stordb.Reload()
- if err == nil || err.Error() != "can't conver StorDB of type mongo to MongoStorage" {
+ if err == nil || err.Error() != "can't conver StorDB of type *mongo to MongoStorage" {
t.Fatal(err)
}
cfg.CdrsCfg().Enabled = false
err = stordb.Reload()
- if err == nil || err.Error() != "can't conver StorDB of type mongo to MongoStorage" {
+ if err == nil || err.Error() != "can't conver StorDB of type *mongo to MongoStorage" {
t.Fatal(err)
}
time.Sleep(10 * time.Millisecond)
@@ -305,13 +305,13 @@ func TestStorDBReloadVersion2(t *testing.T) {
}
stordb.db = nil
err = stordb.Reload()
- if err == nil || err.Error() != "can't conver StorDB of type mysql to SQLStorage" {
- t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "can't convert StorDB of type mysql to SQLStorage", err)
+ if err == nil || err.Error() != "can't conver StorDB of type *mysql to SQLStorage" {
+ t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "can't convert StorDB of type *mysql to SQLStorage", err)
}
cfg.CdrsCfg().Enabled = false
err = stordb.Reload()
- if err == nil || err.Error() != "can't conver StorDB of type mysql to SQLStorage" {
- t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "can't convert StorDB of type mysql to SQLStorage", err)
+ if err == nil || err.Error() != "can't conver StorDB of type *mysql to SQLStorage" {
+ t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "can't convert StorDB of type *mysql to SQLStorage", err)
time.Sleep(10 * time.Millisecond)
shdChan.CloseOnce()
@@ -376,7 +376,7 @@ func TestStorDBReloadVersion3(t *testing.T) {
stordb.oldDBCfg = cfg.StorDbCfg().Clone()
stordb.db = nil
err = stordb.Reload()
- if err == nil || err.Error() != "can't conver StorDB of type internal to InternalDB" {
+ if err == nil || err.Error() != "can't conver StorDB of type *internal to InternalDB" {
t.Fatal(err)
}
/* the internal now uses its own cache
@@ -416,8 +416,8 @@ func TestStorDBReloadNewStorDBConnError(t *testing.T) {
}
cfg.StorDbCfg().Type = "badType"
err := stordb.Reload()
- if err == nil || err.Error() != "unknown db 'badType' valid options are [mysql, mongo, postgres, internal]" {
- t.Errorf("\nExpecting <%+v>,\n Received <%+v>", "unknown db 'badType' valid options are [mysql, mongo, postgres, internal]", err)
+ if err == nil || err.Error() != "unknown db 'badType' valid options are [*mysql, *mongo, *postgres, *internal]" {
+ t.Errorf("\nExpecting <%+v>,\n Received <%+v>", "unknown db 'badType' valid options are [*mysql, *mongo, *postgres, *internal]", err)
}
shdChan.CloseOnce()
}
@@ -435,7 +435,7 @@ func TestStorDBReloadStartDBError(t *testing.T) {
stordb := NewStorDBService(cfg, srvDep)
cfg.StorDbCfg().Type = "badType"
err := stordb.Start()
- expected := "unknown db 'badType' valid options are [mysql, mongo, postgres, internal]"
+ expected := "unknown db 'badType' valid options are [*mysql, *mongo, *postgres, *internal]"
if err == nil || err.Error() != expected {
t.Errorf("\nExpecting <%+v>,\n Received <%+v>", expected, err)
}