From 1b50526fb560fa0af65aad2b90a18773a8d9e396 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 20 Sep 2019 18:01:49 +0300 Subject: [PATCH] Updated ServiceManager --- servmanager/servmanager.go | 215 +++++++++++-------------------------- 1 file changed, 61 insertions(+), 154 deletions(-) diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 94a91e827..72fd8f7cd 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -217,9 +217,7 @@ func (srvMngr *ServiceManager) GetConnection(subsystem string, conns []*config.R if len(conns) == 0 { return nil, nil } - // srvMngr.RLock() - // defer srvMngr.RUnlock() - service, has := srvMngr.subsystems[subsystem] + service, has := srvMngr.GetService(subsystem) if !has { // used to not cause panics because of services that are not already migrated return nil, errors.New(utils.UnsupportedServiceIDCaps) } @@ -228,10 +226,10 @@ func (srvMngr *ServiceManager) GetConnection(subsystem string, conns []*config.R internalChan = srvMngr.dispatcherSChan } return engine.NewRPCPool(rpcclient.POOL_FIRST, - srvMngr.cfg.TlsCfg().ClientKey, - srvMngr.cfg.TlsCfg().ClientCerificate, srvMngr.cfg.TlsCfg().CaCertificate, - srvMngr.cfg.GeneralCfg().ConnectAttempts, srvMngr.cfg.GeneralCfg().Reconnects, - srvMngr.cfg.GeneralCfg().ConnectTimeout, srvMngr.cfg.GeneralCfg().ReplyTimeout, + srvMngr.GetConfig().TlsCfg().ClientKey, + srvMngr.GetConfig().TlsCfg().ClientCerificate, srvMngr.GetConfig().TlsCfg().CaCertificate, + srvMngr.GetConfig().GeneralCfg().ConnectAttempts, srvMngr.GetConfig().GeneralCfg().Reconnects, + srvMngr.GetConfig().GeneralCfg().ConnectTimeout, srvMngr.GetConfig().GeneralCfg().ReplyTimeout, conns, internalChan, false) } @@ -239,102 +237,39 @@ func (srvMngr *ServiceManager) GetConnection(subsystem string, conns []*config.R func (srvMngr *ServiceManager) StartServices() (err error) { // start the cacheS if srvMngr.GetCacheS() == nil { - var chS Service - chS, err = srvMngr.GetService(utils.CacheS) - if err != nil { + chS, has := srvMngr.GetService(utils.CacheS) + if !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>", + utils.ServiceManager, utils.CacheS)) return } chS.Start(srvMngr, true) } go srvMngr.handleReload() - if srvMngr.cfg.AttributeSCfg().Enabled { - go func() { - if srv, has := srvMngr.subsystems[utils.AttributeS]; !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.AttributeS)) - srvMngr.engineShutdown <- true - } else if err = srv.Start(srvMngr, true); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.AttributeS, err)) - srvMngr.engineShutdown <- true - } - }() + if srvMngr.GetConfig().AttributeSCfg().Enabled { + go srvMngr.startService(utils.AttributeS) } - if srvMngr.cfg.ChargerSCfg().Enabled { - go func() { - if srv, has := srvMngr.subsystems[utils.ChargerS]; !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ChargerS)) - srvMngr.engineShutdown <- true - } else if err = srv.Start(srvMngr, true); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ChargerS, err)) - srvMngr.engineShutdown <- true - } - }() + if srvMngr.GetConfig().ChargerSCfg().Enabled { + go srvMngr.startService(utils.ChargerS) } - if srvMngr.cfg.ThresholdSCfg().Enabled { - go func() { - if srv, has := srvMngr.subsystems[utils.ThresholdS]; !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ThresholdS)) - srvMngr.engineShutdown <- true - } else if err = srv.Start(srvMngr, true); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ThresholdS, err)) - srvMngr.engineShutdown <- true - } - }() + if srvMngr.GetConfig().ThresholdSCfg().Enabled { + go srvMngr.startService(utils.ThresholdS) } - if srvMngr.cfg.StatSCfg().Enabled { - go func() { - if srv, has := srvMngr.subsystems[utils.StatS]; !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.StatS)) - srvMngr.engineShutdown <- true - } else if err = srv.Start(srvMngr, true); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.StatS, err)) - srvMngr.engineShutdown <- true - } - }() + if srvMngr.GetConfig().StatSCfg().Enabled { + go srvMngr.startService(utils.StatS) } - if srvMngr.cfg.ResourceSCfg().Enabled { - go func() { - if srv, has := srvMngr.subsystems[utils.ResourceS]; !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ResourceS)) - srvMngr.engineShutdown <- true - } else if err = srv.Start(srvMngr, true); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ResourceS, err)) - srvMngr.engineShutdown <- true - } - }() + if srvMngr.GetConfig().ResourceSCfg().Enabled { + go srvMngr.startService(utils.ResourceS) } - if srvMngr.cfg.SupplierSCfg().Enabled { - go func() { - if srv, has := srvMngr.subsystems[utils.SupplierS]; !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SupplierS)) - srvMngr.engineShutdown <- true - } else if err = srv.Start(srvMngr, true); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.SupplierS, err)) - srvMngr.engineShutdown <- true - } - }() + if srvMngr.GetConfig().SupplierSCfg().Enabled { + go srvMngr.startService(utils.SupplierS) } - if srvMngr.cfg.SchedulerCfg().Enabled { - go func() { - if srv, has := srvMngr.subsystems[utils.SchedulerS]; !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SchedulerS)) - srvMngr.engineShutdown <- true - } else if err = srv.Start(srvMngr, true); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.SchedulerS, err)) - srvMngr.engineShutdown <- true - } - }() + if srvMngr.GetConfig().SchedulerCfg().Enabled { + go srvMngr.startService(utils.SchedulerS) } - if srvMngr.cfg.CdrsCfg().Enabled { - go func() { - if srv, has := srvMngr.subsystems[utils.CDRServer]; !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.CDRServer)) - srvMngr.engineShutdown <- true - } else if err = srv.Start(srvMngr, true); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.CDRServer, err)) - srvMngr.engineShutdown <- true - } - }() + if srvMngr.GetConfig().CdrsCfg().Enabled { + go srvMngr.startService(utils.CDRServer) } // startServer() return @@ -342,12 +277,14 @@ func (srvMngr *ServiceManager) StartServices() (err error) { // AddService adds given services func (srvMngr *ServiceManager) AddService(services ...Service) { + srvMngr.Lock() for _, srv := range services { if _, has := srvMngr.subsystems[srv.ServiceName()]; has { // do not rewrite the service continue } srvMngr.subsystems[srv.ServiceName()] = srv } + srvMngr.Unlock() } func (srvMngr *ServiceManager) handleReload() { @@ -358,83 +295,35 @@ func (srvMngr *ServiceManager) handleReload() { srvMngr.engineShutdown <- ext return case <-srvMngr.cfg.GetReloadChan(config.ATTRIBUTE_JSN): - srv, has := srvMngr.subsystems[utils.AttributeS] - if !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.AttributeS)) - srvMngr.engineShutdown <- true - return // stop if we encounter an error - } - if err = srvMngr.reloadService(srv, srvMngr.cfg.AttributeSCfg().Enabled); err != nil { + if err = srvMngr.reloadService(utils.AttributeS, srvMngr.cfg.AttributeSCfg().Enabled); err != nil { return } case <-srvMngr.cfg.GetReloadChan(config.ChargerSCfgJson): - srv, has := srvMngr.subsystems[utils.ChargerS] - if !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ChargerS)) - srvMngr.engineShutdown <- true - return // stop if we encounter an error - } - if err = srvMngr.reloadService(srv, srvMngr.cfg.ChargerSCfg().Enabled); err != nil { + if err = srvMngr.reloadService(utils.ChargerS, srvMngr.cfg.ChargerSCfg().Enabled); err != nil { return } case <-srvMngr.cfg.GetReloadChan(config.THRESHOLDS_JSON): - srv, has := srvMngr.subsystems[utils.ThresholdS] - if !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ThresholdS)) - srvMngr.engineShutdown <- true - return // stop if we encounter an error - } - if err = srvMngr.reloadService(srv, srvMngr.cfg.ThresholdSCfg().Enabled); err != nil { + if err = srvMngr.reloadService(utils.ThresholdS, srvMngr.cfg.ThresholdSCfg().Enabled); err != nil { return } case <-srvMngr.cfg.GetReloadChan(config.STATS_JSON): - srv, has := srvMngr.subsystems[utils.StatS] - if !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.StatS)) - srvMngr.engineShutdown <- true - return // stop if we encounter an error - } - if err = srvMngr.reloadService(srv, srvMngr.cfg.StatSCfg().Enabled); err != nil { + if err = srvMngr.reloadService(utils.StatS, srvMngr.cfg.StatSCfg().Enabled); err != nil { return } case <-srvMngr.cfg.GetReloadChan(config.RESOURCES_JSON): - srv, has := srvMngr.subsystems[utils.ResourceS] - if !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ResourceS)) - srvMngr.engineShutdown <- true - return // stop if we encounter an error - } - if err = srvMngr.reloadService(srv, srvMngr.cfg.ResourceSCfg().Enabled); err != nil { + if err = srvMngr.reloadService(utils.ResourceS, srvMngr.cfg.ResourceSCfg().Enabled); err != nil { return } case <-srvMngr.cfg.GetReloadChan(config.SupplierSJson): - srv, has := srvMngr.subsystems[utils.SupplierS] - if !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SupplierS)) - srvMngr.engineShutdown <- true - return // stop if we encounter an error - } - if err = srvMngr.reloadService(srv, srvMngr.cfg.SupplierSCfg().Enabled); err != nil { + if err = srvMngr.reloadService(utils.SupplierS, srvMngr.cfg.SupplierSCfg().Enabled); err != nil { return } case <-srvMngr.cfg.GetReloadChan(config.SCHEDULER_JSN): - srv, has := srvMngr.subsystems[utils.SchedulerS] - if !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SchedulerS)) - srvMngr.engineShutdown <- true - return // stop if we encounter an error - } - if err = srvMngr.reloadService(srv, srvMngr.cfg.SchedulerCfg().Enabled); err != nil { + if err = srvMngr.reloadService(utils.SchedulerS, srvMngr.cfg.SchedulerCfg().Enabled); err != nil { return } case <-srvMngr.cfg.GetReloadChan(config.CDRS_JSN): - srv, has := srvMngr.subsystems[utils.CDRServer] - if !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.CDRServer)) - srvMngr.engineShutdown <- true - return // stop if we encounter an error - } - if err = srvMngr.reloadService(srv, srvMngr.cfg.CdrsCfg().Enabled); err != nil { + if err = srvMngr.reloadService(utils.CDRServer, srvMngr.cfg.CdrsCfg().Enabled); err != nil { return } } @@ -442,7 +331,14 @@ func (srvMngr *ServiceManager) handleReload() { } } -func (srvMngr *ServiceManager) reloadService(srv Service, shouldRun bool) (err error) { +func (srvMngr *ServiceManager) reloadService(srviceName string, shouldRun bool) (err error) { + srv, has := srvMngr.GetService(srviceName) + if !has { // this should not happen (check the added services) + utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>", + utils.ServiceManager, srviceName)) + srvMngr.engineShutdown <- true + return + } if shouldRun { if srv.IsRunning() { if err = srv.Reload(srvMngr); err != nil { @@ -467,13 +363,24 @@ func (srvMngr *ServiceManager) reloadService(srv Service, shouldRun bool) (err e return } -// GetService returns the named service -func (srvMngr *ServiceManager) GetService(subsystem string) (srv Service, err error) { - var has bool - srvMngr.RLock() - if srv, has = srvMngr.subsystems[subsystem]; !has { - err = utils.ErrNotFound +func (srvMngr *ServiceManager) startService(srviceName string) { + srv, has := srvMngr.GetService(srviceName) + if !has { // this should not happen (check the added services) + utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>", + utils.ServiceManager, srviceName)) + srvMngr.engineShutdown <- true + return } + if err := srv.Start(srvMngr, true); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, srviceName, err)) + srvMngr.engineShutdown <- true + } +} + +// GetService returns the named service +func (srvMngr ServiceManager) GetService(subsystem string) (srv Service, has bool) { + srvMngr.RLock() + srv, has = srvMngr.subsystems[subsystem] srvMngr.RUnlock() return } @@ -506,7 +413,7 @@ type ServiceProvider interface { // GetConnection creates a rpcClient to the specified subsystem GetConnection(subsystem string, cfg []*config.RemoteHost) (rpcclient.RpcClientConnection, error) // GetService returns the named service - GetService(subsystem string) (Service, error) + GetService(subsystem string) (Service, bool) // AddService adds the given serices AddService(services ...Service) // SetCacheS sets the cacheS