diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 55a42fee3..1238de92f 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -26,7 +26,6 @@ import ( "path/filepath" "runtime" "runtime/pprof" - "strings" "sync" "syscall" "time" @@ -36,7 +35,6 @@ import ( "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" - "github.com/cgrates/cgrates/loaders" "github.com/cgrates/cgrates/services" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" @@ -142,7 +140,7 @@ func runCGREngine(fs []string) (err error) { coreS, services.NewCacheService(cfg), services.NewFilterService(cfg), - services.NewLoaderService(cfg), + services.NewLoaderService(cfg, *flags.Preload), services.NewExportFailoverService(cfg), services.NewAdminSv1Service(cfg), services.NewSessionService(cfg), @@ -205,12 +203,6 @@ func runCGREngine(fs []string) (err error) { srvManager.StartServices(shutdown) cgrInitServiceManagerV1(cfg, srvManager, registry) - if *flags.Preload != utils.EmptyString { - if err = cgrRunPreload(cfg, *flags.Preload, registry); err != nil { - return - } - } - // Serve rpc connections cgrStartRPC(cfg, registry, shutdown) @@ -230,33 +222,6 @@ func runCGREngine(fs []string) (err error) { return } -// TODO: merge with LoaderService -func cgrRunPreload(cfg *config.CGRConfig, loaderIDs string, - registry *servmanager.ServiceRegistry) (err error) { - if !cfg.LoaderCfg().Enabled() { - err = fmt.Errorf("<%s> not enabled but required by preload mechanism", utils.LoaderS) - return - } - loader := registry.Lookup(utils.LoaderS).(*services.LoaderService) - if utils.StructChanTimeout(loader.StateChan(utils.StateServiceUP), cfg.GeneralCfg().ConnectTimeout) { - return utils.NewServiceStateTimeoutError(utils.PreloadCgr, utils.LoaderS, utils.StateServiceUP) - } - var reply string - for _, loaderID := range strings.Split(loaderIDs, utils.FieldsSep) { - if err = loader.GetLoaderS().V1Run(context.TODO(), &loaders.ArgsProcessFolder{ - APIOpts: map[string]any{ - utils.MetaForceLock: true, // force lock will unlock the file in case is locked and return error - utils.MetaStopOnError: true, - }, - LoaderID: loaderID, - }, &reply); err != nil { - err = fmt.Errorf("<%s> preload failed on loadID <%s> , err: <%s>", utils.LoaderS, loaderID, err) - return - } - } - return -} - func cgrInitServiceManagerV1(cfg *config.CGRConfig, srvMngr *servmanager.ServiceManager, registry *servmanager.ServiceRegistry) { srvDeps, err := services.WaitForServicesToReachState(utils.StateServiceUP, diff --git a/services/loaders.go b/services/loaders.go index d474d4466..8297e6f4e 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -19,8 +19,11 @@ along with this program. If not, see package services import ( + "fmt" + "strings" "sync" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/commonlisteners" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -30,29 +33,28 @@ import ( ) // NewLoaderService returns the Loader Service -func NewLoaderService(cfg *config.CGRConfig) *LoaderService { +func NewLoaderService(cfg *config.CGRConfig, preloadIDs string) *LoaderService { return &LoaderService{ - cfg: cfg, - stopChan: make(chan struct{}), - stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), + cfg: cfg, + stopChan: make(chan struct{}), + preloadIDs: strings.Split(preloadIDs, utils.FieldsSep), + stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}), } } // LoaderService implements Service interface type LoaderService struct { sync.RWMutex - - ldrs *loaders.LoaderS - cl *commonlisteners.CommonListenerS - - stopChan chan struct{} - cfg *config.CGRConfig - - stateDeps *StateDependencies // channel subscriptions for state changes + cfg *config.CGRConfig + ldrs *loaders.LoaderS + cl *commonlisteners.CommonListenerS + preloadIDs []string + stopChan chan struct{} + stateDeps *StateDependencies // channel subscriptions for state changes } // Start should handle the service start -func (ldrs *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) (err error) { +func (s *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.CommonListenerS, @@ -60,86 +62,99 @@ func (ldrs *LoaderService) Start(_ *utils.SyncedChan, registry *servmanager.Serv utils.FilterS, utils.DataDB, }, - registry, ldrs.cfg.GeneralCfg().ConnectTimeout) + registry, s.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } - ldrs.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() + s.cl = srvDeps[utils.CommonListenerS].(*CommonListenerService).CLS() cms := srvDeps[utils.ConnManager].(*ConnManagerService) - fs := srvDeps[utils.FilterS].(*FilterService) - dbs := srvDeps[utils.DataDB].(*DataDBService) + fs := srvDeps[utils.FilterS].(*FilterService).FilterS() + dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager() - ldrs.Lock() - defer ldrs.Unlock() + s.Lock() + defer s.Unlock() - ldrs.ldrs = loaders.NewLoaderS(ldrs.cfg, dbs.DataManager(), fs.FilterS(), cms.ConnManager()) - - if !ldrs.ldrs.Enabled() { - return + s.ldrs = loaders.NewLoaderS(s.cfg, dbs, fs, cms.ConnManager()) + if !s.ldrs.Enabled() { + return nil } - if err = ldrs.ldrs.ListenAndServe(ldrs.stopChan); err != nil { - return + + var reply string + for _, loaderID := range s.preloadIDs { + if err = s.ldrs.V1Run(context.TODO(), + &loaders.ArgsProcessFolder{ + APIOpts: map[string]any{ + utils.MetaForceLock: true, + utils.MetaStopOnError: true, + }, LoaderID: loaderID, + }, &reply); err != nil { + return fmt.Errorf("could not preload loader with ID %q: %v", loaderID, err) + } } - srv, _ := engine.NewService(ldrs.ldrs) + + if err := s.ldrs.ListenAndServe(s.stopChan); err != nil { + return err + } + srv, _ := engine.NewService(s.ldrs) // srv, _ := birpc.NewService(apis.NewLoaderSv1(ldrs.ldrs), "", false) - for _, s := range srv { - ldrs.cl.RpcRegister(s) + for _, svc := range srv { + s.cl.RpcRegister(svc) } cms.AddInternalConn(utils.LoaderS, srv) - return + return nil } // Reload handles the change of config -func (ldrs *LoaderService) Reload(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { +func (s *LoaderService) Reload(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error { srvDeps, err := WaitForServicesToReachState(utils.StateServiceUP, []string{ utils.ConnManager, utils.FilterS, utils.DataDB, }, - registry, ldrs.cfg.GeneralCfg().ConnectTimeout) + registry, s.cfg.GeneralCfg().ConnectTimeout) if err != nil { return err } - cms := srvDeps[utils.ConnManager].(*ConnManagerService) - fs := srvDeps[utils.FilterS].(*FilterService) - dbs := srvDeps[utils.DataDB].(*DataDBService) - close(ldrs.stopChan) - ldrs.stopChan = make(chan struct{}) + cms := srvDeps[utils.ConnManager].(*ConnManagerService).ConnManager() + fs := srvDeps[utils.FilterS].(*FilterService).FilterS() + dbs := srvDeps[utils.DataDB].(*DataDBService).DataManager() + close(s.stopChan) + s.stopChan = make(chan struct{}) - ldrs.RLock() - defer ldrs.RUnlock() + s.RLock() + defer s.RUnlock() - ldrs.ldrs.Reload(dbs.DataManager(), fs.FilterS(), cms.ConnManager()) - return ldrs.ldrs.ListenAndServe(ldrs.stopChan) + s.ldrs.Reload(dbs, fs, cms) + return s.ldrs.ListenAndServe(s.stopChan) } // Shutdown stops the service -func (ldrs *LoaderService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { - ldrs.Lock() - ldrs.ldrs = nil - close(ldrs.stopChan) - ldrs.cl.RpcUnregisterName(utils.LoaderSv1) - ldrs.Unlock() +func (s *LoaderService) Shutdown(_ *servmanager.ServiceRegistry) (_ error) { + s.Lock() + s.ldrs = nil + close(s.stopChan) + s.cl.RpcUnregisterName(utils.LoaderSv1) + s.Unlock() return } // ServiceName returns the service name -func (ldrs *LoaderService) ServiceName() string { +func (s *LoaderService) ServiceName() string { return utils.LoaderS } // ShouldRun returns if the service should be running -func (ldrs *LoaderService) ShouldRun() bool { - return ldrs.cfg.LoaderCfg().Enabled() +func (s *LoaderService) ShouldRun() bool { + return s.cfg.LoaderCfg().Enabled() } // GetLoaderS returns the initialized LoaderService -func (ldrs *LoaderService) GetLoaderS() *loaders.LoaderS { - return ldrs.ldrs +func (s *LoaderService) GetLoaderS() *loaders.LoaderS { + return s.ldrs } // StateChan returns signaling channel of specific state -func (ldrs *LoaderService) StateChan(stateID string) chan struct{} { - return ldrs.stateDeps.StateChan(stateID) +func (s *LoaderService) StateChan(stateID string) chan struct{} { + return s.stateDeps.StateChan(stateID) }