diff --git a/agents/radagent.go b/agents/radagent.go index 4f8c398ca..1592c049a 100644 --- a/agents/radagent.go +++ b/agents/radagent.go @@ -353,7 +353,7 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R } func (ra *RadiusAgent) ListenAndServe(stopChan <-chan struct{}) (err error) { - var errListen chan error + errListen := make(chan error, 2) go func() { utils.Logger.Info(fmt.Sprintf("<%s> Start listening for auth requests on <%s>", utils.RadiusAgent, ra.cgrCfg.RadiusAgentCfg().ListenAuth)) if err := ra.rsAuth.ListenAndServe(stopChan); err != nil { diff --git a/services/asteriskagent_it_test.go b/services/asteriskagent_it_test.go index 16457ba59..be3ba944b 100644 --- a/services/asteriskagent_it_test.go +++ b/services/asteriskagent_it_test.go @@ -19,7 +19,20 @@ along with this program. If not, see */ package services -/* +import ( + "path" + "sync" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + func TestAsteriskAgentReload(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.SessionSCfg().Enabled = true @@ -28,6 +41,10 @@ func TestAsteriskAgentReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil shdChan := utils.NewSyncedChan() + defer func() { + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) + }() shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) @@ -49,7 +66,7 @@ func TestAsteriskAgentReload(t *testing.T) { t.Fatal(err) } if srv.IsRunning() { - t.Errorf("Expected service to be down") + t.Fatalf("Expected service to be down") } var reply string if err := cfg.V1ReloadConfig(&config.ReloadArgs{ @@ -58,28 +75,27 @@ func TestAsteriskAgentReload(t *testing.T) { }, &reply); err != nil { t.Fatal(err) } else if reply != utils.OK { - t.Errorf("Expecting OK ,received %s", reply) + t.Fatalf("Expecting OK ,received %s", reply) } time.Sleep(10 * time.Millisecond) //need to switch to gorutine if !srv.IsRunning() { - t.Errorf("Expected service to be running") + t.Fatalf("Expected service to be running") } srvReload := srv.Reload() if srvReload != nil { - t.Errorf("\nExpecting ,\n Received <%+v>", srvReload) + t.Fatalf("\nExpecting ,\n Received <%+v>", srvReload) } err := srv.Start() if err != utils.ErrServiceAlreadyRunning { - t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err) + t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err) } cfg.AsteriskAgentCfg().Enabled = false cfg.GetReloadChan(config.AsteriskAgentJSN) <- struct{}{} time.Sleep(10 * time.Millisecond) if srv.IsRunning() { - t.Errorf("Expected service to be down") + t.Fatalf("Expected service to be down") } - shdChan.CloseOnce() - time.Sleep(10 * time.Millisecond) + } func TestAsteriskAgentReload2(t *testing.T) { @@ -90,6 +106,10 @@ func TestAsteriskAgentReload2(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil shdChan := utils.NewSyncedChan() + defer func() { + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) + }() shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) @@ -111,7 +131,7 @@ func TestAsteriskAgentReload2(t *testing.T) { t.Fatal(err) } if srv.IsRunning() { - t.Errorf("Expected service to be down") + t.Fatalf("Expected service to be down") } var reply string if err := cfg.V1ReloadConfig(&config.ReloadArgs{ @@ -120,19 +140,19 @@ func TestAsteriskAgentReload2(t *testing.T) { }, &reply); err != nil { t.Fatal(err) } else if reply != utils.OK { - t.Errorf("Expecting OK ,received %s", reply) + t.Fatalf("Expecting OK ,received %s", reply) } time.Sleep(10 * time.Millisecond) //need to switch to gorutine if !srv.IsRunning() { - t.Errorf("Expected service to be running") + t.Fatalf("Expected service to be running") } srvReload := srv.Reload() if srvReload != nil { - t.Errorf("\nExpecting ,\n Received <%+v>", srvReload) + t.Fatalf("\nExpecting ,\n Received <%+v>", srvReload) } err := srv.Start() if err != utils.ErrServiceAlreadyRunning { - t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err) + t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err) } cfg.AsteriskAgentCfg().AsteriskConns = []*config.AsteriskConnCfg{ { @@ -145,15 +165,12 @@ func TestAsteriskAgentReload2(t *testing.T) { }} srvReload = srv.Reload() if srvReload != nil { - t.Errorf("\nExpecting ,\n Received <%+v>", srvReload) + t.Fatalf("\nExpecting ,\n Received <%+v>", srvReload) } cfg.AsteriskAgentCfg().Enabled = false cfg.GetReloadChan(config.AsteriskAgentJSN) <- struct{}{} time.Sleep(10 * time.Millisecond) if srv.IsRunning() { - t.Errorf("Expected service to be down") + t.Fatalf("Expected service to be down") } - shdChan.CloseOnce() - time.Sleep(10 * time.Millisecond) } -*/ diff --git a/services/dnsagent_it_test.go b/services/dnsagent_it_test.go index df63fdded..18f85e6a4 100644 --- a/services/dnsagent_it_test.go +++ b/services/dnsagent_it_test.go @@ -19,7 +19,22 @@ along with this program. If not, see */ package services -/* +import ( + "path" + "runtime" + "sync" + "testing" + "time" + + "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + func TestDNSAgentReload(t *testing.T) { cfg := config.NewDefaultCGRConfig() @@ -29,6 +44,10 @@ func TestDNSAgentReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil shdChan := utils.NewSyncedChan() + defer func() { + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) + }() shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) @@ -84,8 +103,7 @@ func TestDNSAgentReload(t *testing.T) { if srv.IsRunning() { t.Fatalf("Expected service to be down") } - shdChan.CloseOnce() - time.Sleep(10 * time.Millisecond) + } func TestDNSAgentReload2(t *testing.T) { @@ -96,6 +114,10 @@ func TestDNSAgentReload2(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil shdChan := utils.NewSyncedChan() + defer func() { + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) + }() shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) @@ -169,8 +191,6 @@ func TestDNSAgentReload2(t *testing.T) { if srv.IsRunning() { t.Fatalf("Expected service to be down") } - shdChan.CloseOnce() - time.Sleep(10 * time.Millisecond) } func TestDNSAgentReload3(t *testing.T) { @@ -186,6 +206,10 @@ func TestDNSAgentReload3(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil shdChan := utils.NewSyncedChan() + defer func() { + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) + }() shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) @@ -227,10 +251,8 @@ func TestDNSAgentReload3(t *testing.T) { if srv.IsRunning() { t.Fatalf("Expected service to be down") } - shdChan.CloseOnce() - time.Sleep(10 * time.Millisecond) -} +} func TestDNSAgentReload4(t *testing.T) { cfg := config.NewDefaultCGRConfig() @@ -241,6 +263,10 @@ func TestDNSAgentReload4(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil shdChan := utils.NewSyncedChan() + defer func() { + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) + }() shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) @@ -263,7 +289,7 @@ func TestDNSAgentReload4(t *testing.T) { t.Fatal(err) } if srv.IsRunning() { - t.Errorf("Expected service to be down") + t.Fatalf("Expected service to be down") } var reply string if err := cfg.V1ReloadConfig(&config.ReloadArgs{ @@ -272,19 +298,19 @@ func TestDNSAgentReload4(t *testing.T) { }, &reply); err != nil { t.Fatal(err) } else if reply != utils.OK { - t.Errorf("Expecting OK ,received %s", reply) + t.Fatalf("Expecting OK ,received %s", reply) } time.Sleep(10 * time.Millisecond) //need to switch to goroutine runtime.Gosched() if !srv.IsRunning() { - t.Errorf("Expected service to be running") + t.Fatalf("Expected service to be running") } runtime.Gosched() err = db.Reload() if err != nil { - t.Errorf("\nExpecting ,\n Received <%+v>", err) + t.Fatalf("\nExpecting ,\n Received <%+v>", err) } dnsTest := &agents.DNSAgent{} @@ -297,9 +323,7 @@ func TestDNSAgentReload4(t *testing.T) { runtime.Gosched() runtime.Gosched() if srv.IsRunning() { - t.Errorf("Expected service to be down") + t.Fatalf("Expected service to be down") } - shdChan.CloseOnce() - time.Sleep(10 * time.Millisecond) + } -*/ diff --git a/services/ers.go b/services/ers.go index eccf3643a..7f182293b 100644 --- a/services/ers.go +++ b/services/ers.go @@ -76,12 +76,15 @@ func (erS *EventReaderService) Start() (err error) { // build the service erS.ers = ers.NewERService(erS.cfg, filterS, erS.connMgr) - go func(ers *ers.ERService, stopChan, rldChan chan struct{}) { - if err := ers.ListenAndServe(stopChan, rldChan); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error())) - erS.shdChan.CloseOnce() - } - }(erS.ers, erS.stopChan, erS.rldChan) + go erS.listenAndServe(erS.ers, erS.stopChan, erS.rldChan) + return +} + +func (erS *EventReaderService) listenAndServe(ers *ers.ERService, stopChan chan struct{}, rldChan chan struct{}) (err error) { + if err = ers.ListenAndServe(stopChan, rldChan); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error())) + erS.shdChan.CloseOnce() + } return } diff --git a/services/ers_it_test.go b/services/ers_it_test.go index 793501b56..a61774d64 100644 --- a/services/ers_it_test.go +++ b/services/ers_it_test.go @@ -19,7 +19,23 @@ along with this program. If not, see */ package services -/* +import ( + "os" + "path" + "runtime" + "sync" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/ers" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + func TestEventReaderSReload(t *testing.T) { for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out"} { if err := os.RemoveAll(dir); err != nil { @@ -37,6 +53,10 @@ func TestEventReaderSReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil shdChan := utils.NewSyncedChan() + defer func() { + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) + }() shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) @@ -44,14 +64,14 @@ func TestEventReaderSReload(t *testing.T) { anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) db := NewDataDBService(cfg, nil, srvDep) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, nil, anz, srvDep) - attrS := NewEventReaderService(cfg, filterSChan, shdChan, nil, srvDep) + erS := NewEventReaderService(cfg, filterSChan, shdChan, nil, srvDep) engine.NewConnManager(cfg, nil) - srvMngr.AddServices(attrS, sS, + srvMngr.AddServices(erS, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db) if err := srvMngr.StartServices(); err != nil { t.Fatal(err) } - if attrS.IsRunning() { + if erS.IsRunning() { t.Fatal("Expected service to be down") } var reply string @@ -64,30 +84,30 @@ func TestEventReaderSReload(t *testing.T) { t.Fatalf("Expecting OK ,received %s", reply) } runtime.Gosched() - if !attrS.IsRunning() { + if !erS.IsRunning() { t.Fatalf("Expected service to be running") } runtime.Gosched() - err := attrS.Start() + err := erS.Start() if err == nil || err != utils.ErrServiceAlreadyRunning { t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err) } time.Sleep(10 * time.Millisecond) runtime.Gosched() - err = attrS.Reload() + err = erS.Reload() if err != nil { t.Fatalf("\nExpecting ,\n Received <%+v>", err) } cfg.ERsCfg().Enabled = false cfg.GetReloadChan(config.ERsJson) <- struct{}{} time.Sleep(10 * time.Millisecond) - if attrS.IsRunning() { + if erS.IsRunning() { t.Fatal("Expected service to be down") } - shdChan.CloseOnce() - time.Sleep(10 * time.Millisecond) + } + func TestEventReaderSReload2(t *testing.T) { for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out"} { if err := os.RemoveAll(dir); err != nil { @@ -101,51 +121,25 @@ func TestEventReaderSReload2(t *testing.T) { utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) cfg.SessionSCfg().Enabled = true - filterSChan := make(chan *engine.FilterS, 1) - filterSChan <- nil - shdChan := utils.NewSyncedChan() - shdWg := new(sync.WaitGroup) - server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) - srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) - db := NewDataDBService(cfg, nil, srvDep) - sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, nil, anz, srvDep) - attrS := NewEventReaderService(cfg, filterSChan, shdChan, nil, srvDep) - engine.NewConnManager(cfg, nil) - srvMngr.AddServices(attrS, sS, - NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db) - if err := srvMngr.StartServices(); err != nil { - t.Fatal(err) - } - if attrS.IsRunning() { - t.Fatalf("Expected service to be down") - } - var reply string - if err := cfg.V1ReloadConfig(&config.ReloadArgs{ - Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "ers_reload", "internal"), - Section: config.ERsJson, - }, &reply); err != nil { - t.Fatal(err) - } else if reply != utils.OK { - t.Fatalf("Expecting OK ,received %s", reply) - } - time.Sleep(10 * time.Millisecond) + cfg.ERsCfg().Enabled = true cfg.ERsCfg().Readers = []*config.EventReaderCfg{ { Type: "bad_type", }, } - time.Sleep(10 * time.Millisecond) //need to switch to gorutine - err := attrS.Reload() - if err != nil { - t.Fatalf("\nExpecting ,\n Received <%+v>", err) - } - cfg.ERsCfg().Enabled = false - cfg.GetReloadChan(config.ERsJson) <- struct{}{} - time.Sleep(10 * time.Millisecond) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + erS := NewEventReaderService(cfg, filterSChan, shdChan, nil, srvDep) + ers := ers.NewERService(cfg, nil, nil) - shdChan.CloseOnce() - time.Sleep(10 * time.Millisecond) + runtime.Gosched() + srv := erS.(*EventReaderService) + srv.stopChan = make(chan struct{}) + srv.rldChan = make(chan struct{}) + err := srv.listenAndServe(ers, srv.stopChan, srv.rldChan) + if err == nil || err.Error() != "unsupported reader type: " { + t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "unsupported reader type: ", err) + } } -*/ diff --git a/services/freeswitchagent_it_test.go b/services/freeswitchagent_it_test.go index d70e1d8da..2a417662b 100644 --- a/services/freeswitchagent_it_test.go +++ b/services/freeswitchagent_it_test.go @@ -19,7 +19,22 @@ along with this program. If not, see */ package services -/* +import ( + "path" + "runtime" + "sync" + "testing" + "time" + + "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + func TestFreeSwitchAgentReload(t *testing.T) { cfg := config.NewDefaultCGRConfig() @@ -30,6 +45,10 @@ func TestFreeSwitchAgentReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil shdChan := utils.NewSyncedChan() + defer func() { + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) + }() shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) cacheSChan := make(chan rpcclient.ClientConnector, 1) @@ -50,7 +69,7 @@ func TestFreeSwitchAgentReload(t *testing.T) { t.Fatal(err) } if srv.IsRunning() { - t.Errorf("Expected service to be down") + t.Fatalf("Expected service to be down") } var reply string if err := cfg.V1ReloadConfig(&config.ReloadArgs{ @@ -59,7 +78,7 @@ func TestFreeSwitchAgentReload(t *testing.T) { }, &reply); err != nil { t.Fatal(err) } else if reply != utils.OK { - t.Errorf("Expecting OK ,received %s", reply) + t.Fatalf("Expecting OK ,received %s", reply) } time.Sleep(10 * time.Millisecond) //need to switch to gorutine @@ -75,11 +94,8 @@ func TestFreeSwitchAgentReload(t *testing.T) { runtime.Gosched() err := srv.Reload() if err != nil { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) + t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } time.Sleep(10 * time.Millisecond) - shdChan.CloseOnce() - runtime.Gosched() - time.Sleep(10 * time.Millisecond) + } -*/ diff --git a/services/httpagent_it_test.go b/services/httpagent_it_test.go index 93bd8c212..313f4ac07 100644 --- a/services/httpagent_it_test.go +++ b/services/httpagent_it_test.go @@ -19,7 +19,21 @@ along with this program. If not, see */ package services -/* +import ( + "path" + "runtime" + "sync" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + func TestHTTPAgentReload(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.SessionSCfg().Enabled = true @@ -28,6 +42,10 @@ func TestHTTPAgentReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil shdChan := utils.NewSyncedChan() + defer func() { + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) + }() shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) cacheSChan := make(chan rpcclient.ClientConnector, 1) @@ -47,7 +65,7 @@ func TestHTTPAgentReload(t *testing.T) { t.Fatal(err) } if srv.IsRunning() { - t.Errorf("Expected service to be down") + t.Fatalf("Expected service to be down") } var reply string if err := cfg.V1ReloadConfig(&config.ReloadArgs{ @@ -56,31 +74,28 @@ func TestHTTPAgentReload(t *testing.T) { }, &reply); err != nil { t.Fatal(err) } else if reply != utils.OK { - t.Errorf("Expecting OK ,received %s", reply) + t.Fatalf("Expecting OK ,received %s", reply) } time.Sleep(10 * time.Millisecond) //need to switch to gorutine runtime.Gosched() if !srv.IsRunning() { - t.Errorf("Expected service to be running") + t.Fatalf("Expected service to be running") } runtime.Gosched() srvReload := srv.Reload() if srvReload != nil { - t.Errorf("\nExpecting ,\n Received <%+v>", srvReload) + t.Fatalf("\nExpecting ,\n Received <%+v>", srvReload) } runtime.Gosched() err := srv.Start() if err != utils.ErrServiceAlreadyRunning { - t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err) + t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err) } err = srv.Shutdown() if err != nil { - t.Errorf("\nExpecting <%+v>,\n Received <%+v>", nil, err) + t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", nil, err) } if srv.IsRunning() { - t.Errorf("Expected service to be down") + t.Fatalf("Expected service to be down") } - shdChan.CloseOnce() - time.Sleep(10 * time.Millisecond) } -*/ diff --git a/services/radiusagent.go b/services/radiusagent.go index 0c90c43e2..d16690a0b 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -80,12 +80,17 @@ func (rad *RadiusAgent) Start() (err error) { return } rad.stopChan = make(chan struct{}) - go func(r *agents.RadiusAgent) { - if err = r.ListenAndServe(rad.stopChan); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error())) - rad.shdChan.CloseOnce() - } - }(rad.rad) + + go rad.listenAndServe(rad.rad) + + return +} + +func (rad *RadiusAgent) listenAndServe(r *agents.RadiusAgent) (err error) { + if err = r.ListenAndServe(rad.stopChan); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error())) + rad.shdChan.CloseOnce() + } return } @@ -97,19 +102,21 @@ func (rad *RadiusAgent) Reload() (err error) { return } - if err = rad.Shutdown(); err != nil { - return - } + rad.shutdown() return rad.Start() } // Shutdown stops the service func (rad *RadiusAgent) Shutdown() (err error) { + rad.shutdown() + return // no shutdown for the momment +} + +func (rad *RadiusAgent) shutdown() { rad.Lock() close(rad.stopChan) rad.rad = nil rad.Unlock() - return // no shutdown for the momment } // IsRunning returns if the service is running diff --git a/services/radiusagent_it_test.go b/services/radiusagent_it_test.go index 72e7053d1..a474f121f 100644 --- a/services/radiusagent_it_test.go +++ b/services/radiusagent_it_test.go @@ -19,7 +19,23 @@ along with this program. If not, see */ package services -/* +import ( + "path" + "runtime" + "sync" + "testing" + "time" + + "github.com/cgrates/cgrates/agents" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + func TestRadiusAgentReload(t *testing.T) { cfg := config.NewDefaultCGRConfig() @@ -29,6 +45,10 @@ func TestRadiusAgentReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil shdChan := utils.NewSyncedChan() + defer func() { + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) + }() shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) @@ -62,9 +82,11 @@ func TestRadiusAgentReload(t *testing.T) { t.Fatalf("Expecting OK ,received %s", reply) } time.Sleep(10 * time.Millisecond) //need to switch to gorutine + runtime.Gosched() if !srv.IsRunning() { t.Fatalf("Expected service to be running") } + runtime.Gosched() err := srv.Start() if err == nil || err != utils.ErrServiceAlreadyRunning { t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err) @@ -79,8 +101,6 @@ func TestRadiusAgentReload(t *testing.T) { if srv.IsRunning() { t.Fatalf("Expected service to be down") } - shdChan.CloseOnce() - time.Sleep(10 * time.Millisecond) } func TestRadiusAgentReload2(t *testing.T) { @@ -93,6 +113,10 @@ func TestRadiusAgentReload2(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil shdChan := utils.NewSyncedChan() + defer func() { + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) + }() shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) @@ -161,8 +185,7 @@ func TestRadiusAgentReload2(t *testing.T) { if srv.IsRunning() { t.Fatalf("Expected service to be down") } - shdChan.CloseOnce() - time.Sleep(10 * time.Millisecond) + } func TestRadiusAgentReload3(t *testing.T) { @@ -171,56 +194,45 @@ func TestRadiusAgentReload3(t *testing.T) { "test": "test", } cfg.SessionSCfg().Enabled = true + cfg.RadiusAgentCfg().Enabled = true utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil shdChan := utils.NewSyncedChan() - shdWg := new(sync.WaitGroup) - chS := engine.NewCacheS(cfg, nil, nil) - - cacheSChan := make(chan rpcclient.ClientConnector, 1) - cacheSChan <- chS - - server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) + defer func() { + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) + }() srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - db := NewDataDBService(cfg, nil, srvDep) - anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) - sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep) - engine.NewConnManager(cfg, nil) - srvMngr.AddServices(srv, sS, - NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db) - if err := srvMngr.StartServices(); err != nil { - t.Fatal(err) + err := srv.Start() + if err == nil || err.Error() != "stat test: no such file or directory" { + t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "stat test: no such file or directory", err) + } +} + +func TestRadiusAgentReload4(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.SessionSCfg().Enabled = true + cfg.RadiusAgentCfg().Enabled = true + cfg.RadiusAgentCfg().ListenNet = "test" + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, srvDep) + r, err := agents.NewRadiusAgent(cfg, nil, nil) + if err != nil { + t.Fatal(err) + } + runtime.Gosched() + rad := srv.(*RadiusAgent) + rad.stopChan = make(chan struct{}) + err = rad.listenAndServe(r) + if err == nil || err.Error() != "unsupported network: " { + t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "unsupported network: ", err) } - if srv.IsRunning() { - t.Fatal("Expected service to be down") - } - var reply string - if err := cfg.V1ReloadConfig(&config.ReloadArgs{ - Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "radagent_mysql"), - Section: config.RA_JSN, - }, &reply); err != nil { - t.Fatal(err) - } else if reply != utils.OK { - t.Fatalf("Expecting OK ,received %s", reply) - } - time.Sleep(10 * time.Millisecond) //need to switch to gorutine - srv.(*RadiusAgent).stopChan = make(chan struct{}) - err := srv.Reload() - if err != nil { - t.Fatalf("\nExpecting ,\n Received <%+v>", err) - } - cfg.RadiusAgentCfg().Enabled = false - cfg.GetReloadChan(config.RA_JSN) <- struct{}{} - time.Sleep(10 * time.Millisecond) - if srv.IsRunning() { - t.Fatalf("Expected service to be down") - } - shdChan.CloseOnce() - time.Sleep(10 * time.Millisecond) } -*/