Files
cgrates/servmanager/servmanager.go
ionutboangiu dab152f976 Integrate StateServiceDOWN + registry related refactor
Removed redundant IsRunning service method
Removed registry from constructors
Pass registry to Start/Reload/Shutdown service methods
2025-01-14 19:00:37 +01:00

424 lines
14 KiB
Go

/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package servmanager
import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewServiceManager returns a service manager
func NewServiceManager(shdWg *sync.WaitGroup, connMgr *engine.ConnManager,
cfg *config.CGRConfig, registry *ServiceRegistry, services []Service) (sM *ServiceManager) {
sM = &ServiceManager{
cfg: cfg,
registry: registry,
shdWg: shdWg,
connMgr: connMgr,
rldChan: cfg.GetReloadChan(),
}
sM.AddServices(services...)
return
}
// ServiceManager handles service management ran by the engine
type ServiceManager struct {
sync.RWMutex // lock access to any shared data
cfg *config.CGRConfig
registry *ServiceRegistry // index here the services for accessing them by their IDs
shdWg *sync.WaitGroup // list of shutdown items
rldChan <-chan string // reload signals come over this channelc
connMgr *engine.ConnManager
}
// StartServices starts all enabled services
func (m *ServiceManager) StartServices(shutdown chan struct{}) {
go m.handleReload(shutdown)
for _, svc := range m.registry.List() {
if svc.ShouldRun() && !IsServiceInState(svc, utils.StateServiceUP) {
m.shdWg.Add(1)
go func() {
if err := svc.Start(shutdown, m.registry); err != nil &&
err != utils.ErrServiceAlreadyRunning { // in case the service was started in another gorutine
utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err))
close(shutdown)
}
utils.Logger.Info(fmt.Sprintf("<%s> started <%s> service", utils.ServiceManager, svc.ServiceName()))
}()
}
}
// startServer()
}
// AddServices adds given services
func (m *ServiceManager) AddServices(services ...Service) {
m.Lock()
for _, svc := range services {
m.registry.Register(svc)
if sAPIData, hasAPIData := serviceAPIData[svc.ServiceName()]; hasAPIData { // Add the internal connections
rpcIntChan := make(chan birpc.ClientConnector, 1)
m.connMgr.AddInternalConn(sAPIData[1], sAPIData[0], rpcIntChan)
if len(sAPIData) > 2 { // Add the bidirectional API
m.connMgr.AddInternalConn(sAPIData[2], sAPIData[0], rpcIntChan)
}
go func() { // ToDo: centralize management into one single goroutine
if utils.StructChanTimeout(
m.registry.Lookup(svc.ServiceName()).StateChan(utils.StateServiceUP),
m.cfg.GeneralCfg().ConnectTimeout) {
utils.Logger.Err(
fmt.Sprintf("<%s> failed to register internal connection to service %s because of timeout waiting for ServiceUP state",
utils.ServiceManager, svc.ServiceName()))
// toDo: shutdown service
}
rpcIntChan <- svc.IntRPCConn()
}()
}
}
m.Unlock()
}
func (m *ServiceManager) handleReload(shutdown chan struct{}) {
var serviceID string
for {
select {
case <-shutdown:
m.ShutdownServices()
return
case serviceID = <-m.rldChan:
}
if serviceID == config.RPCConnsJSON {
go m.connMgr.Reload()
} else {
go m.reloadService(serviceID, shutdown)
}
// handle RPC server
}
}
func (m *ServiceManager) reloadService(id string, shutdown chan struct{}) (err error) {
svc := m.registry.Lookup(id)
isUp := IsServiceInState(svc, utils.StateServiceUP)
if svc.ShouldRun() {
if isUp {
if err = svc.Reload(shutdown, m.registry); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed to reload <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err))
close(shutdown)
return // stop if we encounter an error
}
utils.Logger.Info(fmt.Sprintf("<%s> reloaded <%s> service", utils.ServiceManager, svc.ServiceName()))
} else {
m.shdWg.Add(1)
if err = svc.Start(shutdown, m.registry); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> serivce: %v", utils.ServiceManager, svc.ServiceName(), err))
close(shutdown)
return // stop if we encounter an error
}
utils.Logger.Info(fmt.Sprintf("<%s> started <%s> service", utils.ServiceManager, svc.ServiceName()))
}
} else if isUp {
if err = svc.Shutdown(m.registry); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed to shut down <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err))
close(shutdown)
}
utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> service", utils.ServiceManager, svc.ServiceName()))
m.shdWg.Done()
}
return
}
// ShutdownServices will stop all services
func (m *ServiceManager) ShutdownServices() {
for _, svc := range m.registry.List() {
if IsServiceInState(svc, utils.StateServiceUP) {
go func() {
defer m.shdWg.Done()
if err := svc.Shutdown(m.registry); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed to shut down <%s> service: %v",
utils.ServiceManager, svc.ServiceName(), err))
return
}
utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> service", utils.ServiceManager, svc.ServiceName()))
}()
}
}
}
// Service interface that describes what functions should a service implement
type Service interface {
// Start should handle the service start
Start(chan struct{}, *ServiceRegistry) error
// Reload handles the change of config
Reload(chan struct{}, *ServiceRegistry) error
// Shutdown stops the service
Shutdown(*ServiceRegistry) error
// ShouldRun returns if the service should be running
ShouldRun() bool
// ServiceName returns the service name
ServiceName() string
// StateChan returns the channel for specific state subscription
StateChan(stateID string) chan struct{}
// IntRPCConn returns the connector needed for internal RPC connections
IntRPCConn() birpc.ClientConnector
}
// ArgsServiceID are passed to Start/Stop/Status RPC methods
type ArgsServiceID struct {
ServiceID string
APIOpts map[string]any
}
// V1StartService starts a service with ID
func (m *ServiceManager) V1StartService(ctx *context.Context, args *ArgsServiceID, reply *string) (err error) {
err = toggleService(args.ServiceID, true, m)
if err != nil {
return
}
*reply = utils.OK
return
}
// V1StopService shuts-down a service with ID
func (m *ServiceManager) V1StopService(ctx *context.Context, args *ArgsServiceID, reply *string) (err error) {
err = toggleService(args.ServiceID, false, m)
if err != nil {
return
}
*reply = utils.OK
return
}
// V1ServiceStatus returns the service status
func (m *ServiceManager) V1ServiceStatus(ctx *context.Context, args *ArgsServiceID, reply *string) error {
m.RLock()
defer m.RUnlock()
svc := m.registry.Lookup(args.ServiceID)
if svc == nil {
return utils.ErrUnsupportedServiceID
}
if IsServiceInState(svc, utils.StateServiceUP) {
*reply = utils.RunningCaps
} else {
*reply = utils.StoppedCaps
}
return nil
}
// GetConfig returns the Configuration
func (m *ServiceManager) GetConfig() *config.CGRConfig {
m.RLock()
defer m.RUnlock()
return m.cfg
}
func toggleService(id string, status bool, srvMngr *ServiceManager) (err error) {
srvMngr.Lock()
defer srvMngr.Unlock()
switch id {
case utils.AccountS:
srvMngr.cfg.AccountSCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.ActionS:
srvMngr.cfg.ActionSCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.AdminS:
srvMngr.cfg.AdminSCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.AnalyzerS:
srvMngr.cfg.AnalyzerSCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.AttributeS:
srvMngr.cfg.AttributeSCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.CDRServer:
srvMngr.cfg.CdrsCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.ChargerS:
srvMngr.cfg.ChargerSCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.DispatcherS:
srvMngr.cfg.DispatcherSCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.EEs:
srvMngr.cfg.EEsCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.EFs:
srvMngr.cfg.EFsCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.ERs:
srvMngr.cfg.ERsCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
// case utils.LoaderS:
// srvMngr.cfg.LoaderCfg()[0].Enabled = status
// srvMngr.cfg.GetReloadChan() <- serviceID
case utils.RateS:
srvMngr.cfg.RateSCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.TrendS:
srvMngr.cfg.TrendSCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.RankingS:
srvMngr.cfg.RankingSCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.ResourceS:
srvMngr.cfg.ResourceSCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.RouteS:
srvMngr.cfg.RouteSCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.SessionS:
srvMngr.cfg.SessionSCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.StatS:
srvMngr.cfg.StatSCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.ThresholdS:
srvMngr.cfg.ThresholdSCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.TPeS:
srvMngr.cfg.TpeSCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.AsteriskAgent:
srvMngr.cfg.AsteriskAgentCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.DiameterAgent:
srvMngr.cfg.DiameterAgentCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.DNSAgent:
srvMngr.cfg.DNSAgentCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.FreeSWITCHAgent:
srvMngr.cfg.FsAgentCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.KamailioAgent:
srvMngr.cfg.KamAgentCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.RadiusAgent:
srvMngr.cfg.RadiusAgentCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
case utils.SIPAgent:
srvMngr.cfg.SIPAgentCfg().Enabled = status
srvMngr.cfg.GetReloadChan() <- id
default:
err = utils.ErrUnsupportedServiceID
}
return
}
var serviceAPIData = map[string][]string{
utils.AnalyzerS: {
utils.AnalyzerSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzerS)},
utils.AdminS: {
utils.AdminSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS)},
utils.AttributeS: {
utils.AttributeSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes)},
utils.CacheS: {
utils.CacheSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)},
utils.CDRs: {
utils.CDRsV1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs)},
utils.ChargerS: {
utils.ChargerSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers)},
utils.GuardianS: {
utils.GuardianSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian)},
utils.LoaderS: {
utils.LoaderSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaLoaders)},
utils.ResourceS: {
utils.ResourceSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources)},
utils.SessionS: {
utils.SessionSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS),
utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS)},
utils.StatS: {
utils.StatSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats)},
utils.RankingS: {
utils.RankingSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRankings)},
utils.TrendS: {
utils.TrendSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTrends)},
utils.RouteS: {
utils.RouteSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes)},
utils.ThresholdS: {
utils.ThresholdSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)},
utils.ServiceManagerS: {
utils.ServiceManagerV1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager)},
utils.ConfigS: {
utils.ConfigSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig)},
utils.CoreS: {
utils.CoreSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore)},
utils.EEs: {
utils.EeSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)},
utils.RateS: {
utils.RateSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRates)},
utils.DispatcherS: {
utils.DispatcherSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers)},
utils.AccountS: {
utils.AccountSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts)},
utils.ActionS: {
utils.ActionSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions)},
utils.TPeS: {
utils.TPeSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes)},
utils.EFs: {
utils.EfSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs)},
utils.ERs: {
utils.ErSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs)},
}
// IsServiceInState performs a non-blocking check to determine if a service is in the specified state.
func IsServiceInState(svc Service, state string) bool {
select {
case <-svc.StateChan(state):
return true
default:
return false
}
}