From 5f04bcfe6684002b5bb8dae8895276c9c9cc98e4 Mon Sep 17 00:00:00 2001 From: arberkatellari Date: Wed, 21 Jun 2023 11:57:12 -0400 Subject: [PATCH] Add locks for dns service and agent --- agents/dnsagent.go | 5 + config/dnsagntcfg.go | 14 +- .../samples/dnsagent_internal/attributes.json | 2 - .../samples/dnsagent_internal/cgrates.json | 9 +- .../samples/dnsagent_internal/dryrun.json | 2 - engine/storage_utils.go | 2 +- services/dnsagent.go | 29 ++- services/dnsagent_it_test.go | 198 +++++++++--------- services/stordb_it_test.go | 20 +- 9 files changed, 146 insertions(+), 135 deletions(-) diff --git a/agents/dnsagent.go b/agents/dnsagent.go index 3772af60d..63c54055c 100644 --- a/agents/dnsagent.go +++ b/agents/dnsagent.go @@ -22,6 +22,7 @@ import ( "crypto/tls" "fmt" "strings" + "sync" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -39,6 +40,7 @@ func NewDNSAgent(cgrCfg *config.CGRConfig, fltrS *engine.FilterS, // DNSAgent translates DNS requests towards CGRateS infrastructure type DNSAgent struct { + sync.RWMutex cgrCfg *config.CGRConfig // loaded CGRateS configuration fltrS *engine.FilterS // connection towards FilterS servers []*dns.Server @@ -81,6 +83,9 @@ func (da *DNSAgent) ListenAndServe(stopChan chan struct{}) error { if err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> error <%v>, on ListenAndServe <%s:%s>", utils.DNSAgent, err, srv.Net, srv.Addr)) + if strings.Contains(err.Error(), "address already in use") { + return + } errChan <- err } }(server) diff --git a/config/dnsagntcfg.go b/config/dnsagntcfg.go index 22aa42a90..7f9d2e373 100644 --- a/config/dnsagntcfg.go +++ b/config/dnsagntcfg.go @@ -57,12 +57,6 @@ func (da *DNSAgentCfg) loadFromJSONCfg(jsnCfg *DNSAgentJsonCfg, sep string) (err da.Listeners = append(da.Listeners, ls) } } - // if jsnCfg.Listen_net != nil { - // da.ListenNet = *jsnCfg.Listen_net - // } - // if jsnCfg.Listen != nil { - // da.Listen = *jsnCfg.Listen - // } if jsnCfg.Timezone != nil { da.Timezone = *jsnCfg.Timezone } @@ -110,9 +104,7 @@ func (lstn *Listener) AsMapInterface(separator string) map[string]any { // AsMapInterface returns the config as a map[string]any func (da *DNSAgentCfg) AsMapInterface(separator string) (initialMP map[string]any) { initialMP = map[string]any{ - utils.EnabledCfg: da.Enabled, - // utils.ListenCfg: da.Listen, - // utils.ListenNetCfg: da.ListenNet, + utils.EnabledCfg: da.Enabled, utils.TimezoneCfg: da.Timezone, } @@ -146,9 +138,7 @@ func (da DNSAgentCfg) Clone() (cln *DNSAgentCfg) { cln = &DNSAgentCfg{ Enabled: da.Enabled, Listeners: da.Listeners, - // Listen: da.Listen, - // ListenNet: da.ListenNet, - Timezone: da.Timezone, + Timezone: da.Timezone, } if da.Listeners != nil { diff --git a/data/conf/samples/dnsagent_internal/attributes.json b/data/conf/samples/dnsagent_internal/attributes.json index e79f73516..0eb837549 100644 --- a/data/conf/samples/dnsagent_internal/attributes.json +++ b/data/conf/samples/dnsagent_internal/attributes.json @@ -1,5 +1,4 @@ { - "dns_agent": { "request_processors": [ { @@ -27,5 +26,4 @@ }, ], }, - } \ No newline at end of file diff --git a/data/conf/samples/dnsagent_internal/cgrates.json b/data/conf/samples/dnsagent_internal/cgrates.json index 2fd04cc6e..4d0a82c69 100644 --- a/data/conf/samples/dnsagent_internal/cgrates.json +++ b/data/conf/samples/dnsagent_internal/cgrates.json @@ -66,16 +66,13 @@ "dns_agent": { "enabled": true, "listeners":[ - { - "address":":2053", - "network":"tcp" - }, { "address":":2053", "network":"udp" - } + }, ], - "sessions_conns": ["*localhost"] + "sessions_conns": ["*localhost"], + }, diff --git a/data/conf/samples/dnsagent_internal/dryrun.json b/data/conf/samples/dnsagent_internal/dryrun.json index 26535baec..375ebd4d6 100644 --- a/data/conf/samples/dnsagent_internal/dryrun.json +++ b/data/conf/samples/dnsagent_internal/dryrun.json @@ -1,5 +1,4 @@ { - "dns_agent": { "request_processors": [ { @@ -20,5 +19,4 @@ }, ], }, - } \ No newline at end of file diff --git a/engine/storage_utils.go b/engine/storage_utils.go index c834285a6..f97b388f9 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -75,7 +75,7 @@ func NewStorDBConn(dbType, host, port, name, user, pass, marshaler string, case utils.MetaInternal: db = NewInternalDB(stringIndexedFields, prefixIndexedFields, false, itmsCfg) default: - err = fmt.Errorf("unknown db 1'%s' valid options are [%s, %s, %s, %s]", + err = fmt.Errorf("unknown db '%s' valid options are [%s, %s, %s, %s]", dbType, utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres, utils.MetaInternal) } return diff --git a/services/dnsagent.go b/services/dnsagent.go index c01b14ead..733b26aee 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -37,7 +37,6 @@ func NewDNSAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, cfg: cfg, filterSChan: filterSChan, shdChan: shdChan, - stopChan: make(chan struct{}), connMgr: connMgr, srvDep: srvDep, } @@ -62,6 +61,7 @@ func (dns *DNSAgent) Start() (err error) { if dns.IsRunning() { return utils.ErrServiceAlreadyRunning } + filterS := <-dns.filterSChan dns.filterSChan <- filterS @@ -73,26 +73,38 @@ func (dns *DNSAgent) Start() (err error) { dns.dns = nil return } + dns.stopChan = make(chan struct{}) go dns.listenAndServe(dns.stopChan) return } // Reload handles the change of config func (dns *DNSAgent) Reload() (err error) { - if dns.IsRunning() { - close(dns.stopChan) - } + filterS := <-dns.filterSChan + dns.filterSChan <- filterS + dns.Lock() defer dns.Unlock() - if err = dns.dns.Reload(); err != nil { + + dns.Shutdown() + + dns.dns, err = agents.NewDNSAgent(dns.cfg, filterS, dns.connMgr) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) + dns.dns = nil return } + + dns.dns.Lock() + defer dns.dns.Unlock() dns.stopChan = make(chan struct{}) go dns.listenAndServe(dns.stopChan) return } func (dns *DNSAgent) listenAndServe(stopChan chan struct{}) (err error) { + dns.dns.RLock() + defer dns.dns.RUnlock() if err = dns.dns.ListenAndServe(stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) dns.shdChan.CloseOnce() // stop the engine here @@ -102,9 +114,12 @@ func (dns *DNSAgent) listenAndServe(stopChan chan struct{}) (err error) { // Shutdown stops the service func (dns *DNSAgent) Shutdown() (err error) { - dns.Lock() - defer dns.Unlock() + if dns.dns == nil { + return + } close(dns.stopChan) + dns.dns.Lock() + defer dns.dns.Unlock() dns.dns = nil return } diff --git a/services/dnsagent_it_test.go b/services/dnsagent_it_test.go index c7ac11eb2..6b3d93786 100644 --- a/services/dnsagent_it_test.go +++ b/services/dnsagent_it_test.go @@ -21,6 +21,7 @@ along with this program. If not, see package services import ( + "path" "runtime" "sync" "testing" @@ -69,11 +70,14 @@ func TestDNSAgentStartReloadShut(t *testing.T) { NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db) runtime.Gosched() time.Sleep(10 * time.Millisecond) //need to switch to gorutine - + if err := srv.Shutdown(); err != nil { + t.Error(err) + } + time.Sleep(10 * time.Millisecond) if err := srv.Start(); err != nil { t.Error(err) } - time.Sleep(1 * time.Millisecond) + time.Sleep(10 * time.Millisecond) if err := srv.Reload(); err != nil { t.Error(err) } @@ -87,93 +91,87 @@ func TestDNSAgentStartReloadShut(t *testing.T) { } } -// func TestDNSAgentReloadFirst(t *testing.T) { -// cfg := config.NewDefaultCGRConfig() -// cfg.SessionSCfg().Enabled = true -// cfg.SessionSCfg().ListenBijson = "" -// utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) -// utils.Logger.SetLogLevel(7) -// filterSChan := make(chan *engine.FilterS, 1) -// filterSChan <- nil -// shdChan := utils.NewSyncedChan() -// defer func() { -// shdChan.CloseOnce() -// time.Sleep(10 * time.Millisecond) -// }() -// shdWg := new(sync.WaitGroup) -// chS := engine.NewCacheS(cfg, nil, nil) +func TestDNSAgentReloadFirst(t *testing.T) { -// cacheSChan := make(chan rpcclient.ClientConnector, 1) -// cacheSChan <- chS + cfg := config.NewDefaultCGRConfig() + cfg.SessionSCfg().Enabled = true + cfg.SessionSCfg().ListenBijson = "" + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + defer func() { + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) + }() + shdWg := new(sync.WaitGroup) + chS := engine.NewCacheS(cfg, nil, nil) -// server := cores.NewServer(nil) -// srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, nil) -// srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} -// db := NewDataDBService(cfg, nil, srvDep) -// anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) -// sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), -// shdChan, nil, anz, srvDep) -// srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep) -// engine.NewConnManager(cfg, nil) -// srvMngr.AddServices(srv, sS, -// NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db) -// if err := srvMngr.StartServices(); err != nil { -// t.Fatal(err) -// } -// time.Sleep(1000 * time.Millisecond) -// if srv.IsRunning() { -// t.Fatalf("Expected service to be down") -// } -// var reply string -// if err := cfg.V1ReloadConfig(&config.ReloadArgs{ -// Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "dnsagent_reload"), -// Section: config.DNSAgentJson, -// }, &reply); err != nil { -// t.Fatal(err) -// } else if reply != utils.OK { -// t.Fatalf("Expecting OK ,received %s", reply) -// } -// runtime.Gosched() -// time.Sleep(1000 * time.Millisecond) //need to switch to gorutine -// if !srv.IsRunning() { -// t.Fatalf("Expected service to be running") -// } -// err := srv.Start() -// if err == nil || err != utils.ErrServiceAlreadyRunning { -// t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err) -// } -// fmt.Println("1") -// time.Sleep(1000 * time.Millisecond) -// err = srv.Reload() -// if err != nil { -// t.Fatalf("\nExpecting ,\n Received <%+v>", err) -// } -// time.Sleep(10 * time.Second) -// fmt.Println("2") + cacheSChan := make(chan rpcclient.ClientConnector, 1) + cacheSChan <- chS -// if !srv.IsRunning() { -// fmt.Println("2.2") -// t.Fatalf("Expected service to be up") -// } - -// err = srv.Reload() -// if err != nil { -// t.Fatalf("\nExpecting ,\n Received <%+v>", err) -// } -// fmt.Println("3") -// cfg.DNSAgentCfg().Enabled = false -// fmt.Println("4") -// cfg.GetReloadChan(config.DNSAgentJson) <- struct{}{} -// fmt.Println("5") -// time.Sleep(1000 * time.Millisecond) -// fmt.Println("6") -// if srv.IsRunning() { -// fmt.Println("7") -// t.Fatalf("Expected service to be down") -// } -// fmt.Println("8") - -// } + server := cores.NewServer(nil) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, nil) + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + db := NewDataDBService(cfg, nil, srvDep) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) + sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), + shdChan, nil, anz, srvDep) + srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep) + engine.NewConnManager(cfg, nil) + srvMngr.AddServices(srv, sS, + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db) + if err := srvMngr.StartServices(); err != nil { + t.Fatal(err) + } + time.Sleep(100 * time.Millisecond) + if srv.IsRunning() { + t.Fatalf("Expected service to be down") + } + var reply string + if err := cfg.V1ReloadConfig(&config.ReloadArgs{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "dnsagent_reload"), + Section: config.DNSAgentJson, + }, &reply); err != nil { + t.Fatal(err) + } else if reply != utils.OK { + t.Fatalf("Expecting OK ,received %s", reply) + } + runtime.Gosched() + time.Sleep(10 * time.Millisecond) //need to switch to gorutine + if !srv.IsRunning() { + t.Fatalf("Expected service to be running") + } + err := srv.Start() + if err == nil || err != utils.ErrServiceAlreadyRunning { + t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err) + } + if err := cfg.V1ReloadConfig(&config.ReloadArgs{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "dnsagent_reload"), + Section: config.DNSAgentJson, + }, &reply); err != nil { + t.Fatal(err) + } else if reply != utils.OK { + t.Fatalf("Expecting OK ,received %s", reply) + } + if err := cfg.V1ReloadConfig(&config.ReloadArgs{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "dnsagent_reload"), + Section: config.DNSAgentJson, + }, &reply); err != nil { + t.Fatal(err) + } else if reply != utils.OK { + t.Fatalf("Expecting OK ,received %s", reply) + } + time.Sleep(10 * time.Millisecond) + cfg.DNSAgentCfg().Enabled = false + time.Sleep(10 * time.Millisecond) + cfg.GetReloadChan(config.DNSAgentJson) <- struct{}{} + time.Sleep(100 * time.Millisecond) + if srv.IsRunning() { + t.Fatalf("Expected service to be down") + } +} func TestDNSAgentReload2(t *testing.T) { cfg := config.NewDefaultCGRConfig() @@ -196,9 +194,10 @@ func TestDNSAgentReload2(t *testing.T) { runtime.Gosched() dnsSrv := srv.(*DNSAgent) dnsSrv.dns = agentSrv + err = dnsSrv.listenAndServe(make(chan struct{})) if err == nil || err.Error() != "dns: bad network" { - t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "dns: bad network", err) + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "dns: bad network", err) } } @@ -220,11 +219,13 @@ func TestDNSAgentReload4(t *testing.T) { runtime.Gosched() dnsSrv := srv.(*DNSAgent) dnsSrv.dns = nil + err := dnsSrv.Start() if err == nil || err.Error() != "open bad_certificate: no such file or directory" { - t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "open bad_certificate: no such file or directory", err) + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "open bad_certificate: no such file or directory", err) } dnsSrv.dns = nil + } func TestDNSAgentReload5(t *testing.T) { @@ -246,10 +247,12 @@ func TestDNSAgentReload5(t *testing.T) { time.Sleep(10 * time.Millisecond) runtime.Gosched() runtime.Gosched() + err = srv.Reload() if err != nil { - t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err) + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } + } func TestDNSAgentReload6(t *testing.T) { @@ -263,20 +266,25 @@ func TestDNSAgentReload6(t *testing.T) { filterSChan <- nil shdChan := utils.NewSyncedChan() srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep) cfg.DNSAgentCfg().Listeners[0].Address = "127.0.0.1:0" + srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep) + time.Sleep(10 * time.Millisecond) + err := srv.Start() if err != nil { - t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err) + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } - cfg.DNSAgentCfg().Listeners[0].Network = "tls" - cfg.TLSCfg().ServerCerificate = "bad_certificate" - cfg.TLSCfg().ServerKey = "bad_key" + time.Sleep(10 * time.Millisecond) runtime.Gosched() runtime.Gosched() + + cfg.DNSAgentCfg().Listeners[0].Network = "tls" + cfg.TLSCfg().ServerCerificate = "bad_certificate" + cfg.TLSCfg().ServerKey = "bad_key" err = srv.Reload() if err == nil || err.Error() != "open bad_certificate: no such file or directory" { - t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "open bad_certificate: no such file or directory", err) + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "open bad_certificate: no such file or directory", err) } + } diff --git a/services/stordb_it_test.go b/services/stordb_it_test.go index 1d345ffa3..d70b7aebc 100644 --- a/services/stordb_it_test.go +++ b/services/stordb_it_test.go @@ -223,13 +223,13 @@ func TestStorDBReloadVersion1(t *testing.T) { } stordb.db = nil err = stordb.Reload() - if err == nil || err.Error() != "can't conver StorDB of type mongo to MongoStorage" { + if err == nil || err.Error() != "can't conver StorDB of type *mongo to MongoStorage" { t.Fatal(err) } cfg.CdrsCfg().Enabled = false err = stordb.Reload() - if err == nil || err.Error() != "can't conver StorDB of type mongo to MongoStorage" { + if err == nil || err.Error() != "can't conver StorDB of type *mongo to MongoStorage" { t.Fatal(err) } time.Sleep(10 * time.Millisecond) @@ -305,13 +305,13 @@ func TestStorDBReloadVersion2(t *testing.T) { } stordb.db = nil err = stordb.Reload() - if err == nil || err.Error() != "can't conver StorDB of type mysql to SQLStorage" { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "can't convert StorDB of type mysql to SQLStorage", err) + if err == nil || err.Error() != "can't conver StorDB of type *mysql to SQLStorage" { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "can't convert StorDB of type *mysql to SQLStorage", err) } cfg.CdrsCfg().Enabled = false err = stordb.Reload() - if err == nil || err.Error() != "can't conver StorDB of type mysql to SQLStorage" { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "can't convert StorDB of type mysql to SQLStorage", err) + if err == nil || err.Error() != "can't conver StorDB of type *mysql to SQLStorage" { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "can't convert StorDB of type *mysql to SQLStorage", err) time.Sleep(10 * time.Millisecond) shdChan.CloseOnce() @@ -376,7 +376,7 @@ func TestStorDBReloadVersion3(t *testing.T) { stordb.oldDBCfg = cfg.StorDbCfg().Clone() stordb.db = nil err = stordb.Reload() - if err == nil || err.Error() != "can't conver StorDB of type internal to InternalDB" { + if err == nil || err.Error() != "can't conver StorDB of type *internal to InternalDB" { t.Fatal(err) } /* the internal now uses its own cache @@ -416,8 +416,8 @@ func TestStorDBReloadNewStorDBConnError(t *testing.T) { } cfg.StorDbCfg().Type = "badType" err := stordb.Reload() - if err == nil || err.Error() != "unknown db 'badType' valid options are [mysql, mongo, postgres, internal]" { - t.Errorf("\nExpecting <%+v>,\n Received <%+v>", "unknown db 'badType' valid options are [mysql, mongo, postgres, internal]", err) + if err == nil || err.Error() != "unknown db 'badType' valid options are [*mysql, *mongo, *postgres, *internal]" { + t.Errorf("\nExpecting <%+v>,\n Received <%+v>", "unknown db 'badType' valid options are [*mysql, *mongo, *postgres, *internal]", err) } shdChan.CloseOnce() } @@ -435,7 +435,7 @@ func TestStorDBReloadStartDBError(t *testing.T) { stordb := NewStorDBService(cfg, srvDep) cfg.StorDbCfg().Type = "badType" err := stordb.Start() - expected := "unknown db 'badType' valid options are [mysql, mongo, postgres, internal]" + expected := "unknown db 'badType' valid options are [*mysql, *mongo, *postgres, *internal]" if err == nil || err.Error() != expected { t.Errorf("\nExpecting <%+v>,\n Received <%+v>", expected, err) }