mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Updated ServiceManager
This commit is contained in:
committed by
Dan Christian Bogos
parent
758074c4de
commit
1b50526fb5
@@ -217,9 +217,7 @@ func (srvMngr *ServiceManager) GetConnection(subsystem string, conns []*config.R
|
||||
if len(conns) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
// srvMngr.RLock()
|
||||
// defer srvMngr.RUnlock()
|
||||
service, has := srvMngr.subsystems[subsystem]
|
||||
service, has := srvMngr.GetService(subsystem)
|
||||
if !has { // used to not cause panics because of services that are not already migrated
|
||||
return nil, errors.New(utils.UnsupportedServiceIDCaps)
|
||||
}
|
||||
@@ -228,10 +226,10 @@ func (srvMngr *ServiceManager) GetConnection(subsystem string, conns []*config.R
|
||||
internalChan = srvMngr.dispatcherSChan
|
||||
}
|
||||
return engine.NewRPCPool(rpcclient.POOL_FIRST,
|
||||
srvMngr.cfg.TlsCfg().ClientKey,
|
||||
srvMngr.cfg.TlsCfg().ClientCerificate, srvMngr.cfg.TlsCfg().CaCertificate,
|
||||
srvMngr.cfg.GeneralCfg().ConnectAttempts, srvMngr.cfg.GeneralCfg().Reconnects,
|
||||
srvMngr.cfg.GeneralCfg().ConnectTimeout, srvMngr.cfg.GeneralCfg().ReplyTimeout,
|
||||
srvMngr.GetConfig().TlsCfg().ClientKey,
|
||||
srvMngr.GetConfig().TlsCfg().ClientCerificate, srvMngr.GetConfig().TlsCfg().CaCertificate,
|
||||
srvMngr.GetConfig().GeneralCfg().ConnectAttempts, srvMngr.GetConfig().GeneralCfg().Reconnects,
|
||||
srvMngr.GetConfig().GeneralCfg().ConnectTimeout, srvMngr.GetConfig().GeneralCfg().ReplyTimeout,
|
||||
conns, internalChan, false)
|
||||
}
|
||||
|
||||
@@ -239,102 +237,39 @@ func (srvMngr *ServiceManager) GetConnection(subsystem string, conns []*config.R
|
||||
func (srvMngr *ServiceManager) StartServices() (err error) {
|
||||
// start the cacheS
|
||||
if srvMngr.GetCacheS() == nil {
|
||||
var chS Service
|
||||
chS, err = srvMngr.GetService(utils.CacheS)
|
||||
if err != nil {
|
||||
chS, has := srvMngr.GetService(utils.CacheS)
|
||||
if !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>",
|
||||
utils.ServiceManager, utils.CacheS))
|
||||
return
|
||||
}
|
||||
chS.Start(srvMngr, true)
|
||||
}
|
||||
|
||||
go srvMngr.handleReload()
|
||||
if srvMngr.cfg.AttributeSCfg().Enabled {
|
||||
go func() {
|
||||
if srv, has := srvMngr.subsystems[utils.AttributeS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.AttributeS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.AttributeS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
}()
|
||||
if srvMngr.GetConfig().AttributeSCfg().Enabled {
|
||||
go srvMngr.startService(utils.AttributeS)
|
||||
}
|
||||
if srvMngr.cfg.ChargerSCfg().Enabled {
|
||||
go func() {
|
||||
if srv, has := srvMngr.subsystems[utils.ChargerS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ChargerS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ChargerS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
}()
|
||||
if srvMngr.GetConfig().ChargerSCfg().Enabled {
|
||||
go srvMngr.startService(utils.ChargerS)
|
||||
}
|
||||
if srvMngr.cfg.ThresholdSCfg().Enabled {
|
||||
go func() {
|
||||
if srv, has := srvMngr.subsystems[utils.ThresholdS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ThresholdS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ThresholdS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
}()
|
||||
if srvMngr.GetConfig().ThresholdSCfg().Enabled {
|
||||
go srvMngr.startService(utils.ThresholdS)
|
||||
}
|
||||
if srvMngr.cfg.StatSCfg().Enabled {
|
||||
go func() {
|
||||
if srv, has := srvMngr.subsystems[utils.StatS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.StatS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.StatS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
}()
|
||||
if srvMngr.GetConfig().StatSCfg().Enabled {
|
||||
go srvMngr.startService(utils.StatS)
|
||||
}
|
||||
if srvMngr.cfg.ResourceSCfg().Enabled {
|
||||
go func() {
|
||||
if srv, has := srvMngr.subsystems[utils.ResourceS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ResourceS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ResourceS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
}()
|
||||
if srvMngr.GetConfig().ResourceSCfg().Enabled {
|
||||
go srvMngr.startService(utils.ResourceS)
|
||||
}
|
||||
if srvMngr.cfg.SupplierSCfg().Enabled {
|
||||
go func() {
|
||||
if srv, has := srvMngr.subsystems[utils.SupplierS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SupplierS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.SupplierS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
}()
|
||||
if srvMngr.GetConfig().SupplierSCfg().Enabled {
|
||||
go srvMngr.startService(utils.SupplierS)
|
||||
}
|
||||
if srvMngr.cfg.SchedulerCfg().Enabled {
|
||||
go func() {
|
||||
if srv, has := srvMngr.subsystems[utils.SchedulerS]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SchedulerS))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.SchedulerS, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
}()
|
||||
if srvMngr.GetConfig().SchedulerCfg().Enabled {
|
||||
go srvMngr.startService(utils.SchedulerS)
|
||||
}
|
||||
if srvMngr.cfg.CdrsCfg().Enabled {
|
||||
go func() {
|
||||
if srv, has := srvMngr.subsystems[utils.CDRServer]; !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.CDRServer))
|
||||
srvMngr.engineShutdown <- true
|
||||
} else if err = srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.CDRServer, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
}()
|
||||
if srvMngr.GetConfig().CdrsCfg().Enabled {
|
||||
go srvMngr.startService(utils.CDRServer)
|
||||
}
|
||||
// startServer()
|
||||
return
|
||||
@@ -342,12 +277,14 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
|
||||
|
||||
// AddService adds given services
|
||||
func (srvMngr *ServiceManager) AddService(services ...Service) {
|
||||
srvMngr.Lock()
|
||||
for _, srv := range services {
|
||||
if _, has := srvMngr.subsystems[srv.ServiceName()]; has { // do not rewrite the service
|
||||
continue
|
||||
}
|
||||
srvMngr.subsystems[srv.ServiceName()] = srv
|
||||
}
|
||||
srvMngr.Unlock()
|
||||
}
|
||||
|
||||
func (srvMngr *ServiceManager) handleReload() {
|
||||
@@ -358,83 +295,35 @@ func (srvMngr *ServiceManager) handleReload() {
|
||||
srvMngr.engineShutdown <- ext
|
||||
return
|
||||
case <-srvMngr.cfg.GetReloadChan(config.ATTRIBUTE_JSN):
|
||||
srv, has := srvMngr.subsystems[utils.AttributeS]
|
||||
if !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.AttributeS))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
if err = srvMngr.reloadService(srv, srvMngr.cfg.AttributeSCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.AttributeS, srvMngr.cfg.AttributeSCfg().Enabled); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.cfg.GetReloadChan(config.ChargerSCfgJson):
|
||||
srv, has := srvMngr.subsystems[utils.ChargerS]
|
||||
if !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ChargerS))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
if err = srvMngr.reloadService(srv, srvMngr.cfg.ChargerSCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.ChargerS, srvMngr.cfg.ChargerSCfg().Enabled); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.cfg.GetReloadChan(config.THRESHOLDS_JSON):
|
||||
srv, has := srvMngr.subsystems[utils.ThresholdS]
|
||||
if !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ThresholdS))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
if err = srvMngr.reloadService(srv, srvMngr.cfg.ThresholdSCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.ThresholdS, srvMngr.cfg.ThresholdSCfg().Enabled); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.cfg.GetReloadChan(config.STATS_JSON):
|
||||
srv, has := srvMngr.subsystems[utils.StatS]
|
||||
if !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.StatS))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
if err = srvMngr.reloadService(srv, srvMngr.cfg.StatSCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.StatS, srvMngr.cfg.StatSCfg().Enabled); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.cfg.GetReloadChan(config.RESOURCES_JSON):
|
||||
srv, has := srvMngr.subsystems[utils.ResourceS]
|
||||
if !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ResourceS))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
if err = srvMngr.reloadService(srv, srvMngr.cfg.ResourceSCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.ResourceS, srvMngr.cfg.ResourceSCfg().Enabled); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.cfg.GetReloadChan(config.SupplierSJson):
|
||||
srv, has := srvMngr.subsystems[utils.SupplierS]
|
||||
if !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SupplierS))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
if err = srvMngr.reloadService(srv, srvMngr.cfg.SupplierSCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.SupplierS, srvMngr.cfg.SupplierSCfg().Enabled); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.cfg.GetReloadChan(config.SCHEDULER_JSN):
|
||||
srv, has := srvMngr.subsystems[utils.SchedulerS]
|
||||
if !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.SchedulerS))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
if err = srvMngr.reloadService(srv, srvMngr.cfg.SchedulerCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.SchedulerS, srvMngr.cfg.SchedulerCfg().Enabled); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.cfg.GetReloadChan(config.CDRS_JSN):
|
||||
srv, has := srvMngr.subsystems[utils.CDRServer]
|
||||
if !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.CDRServer))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
if err = srvMngr.reloadService(srv, srvMngr.cfg.CdrsCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.CDRServer, srvMngr.cfg.CdrsCfg().Enabled); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -442,7 +331,14 @@ func (srvMngr *ServiceManager) handleReload() {
|
||||
}
|
||||
}
|
||||
|
||||
func (srvMngr *ServiceManager) reloadService(srv Service, shouldRun bool) (err error) {
|
||||
func (srvMngr *ServiceManager) reloadService(srviceName string, shouldRun bool) (err error) {
|
||||
srv, has := srvMngr.GetService(srviceName)
|
||||
if !has { // this should not happen (check the added services)
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>",
|
||||
utils.ServiceManager, srviceName))
|
||||
srvMngr.engineShutdown <- true
|
||||
return
|
||||
}
|
||||
if shouldRun {
|
||||
if srv.IsRunning() {
|
||||
if err = srv.Reload(srvMngr); err != nil {
|
||||
@@ -467,13 +363,24 @@ func (srvMngr *ServiceManager) reloadService(srv Service, shouldRun bool) (err e
|
||||
return
|
||||
}
|
||||
|
||||
// GetService returns the named service
|
||||
func (srvMngr *ServiceManager) GetService(subsystem string) (srv Service, err error) {
|
||||
var has bool
|
||||
srvMngr.RLock()
|
||||
if srv, has = srvMngr.subsystems[subsystem]; !has {
|
||||
err = utils.ErrNotFound
|
||||
func (srvMngr *ServiceManager) startService(srviceName string) {
|
||||
srv, has := srvMngr.GetService(srviceName)
|
||||
if !has { // this should not happen (check the added services)
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>",
|
||||
utils.ServiceManager, srviceName))
|
||||
srvMngr.engineShutdown <- true
|
||||
return
|
||||
}
|
||||
if err := srv.Start(srvMngr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, srviceName, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
}
|
||||
|
||||
// GetService returns the named service
|
||||
func (srvMngr ServiceManager) GetService(subsystem string) (srv Service, has bool) {
|
||||
srvMngr.RLock()
|
||||
srv, has = srvMngr.subsystems[subsystem]
|
||||
srvMngr.RUnlock()
|
||||
return
|
||||
}
|
||||
@@ -506,7 +413,7 @@ type ServiceProvider interface {
|
||||
// GetConnection creates a rpcClient to the specified subsystem
|
||||
GetConnection(subsystem string, cfg []*config.RemoteHost) (rpcclient.RpcClientConnection, error)
|
||||
// GetService returns the named service
|
||||
GetService(subsystem string) (Service, error)
|
||||
GetService(subsystem string) (Service, bool)
|
||||
// AddService adds the given serices
|
||||
AddService(services ...Service)
|
||||
// SetCacheS sets the cacheS
|
||||
|
||||
Reference in New Issue
Block a user