mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 20:59:53 +05:00
servmanager: remove redundant subsystems map
ServiceIndexer can handle its usecases instead
This commit is contained in:
committed by
Dan Christian Bogos
parent
cfdb3e80ca
commit
82c985cdbe
@@ -20,7 +20,6 @@ package servmanager
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
@@ -36,7 +35,6 @@ func NewServiceManager(shdWg *sync.WaitGroup, connMgr *engine.ConnManager,
|
||||
cfg *config.CGRConfig, srvIndxr *ServiceIndexer, services []Service) (sM *ServiceManager) {
|
||||
sM = &ServiceManager{
|
||||
cfg: cfg,
|
||||
subsystems: make(map[string]Service),
|
||||
serviceIndexer: srvIndxr,
|
||||
shdWg: shdWg,
|
||||
connMgr: connMgr,
|
||||
@@ -48,44 +46,27 @@ func NewServiceManager(shdWg *sync.WaitGroup, connMgr *engine.ConnManager,
|
||||
|
||||
// ServiceManager handles service management ran by the engine
|
||||
type ServiceManager struct {
|
||||
sync.RWMutex // lock access to any shared data
|
||||
cfg *config.CGRConfig
|
||||
subsystems map[string]Service // active subsystems managed by SM
|
||||
|
||||
sync.RWMutex // lock access to any shared data
|
||||
cfg *config.CGRConfig
|
||||
serviceIndexer *ServiceIndexer // index here the services for accessing them by their IDs
|
||||
|
||||
shdWg *sync.WaitGroup // list of shutdown items
|
||||
|
||||
rldChan <-chan string // reload signals come over this channelc
|
||||
|
||||
connMgr *engine.ConnManager
|
||||
shdWg *sync.WaitGroup // list of shutdown items
|
||||
rldChan <-chan string // reload signals come over this channelc
|
||||
connMgr *engine.ConnManager
|
||||
}
|
||||
|
||||
// StartServices starts all enabled services
|
||||
func (srvMngr *ServiceManager) StartServices(ctx *context.Context, shtDwn context.CancelFunc) {
|
||||
go srvMngr.handleReload(ctx, shtDwn)
|
||||
for _, service := range srvMngr.subsystems {
|
||||
if service.ShouldRun() && !service.IsRunning() {
|
||||
srvMngr.shdWg.Add(1)
|
||||
go func(srv Service) {
|
||||
if err := srv.Start(ctx, shtDwn); err != nil &&
|
||||
err != utils.ErrServiceAlreadyRunning { // in case the service was started in another gorutine
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> failed to start %s because: %s", utils.ServiceManager, srv.ServiceName(), err))
|
||||
shtDwn()
|
||||
}
|
||||
}(service)
|
||||
}
|
||||
}
|
||||
for _, service := range srvMngr.serviceIndexer.GetServices() {
|
||||
if service.ShouldRun() && !service.IsRunning() {
|
||||
srvMngr.shdWg.Add(1)
|
||||
go func(srv Service) {
|
||||
if err := srv.Start(ctx, shtDwn); err != nil &&
|
||||
go func() {
|
||||
if err := service.Start(ctx, shtDwn); err != nil &&
|
||||
err != utils.ErrServiceAlreadyRunning { // in case the service was started in another gorutine
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> failed to start %s because: %s", utils.ServiceManager, srv.ServiceName(), err))
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> failed to start %s because: %s", utils.ServiceManager, service.ServiceName(), err))
|
||||
shtDwn()
|
||||
}
|
||||
}(service)
|
||||
}()
|
||||
}
|
||||
}
|
||||
// startServer()
|
||||
@@ -95,9 +76,7 @@ func (srvMngr *ServiceManager) StartServices(ctx *context.Context, shtDwn contex
|
||||
func (srvMngr *ServiceManager) AddServices(services ...Service) {
|
||||
srvMngr.Lock()
|
||||
for _, srv := range services {
|
||||
if _, has := srvMngr.subsystems[srv.ServiceName()]; !has { // do not rewrite the service
|
||||
srvMngr.subsystems[srv.ServiceName()] = srv
|
||||
}
|
||||
srvMngr.serviceIndexer.AddService(srv.ServiceName(), srv)
|
||||
if sAPIData, hasAPIData := serviceAPIData[srv.ServiceName()]; hasAPIData { // Add the internal connections
|
||||
rpcIntChan := make(chan birpc.ClientConnector, 1)
|
||||
srvMngr.connMgr.AddInternalConn(sAPIData[1], sAPIData[0], rpcIntChan)
|
||||
@@ -105,7 +84,6 @@ func (srvMngr *ServiceManager) AddServices(services ...Service) {
|
||||
srvMngr.connMgr.AddInternalConn(sAPIData[2], sAPIData[0], rpcIntChan)
|
||||
}
|
||||
go func() { // ToDo: centralize management into one single goroutine
|
||||
log.Printf("service name: %s", srv.ServiceName())
|
||||
if utils.StructChanTimeout(
|
||||
srvMngr.serviceIndexer.GetService(srv.ServiceName()).StateChan(utils.StateServiceUP),
|
||||
srvMngr.cfg.GeneralCfg().ConnectTimeout) {
|
||||
@@ -141,7 +119,7 @@ func (srvMngr *ServiceManager) handleReload(ctx *context.Context, shtDwn context
|
||||
}
|
||||
|
||||
func (srvMngr *ServiceManager) reloadService(srvName string, ctx *context.Context, shtDwn context.CancelFunc) (err error) {
|
||||
srv := srvMngr.GetService(srvName)
|
||||
srv := srvMngr.serviceIndexer.GetService(srvName)
|
||||
if srv.ShouldRun() {
|
||||
if srv.IsRunning() {
|
||||
if err = srv.Reload(ctx, shtDwn); err != nil {
|
||||
@@ -167,17 +145,9 @@ func (srvMngr *ServiceManager) reloadService(srvName string, ctx *context.Contex
|
||||
return
|
||||
}
|
||||
|
||||
// GetService returns the named service
|
||||
func (srvMngr *ServiceManager) GetService(subsystem string) (srv Service) {
|
||||
srvMngr.RLock()
|
||||
srv = srvMngr.subsystems[subsystem]
|
||||
srvMngr.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
// ShutdownServices will stop all services
|
||||
func (srvMngr *ServiceManager) ShutdownServices() {
|
||||
for _, srv := range srvMngr.subsystems { // gracefully stop all running subsystems
|
||||
for _, srv := range srvMngr.serviceIndexer.GetServices() {
|
||||
if srv.IsRunning() {
|
||||
go func(srv Service) {
|
||||
if err := srv.Shutdown(); err != nil {
|
||||
@@ -241,7 +211,7 @@ func (srvMngr *ServiceManager) V1ServiceStatus(ctx *context.Context, args *ArgsS
|
||||
srvMngr.RLock()
|
||||
defer srvMngr.RUnlock()
|
||||
|
||||
srv := srvMngr.GetService(args.ServiceID)
|
||||
srv := srvMngr.serviceIndexer.GetService(args.ServiceID)
|
||||
if srv == nil {
|
||||
return utils.ErrUnsupportedServiceID
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user