mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Use SyncedChan to handle shutdown
This commit is contained in:
committed by
Dan Christian Bogos
parent
c49e67b2ed
commit
089dfc00ae
@@ -60,7 +60,7 @@ func TestCoreSSleep(t *testing.T) {
|
||||
func TestCoreSShutdown(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
caps := engine.NewCaps(2, utils.MetaTopUp)
|
||||
shutdown := make(chan struct{})
|
||||
shutdown := utils.NewSyncedChan()
|
||||
coreService := cores.NewCoreService(cfg, caps, nil, make(chan struct{}), nil, shutdown)
|
||||
cS := NewCoreSv1(coreService)
|
||||
arg := &utils.CGREvent{}
|
||||
@@ -71,7 +71,7 @@ func TestCoreSShutdown(t *testing.T) {
|
||||
t.Errorf("Expected OK, received %+v", reply)
|
||||
}
|
||||
select {
|
||||
case <-shutdown:
|
||||
case <-shutdown.Done():
|
||||
default:
|
||||
t.Error("engine did not shut down")
|
||||
}
|
||||
|
||||
@@ -86,7 +86,7 @@ func runCGREngine(fs []string) (err error) {
|
||||
|
||||
shdWg := new(sync.WaitGroup)
|
||||
shdWg.Add(1)
|
||||
shutdown := make(chan struct{})
|
||||
shutdown := utils.NewSyncedChan()
|
||||
go handleSignals(shutdown, cfg, shdWg)
|
||||
|
||||
if *flags.ScheduledShutdown != utils.EmptyString {
|
||||
@@ -100,8 +100,8 @@ func runCGREngine(fs []string) (err error) {
|
||||
tm := time.NewTimer(shtDwDur)
|
||||
select {
|
||||
case <-tm.C:
|
||||
close(shutdown)
|
||||
case <-shutdown:
|
||||
shutdown.CloseOnce()
|
||||
case <-shutdown.Done():
|
||||
tm.Stop()
|
||||
}
|
||||
}()
|
||||
@@ -266,7 +266,7 @@ func runCGREngine(fs []string) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
<-shutdown
|
||||
<-shutdown.Done()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -315,7 +315,7 @@ func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, cfg *config
|
||||
iServMngrCh <- anz.GetInternalCodec(srv, utils.ServiceManager)
|
||||
}
|
||||
|
||||
func cgrStartRPC(cfg *config.CGRConfig, registry *servmanager.ServiceRegistry, shutdown chan struct{}) {
|
||||
func cgrStartRPC(cfg *config.CGRConfig, registry *servmanager.ServiceRegistry, shutdown *utils.SyncedChan) {
|
||||
if cfg.DispatcherSCfg().Enabled { // wait only for dispatcher as cache is allways registered before this
|
||||
if utils.StructChanTimeout(
|
||||
registry.Lookup(utils.DispatcherS).StateChan(utils.StateServiceUP),
|
||||
@@ -327,7 +327,7 @@ func cgrStartRPC(cfg *config.CGRConfig, registry *servmanager.ServiceRegistry, s
|
||||
cl.StartServer(cfg, shutdown)
|
||||
}
|
||||
|
||||
func handleSignals(stopChan chan struct{}, cfg *config.CGRConfig, shdWg *sync.WaitGroup) {
|
||||
func handleSignals(shutdown *utils.SyncedChan, cfg *config.CGRConfig, shdWg *sync.WaitGroup) {
|
||||
defer shdWg.Done()
|
||||
shutdownSignal := make(chan os.Signal, 1)
|
||||
reloadSignal := make(chan os.Signal, 1)
|
||||
@@ -336,10 +336,10 @@ func handleSignals(stopChan chan struct{}, cfg *config.CGRConfig, shdWg *sync.Wa
|
||||
signal.Notify(reloadSignal, syscall.SIGHUP)
|
||||
for {
|
||||
select {
|
||||
case <-stopChan:
|
||||
case <-shutdown.Done():
|
||||
return
|
||||
case <-shutdownSignal:
|
||||
close(stopChan)
|
||||
shutdown.CloseOnce()
|
||||
case <-reloadSignal:
|
||||
// do it in its own goroutine in order to not block the signal handler with the reload functionality
|
||||
go func() {
|
||||
|
||||
@@ -123,8 +123,8 @@ func testServeJSON(t *testing.T) {
|
||||
|
||||
buff := new(bytes.Buffer)
|
||||
log.SetOutput(buff)
|
||||
shutdown := make(chan struct{})
|
||||
defer close(shutdown)
|
||||
shutdown := utils.NewSyncedChan()
|
||||
defer shutdown.CloseOnce()
|
||||
defer server.Stop()
|
||||
go server.ServeJSON(":88845", shutdown)
|
||||
runtime.Gosched()
|
||||
|
||||
@@ -133,10 +133,10 @@ func (c *CommonListenerS) handleWebSocket(ws *websocket.Conn) {
|
||||
c.rpcServer.ServeCodec(newCapsJSONCodec(ws, c.caps, c.anz))
|
||||
}
|
||||
|
||||
func (c *CommonListenerS) ServeJSON(addr string, shutdown chan struct{}) (err error) {
|
||||
func (c *CommonListenerS) ServeJSON(addr string, shutdown *utils.SyncedChan) (err error) {
|
||||
if c.rpcJSONl, err = net.Listen(utils.TCP, addr); err != nil {
|
||||
log.Printf("Serve%s listen error: %s", utils.JSONCaps, err)
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s server at <%s>.", utils.JSONCaps, addr))
|
||||
@@ -145,10 +145,10 @@ func (c *CommonListenerS) ServeJSON(addr string, shutdown chan struct{}) (err er
|
||||
})
|
||||
}
|
||||
|
||||
func (c *CommonListenerS) ServeGOB(addr string, shutdown chan struct{}) (err error) {
|
||||
func (c *CommonListenerS) ServeGOB(addr string, shutdown *utils.SyncedChan) (err error) {
|
||||
if c.rpcGOBl, err = net.Listen(utils.TCP, addr); err != nil {
|
||||
log.Printf("Serve%s listen error: %s", utils.GOBCaps, err)
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s server at <%s>.", utils.GOBCaps, addr))
|
||||
@@ -158,7 +158,7 @@ func (c *CommonListenerS) ServeGOB(addr string, shutdown chan struct{}) (err err
|
||||
}
|
||||
|
||||
func (c *CommonListenerS) ServeHTTP(addr, jsonRPCURL, wsRPCURL, promURL, pprofPath string,
|
||||
useBasicAuth bool, userList map[string]string, shutdown chan struct{}) {
|
||||
useBasicAuth bool, userList map[string]string, shutdown *utils.SyncedChan) {
|
||||
c.mu.Lock()
|
||||
c.httpEnabled = c.httpEnabled || jsonRPCURL != "" || wsRPCURL != "" || pprofPath != ""
|
||||
enabled := c.httpEnabled
|
||||
@@ -218,7 +218,7 @@ func (c *CommonListenerS) ServeHTTP(addr, jsonRPCURL, wsRPCURL, promURL, pprofPa
|
||||
c.httpServer.Addr = addr
|
||||
if err := c.httpServer.ListenAndServe(); err != nil {
|
||||
log.Println(fmt.Sprintf("<HTTP>Error: %s when listening ", err))
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -249,16 +249,16 @@ func (c *CommonListenerS) ServeBiRPC(addrJSON, addrGOB string, onConn, onDis fun
|
||||
}
|
||||
|
||||
func (c *CommonListenerS) ServeGOBTLS(addr, serverCrt, serverKey, caCert string, serverPolicy int,
|
||||
serverName string, shutdown chan struct{}) (err error) {
|
||||
serverName string, shutdown *utils.SyncedChan) (err error) {
|
||||
config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName)
|
||||
if err != nil {
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
return
|
||||
}
|
||||
c.rpcGOBlTLS, err = tls.Listen(utils.TCP, addr, config)
|
||||
if err != nil {
|
||||
log.Println(fmt.Sprintf("Error: %s when listening", err))
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s TLS server at <%s>.", utils.GOBCaps, addr))
|
||||
@@ -269,16 +269,16 @@ func (c *CommonListenerS) ServeGOBTLS(addr, serverCrt, serverKey, caCert string,
|
||||
}
|
||||
|
||||
func (c *CommonListenerS) ServeJSONTLS(addr, serverCrt, serverKey, caCert string, serverPolicy int,
|
||||
serverName string, shutdown chan struct{}) (err error) {
|
||||
serverName string, shutdown *utils.SyncedChan) (err error) {
|
||||
config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName)
|
||||
if err != nil {
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
return
|
||||
}
|
||||
c.rpcJSONlTLS, err = tls.Listen(utils.TCP, addr, config)
|
||||
if err != nil {
|
||||
log.Println(fmt.Sprintf("Error: %s when listening", err))
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s TLS server at <%s>.", utils.JSONCaps, addr))
|
||||
@@ -290,7 +290,7 @@ func (c *CommonListenerS) ServeJSONTLS(addr, serverCrt, serverKey, caCert string
|
||||
|
||||
func (c *CommonListenerS) ServeHTTPS(addr, serverCrt, serverKey, caCert string, serverPolicy int,
|
||||
serverName, jsonRPCURL, wsRPCURL, pprofPath string, useBasicAuth bool, userList map[string]string,
|
||||
shutdown chan struct{}) {
|
||||
shutdown *utils.SyncedChan) {
|
||||
c.mu.Lock()
|
||||
c.httpEnabled = c.httpEnabled || jsonRPCURL != "" || wsRPCURL != "" || pprofPath != ""
|
||||
enabled := c.httpEnabled
|
||||
@@ -339,7 +339,7 @@ func (c *CommonListenerS) ServeHTTPS(addr, serverCrt, serverKey, caCert string,
|
||||
}
|
||||
config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName)
|
||||
if err != nil {
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
return
|
||||
}
|
||||
c.httpsServer.Addr = addr
|
||||
@@ -348,7 +348,7 @@ func (c *CommonListenerS) ServeHTTPS(addr, serverCrt, serverKey, caCert string,
|
||||
|
||||
if err := c.httpsServer.ListenAndServeTLS(serverCrt, serverKey); err != nil {
|
||||
log.Println(fmt.Sprintf("<HTTPS>Error: %s when listening ", err))
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -380,7 +380,7 @@ func (c *CommonListenerS) StopBiRPC() {
|
||||
c.birpcSrv = birpc.NewBirpcServer()
|
||||
}
|
||||
|
||||
func (c *CommonListenerS) StartServer(cfg *config.CGRConfig, shutdown chan struct{}) {
|
||||
func (c *CommonListenerS) StartServer(cfg *config.CGRConfig, shutdown *utils.SyncedChan) {
|
||||
c.startSrv.Do(func() {
|
||||
go c.ServeJSON(cfg.ListenCfg().RPCJSONListen, shutdown)
|
||||
go c.ServeGOB(cfg.ListenCfg().RPCGOBListen, shutdown)
|
||||
|
||||
@@ -124,14 +124,14 @@ func loadTLSConfig(serverCrt, serverKey, caCert string, serverPolicy int,
|
||||
return
|
||||
}
|
||||
|
||||
func acceptRPC(shutdown chan struct{}, srv *birpc.Server, l net.Listener, codecName string, newCodec func(conn conn) birpc.ServerCodec) (err error) {
|
||||
func acceptRPC(shutdown *utils.SyncedChan, srv *birpc.Server, l net.Listener, codecName string, newCodec func(conn conn) birpc.ServerCodec) (err error) {
|
||||
var errCnt int
|
||||
var lastErrorTime time.Time
|
||||
for {
|
||||
var conn net.Conn
|
||||
if conn, err = l.Accept(); err != nil {
|
||||
select {
|
||||
case <-shutdown:
|
||||
case <-shutdown.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
@@ -143,7 +143,7 @@ func acceptRPC(shutdown chan struct{}, srv *birpc.Server, l net.Listener, codecN
|
||||
lastErrorTime = time.Now()
|
||||
errCnt++
|
||||
if errCnt > 50 { // Too many errors in short interval, network buffer failure most probably
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
return
|
||||
}
|
||||
continue
|
||||
|
||||
@@ -34,7 +34,7 @@ import (
|
||||
)
|
||||
|
||||
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, fileCPU *os.File, stopChan chan struct{},
|
||||
shdWg *sync.WaitGroup, shutdown chan struct{}) *CoreS {
|
||||
shdWg *sync.WaitGroup, shutdown *utils.SyncedChan) *CoreS {
|
||||
var st *engine.CapsStats
|
||||
if caps.IsLimited() && cfg.CoreSCfg().CapsStatsInterval != 0 {
|
||||
st = engine.NewCapsStats(cfg.CoreSCfg().CapsStatsInterval, caps, stopChan)
|
||||
@@ -53,7 +53,7 @@ type CoreS struct {
|
||||
cfg *config.CGRConfig
|
||||
CapsStats *engine.CapsStats
|
||||
shdWg *sync.WaitGroup
|
||||
shutdown chan struct{}
|
||||
shutdown *utils.SyncedChan
|
||||
|
||||
memProfMux sync.Mutex
|
||||
finalMemProf string // full path of the final memory profile created on stop/shutdown
|
||||
@@ -66,7 +66,7 @@ type CoreS struct {
|
||||
}
|
||||
|
||||
func (cS *CoreS) ShutdownEngine() {
|
||||
close(cS.shutdown)
|
||||
cS.shutdown.CloseOnce()
|
||||
}
|
||||
|
||||
// Shutdown is called to shutdown the service
|
||||
|
||||
@@ -34,7 +34,7 @@ func TestNewCoreService(t *testing.T) {
|
||||
stopchan := make(chan struct{}, 1)
|
||||
caps := engine.NewCaps(1, utils.MetaBusy)
|
||||
sts := engine.NewCapsStats(cfgDflt.CoreSCfg().CapsStatsInterval, caps, stopchan)
|
||||
shutdown := make(chan struct{})
|
||||
shutdown := utils.NewSyncedChan()
|
||||
expected := &CoreS{
|
||||
cfg: cfgDflt,
|
||||
CapsStats: sts,
|
||||
@@ -49,7 +49,7 @@ func TestNewCoreService(t *testing.T) {
|
||||
rcv.Shutdown()
|
||||
rcv.ShutdownEngine()
|
||||
select {
|
||||
case <-shutdown:
|
||||
case <-shutdown.Done():
|
||||
default:
|
||||
t.Error("engine did not shut down")
|
||||
}
|
||||
|
||||
@@ -265,7 +265,7 @@ func (chS *CacheS) GetPrecacheChannel(chID string) chan struct{} {
|
||||
}
|
||||
|
||||
// Precache loads data from DataDB into cache at engine start
|
||||
func (chS *CacheS) Precache(shutdown chan struct{}) {
|
||||
func (chS *CacheS) Precache(shutdown *utils.SyncedChan) {
|
||||
for cacheID, cacheCfg := range chS.cfg.CacheCfg().Partitions {
|
||||
if !cacheCfg.Precache {
|
||||
close(chS.pcItems[cacheID]) // no need of precache
|
||||
@@ -278,7 +278,7 @@ func (chS *CacheS) Precache(shutdown chan struct{}) {
|
||||
false)
|
||||
if err != nil && err != context.Canceled {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> precaching cacheID <%s>, got error: %s", utils.CacheS, cacheID, err))
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
return
|
||||
}
|
||||
close(chS.pcItems[cacheID])
|
||||
|
||||
@@ -1373,7 +1373,7 @@ func TestCacheSPrecachePartitions(t *testing.T) {
|
||||
if _, err := dm.GetAttributeProfile(context.Background(), utils.CGRateSorg, "TEST_ATTRIBUTES_TEST", true, true, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
cacheS.Precache(make(chan struct{}))
|
||||
cacheS.Precache(utils.NewSyncedChan())
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
if rcv, ok := Cache.Get(utils.CacheAttributeProfiles, "cgrates.org:TEST_ATTRIBUTES_TEST"); !ok {
|
||||
@@ -1410,7 +1410,7 @@ func TestCacheSPrecacheErr(t *testing.T) {
|
||||
|
||||
cacheS := NewCacheS(cfg, nil, connMgr, nil)
|
||||
|
||||
cacheS.Precache(make(chan struct{}))
|
||||
cacheS.Precache(utils.NewSyncedChan())
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
expErr := "<CacheS> precaching cacheID <*accounts>, got error: NO_DATABASE_CONNECTION"
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ type AccountService struct {
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (acts *AccountService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (acts *AccountService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -102,7 +102,7 @@ func (acts *AccountService) Start(shutdown chan struct{}, registry *servmanager.
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (acts *AccountService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (acts *AccountService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
acts.rldChan <- struct{}{}
|
||||
return // for the moment nothing to reload
|
||||
}
|
||||
|
||||
@@ -60,7 +60,7 @@ type ActionService struct {
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (acts *ActionService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (acts *ActionService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -103,7 +103,7 @@ func (acts *ActionService) Start(shutdown chan struct{}, registry *servmanager.S
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (acts *ActionService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (acts *ActionService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
acts.rldChan <- struct{}{}
|
||||
return // for the moment nothing to reload
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ type AdminSv1Service struct {
|
||||
|
||||
// Start should handle the sercive start
|
||||
// For this service the start should be called from RAL Service
|
||||
func (apiService *AdminSv1Service) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (apiService *AdminSv1Service) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -100,7 +100,7 @@ func (apiService *AdminSv1Service) Start(_ chan struct{}, registry *servmanager.
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (apiService *AdminSv1Service) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (apiService *AdminSv1Service) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ type AnalyzerService struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (anz *AnalyzerService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (anz *AnalyzerService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
cls, err := waitForServiceState(utils.StateServiceUP, utils.CommonListenerS, registry,
|
||||
anz.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
@@ -75,7 +75,7 @@ func (anz *AnalyzerService) Start(shutdown chan struct{}, registry *servmanager.
|
||||
go func(a *analyzers.AnalyzerS) {
|
||||
if err := a.ListenAndServe(anzCtx); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets", utils.AnalyzerS, err.Error()))
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
}
|
||||
}(anz.anz)
|
||||
anz.cl.SetAnalyzer(anz.anz)
|
||||
@@ -103,7 +103,7 @@ func (anz *AnalyzerService) start(registry *servmanager.ServiceRegistry) {
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (anz *AnalyzerService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (anz *AnalyzerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
return // for the momment nothing to reload
|
||||
}
|
||||
|
||||
|
||||
@@ -55,14 +55,14 @@ type AsteriskAgent struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (ast *AsteriskAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (ast *AsteriskAgent) Start(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
ast.Lock()
|
||||
defer ast.Unlock()
|
||||
|
||||
listenAndServe := func(sma *agents.AsteriskAgent, stopChan chan struct{}) {
|
||||
if err := sma.ListenAndServe(stopChan); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> runtime error: %s!", utils.AsteriskAgent, err))
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
}
|
||||
}
|
||||
ast.stopChan = make(chan struct{})
|
||||
@@ -75,7 +75,7 @@ func (ast *AsteriskAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceRe
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (ast *AsteriskAgent) Reload(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (ast *AsteriskAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
ast.shutdown()
|
||||
return ast.Start(shutdown, registry)
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ type AttributeService struct {
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (attrS *AttributeService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (attrS *AttributeService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -110,7 +110,7 @@ func (attrS *AttributeService) Start(shutdown chan struct{}, registry *servmanag
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (attrS *AttributeService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (attrS *AttributeService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
return // for the moment nothing to reload
|
||||
}
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ type CacheService struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (cS *CacheService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (cS *CacheService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -90,7 +90,7 @@ func (cS *CacheService) Start(shutdown chan struct{}, registry *servmanager.Serv
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (cS *CacheService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (_ error) {
|
||||
func (cS *CacheService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (_ error) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -115,17 +115,17 @@ func (cS *CacheService) GetCacheSChan() chan *engine.CacheS {
|
||||
return cS.cacheCh
|
||||
}
|
||||
|
||||
func (cS *CacheService) WaitToPrecache(shutdown chan struct{}, cacheIDs ...string) (err error) {
|
||||
func (cS *CacheService) WaitToPrecache(shutdown *utils.SyncedChan, cacheIDs ...string) (err error) {
|
||||
var cacheS *engine.CacheS
|
||||
select {
|
||||
case <-shutdown:
|
||||
case <-shutdown.Done():
|
||||
return
|
||||
case cacheS = <-cS.cacheCh:
|
||||
cS.cacheCh <- cacheS
|
||||
}
|
||||
for _, cacheID := range cacheIDs {
|
||||
select {
|
||||
case <-shutdown:
|
||||
case <-shutdown.Done():
|
||||
return
|
||||
case <-cacheS.GetPrecacheChannel(cacheID):
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ type CDRService struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (cs *CDRService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (cs *CDRService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -93,7 +93,7 @@ func (cs *CDRService) Start(_ chan struct{}, registry *servmanager.ServiceRegist
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (cs *CDRService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (cs *CDRService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -54,7 +54,7 @@ type ChargerService struct {
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (chrS *ChargerService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) error {
|
||||
func (chrS *ChargerService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -94,7 +94,7 @@ func (chrS *ChargerService) Start(shutdown chan struct{}, registry *servmanager.
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (chrS *ChargerService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (chrS *ChargerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ type CommonListenerService struct {
|
||||
}
|
||||
|
||||
// Start handles the service start.
|
||||
func (cl *CommonListenerService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) error {
|
||||
func (cl *CommonListenerService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
|
||||
cl.mu.Lock()
|
||||
defer cl.mu.Unlock()
|
||||
cl.cls = commonlisteners.NewCommonListenerS(cl.caps)
|
||||
@@ -67,7 +67,7 @@ func (cl *CommonListenerService) Start(_ chan struct{}, _ *servmanager.ServiceRe
|
||||
}
|
||||
|
||||
// Reload handles the config changes.
|
||||
func (cl *CommonListenerService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error {
|
||||
func (cl *CommonListenerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ type ConfigService struct {
|
||||
}
|
||||
|
||||
// Start handles the service start.
|
||||
func (s *ConfigService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) error {
|
||||
func (s *ConfigService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -71,7 +71,7 @@ func (s *ConfigService) Start(_ chan struct{}, registry *servmanager.ServiceRegi
|
||||
}
|
||||
|
||||
// Reload handles the config changes.
|
||||
func (s *ConfigService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error {
|
||||
func (s *ConfigService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -63,7 +63,7 @@ type CoreService struct {
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (cS *CoreService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) error {
|
||||
func (cS *CoreService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -96,7 +96,7 @@ func (cS *CoreService) Start(shutdown chan struct{}, registry *servmanager.Servi
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (cS *CoreService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error {
|
||||
func (cS *CoreService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -58,7 +58,7 @@ type DataDBService struct {
|
||||
}
|
||||
|
||||
// Start handles the service start.
|
||||
func (db *DataDBService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (db *DataDBService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
db.Lock()
|
||||
defer db.Unlock()
|
||||
db.oldDBCfg = db.cfg.DataDbCfg().Clone()
|
||||
@@ -86,7 +86,7 @@ func (db *DataDBService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry)
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (db *DataDBService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (db *DataDBService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
db.Lock()
|
||||
defer db.Unlock()
|
||||
if db.needsConnectionReload() {
|
||||
|
||||
@@ -59,7 +59,7 @@ type DiameterAgent struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (da *DiameterAgent) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) error {
|
||||
func (da *DiameterAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
||||
fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
da.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
@@ -71,7 +71,7 @@ func (da *DiameterAgent) Start(shutdown chan struct{}, registry *servmanager.Ser
|
||||
return da.start(fs.(*FilterService).FilterS(), da.caps, shutdown)
|
||||
}
|
||||
|
||||
func (da *DiameterAgent) start(filterS *engine.FilterS, caps *engine.Caps, shutdown chan struct{}) error {
|
||||
func (da *DiameterAgent) start(filterS *engine.FilterS, caps *engine.Caps, shutdown *utils.SyncedChan) error {
|
||||
var err error
|
||||
da.da, err = agents.NewDiameterAgent(da.cfg, filterS, da.connMgr, caps)
|
||||
if err != nil {
|
||||
@@ -86,14 +86,14 @@ func (da *DiameterAgent) start(filterS *engine.FilterS, caps *engine.Caps, shutd
|
||||
if err := d.ListenAndServe(da.stopChan); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!",
|
||||
utils.DiameterAgent, err))
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
}
|
||||
}(da.da)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (da *DiameterAgent) Reload(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (da *DiameterAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
da.Lock()
|
||||
defer da.Unlock()
|
||||
if da.lnet == da.cfg.DiameterAgentCfg().ListenNet &&
|
||||
|
||||
@@ -57,7 +57,7 @@ type DispatcherService struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (dspS *DispatcherService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (dspS *DispatcherService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -103,7 +103,7 @@ func (dspS *DispatcherService) Start(shutdown chan struct{}, registry *servmanag
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (dspS *DispatcherService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (dspS *DispatcherService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
return // for the momment nothing to reload
|
||||
}
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ type DNSAgent struct {
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (dns *DNSAgent) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (dns *DNSAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
dns.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
@@ -76,7 +76,7 @@ func (dns *DNSAgent) Start(shutdown chan struct{}, registry *servmanager.Service
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (dns *DNSAgent) Reload(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (dns *DNSAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
dns.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
@@ -104,12 +104,12 @@ func (dns *DNSAgent) Reload(shutdown chan struct{}, registry *servmanager.Servic
|
||||
return
|
||||
}
|
||||
|
||||
func (dns *DNSAgent) listenAndServe(stopChan chan struct{}, shutdown chan struct{}) (err error) {
|
||||
func (dns *DNSAgent) listenAndServe(stopChan chan struct{}, shutdown *utils.SyncedChan) (err error) {
|
||||
dns.dns.RLock()
|
||||
defer dns.dns.RUnlock()
|
||||
if err = dns.dns.ListenAndServe(stopChan); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
|
||||
close(shutdown) // stop the engine here
|
||||
shutdown.CloseOnce() // stop the engine here
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ func (es *EventExporterService) ShouldRun() (should bool) {
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (es *EventExporterService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error {
|
||||
func (es *EventExporterService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
|
||||
es.mu.Lock()
|
||||
defer es.mu.Unlock()
|
||||
es.eeS.ClearExporterCache()
|
||||
@@ -83,7 +83,7 @@ func (es *EventExporterService) Shutdown(_ *servmanager.ServiceRegistry) error {
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (es *EventExporterService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) error {
|
||||
func (es *EventExporterService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
|
||||
@@ -56,7 +56,7 @@ func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (efServ *ExportFailoverService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (efServ *ExportFailoverService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
cls, err := waitForServiceState(utils.StateServiceUP, utils.CommonListenerS, registry,
|
||||
efServ.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
@@ -75,7 +75,7 @@ func (efServ *ExportFailoverService) Start(_ chan struct{}, registry *servmanage
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (efServ *ExportFailoverService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (efServ *ExportFailoverService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ type EventReaderService struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (erS *EventReaderService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (erS *EventReaderService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -96,16 +96,16 @@ func (erS *EventReaderService) Start(shutdown chan struct{}, registry *servmanag
|
||||
return
|
||||
}
|
||||
|
||||
func (erS *EventReaderService) listenAndServe(ers *ers.ERService, stopChan, rldChan, shutdown chan struct{}) (err error) {
|
||||
func (erS *EventReaderService) listenAndServe(ers *ers.ERService, stopChan, rldChan chan struct{}, shutdown *utils.SyncedChan) (err error) {
|
||||
if err = ers.ListenAndServe(stopChan, rldChan); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: <%v>", utils.ERs, err))
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (erS *EventReaderService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (erS *EventReaderService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
erS.RLock()
|
||||
erS.rldChan <- struct{}{}
|
||||
erS.RUnlock()
|
||||
|
||||
@@ -51,7 +51,7 @@ type FilterService struct {
|
||||
}
|
||||
|
||||
// Start handles the service start.
|
||||
func (s *FilterService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) error {
|
||||
func (s *FilterService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CacheS,
|
||||
@@ -75,7 +75,7 @@ func (s *FilterService) Start(shutdown chan struct{}, registry *servmanager.Serv
|
||||
}
|
||||
|
||||
// Reload handles the config changes.
|
||||
func (s *FilterService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error {
|
||||
func (s *FilterService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -54,7 +54,7 @@ type FreeswitchAgent struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (fS *FreeswitchAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (fS *FreeswitchAgent) Start(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
fS.Lock()
|
||||
defer fS.Unlock()
|
||||
|
||||
@@ -65,7 +65,7 @@ func (fS *FreeswitchAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceR
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (fS *FreeswitchAgent) Reload(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (fS *FreeswitchAgent) Reload(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
fS.Lock()
|
||||
defer fS.Unlock()
|
||||
if err = fS.fS.Shutdown(); err != nil {
|
||||
@@ -76,10 +76,10 @@ func (fS *FreeswitchAgent) Reload(shutdown chan struct{}, _ *servmanager.Service
|
||||
return
|
||||
}
|
||||
|
||||
func (fS *FreeswitchAgent) connect(shutdown chan struct{}) {
|
||||
func (fS *FreeswitchAgent) connect(shutdown *utils.SyncedChan) {
|
||||
if err := fS.fS.Connect(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err))
|
||||
close(shutdown) // stop the engine here
|
||||
shutdown.CloseOnce() // stop the engine here
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -44,7 +44,7 @@ type GlobalVarS struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (gv *GlobalVarS) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) error {
|
||||
func (gv *GlobalVarS) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
|
||||
engine.SetHTTPPstrTransport(gv.cfg.HTTPCfg().ClientOpts)
|
||||
utils.DecimalContext.MaxScale = gv.cfg.GeneralCfg().DecimalMaxScale
|
||||
utils.DecimalContext.MinScale = gv.cfg.GeneralCfg().DecimalMinScale
|
||||
@@ -54,7 +54,7 @@ func (gv *GlobalVarS) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) err
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (gv *GlobalVarS) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error {
|
||||
func (gv *GlobalVarS) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
|
||||
engine.SetHTTPPstrTransport(gv.cfg.HTTPCfg().ClientOpts)
|
||||
utils.DecimalContext.MaxScale = gv.cfg.GeneralCfg().DecimalMaxScale
|
||||
utils.DecimalContext.MinScale = gv.cfg.GeneralCfg().DecimalMinScale
|
||||
|
||||
@@ -48,7 +48,7 @@ type GuardianService struct {
|
||||
}
|
||||
|
||||
// Start handles the service start.
|
||||
func (s *GuardianService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) error {
|
||||
func (s *GuardianService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -75,7 +75,7 @@ func (s *GuardianService) Start(_ chan struct{}, registry *servmanager.ServiceRe
|
||||
}
|
||||
|
||||
// Reload handles the config changes.
|
||||
func (s *GuardianService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) error {
|
||||
func (s *GuardianService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -58,7 +58,7 @@ type HTTPAgent struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (ha *HTTPAgent) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (ha *HTTPAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -85,7 +85,7 @@ func (ha *HTTPAgent) Start(_ chan struct{}, registry *servmanager.ServiceRegistr
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (ha *HTTPAgent) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (ha *HTTPAgent) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
return // no reload
|
||||
}
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ type JanusAgent struct {
|
||||
}
|
||||
|
||||
// Start should jandle the sercive start
|
||||
func (ja *JanusAgent) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (ja *JanusAgent) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -99,7 +99,7 @@ func (ja *JanusAgent) Start(_ chan struct{}, registry *servmanager.ServiceRegist
|
||||
}
|
||||
|
||||
// Reload jandles the change of config
|
||||
func (ja *JanusAgent) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (ja *JanusAgent) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
return // no reload
|
||||
}
|
||||
|
||||
|
||||
@@ -54,7 +54,7 @@ type KamailioAgent struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (kam *KamailioAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (kam *KamailioAgent) Start(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
kam.Lock()
|
||||
defer kam.Unlock()
|
||||
|
||||
@@ -66,7 +66,7 @@ func (kam *KamailioAgent) Start(shutdown chan struct{}, _ *servmanager.ServiceRe
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (kam *KamailioAgent) Reload(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (kam *KamailioAgent) Reload(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
kam.Lock()
|
||||
defer kam.Unlock()
|
||||
if err = kam.kam.Shutdown(); err != nil {
|
||||
@@ -77,13 +77,13 @@ func (kam *KamailioAgent) Reload(shutdown chan struct{}, _ *servmanager.ServiceR
|
||||
return
|
||||
}
|
||||
|
||||
func (kam *KamailioAgent) connect(k *agents.KamailioAgent, shutdown chan struct{}) (err error) {
|
||||
func (kam *KamailioAgent) connect(k *agents.KamailioAgent, shutdown *utils.SyncedChan) (err error) {
|
||||
if err = k.Connect(); err != nil {
|
||||
if !strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log
|
||||
if !strings.Contains(err.Error(), "KamEvapi") {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err))
|
||||
}
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
}
|
||||
}
|
||||
return
|
||||
|
||||
@@ -57,7 +57,7 @@ type LoaderService struct {
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (ldrs *LoaderService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (ldrs *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -97,7 +97,7 @@ func (ldrs *LoaderService) Start(_ chan struct{}, registry *servmanager.ServiceR
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (ldrs *LoaderService) Reload(_ chan struct{}, registry *servmanager.ServiceRegistry) error {
|
||||
func (ldrs *LoaderService) Reload(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.FilterS,
|
||||
|
||||
@@ -58,7 +58,7 @@ type RadiusAgent struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (rad *RadiusAgent) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (rad *RadiusAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
rad.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
@@ -81,16 +81,16 @@ func (rad *RadiusAgent) Start(shutdown chan struct{}, registry *servmanager.Serv
|
||||
return
|
||||
}
|
||||
|
||||
func (rad *RadiusAgent) listenAndServe(r *agents.RadiusAgent, shutdown chan struct{}) (err error) {
|
||||
func (rad *RadiusAgent) listenAndServe(r *agents.RadiusAgent, shutdown *utils.SyncedChan) (err error) {
|
||||
if err = r.ListenAndServe(rad.stopChan); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error()))
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (rad *RadiusAgent) Reload(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (rad *RadiusAgent) Reload(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
if rad.lnet == rad.cfg.RadiusAgentCfg().ListenNet &&
|
||||
rad.lauth == rad.cfg.RadiusAgentCfg().ListenAuth &&
|
||||
rad.lacct == rad.cfg.RadiusAgentCfg().ListenAcct {
|
||||
|
||||
@@ -58,7 +58,7 @@ type RankingService struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (ran *RankingService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (ran *RankingService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
ran.srvDep[utils.DataDB].Add(1)
|
||||
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
@@ -104,7 +104,7 @@ func (ran *RankingService) Start(shutdown chan struct{}, registry *servmanager.S
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (ran *RankingService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (ran *RankingService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
ran.Lock()
|
||||
ran.ran.Reload(context.TODO())
|
||||
ran.Unlock()
|
||||
|
||||
@@ -65,7 +65,7 @@ func (rs *RateService) ShouldRun() (should bool) {
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (rs *RateService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (_ error) {
|
||||
func (rs *RateService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (_ error) {
|
||||
rs.rldChan <- struct{}{}
|
||||
return
|
||||
}
|
||||
@@ -81,7 +81,7 @@ func (rs *RateService) Shutdown(_ *servmanager.ServiceRegistry) (err error) {
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (rs *RateService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (rs *RateService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
|
||||
@@ -54,7 +54,7 @@ type RegistrarCService struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (dspS *RegistrarCService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (dspS *RegistrarCService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
dspS.Lock()
|
||||
defer dspS.Unlock()
|
||||
|
||||
@@ -66,7 +66,7 @@ func (dspS *RegistrarCService) Start(_ chan struct{}, _ *servmanager.ServiceRegi
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (dspS *RegistrarCService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (dspS *RegistrarCService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
dspS.rldChan <- struct{}{}
|
||||
return // for the momment nothing to reload
|
||||
}
|
||||
|
||||
@@ -58,7 +58,7 @@ type ResourceService struct {
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (reS *ResourceService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (reS *ResourceService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
reS.srvDep[utils.DataDB].Add(1)
|
||||
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
@@ -102,7 +102,7 @@ func (reS *ResourceService) Start(shutdown chan struct{}, registry *servmanager.
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (reS *ResourceService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (reS *ResourceService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
reS.Lock()
|
||||
reS.reS.Reload(context.TODO())
|
||||
reS.Unlock()
|
||||
|
||||
@@ -54,7 +54,7 @@ type RouteService struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (routeS *RouteService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (routeS *RouteService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -93,7 +93,7 @@ func (routeS *RouteService) Start(shutdown chan struct{}, registry *servmanager.
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (routeS *RouteService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (routeS *RouteService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ type SessionService struct {
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (smg *SessionService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (smg *SessionService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
utils.CommonListenerS,
|
||||
@@ -106,20 +106,20 @@ func (smg *SessionService) Start(shutdown chan struct{}, registry *servmanager.S
|
||||
return
|
||||
}
|
||||
|
||||
func (smg *SessionService) start(shutdown chan struct{}) (err error) {
|
||||
func (smg *SessionService) start(shutdown *utils.SyncedChan) (err error) {
|
||||
if err := smg.cl.ServeBiRPC(smg.cfg.SessionSCfg().ListenBijson,
|
||||
smg.cfg.SessionSCfg().ListenBigob, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> serve BiRPC error: %s!", utils.SessionS, err))
|
||||
smg.Lock()
|
||||
smg.bircpEnabled = false
|
||||
smg.Unlock()
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (smg *SessionService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (smg *SessionService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ type SIPAgent struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (sip *SIPAgent) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (sip *SIPAgent) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
fs, err := waitForServiceState(utils.StateServiceUP, utils.FilterS, registry,
|
||||
sip.cfg.GeneralCfg().ConnectTimeout)
|
||||
if err != nil {
|
||||
@@ -75,15 +75,15 @@ func (sip *SIPAgent) Start(shutdown chan struct{}, registry *servmanager.Service
|
||||
return
|
||||
}
|
||||
|
||||
func (sip *SIPAgent) listenAndServe(shutdown chan struct{}) {
|
||||
func (sip *SIPAgent) listenAndServe(shutdown *utils.SyncedChan) {
|
||||
if err := sip.sip.ListenAndServe(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error()))
|
||||
close(shutdown) // stop the engine here
|
||||
shutdown.CloseOnce() // stop the engine here
|
||||
}
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (sip *SIPAgent) Reload(shutdown chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (sip *SIPAgent) Reload(shutdown *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
if sip.oldListen == sip.cfg.SIPAgentCfg().Listen {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ type StatService struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (sts *StatService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (sts *StatService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
sts.srvDep[utils.DataDB].Add(1)
|
||||
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
@@ -99,7 +99,7 @@ func (sts *StatService) Start(shutdown chan struct{}, registry *servmanager.Serv
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (sts *StatService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (sts *StatService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
sts.Lock()
|
||||
sts.sts.Reload(context.TODO())
|
||||
sts.Unlock()
|
||||
|
||||
@@ -52,7 +52,7 @@ type StorDBService struct {
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (db *StorDBService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (db *StorDBService) Start(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
db.Lock()
|
||||
defer db.Unlock()
|
||||
db.oldDBCfg = db.cfg.StorDbCfg().Clone()
|
||||
@@ -79,7 +79,7 @@ func (db *StorDBService) Start(_ chan struct{}, _ *servmanager.ServiceRegistry)
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (db *StorDBService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (db *StorDBService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
db.Lock()
|
||||
defer db.Unlock()
|
||||
if db.needsConnectionReload() {
|
||||
|
||||
@@ -58,7 +58,7 @@ type ThresholdService struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (thrs *ThresholdService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (thrs *ThresholdService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
thrs.srvDep[utils.DataDB].Add(1)
|
||||
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
@@ -101,7 +101,7 @@ func (thrs *ThresholdService) Start(shutdown chan struct{}, registry *servmanage
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (thrs *ThresholdService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (_ error) {
|
||||
func (thrs *ThresholdService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (_ error) {
|
||||
thrs.Lock()
|
||||
thrs.thrs.Reload(context.TODO())
|
||||
thrs.Unlock()
|
||||
|
||||
@@ -56,7 +56,7 @@ type TPeService struct {
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (ts *TPeService) Start(_ chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (ts *TPeService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
[]string{
|
||||
@@ -78,7 +78,7 @@ func (ts *TPeService) Start(_ chan struct{}, registry *servmanager.ServiceRegist
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (ts *TPeService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (ts *TPeService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ type TrendService struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (trs *TrendService) Start(shutdown chan struct{}, registry *servmanager.ServiceRegistry) (err error) {
|
||||
func (trs *TrendService) Start(shutdown *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
|
||||
trs.srvDep[utils.DataDB].Add(1)
|
||||
|
||||
srvDeps, err := waitForServicesToReachState(utils.StateServiceUP,
|
||||
@@ -103,7 +103,7 @@ func (trs *TrendService) Start(shutdown chan struct{}, registry *servmanager.Ser
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (trs *TrendService) Reload(_ chan struct{}, _ *servmanager.ServiceRegistry) (err error) {
|
||||
func (trs *TrendService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) (err error) {
|
||||
trs.Lock()
|
||||
trs.trs.Reload(context.TODO())
|
||||
trs.Unlock()
|
||||
|
||||
@@ -55,7 +55,7 @@ type ServiceManager struct {
|
||||
}
|
||||
|
||||
// StartServices starts all enabled services
|
||||
func (m *ServiceManager) StartServices(shutdown chan struct{}) {
|
||||
func (m *ServiceManager) StartServices(shutdown *utils.SyncedChan) {
|
||||
go m.handleReload(shutdown)
|
||||
for _, svc := range m.registry.List() {
|
||||
// TODO: verify if IsServiceInState check is needed. It should
|
||||
@@ -67,7 +67,7 @@ func (m *ServiceManager) StartServices(shutdown chan struct{}) {
|
||||
if err := svc.Start(shutdown, m.registry); err != nil &&
|
||||
err != utils.ErrServiceAlreadyRunning { // in case the service was started in another gorutine
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err))
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
}
|
||||
close(svc.StateChan(utils.StateServiceUP))
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> started <%s> service", utils.ServiceManager, svc.ServiceName()))
|
||||
@@ -104,11 +104,11 @@ func (m *ServiceManager) AddServices(services ...Service) {
|
||||
m.Unlock()
|
||||
}
|
||||
|
||||
func (m *ServiceManager) handleReload(shutdown chan struct{}) {
|
||||
func (m *ServiceManager) handleReload(shutdown *utils.SyncedChan) {
|
||||
var serviceID string
|
||||
for {
|
||||
select {
|
||||
case <-shutdown:
|
||||
case <-shutdown.Done():
|
||||
m.ShutdownServices()
|
||||
return
|
||||
case serviceID = <-m.rldChan:
|
||||
@@ -123,7 +123,7 @@ func (m *ServiceManager) handleReload(shutdown chan struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *ServiceManager) reloadService(id string, shutdown chan struct{}) (err error) {
|
||||
func (m *ServiceManager) reloadService(id string, shutdown *utils.SyncedChan) (err error) {
|
||||
svc := m.registry.Lookup(id)
|
||||
isUp := IsServiceInState(svc, utils.StateServiceUP)
|
||||
if svc.ShouldRun() {
|
||||
@@ -131,7 +131,7 @@ func (m *ServiceManager) reloadService(id string, shutdown chan struct{}) (err e
|
||||
// TODO: state channels must be reinitiated for both SERVICE_UP and SERVICE_DOWN.
|
||||
if err = svc.Reload(shutdown, m.registry); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> failed to reload <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err))
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> reloaded <%s> service", utils.ServiceManager, svc.ServiceName()))
|
||||
@@ -139,7 +139,7 @@ func (m *ServiceManager) reloadService(id string, shutdown chan struct{}) (err e
|
||||
m.shdWg.Add(1)
|
||||
if err = svc.Start(shutdown, m.registry); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> serivce: %v", utils.ServiceManager, svc.ServiceName(), err))
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
close(svc.StateChan(utils.StateServiceUP))
|
||||
@@ -148,7 +148,7 @@ func (m *ServiceManager) reloadService(id string, shutdown chan struct{}) (err e
|
||||
} else if isUp {
|
||||
if err = svc.Shutdown(m.registry); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> failed to shut down <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err))
|
||||
close(shutdown)
|
||||
shutdown.CloseOnce()
|
||||
}
|
||||
close(svc.StateChan(utils.StateServiceDOWN))
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> service", utils.ServiceManager, svc.ServiceName()))
|
||||
@@ -178,9 +178,9 @@ func (m *ServiceManager) ShutdownServices() {
|
||||
// Service interface that describes what functions should a service implement
|
||||
type Service interface {
|
||||
// Start should handle the service start
|
||||
Start(chan struct{}, *ServiceRegistry) error
|
||||
Start(*utils.SyncedChan, *ServiceRegistry) error
|
||||
// Reload handles the change of config
|
||||
Reload(chan struct{}, *ServiceRegistry) error
|
||||
Reload(*utils.SyncedChan, *ServiceRegistry) error
|
||||
// Shutdown stops the service
|
||||
Shutdown(*ServiceRegistry) error
|
||||
// ShouldRun returns if the service should be running
|
||||
|
||||
Reference in New Issue
Block a user