diff --git a/ees/ees.go b/ees/ees.go index 94192f42a..7d638d840 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -37,7 +37,7 @@ func onCacheEvicted(itmID string, value interface{}) { // NewEventExporterS instantiates the EventExporterS func NewEventExporterS(cfg *config.CGRConfig, filterS *engine.FilterS, - connMgr *engine.ConnManager) (eeS *EventExporterS, err error) { + connMgr *engine.ConnManager) (eeS *EventExporterS) { eeS = &EventExporterS{ cfg: cfg, filterS: filterS, @@ -76,10 +76,9 @@ func (eeS *EventExporterS) ListenAndServe(stopChan, cfgRld chan struct{}) { } // Shutdown is called to shutdown the service -func (eeS *EventExporterS) Shutdown() (err error) { +func (eeS *EventExporterS) Shutdown() { utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EventExporterS)) eeS.setupCache(nil) // cleanup exporters - return } // Call implements rpcclient.ClientConnector interface for internal RPC diff --git a/services/dnsagent_it_test.go b/services/dnsagent_it_test.go index b3f9b040f..920868317 100644 --- a/services/dnsagent_it_test.go +++ b/services/dnsagent_it_test.go @@ -19,7 +19,22 @@ along with this program. If not, see */ package services -/* +import ( + "path" + "runtime" + "sync" + "testing" + "time" + + "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + func TestDNSAgentReload(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.SessionSCfg().Enabled = true @@ -151,7 +166,8 @@ func TestDNSAgentReload4(t *testing.T) { cfg.SessionSCfg().Enabled = true cfg.DNSAgentCfg().Enabled = true cfg.DNSAgentCfg().ListenNet = "tls" - + cfg.TLSCfg().ServerCerificate = "bad_certificate" + cfg.TLSCfg().ServerKey = "bad_key" utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) @@ -164,8 +180,8 @@ func TestDNSAgentReload4(t *testing.T) { dnsSrv := srv.(*DNSAgent) dnsSrv.dns = nil err := dnsSrv.Start() - if err == nil || err.Error() != "open : no such file or directory" { - t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "open : no such file or directory", err) + if err == nil || err.Error() != "open bad_certificate: no such file or directory" { + t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "open bad_certificate: no such file or directory", err) } dnsSrv.dns = nil } @@ -187,6 +203,7 @@ func TestDNSAgentReload5(t *testing.T) { t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } srv.(*DNSAgent).oldListen = "127.0.0.1:2093" + time.Sleep(10 * time.Millisecond) runtime.Gosched() runtime.Gosched() err = srv.Reload() @@ -195,226 +212,31 @@ func TestDNSAgentReload5(t *testing.T) { } } -/* -func TestDNSAgentReload2(t *testing.T) { +func TestDNSAgentReload6(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.SessionSCfg().Enabled = true + cfg.DNSAgentCfg().Enabled = true + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil shdChan := utils.NewSyncedChan() - defer func() { - shdChan.CloseOnce() - time.Sleep(10 * time.Millisecond) - }() - shdWg := new(sync.WaitGroup) - chS := engine.NewCacheS(cfg, nil, nil) - - cacheSChan := make(chan rpcclient.ClientConnector, 1) - cacheSChan <- chS - - server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - db := NewDataDBService(cfg, nil, srvDep) - anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) - sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep) - engine.NewConnManager(cfg, nil) - srvMngr.AddServices(srv, sS, - NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db) - if err := srvMngr.StartServices(); err != nil { - t.Fatal(err) - } - if srv.IsRunning() { - t.Fatalf("Expected service to be down") - } - var reply string - if err := cfg.V1ReloadConfig(&config.ReloadArgs{ - Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "dnsagent_mongo"), - Section: config.DNSAgentJson, - }, &reply); err != nil { - t.Fatal(err) - } else if reply != utils.OK { - t.Fatalf("Expecting OK ,received %s", reply) - } - time.Sleep(10 * time.Millisecond) //need to switch to gorutine - runtime.Gosched() - if !srv.IsRunning() { - t.Fatalf("Expected service to be running") - } - runtime.Gosched() err := srv.Start() - if err == nil || err != utils.ErrServiceAlreadyRunning { - t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err) + if err != nil { + t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } - - castSrv, canCastSrv := srv.(*DNSAgent) - if !canCastSrv { - t.Fatalf("cannot cast") - } - - castSrv.oldListen = "test_string" + srv.(*DNSAgent).oldListen = "127.0.0.1:2093" + cfg.DNSAgentCfg().ListenNet = "tls" + cfg.TLSCfg().ServerCerificate = "bad_certificate" + cfg.TLSCfg().ServerKey = "bad_key" + time.Sleep(10 * time.Millisecond) runtime.Gosched() runtime.Gosched() err = srv.Reload() - if err != nil { - t.Fatalf("\nExpecting ,\n Received <%+v>", err) - } - err = db.Reload() - if err != nil { - t.Fatalf("\nExpecting ,\n Received <%+v>", err) - } - runtime.Gosched() - runtime.Gosched() - err = srv.Reload() - if err != nil { - t.Fatalf("\nExpecting ,\n Received <%+v>", err) - } - time.Sleep(10 * time.Millisecond) - cfg.DNSAgentCfg().Enabled = false - cfg.GetReloadChan(config.DNSAgentJson) <- struct{}{} - time.Sleep(10 * time.Millisecond) - runtime.Gosched() - if srv.IsRunning() { - t.Fatalf("Expected service to be down") + if err == nil || err.Error() != "open bad_certificate: no such file or directory" { + t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "open bad_certificate: no such file or directory", err) } } - -func TestDNSAgentReload3(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - cfg.SessionSCfg().Enabled = true - - cfg.DNSAgentCfg().ListenNet = "testtls" - cfg.TLSCfg().ServerCerificate = "" - cfg.TLSCfg().ServerKey = "" - - utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) - utils.Logger.SetLogLevel(7) - filterSChan := make(chan *engine.FilterS, 1) - filterSChan <- nil - shdChan := utils.NewSyncedChan() - defer func() { - shdChan.CloseOnce() - time.Sleep(10 * time.Millisecond) - }() - shdWg := new(sync.WaitGroup) - chS := engine.NewCacheS(cfg, nil, nil) - - cacheSChan := make(chan rpcclient.ClientConnector, 1) - cacheSChan <- chS - - server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) - srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - db := NewDataDBService(cfg, nil, srvDep) - anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) - sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) - srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep) - engine.NewConnManager(cfg, nil) - srvMngr.AddServices(srv, sS, - NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db) - if err := srvMngr.StartServices(); err != nil { - t.Fatal(err) - } - if srv.IsRunning() { - t.Fatalf("Expected service to be down") - } - var reply string - if err := cfg.V1ReloadConfig(&config.ReloadArgs{ - Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "dnsagent_mongo"), - Section: config.DNSAgentJson, - }, &reply); err != nil { - t.Fatal(err) - } else if reply != utils.OK { - t.Fatalf("Expecting OK ,received %s", reply) - } - - time.Sleep(10 * time.Millisecond) - - cfg.DNSAgentCfg().Enabled = false - cfg.GetReloadChan(config.DNSAgentJson) <- struct{}{} - time.Sleep(10 * time.Millisecond) - if srv.IsRunning() { - t.Fatalf("Expected service to be down") - } - -} - -func TestDNSAgentReload4(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - cfg.SessionSCfg().Enabled = true - - utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) - utils.Logger.SetLogLevel(7) - filterSChan := make(chan *engine.FilterS, 1) - filterSChan <- nil - shdChan := utils.NewSyncedChan() - defer func() { - shdChan.CloseOnce() - time.Sleep(10 * time.Millisecond) - }() - shdWg := new(sync.WaitGroup) - chS := engine.NewCacheS(cfg, nil, nil) - - cacheSChan := make(chan rpcclient.ClientConnector, 1) - cacheSChan <- chS - - server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) - srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - db := NewDataDBService(cfg, nil, srvDep) - anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) - sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - shdChan, nil, nil, anz, srvDep) - srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep) - var err error - - srvMngr.AddServices(srv, sS, - NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db) - if err := srvMngr.StartServices(); err != nil { - t.Fatal(err) - } - if srv.IsRunning() { - t.Fatalf("Expected service to be down") - } - var reply string - if err := cfg.V1ReloadConfig(&config.ReloadArgs{ - Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "dnsagent_mongo"), - Section: config.DNSAgentJson, - }, &reply); err != nil { - t.Fatal(err) - } else if reply != utils.OK { - t.Fatalf("Expecting OK ,received %s", reply) - } - time.Sleep(10 * time.Millisecond) //need to switch to goroutine - - runtime.Gosched() - if !srv.IsRunning() { - t.Fatalf("Expected service to be running") - } - - runtime.Gosched() - err = db.Reload() - if err != nil { - t.Fatalf("\nExpecting ,\n Received <%+v>", err) - } - - dnsTest := &agents.DNSAgent{} - srv.(*DNSAgent).dns = dnsTest - - time.Sleep(10 * time.Millisecond) - cfg.DNSAgentCfg().Enabled = false - cfg.GetReloadChan(config.DNSAgentJson) <- struct{}{} - time.Sleep(10 * time.Millisecond) - runtime.Gosched() - runtime.Gosched() - if srv.IsRunning() { - t.Fatalf("Expected service to be down") - } - -} -*/ diff --git a/services/ees.go b/services/ees.go index 377401e77..1d2ae605c 100644 --- a/services/ees.go +++ b/services/ees.go @@ -95,9 +95,7 @@ func (es *EventExporterService) Shutdown() (err error) { es.Lock() defer es.Unlock() close(es.stopChan) - if err = es.eeS.Shutdown(); err != nil { - return - } + es.eeS.Shutdown() es.eeS = nil <-es.intConnChan return @@ -117,12 +115,7 @@ func (es *EventExporterService) Start() (err error) { es.Lock() defer es.Unlock() - es.eeS, err = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr) - if err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", - utils.EventExporterS, err)) - return - } + es.eeS = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr) es.stopChan = make(chan struct{}) go es.eeS.ListenAndServe(es.stopChan, es.rldChan) diff --git a/services/ees_it_test.go b/services/ees_it_test.go index 62579bc08..ab86c0c39 100644 --- a/services/ees_it_test.go +++ b/services/ees_it_test.go @@ -114,3 +114,32 @@ func TestEventExporterSReload(t *testing.T) { shdChan.CloseOnce() time.Sleep(10 * time.Millisecond) } + +func TestEventExporterSReload2(t *testing.T) { + for _, dir := range []string{"/tmp/testCSV", "/tmp/testComposedCSV", "/tmp/testFWV", "/tmp/testCSVMasked", + "/tmp/testCSVfromVirt", "/tmp/testCSVExpTemp"} { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + t.Fatal("Error creating folder: ", dir, err) + } + } + cfg := config.NewDefaultCGRConfig() + + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + cfg.AttributeSCfg().Enabled = true + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + server := cores.NewServer(nil) + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) + ees := NewEventExporterService(cfg, filterSChan, engine.NewConnManager(cfg, nil), + server, make(chan rpcclient.ClientConnector, 2), anz, srvDep) + if ees.IsRunning() { + t.Fatalf("Expected service to be down") + } + +} diff --git a/services/freeswitchagent.go b/services/freeswitchagent.go index 3113b92f6..fb731f7f3 100644 --- a/services/freeswitchagent.go +++ b/services/freeswitchagent.go @@ -81,12 +81,15 @@ func (fS *FreeswitchAgent) Reload() (err error) { return } fS.fS.Reload() - go func(f *agents.FSsessions) { - if err := fS.fS.Connect(); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err)) - fS.shdChan.CloseOnce() // stop the engine here - } - }(fS.fS) + go fS.reload(fS.fS) + return +} + +func (fS *FreeswitchAgent) reload(f *agents.FSsessions) (err error) { + if err := fS.fS.Connect(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err)) + fS.shdChan.CloseOnce() // stop the engine here + } return } @@ -94,9 +97,7 @@ func (fS *FreeswitchAgent) Reload() (err error) { func (fS *FreeswitchAgent) Shutdown() (err error) { fS.Lock() defer fS.Unlock() - if err = fS.fS.Shutdown(); err != nil { - return - } + err = fS.fS.Shutdown() fS.fS = nil return } diff --git a/services/freeswitchagent_it_test.go b/services/freeswitchagent_it_test.go index 3c678e9ee..cef5b9078 100644 --- a/services/freeswitchagent_it_test.go +++ b/services/freeswitchagent_it_test.go @@ -19,7 +19,21 @@ along with this program. If not, see */ package services -/* +import ( + "path" + "sync" + "testing" + "time" + + "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + func TestFreeSwitchAgentReload(t *testing.T) { cfg := config.NewDefaultCGRConfig() @@ -68,22 +82,9 @@ func TestFreeSwitchAgentReload(t *testing.T) { time.Sleep(10 * time.Millisecond) //need to switch to gorutine // the engine should be stopped as we could not connect to freeswitch - agentCfg := &config.FsAgentCfg{ - Enabled: true, - CreateCdr: true, - SubscribePark: true, - EventSocketConns: []*config.FsConnCfg{}, - } - - srv.(*FreeswitchAgent).fS = agents.NewFSsessions(agentCfg, "", nil) - runtime.Gosched() - err := srv.Reload() - if err != nil { - t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err) - } - time.Sleep(10 * time.Millisecond) } + func TestFreeSwitchAgentReload2(t *testing.T) { cfg := config.NewDefaultCGRConfig() @@ -148,4 +149,111 @@ func TestFreeSwitchAgentReload3(t *testing.T) { t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } } -*/ + +func TestFreeSwitchAgentReload4(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.SessionSCfg().Enabled = true + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + chS := engine.NewCacheS(cfg, nil, nil) + cacheSChan := make(chan rpcclient.ClientConnector, 1) + cacheSChan <- chS + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep) + if srv.IsRunning() { + t.Fatalf("Expected service to be down") + } + agentCfg := &config.FsAgentCfg{ + Enabled: true, + SessionSConns: nil, + SubscribePark: true, + CreateCdr: true, + ExtraFields: nil, + LowBalanceAnnFile: "", + EmptyBalanceContext: "", + EmptyBalanceAnnFile: "", + MaxWaitConnection: 0, + EventSocketConns: []*config.FsConnCfg{ + { + Address: "", + Password: "", + Reconnects: 0, + Alias: "", + }, + }, + } + srv.(*FreeswitchAgent).fS = agents.NewFSsessions(agentCfg, "", nil) + err := srv.(*FreeswitchAgent).reload(srv.(*FreeswitchAgent).fS) + if err != nil { + t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err) + } +} + +func TestFreeSwitchAgentReload5(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.SessionSCfg().Enabled = true + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + chS := engine.NewCacheS(cfg, nil, nil) + cacheSChan := make(chan rpcclient.ClientConnector, 1) + cacheSChan <- chS + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep) + if srv.IsRunning() { + t.Fatalf("Expected service to be down") + } + + srv.(*FreeswitchAgent).fS = nil + err := srv.Start() + if err != nil { + t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err) + } +} + +func TestFreeSwitchAgentReload6(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.SessionSCfg().Enabled = true + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + chS := engine.NewCacheS(cfg, nil, nil) + cacheSChan := make(chan rpcclient.ClientConnector, 1) + cacheSChan <- chS + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep) + if srv.IsRunning() { + t.Fatalf("Expected service to be down") + } + agentCfg := &config.FsAgentCfg{ + Enabled: true, + SessionSConns: nil, + SubscribePark: true, + CreateCdr: true, + ExtraFields: nil, + LowBalanceAnnFile: "", + EmptyBalanceContext: "", + EmptyBalanceAnnFile: "", + MaxWaitConnection: 0, + EventSocketConns: []*config.FsConnCfg{ + { + Address: "", + Password: "", + Reconnects: 0, + Alias: "", + }, + }, + } + srv.(*FreeswitchAgent).fS = agents.NewFSsessions(agentCfg, "", nil) + err := srv.Reload() + if err != nil { + t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err) + } +} diff --git a/services/kamailioagent_it_test.go b/services/kamailioagent_it_test.go index f9db1d6ce..09679e487 100644 --- a/services/kamailioagent_it_test.go +++ b/services/kamailioagent_it_test.go @@ -68,7 +68,7 @@ func TestKamailioAgentReload(t *testing.T) { } if srv.IsRunning() { - t.Errorf("Expected service to be down") + t.Fatalf("Expected service to be down") } var reply string if err := cfg.V1ReloadConfig(&config.ReloadArgs{ @@ -77,7 +77,7 @@ func TestKamailioAgentReload(t *testing.T) { }, &reply); err != nil { t.Fatal(err) } else if reply != utils.OK { - t.Errorf("Expecting OK ,received %s", reply) + t.Fatalf("Expecting OK ,received %s", reply) } runtime.Gosched() @@ -94,7 +94,7 @@ func TestKamailioAgentReload(t *testing.T) { err := srv.Reload() if err != nil { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) + t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } time.Sleep(10 * time.Millisecond) //need to switch to gorutine // the engine should be stoped as we could not connect to kamailio @@ -102,3 +102,53 @@ func TestKamailioAgentReload(t *testing.T) { shdChan.CloseOnce() time.Sleep(10 * time.Millisecond) } + +func TestKamailioAgentReload2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.SessionSCfg().Enabled = true + cfg.SessionSCfg().ListenBijson = "" + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + srv := NewKamailioAgent(cfg, shdChan, nil, srvDep) + srvKam := &agents.KamailioAgent{} + if srv.IsRunning() { + t.Fatalf("Expected service to be down") + } + srv.(*KamailioAgent).kam = srvKam + if !srv.IsRunning() { + t.Fatalf("Expected service to be running") + } + err := srv.Start() + if err == nil || err.Error() != "service already running" { + t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "service already running", err) + } +} + +func TestKamailioAgentReload3(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.SessionSCfg().Enabled = true + cfg.SessionSCfg().ListenBijson = "" + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + srv := NewKamailioAgent(cfg, shdChan, nil, srvDep) + srvKam := &agents.KamailioAgent{} + if srv.IsRunning() { + t.Fatalf("Expected service to be down") + } + srv.(*KamailioAgent).kam = srvKam + if !srv.IsRunning() { + t.Fatalf("Expected service to be running") + } + err := srv.Start() + if err == nil || err.Error() != "service already running" { + t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "service already running", err) + } +} diff --git a/services/radiusagent_it_test.go b/services/radiusagent_it_test.go index d24b12c44..08282722e 100644 --- a/services/radiusagent_it_test.go +++ b/services/radiusagent_it_test.go @@ -19,7 +19,22 @@ along with this program. If not, see */ package services -/* +import ( + "path" + "runtime" + "sync" + "testing" + "time" + + "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + func TestRadiusAgentReload(t *testing.T) { cfg := config.NewDefaultCGRConfig() @@ -221,4 +236,3 @@ func TestRadiusAgentReload4(t *testing.T) { t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", "unsupported network: ", err) } } -*/ diff --git a/services/sessions.go b/services/sessions.go index dfd7a6d74..848c51899 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -113,16 +113,19 @@ func (smg *SessionService) Start() (err error) { smg.server.BiRPCRegisterName(method, handler) } // run this in it's own goroutine - go func() { - if err := smg.server.ServeBiRPC(smg.cfg.SessionSCfg().ListenBijson, - smg.cfg.SessionSCfg().ListenBigob, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> serve BiRPC error: %s!", utils.SessionS, err)) - smg.Lock() - smg.bircpEnabled = false - smg.Unlock() - smg.shdChan.CloseOnce() - } - }() + go smg.start() + } + return +} + +func (smg *SessionService) start() (err error) { + if err := smg.server.ServeBiRPC(smg.cfg.SessionSCfg().ListenBijson, + smg.cfg.SessionSCfg().ListenBigob, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> serve BiRPC error: %s!", utils.SessionS, err)) + smg.Lock() + smg.bircpEnabled = false + smg.Unlock() + smg.shdChan.CloseOnce() } return } diff --git a/services/sessions_it_test.go b/services/sessions_it_test.go index f57df2089..3bc00435f 100644 --- a/services/sessions_it_test.go +++ b/services/sessions_it_test.go @@ -133,7 +133,7 @@ func TestSessionSReload1(t *testing.T) { srv.(*SessionService).sm.BiRPCv1InitiateSession(nil, args, rply) err = srv.Shutdown() if err == nil || err != utils.ErrPartiallyExecuted { - t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrPartiallyExecuted, err) + t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrPartiallyExecuted, err) } } @@ -178,23 +178,73 @@ func TestSessionSReload2(t *testing.T) { srv.(*SessionService).sm = &sessions.SessionS{} if !srv.IsRunning() { - t.Errorf("\nExpecting service to be running") + t.Fatalf("\nExpecting service to be running") } err2 := srv.Start() if err2 != utils.ErrServiceAlreadyRunning { - t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err2) + t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err2) } cfg.SessionSCfg().Enabled = false err := srv.Reload() if err != nil { - t.Errorf("\nExpecting ,\n Received <%+v>", err) + t.Fatalf("\nExpecting ,\n Received <%+v>", err) } time.Sleep(10 * time.Millisecond) srv.(*SessionService).sm = nil if srv.IsRunning() { - t.Errorf("Expected service to be down") + t.Fatalf("Expected service to be down") } shdChan.CloseOnce() time.Sleep(10 * time.Millisecond) } + +func TestSessionSReload3(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + + cfg.ChargerSCfg().Enabled = true + cfg.RalsCfg().Enabled = true + cfg.CdrsCfg().Enabled = true + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + chS := engine.NewCacheS(cfg, nil, nil) + close(chS.GetPrecacheChannel(utils.CacheChargerProfiles)) + close(chS.GetPrecacheChannel(utils.CacheChargerFilterIndexes)) + close(chS.GetPrecacheChannel(utils.CacheDestinations)) + close(chS.GetPrecacheChannel(utils.CacheReverseDestinations)) + close(chS.GetPrecacheChannel(utils.CacheRatingPlans)) + close(chS.GetPrecacheChannel(utils.CacheRatingProfiles)) + close(chS.GetPrecacheChannel(utils.CacheActions)) + close(chS.GetPrecacheChannel(utils.CacheActionPlans)) + close(chS.GetPrecacheChannel(utils.CacheAccountActionPlans)) + close(chS.GetPrecacheChannel(utils.CacheActionTriggers)) + close(chS.GetPrecacheChannel(utils.CacheSharedGroups)) + close(chS.GetPrecacheChannel(utils.CacheTimings)) + + internalChan := make(chan rpcclient.ClientConnector, 1) + internalChan <- nil + cacheSChan := make(chan rpcclient.ClientConnector, 1) + cacheSChan <- chS + + server := cores.NewServer(nil) + + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + db := NewDataDBService(cfg, nil, srvDep) + cfg.StorDbCfg().Type = utils.INTERNAL + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) + srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, nil, anz, srvDep) + engine.NewConnManager(cfg, nil) + + srv.(*SessionService).sm = &sessions.SessionS{} + if !srv.IsRunning() { + t.Fatalf("\nExpecting service to be running") + } + err2 := srv.(*SessionService).start() + if err2 != nil { + t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err2) + } + +} diff --git a/services/sipagent_it_test.go b/services/sipagent_it_test.go index 6ce9bcd7b..95827f517 100644 --- a/services/sipagent_it_test.go +++ b/services/sipagent_it_test.go @@ -88,7 +88,7 @@ func TestSIPAgentReload(t *testing.T) { } err := srv.Reload() if err != nil { - t.Fatalf("\nExpecting ,\n Received <%+v>", err) + t.Fatalf("\nExpecting ,\n Received <%+v>", err) } time.Sleep(10 * time.Millisecond) cfg.SIPAgentCfg().Enabled = false @@ -101,16 +101,63 @@ func TestSIPAgentReload(t *testing.T) { time.Sleep(10 * time.Millisecond) } -/* -WILLFIX - castSrv, canCastSrv := srv.(*SIPAgent) - if !canCastSrv { - t.Fatalf("cannot cast") +func TestSIPAgentReload2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.SessionSCfg().Enabled = true + cfg.SessionSCfg().ListenBijson = "" + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + chS := engine.NewCacheS(cfg, nil, nil) + cacheSChan := make(chan rpcclient.ClientConnector, 1) + cacheSChan <- chS + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, srvDep) + if srv.IsRunning() { + t.Fatalf("Expected service to be down") } - castSrv.sip = srvSIP - castSrv.oldListen = "test_string" + cfg.SIPAgentCfg().RequestProcessors = []*config.RequestProcessor{ + { + RequestFields: []*config.FCTemplate{ + { + Type: utils.MetaTemplate, + }, + }, + }, + } + err := srv.Start() + if err == nil || err.Error() != "no template with id: <>" { + t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", "no template with id: <>", err) + } + +} + +func TestSIPAgentReload3(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.SessionSCfg().Enabled = true + cfg.SessionSCfg().ListenBijson = "" + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + chS := engine.NewCacheS(cfg, nil, nil) + cacheSChan := make(chan rpcclient.ClientConnector, 1) + cacheSChan <- chS + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, srvDep) + if srv.IsRunning() { + t.Fatalf("Expected service to be down") + } + err := srv.Start() + if err != nil { + t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", nil, err) + } + srv.(*SIPAgent).oldListen = "test" err = srv.Reload() if err != nil { - t.Fatalf("\nExpecting ,\n Received <%+v>", err) + t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", nil, err) } -*/ +}