diff --git a/engine/resources.go b/engine/resources.go index 130a5b8ad..8f5b7e07e 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -345,12 +345,11 @@ type ResourceService struct { } // Shutdown is called to shutdown the service -func (rS *ResourceService) Shutdown() error { +func (rS *ResourceService) Shutdown() { utils.Logger.Info(" service shutdown initialized") close(rS.stopBackup) rS.storeResources() utils.Logger.Info(" service shutdown complete") - return nil } // StoreResource stores the resource in DB and corrects dirty flag diff --git a/engine/routes.go b/engine/routes.go index caefc89eb..19b453a60 100644 --- a/engine/routes.go +++ b/engine/routes.go @@ -136,10 +136,9 @@ type RouteService struct { } // Shutdown is called to shutdown the service -func (rpS *RouteService) Shutdown() error { +func (rpS *RouteService) Shutdown() { utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.RouteS)) utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.RouteS)) - return nil } // matchingRouteProfilesForEvent returns ordered list of matching resources which are active by the time of the call diff --git a/rates/rates.go b/rates/rates.go index dbc89ee91..b88b8c993 100644 --- a/rates/rates.go +++ b/rates/rates.go @@ -58,9 +58,8 @@ func (rS *RateS) ListenAndServe(stopChan, cfgRld chan struct{}) { } // Shutdown is called to shutdown the service -func (rS *RateS) Shutdown() (err error) { +func (rS *RateS) Shutdown() { utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.RateS)) - return } // Call implements rpcclient.ClientConnector interface for internal RPC diff --git a/services/kamailioagent.go b/services/kamailioagent.go index dab69d9db..1330ac032 100644 --- a/services/kamailioagent.go +++ b/services/kamailioagent.go @@ -84,15 +84,18 @@ func (kam *KamailioAgent) Reload() (err error) { return } kam.kam.Reload() - go func(k *agents.KamailioAgent) { - if err = k.Connect(); err != nil { - if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log - return - } - utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err)) - kam.shdChan.CloseOnce() + go kam.reload(kam.kam) + return +} + +func (kam *KamailioAgent) reload(k *agents.KamailioAgent) (err error) { + if err = k.Connect(); err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log + return } - }(kam.kam) + utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err)) + kam.shdChan.CloseOnce() + } return } @@ -100,9 +103,7 @@ func (kam *KamailioAgent) Reload() (err error) { func (kam *KamailioAgent) Shutdown() (err error) { kam.Lock() defer kam.Unlock() - if err = kam.kam.Shutdown(); err != nil { - return - } + err = kam.kam.Shutdown() kam.kam = nil return } diff --git a/services/loaders_it_test.go b/services/loaders_it_test.go index 65b4ba924..6cb3551cd 100644 --- a/services/loaders_it_test.go +++ b/services/loaders_it_test.go @@ -141,3 +141,54 @@ func testCleanupFiles(t *testing.T) { } } } + +func TestLoaderSReload2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + for _, ld := range cfg.LoaderCfg() { + ld.Enabled = false + } + shdChan := utils.NewSyncedChan() + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + server := cores.NewServer(nil) + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + db := NewDataDBService(cfg, nil, srvDep) + db.dbchan <- new(engine.DataManager) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) + srv := NewLoaderService(cfg, db, filterSChan, + server, make(chan rpcclient.ClientConnector, 1), + nil, anz, srvDep) + err := srv.Start() + if err != nil { + t.Fatal(err) + } +} + +func TestLoaderSReload3(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + for _, ld := range cfg.LoaderCfg() { + ld.Enabled = false + } + cfg.LoaderCfg()[0].Enabled = true + cfg.LoaderCfg()[0].TpInDir = "/tmp/TestLoaderSReload3" + cfg.LoaderCfg()[0].RunDelay = -1 + shdChan := utils.NewSyncedChan() + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + server := cores.NewServer(nil) + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + db := NewDataDBService(cfg, nil, srvDep) + db.dbchan <- new(engine.DataManager) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) + srv := NewLoaderService(cfg, db, filterSChan, + server, make(chan rpcclient.ClientConnector, 1), + nil, anz, srvDep) + err := srv.Start() + if err == nil || err.Error() != "no such file or directory" { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "no such file or directory", err) + } + err = srv.Reload() + if err == nil || err.Error() != "no such file or directory" { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "no such file or directory", err) + } +} diff --git a/services/rals.go b/services/rals.go index 5edb8ecd7..01da8527c 100644 --- a/services/rals.go +++ b/services/rals.go @@ -82,13 +82,7 @@ func (rals *RalService) Start() (err error) { <-rals.cacheS.GetPrecacheChannel(utils.CacheActionTriggers) <-rals.cacheS.GetPrecacheChannel(utils.CacheSharedGroups) <-rals.cacheS.GetPrecacheChannel(utils.CacheTimings) - - if err = rals.responder.Start(); err != nil { - if err != utils.ErrServiceAlreadyRunning { - return - } - err = nil - } + rals.responder.Start() //we don't verify the error because responder.Start() always returns service already running rals.rals = v1.NewRALsV1() @@ -103,9 +97,7 @@ func (rals *RalService) Start() (err error) { // Reload handles the change of config func (rals *RalService) Reload() (err error) { engine.SetRpSubjectPrefixMatching(rals.cfg.RalsCfg().RpSubjectPrefixMatching) - if err = rals.responder.Reload(); err != nil { - return - } + rals.responder.Reload() //we don't verify the error because responder.Reload never returns an error return } @@ -113,9 +105,7 @@ func (rals *RalService) Reload() (err error) { func (rals *RalService) Shutdown() (err error) { rals.Lock() defer rals.Unlock() - if err = rals.responder.Shutdown(); err != nil { - return - } + err = rals.responder.Shutdown() //we don't verify the error because responder.Reload never returns an error rals.rals = nil <-rals.connChan return diff --git a/services/rals_it_test.go b/services/rals_it_test.go index b443e5107..35016decc 100644 --- a/services/rals_it_test.go +++ b/services/rals_it_test.go @@ -130,3 +130,46 @@ func TestRalsReload(t *testing.T) { shdChan.CloseOnce() time.Sleep(10 * time.Millisecond) } +func TestRalsReload2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + chS := engine.NewCacheS(cfg, nil, nil) + close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) + close(chS.GetPrecacheChannel(utils.CacheThresholds)) + close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)) + close(chS.GetPrecacheChannel(utils.CacheDestinations)) + close(chS.GetPrecacheChannel(utils.CacheReverseDestinations)) + close(chS.GetPrecacheChannel(utils.CacheRatingPlans)) + close(chS.GetPrecacheChannel(utils.CacheRatingProfiles)) + close(chS.GetPrecacheChannel(utils.CacheActions)) + close(chS.GetPrecacheChannel(utils.CacheActionPlans)) + close(chS.GetPrecacheChannel(utils.CacheAccountActionPlans)) + close(chS.GetPrecacheChannel(utils.CacheActionTriggers)) + close(chS.GetPrecacheChannel(utils.CacheSharedGroups)) + close(chS.GetPrecacheChannel(utils.CacheTimings)) + + cfg.ThresholdSCfg().Enabled = true + server := cores.NewServer(nil) + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + cfg.StorDbCfg().Type = utils.INTERNAL + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) + ralS := NewRalService(cfg, chS, server, + make(chan rpcclient.ClientConnector, 1), + make(chan rpcclient.ClientConnector, 1), + shdChan, nil, anz, srvDep) + ralS.responder.resp = &engine.Responder{ + ShdChan: shdChan, + Timeout: 0, + Timezone: "", + MaxComputedUsage: nil, + } + err := ralS.Start() + if err != nil { + t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", nil, err) + } +} diff --git a/services/rates.go b/services/rates.go index ea2af9b9b..a6c6e708e 100644 --- a/services/rates.go +++ b/services/rates.go @@ -101,9 +101,7 @@ func (rs *RateService) Shutdown() (err error) { rs.Lock() defer rs.Unlock() close(rs.stopChan) - if err = rs.rateS.Shutdown(); err != nil { - return - } + rs.rateS.Shutdown() //we don't verify the error because shutdown never returns an err rs.rateS = nil <-rs.intConnChan return diff --git a/services/resources.go b/services/resources.go index 6116954e3..f043e4476 100644 --- a/services/resources.go +++ b/services/resources.go @@ -109,9 +109,7 @@ func (reS *ResourceService) Shutdown() (err error) { defer reS.srvDep[utils.DataDB].Done() reS.Lock() defer reS.Unlock() - if err = reS.reS.Shutdown(); err != nil { - return - } + reS.reS.Shutdown() //we don't verify the error because shutdown never returns an error reS.reS = nil reS.rpc = nil <-reS.connChan diff --git a/services/routes.go b/services/routes.go index acde84caf..4f4aefb9e 100644 --- a/services/routes.go +++ b/services/routes.go @@ -104,9 +104,7 @@ func (routeS *RouteService) Reload() (err error) { func (routeS *RouteService) Shutdown() (err error) { routeS.Lock() defer routeS.Unlock() - if err = routeS.routeS.Shutdown(); err != nil { - return - } + routeS.routeS.Shutdown() //we don't verify the error because shutdown never returns an error routeS.routeS = nil routeS.rpc = nil <-routeS.connChan