Implement GuardianService and ConfigService

And use ServiceIndexer to sync with them
This commit is contained in:
ionutboangiu
2024-12-12 18:15:32 +02:00
committed by Dan Christian Bogos
parent dd299361e6
commit d9359a4005
3 changed files with 254 additions and 72 deletions

View File

@@ -39,7 +39,6 @@ import (
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/loaders"
"github.com/cgrates/cgrates/services"
"github.com/cgrates/cgrates/servmanager"
@@ -129,10 +128,6 @@ func runCGREngine(fs []string) (err error) {
iServeManagerCh := make(chan birpc.ClientConnector, 1)
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager), utils.ServiceManagerV1, iServeManagerCh)
iConfigCh := make(chan birpc.ClientConnector, 1)
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig), utils.ConfigSv1, iConfigCh)
iGuardianSCh := make(chan birpc.ClientConnector, 1)
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian), utils.GuardianSv1, iGuardianSCh)
// ServiceIndexer will share service references to all services
srvIdxr := servmanager.NewServiceIndexer()
@@ -141,6 +136,8 @@ func runCGREngine(fs []string) (err error) {
sdbS := services.NewStorDBService(cfg, *flags.SetVersions, srvIdxr)
cls := services.NewCommonListenerService(cfg, caps, srvIdxr)
anzS := services.NewAnalyzerService(cfg, srvIdxr)
configS := services.NewConfigService(cfg, srvIdxr)
guardianS := services.NewGuardianService(cfg, srvIdxr)
coreS := services.NewCoreService(cfg, caps, cpuPrfF, shdWg, srvIdxr)
cacheS := services.NewCacheService(cfg, connMgr, srvIdxr)
fltrS := services.NewFilterService(cfg, connMgr, srvIdxr)
@@ -175,47 +172,48 @@ func runCGREngine(fs []string) (err error) {
accS := services.NewAccountService(cfg, connMgr, srvIdxr)
tpeS := services.NewTPeService(cfg, connMgr, srvIdxr)
srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, srvIdxr,
[]servmanager.Service{
gvS,
dmS,
sdbS,
cls,
anzS,
coreS,
cacheS,
fltrS,
dspS,
ldrs,
efs,
adminS,
sessionS,
attrS,
chrgS,
routeS,
resourceS,
trendS,
rankingS,
thS,
stS,
erS,
dnsAgent,
fsAgent,
kamAgent,
janusAgent,
astAgent,
radAgent,
diamAgent,
httpAgent,
sipAgent,
eeS,
cdrS,
registrarcS,
rateS,
actionS,
accS,
tpeS,
})
srvManager := servmanager.NewServiceManager(shdWg, connMgr, cfg, srvIdxr, []servmanager.Service{
gvS,
dmS,
sdbS,
cls,
anzS,
configS,
guardianS,
coreS,
cacheS,
fltrS,
dspS,
ldrs,
efs,
adminS,
sessionS,
attrS,
chrgS,
routeS,
resourceS,
trendS,
rankingS,
thS,
stS,
erS,
dnsAgent,
fsAgent,
kamAgent,
janusAgent,
astAgent,
radAgent,
diamAgent,
httpAgent,
sipAgent,
eeS,
cdrS,
registrarcS,
rateS,
actionS,
accS,
tpeS,
})
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), cfg.CoreSCfg().ShutdownTimeout*10)
@@ -309,10 +307,7 @@ func runCGREngine(fs []string) (err error) {
return
}
srvManager.StartServices(ctx, cancel)
cgrInitServiceManagerV1(iServeManagerCh, srvManager, cfg, cls.CLS(), anzS)
cgrInitGuardianSv1(iGuardianSCh, cfg, cls.CLS(), anzS)
cgrInitConfigSv1(iConfigCh, cfg, cls.CLS(), anzS)
if *flags.Preload != utils.EmptyString {
if err = cgrRunPreload(ctx, cfg, *flags.Preload, srvIdxr); err != nil {
@@ -368,17 +363,6 @@ func cgrRunPreload(ctx *context.Context, cfg *config.CGRConfig, loaderIDs string
return
}
func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, cfg *config.CGRConfig,
cl *commonlisteners.CommonListenerS, anz *services.AnalyzerService) {
srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true)
if !cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
cl.RpcRegister(s)
}
}
iGuardianSCh <- anz.GetInternalCodec(srv, utils.GuardianS)
}
func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector,
srvMngr *servmanager.ServiceManager, cfg *config.CGRConfig,
cl *commonlisteners.CommonListenerS, anz *services.AnalyzerService) {
@@ -389,18 +373,6 @@ func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector,
iServMngrCh <- anz.GetInternalCodec(srv, utils.ServiceManager)
}
func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector,
cfg *config.CGRConfig, cl *commonlisteners.CommonListenerS, anz *services.AnalyzerService) {
srv, _ := engine.NewServiceWithName(cfg, utils.ConfigS, true)
// srv, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false)
if !cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
cl.RpcRegister(s)
}
}
iConfigCh <- anz.GetInternalCodec(srv, utils.ConfigSv1)
}
func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc,
cfg *config.CGRConfig, sIdxr *servmanager.ServiceIndexer) {
if cfg.DispatcherSCfg().Enabled { // wait only for dispatcher as cache is allways registered before this

105
services/config.go Normal file
View File

@@ -0,0 +1,105 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewConfigService instantiates a new ConfigService.
func NewConfigService(cfg *config.CGRConfig, srvIndexer *servmanager.ServiceIndexer) *ConfigService {
return &ConfigService{
cfg: cfg,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
// ConfigService implements Service interface.
type ConfigService struct {
mu sync.RWMutex
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start handles the service start.
func (s *ConfigService) Start(_ *context.Context, _ context.CancelFunc) error {
cls := s.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.CommonListenerS, utils.StateServiceUP)
}
anz := s.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.AnalyzerS, utils.StateServiceUP)
}
srv, _ := engine.NewServiceWithName(s.cfg, utils.ConfigS, true)
if !s.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
cls.CLS().RpcRegister(s)
}
}
s.intRPCconn = anz.GetInternalCodec(srv, utils.ConfigSv1)
close(s.stateDeps.StateChan(utils.StateServiceUP))
return nil
}
// Reload handles the config changes.
func (s *ConfigService) Reload(*context.Context, context.CancelFunc) error {
return nil
}
// Shutdown stops the service.
func (s *ConfigService) Shutdown() error {
return nil
}
// IsRunning returns whether the service is running or not.
func (s *ConfigService) IsRunning() bool {
return true
}
// ServiceName returns the service name
func (s *ConfigService) ServiceName() string {
return utils.ConfigS
}
// ShouldRun returns if the service should be running.
func (s *ConfigService) ShouldRun() bool {
return true
}
// StateChan returns signaling channel of specific state
func (s *ConfigService) StateChan(stateID string) chan struct{} {
return s.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (s *ConfigService) IntRPCConn() birpc.ClientConnector {
return s.intRPCconn
}

105
services/guardian.go Normal file
View File

@@ -0,0 +1,105 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewGuardianService instantiates a new GuardianService.
func NewGuardianService(cfg *config.CGRConfig, srvIndexer *servmanager.ServiceIndexer) *GuardianService {
return &GuardianService{
cfg: cfg,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
// GuardianService implements Service interface.
type GuardianService struct {
mu sync.RWMutex
cfg *config.CGRConfig
intRPCconn birpc.ClientConnector // expose API methods over internal connection
srvIndexer *servmanager.ServiceIndexer // access directly services from here
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start handles the service start.
func (s *GuardianService) Start(_ *context.Context, _ context.CancelFunc) error {
cls := s.srvIndexer.GetService(utils.CommonListenerS).(*CommonListenerService)
if utils.StructChanTimeout(cls.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.CommonListenerS, utils.StateServiceUP)
}
anz := s.srvIndexer.GetService(utils.AnalyzerS).(*AnalyzerService)
if utils.StructChanTimeout(anz.StateChan(utils.StateServiceUP), s.cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.GuardianS, utils.AnalyzerS, utils.StateServiceUP)
}
srv, _ := engine.NewServiceWithName(guardian.Guardian, utils.GuardianS, true)
if !s.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
cls.CLS().RpcRegister(s)
}
}
s.intRPCconn = anz.GetInternalCodec(srv, utils.GuardianS)
close(s.stateDeps.StateChan(utils.StateServiceUP))
return nil
}
// Reload handles the config changes.
func (s *GuardianService) Reload(*context.Context, context.CancelFunc) error {
return nil
}
// Shutdown stops the service.
func (s *GuardianService) Shutdown() error {
return nil
}
// IsRunning returns whether the service is running or not.
func (s *GuardianService) IsRunning() bool {
return true
}
// ServiceName returns the service name
func (s *GuardianService) ServiceName() string {
return utils.FilterS
}
// ShouldRun returns if the service should be running.
func (s *GuardianService) ShouldRun() bool {
return true
}
// StateChan returns signaling channel of specific state
func (s *GuardianService) StateChan(stateID string) chan struct{} {
return s.stateDeps.StateChan(stateID)
}
// IntRPCConn returns the internal connection used by RPCClient
func (s *GuardianService) IntRPCConn() birpc.ClientConnector {
return s.intRPCconn
}