/* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ package servmanager import ( "fmt" "sync" "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) // NewServiceManager returns a service manager func NewServiceManager(shdWg *sync.WaitGroup, connMgr *engine.ConnManager, cfg *config.CGRConfig, registry *ServiceRegistry, services []Service) (sM *ServiceManager) { sM = &ServiceManager{ cfg: cfg, registry: registry, shdWg: shdWg, connMgr: connMgr, rldChan: cfg.GetReloadChan(), } sM.AddServices(services...) return } // ServiceManager handles service management ran by the engine type ServiceManager struct { sync.RWMutex // lock access to any shared data cfg *config.CGRConfig registry *ServiceRegistry // index here the services for accessing them by their IDs shdWg *sync.WaitGroup // list of shutdown items rldChan <-chan string // reload signals come over this channelc connMgr *engine.ConnManager } // StartServices starts all enabled services func (m *ServiceManager) StartServices(shutdown chan struct{}) { go m.handleReload(shutdown) for _, svc := range m.registry.List() { // TODO: verify if IsServiceInState check is needed. It should // be redundant since ServManager manages all services and this // runs only at startup if svc.ShouldRun() && !IsServiceInState(svc, utils.StateServiceUP) { m.shdWg.Add(1) go func() { if err := svc.Start(shutdown, m.registry); err != nil && err != utils.ErrServiceAlreadyRunning { // in case the service was started in another gorutine utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) close(shutdown) } close(svc.StateChan(utils.StateServiceUP)) utils.Logger.Info(fmt.Sprintf("<%s> started <%s> service", utils.ServiceManager, svc.ServiceName())) }() } } // startServer() } // AddServices adds given services func (m *ServiceManager) AddServices(services ...Service) { m.Lock() for _, svc := range services { m.registry.Register(svc) if sAPIData, hasAPIData := serviceAPIData[svc.ServiceName()]; hasAPIData { // Add the internal connections rpcIntChan := make(chan birpc.ClientConnector, 1) m.connMgr.AddInternalConn(sAPIData[1], sAPIData[0], rpcIntChan) if len(sAPIData) > 2 { // Add the bidirectional API m.connMgr.AddInternalConn(sAPIData[2], sAPIData[0], rpcIntChan) } go func() { // ToDo: centralize management into one single goroutine if utils.StructChanTimeout( m.registry.Lookup(svc.ServiceName()).StateChan(utils.StateServiceUP), m.cfg.GeneralCfg().ConnectTimeout) { utils.Logger.Err( fmt.Sprintf("<%s> failed to register internal connection to service %s because of timeout waiting for ServiceUP state", utils.ServiceManager, svc.ServiceName())) // toDo: shutdown service } rpcIntChan <- svc.IntRPCConn() }() } } m.Unlock() } func (m *ServiceManager) handleReload(shutdown chan struct{}) { var serviceID string for { select { case <-shutdown: m.ShutdownServices() return case serviceID = <-m.rldChan: } if serviceID == config.RPCConnsJSON { go m.connMgr.Reload() } else { go m.reloadService(serviceID, shutdown) } // handle RPC server } } func (m *ServiceManager) reloadService(id string, shutdown chan struct{}) (err error) { svc := m.registry.Lookup(id) isUp := IsServiceInState(svc, utils.StateServiceUP) if svc.ShouldRun() { if isUp { // TODO: state channels must be reinitiated for both SERVICE_UP and SERVICE_DOWN. if err = svc.Reload(shutdown, m.registry); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to reload <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) close(shutdown) return // stop if we encounter an error } utils.Logger.Info(fmt.Sprintf("<%s> reloaded <%s> service", utils.ServiceManager, svc.ServiceName())) } else { m.shdWg.Add(1) if err = svc.Start(shutdown, m.registry); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> serivce: %v", utils.ServiceManager, svc.ServiceName(), err)) close(shutdown) return // stop if we encounter an error } close(svc.StateChan(utils.StateServiceUP)) utils.Logger.Info(fmt.Sprintf("<%s> started <%s> service", utils.ServiceManager, svc.ServiceName())) } } else if isUp { if err = svc.Shutdown(m.registry); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to shut down <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) close(shutdown) } close(svc.StateChan(utils.StateServiceDOWN)) utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> service", utils.ServiceManager, svc.ServiceName())) m.shdWg.Done() } return } // ShutdownServices will stop all services func (m *ServiceManager) ShutdownServices() { for _, svc := range m.registry.List() { if IsServiceInState(svc, utils.StateServiceUP) { go func() { defer m.shdWg.Done() if err := svc.Shutdown(m.registry); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to shut down <%s> service: %v", utils.ServiceManager, svc.ServiceName(), err)) return } close(svc.StateChan(utils.StateServiceDOWN)) utils.Logger.Info(fmt.Sprintf("<%s> stopped <%s> service", utils.ServiceManager, svc.ServiceName())) }() } } } // Service interface that describes what functions should a service implement type Service interface { // Start should handle the service start Start(chan struct{}, *ServiceRegistry) error // Reload handles the change of config Reload(chan struct{}, *ServiceRegistry) error // Shutdown stops the service Shutdown(*ServiceRegistry) error // ShouldRun returns if the service should be running ShouldRun() bool // ServiceName returns the service name ServiceName() string // StateChan returns the channel for specific state subscription StateChan(stateID string) chan struct{} // IntRPCConn returns the connector needed for internal RPC connections IntRPCConn() birpc.ClientConnector } // ArgsServiceID are passed to Start/Stop/Status RPC methods type ArgsServiceID struct { ServiceID string APIOpts map[string]any } // V1StartService starts a service with ID func (m *ServiceManager) V1StartService(ctx *context.Context, args *ArgsServiceID, reply *string) (err error) { err = toggleService(args.ServiceID, true, m) if err != nil { return } *reply = utils.OK return } // V1StopService shuts-down a service with ID func (m *ServiceManager) V1StopService(ctx *context.Context, args *ArgsServiceID, reply *string) (err error) { err = toggleService(args.ServiceID, false, m) if err != nil { return } *reply = utils.OK return } // V1ServiceStatus returns the service status func (m *ServiceManager) V1ServiceStatus(ctx *context.Context, args *ArgsServiceID, reply *string) error { m.RLock() defer m.RUnlock() svc := m.registry.Lookup(args.ServiceID) if svc == nil { return utils.ErrUnsupportedServiceID } if IsServiceInState(svc, utils.StateServiceUP) { *reply = utils.RunningCaps } else { *reply = utils.StoppedCaps } return nil } // GetConfig returns the Configuration func (m *ServiceManager) GetConfig() *config.CGRConfig { m.RLock() defer m.RUnlock() return m.cfg } func toggleService(id string, status bool, srvMngr *ServiceManager) (err error) { srvMngr.Lock() defer srvMngr.Unlock() switch id { case utils.AccountS: srvMngr.cfg.AccountSCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.ActionS: srvMngr.cfg.ActionSCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.AdminS: srvMngr.cfg.AdminSCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.AnalyzerS: srvMngr.cfg.AnalyzerSCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.AttributeS: srvMngr.cfg.AttributeSCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.CDRServer: srvMngr.cfg.CdrsCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.ChargerS: srvMngr.cfg.ChargerSCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.DispatcherS: srvMngr.cfg.DispatcherSCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.EEs: srvMngr.cfg.EEsCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.EFs: srvMngr.cfg.EFsCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.ERs: srvMngr.cfg.ERsCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id // case utils.LoaderS: // srvMngr.cfg.LoaderCfg()[0].Enabled = status // srvMngr.cfg.GetReloadChan() <- serviceID case utils.RateS: srvMngr.cfg.RateSCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.TrendS: srvMngr.cfg.TrendSCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.RankingS: srvMngr.cfg.RankingSCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.ResourceS: srvMngr.cfg.ResourceSCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.RouteS: srvMngr.cfg.RouteSCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.SessionS: srvMngr.cfg.SessionSCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.StatS: srvMngr.cfg.StatSCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.ThresholdS: srvMngr.cfg.ThresholdSCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.TPeS: srvMngr.cfg.TpeSCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.AsteriskAgent: srvMngr.cfg.AsteriskAgentCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.DiameterAgent: srvMngr.cfg.DiameterAgentCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.DNSAgent: srvMngr.cfg.DNSAgentCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.FreeSWITCHAgent: srvMngr.cfg.FsAgentCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.KamailioAgent: srvMngr.cfg.KamAgentCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.RadiusAgent: srvMngr.cfg.RadiusAgentCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id case utils.SIPAgent: srvMngr.cfg.SIPAgentCfg().Enabled = status srvMngr.cfg.GetReloadChan() <- id default: err = utils.ErrUnsupportedServiceID } return } var serviceAPIData = map[string][]string{ utils.AnalyzerS: { utils.AnalyzerSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzerS)}, utils.AdminS: { utils.AdminSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS)}, utils.AttributeS: { utils.AttributeSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes)}, utils.CacheS: { utils.CacheSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)}, utils.CDRs: { utils.CDRsV1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs)}, utils.ChargerS: { utils.ChargerSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers)}, utils.GuardianS: { utils.GuardianSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian)}, utils.LoaderS: { utils.LoaderSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaLoaders)}, utils.ResourceS: { utils.ResourceSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources)}, utils.SessionS: { utils.SessionSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS), utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS)}, utils.StatS: { utils.StatSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats)}, utils.RankingS: { utils.RankingSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRankings)}, utils.TrendS: { utils.TrendSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTrends)}, utils.RouteS: { utils.RouteSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes)}, utils.ThresholdS: { utils.ThresholdSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)}, utils.ServiceManagerS: { utils.ServiceManagerV1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager)}, utils.ConfigS: { utils.ConfigSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig)}, utils.CoreS: { utils.CoreSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore)}, utils.EEs: { utils.EeSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)}, utils.RateS: { utils.RateSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRates)}, utils.DispatcherS: { utils.DispatcherSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers)}, utils.AccountS: { utils.AccountSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts)}, utils.ActionS: { utils.ActionSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions)}, utils.TPeS: { utils.TPeSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes)}, utils.EFs: { utils.EfSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs)}, utils.ERs: { utils.ErSv1, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs)}, } // IsServiceInState performs a non-blocking check to determine if a service is in the specified state. func IsServiceInState(svc Service, state string) bool { select { case <-svc.StateChan(state): return true default: return false } }