Integrate cgrRunPreload inside LoaderService.Start

This commit is contained in:
ionutboangiu
2025-01-17 17:42:27 +02:00
committed by Dan Christian Bogos
parent 4ce506faa9
commit ba9c2e1e3f
2 changed files with 69 additions and 89 deletions

View File

@@ -26,7 +26,6 @@ import (
"path/filepath"
"runtime"
"runtime/pprof"
"strings"
"sync"
"syscall"
"time"
@@ -36,7 +35,6 @@ import (
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/loaders"
"github.com/cgrates/cgrates/services"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
@@ -142,7 +140,7 @@ func runCGREngine(fs []string) (err error) {
coreS,
services.NewCacheService(cfg),
services.NewFilterService(cfg),
services.NewLoaderService(cfg),
services.NewLoaderService(cfg, *flags.Preload),
services.NewExportFailoverService(cfg),
services.NewAdminSv1Service(cfg),
services.NewSessionService(cfg),
@@ -205,12 +203,6 @@ func runCGREngine(fs []string) (err error) {
srvManager.StartServices(shutdown)
cgrInitServiceManagerV1(cfg, srvManager, registry)
if *flags.Preload != utils.EmptyString {
if err = cgrRunPreload(cfg, *flags.Preload, registry); err != nil {
return
}
}
// Serve rpc connections
cgrStartRPC(cfg, registry, shutdown)
@@ -230,33 +222,6 @@ func runCGREngine(fs []string) (err error) {
return
}
// TODO: merge with LoaderService
func cgrRunPreload(cfg *config.CGRConfig, loaderIDs string,
registry *servmanager.ServiceRegistry) (err error) {
if !cfg.LoaderCfg().Enabled() {
err = fmt.Errorf("<%s> not enabled but required by preload mechanism", utils.LoaderS)
return
}
loader := registry.Lookup(utils.LoaderS).(*services.LoaderService)
if utils.StructChanTimeout(loader.StateChan(utils.StateServiceUP), cfg.GeneralCfg().ConnectTimeout) {
return utils.NewServiceStateTimeoutError(utils.PreloadCgr, utils.LoaderS, utils.StateServiceUP)
}
var reply string
for _, loaderID := range strings.Split(loaderIDs, utils.FieldsSep) {
if err = loader.GetLoaderS().V1Run(context.TODO(), &loaders.ArgsProcessFolder{
APIOpts: map[string]any{
utils.MetaForceLock: true, // force lock will unlock the file in case is locked and return error
utils.MetaStopOnError: true,
},
LoaderID: loaderID,
}, &reply); err != nil {
err = fmt.Errorf("<%s> preload failed on loadID <%s> , err: <%s>", utils.LoaderS, loaderID, err)
return
}
}
return
}
func cgrInitServiceManagerV1(cfg *config.CGRConfig, srvMngr *servmanager.ServiceManager,
registry *servmanager.ServiceRegistry) {
srvDeps, err := services.WaitForServicesToReachState(utils.StateServiceUP,

View File

@@ -19,8 +19,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package services
import (
"fmt"
"strings"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/commonlisteners"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -30,29 +33,28 @@ import (
)
// NewLoaderService returns the Loader Service
func NewLoaderService(cfg *config.CGRConfig) *LoaderService {
func NewLoaderService(cfg *config.CGRConfig, preloadIDs string) *LoaderService {
return &LoaderService{
cfg: cfg,
stopChan: make(chan struct{}),
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
cfg: cfg,
stopChan: make(chan struct{}),
preloadIDs: strings.Split(preloadIDs, utils.FieldsSep),
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
}
}
// LoaderService implements Service interface
type LoaderService struct {
sync.RWMutex
ldrs *loaders.LoaderS
cl *commonlisteners.CommonListenerS
stopChan chan struct{}
cfg *config.CGRConfig
stateDeps *StateDependencies // channel subscriptions for state changes
cfg *config.CGRConfig
ldrs *loaders.LoaderS
cl *commonlisteners.CommonListenerS
preloadIDs []string
stopChan chan struct{}
stateDeps *StateDependencies // channel subscriptions for state changes
}
// Start should handle the service start
func (ldrs *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) {
func (s *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.CommonListenerS,
@@ -60,86 +62,99 @@ func (ldrs *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.Serv
utils.FilterS,
utils.DataDB,
},
registry, ldrs.cfg.GeneralCfg().ConnectTimeout)
registry, s.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
ldrs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS()
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
ldrs.Lock()
defer ldrs.Unlock()
s.Lock()
defer s.Unlock()
ldrs.ldrs = loaders.NewLoaderS(ldrs.cfg, dbs.DataManager(), fs.FilterS(), cms.ConnManager())
if !ldrs.ldrs.Enabled() {
return
s.ldrs = loaders.NewLoaderS(s.cfg, dbs, fs, cms.ConnManager())
if !s.ldrs.Enabled() {
return nil
}
if err = ldrs.ldrs.ListenAndServe(ldrs.stopChan); err != nil {
return
var reply string
for _, loaderID := range s.preloadIDs {
if err = s.ldrs.V1Run(context.TODO(),
&loaders.ArgsProcessFolder{
APIOpts: map[string]any{
utils.MetaForceLock: true,
utils.MetaStopOnError: true,
}, LoaderID: loaderID,
}, &reply); err != nil {
return fmt.Errorf("could not preload loader with ID %q: %v", loaderID, err)
}
}
srv, _ := engine.NewService(ldrs.ldrs)
if err := s.ldrs.ListenAndServe(s.stopChan); err != nil {
return err
}
srv, _ := engine.NewService(s.ldrs)
// srv, _ := birpc.NewService(apis.NewLoaderSv1(ldrs.ldrs), "", false)
for _, s := range srv {
ldrs.cl.RpcRegister(s)
for _, svc := range srv {
s.cl.RpcRegister(svc)
}
cms.AddInternalConn(utils.LoaderS, srv)
return
return nil
}
// Reload handles the change of config
func (ldrs *LoaderService) Reload(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
func (s *LoaderService) Reload(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP,
[]string{
utils.ConnManager,
utils.FilterS,
utils.DataDB,
},
registry, ldrs.cfg.GeneralCfg().ConnectTimeout)
registry, s.cfg.GeneralCfg().ConnectTimeout)
if err != nil {
return err
}
cms := srvDeps[utils.ConnManager].(*ConnManagerService)
fs := srvDeps[utils.FilterS].(*FilterService)
dbs := srvDeps[utils.DataDB].(*DataDBService)
close(ldrs.stopChan)
ldrs.stopChan = make(chan struct{})
cms := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager()
fs := srvDeps[utils.FilterS].(*FilterService).FilterS()
dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager()
close(s.stopChan)
s.stopChan = make(chan struct{})
ldrs.RLock()
defer ldrs.RUnlock()
s.RLock()
defer s.RUnlock()
ldrs.ldrs.Reload(dbs.DataManager(), fs.FilterS(), cms.ConnManager())
return ldrs.ldrs.ListenAndServe(ldrs.stopChan)
s.ldrs.Reload(dbs, fs, cms)
return s.ldrs.ListenAndServe(s.stopChan)
}
// Shutdown stops the service
func (ldrs *LoaderService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) {
ldrs.Lock()
ldrs.ldrs = nil
close(ldrs.stopChan)
ldrs.cl.RpcUnregisterName(utils.LoaderSv1)
ldrs.Unlock()
func (s *LoaderService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) {
s.Lock()
s.ldrs = nil
close(s.stopChan)
s.cl.RpcUnregisterName(utils.LoaderSv1)
s.Unlock()
return
}
// ServiceName returns the service name
func (ldrs *LoaderService) ServiceName() string {
func (s *LoaderService) ServiceName() string {
return utils.LoaderS
}
// ShouldRun returns if the service should be running
func (ldrs *LoaderService) ShouldRun() bool {
return ldrs.cfg.LoaderCfg().Enabled()
func (s *LoaderService) ShouldRun() bool {
return s.cfg.LoaderCfg().Enabled()
}
// GetLoaderS returns the initialized LoaderService
func (ldrs *LoaderService) GetLoaderS() *loaders.LoaderS {
return ldrs.ldrs
func (s *LoaderService) GetLoaderS() *loaders.LoaderS {
return s.ldrs
}
// StateChan returns signaling channel of specific state
func (ldrs *LoaderService) StateChan(stateID string) chan struct{} {
return ldrs.stateDeps.StateChan(stateID)
func (s *LoaderService) StateChan(stateID string) chan struct{} {
return s.stateDeps.StateChan(stateID)
}