diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 1d84df05a..f24af5e1e 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -825,25 +825,6 @@ func startAnalyzerService(internalAnalyzerSChan chan rpcclient.RpcClientConnecti internalAnalyzerSChan <- aSv1 } -// initCacheS inits the CacheS and starts precaching as well as populating internal channel for RPC conns -func initCacheS(internalCacheSChan chan rpcclient.RpcClientConnection, - server *utils.Server, dm *engine.DataManager, exitChan chan bool) (chS *engine.CacheS) { - chS = engine.NewCacheS(cfg, dm) - go func() { - if err := chS.Precache(); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> could not init, error: %s", utils.CacheS, err.Error())) - exitChan <- true - } - }() - - chSv1 := v1.NewCacheSv1(chS) - if !cfg.DispatcherSCfg().Enabled { - server.RpcRegister(chSv1) - } - internalCacheSChan <- chS - return -} - func initGuardianSv1(internalGuardianSChan chan rpcclient.RpcClientConnection, server *utils.Server) { grdSv1 := v1.NewGuardianSv1() if !cfg.DispatcherSCfg().Enabled { @@ -1201,17 +1182,8 @@ func main() { filterSChan := make(chan *engine.FilterS, 1) internalDispatcherSChan := make(chan rpcclient.RpcClientConnection, 1) internalRaterChan := make(chan rpcclient.RpcClientConnection, 1) - internalCdrSChan := make(chan rpcclient.RpcClientConnection, 1) internalSMGChan := make(chan rpcclient.RpcClientConnection, 1) - internalAttributeSChan := make(chan rpcclient.RpcClientConnection, 1) - internalChargerSChan := make(chan rpcclient.RpcClientConnection, 1) - internalRsChan := make(chan rpcclient.RpcClientConnection, 1) - internalStatSChan := make(chan rpcclient.RpcClientConnection, 1) - internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1) - internalSupplierSChan := make(chan rpcclient.RpcClientConnection, 1) internalAnalyzerSChan := make(chan rpcclient.RpcClientConnection, 1) - internalCacheSChan := make(chan rpcclient.RpcClientConnection, 1) - internalSchedSChan := make(chan rpcclient.RpcClientConnection, 1) internalGuardianSChan := make(chan rpcclient.RpcClientConnection, 1) internalLoaderSChan := make(chan rpcclient.RpcClientConnection, 1) internalApierV1Chan := make(chan rpcclient.RpcClientConnection, 1) @@ -1221,9 +1193,6 @@ func main() { internalCoreSv1Chan := make(chan rpcclient.RpcClientConnection, 1) internalRALsv1Chan := make(chan rpcclient.RpcClientConnection, 1) - // init CacheS - cacheS := initCacheS(internalCacheSChan, server, dm, exitChan) - // init GuardianSv1 initGuardianSv1(internalGuardianSChan, server) @@ -1231,8 +1200,9 @@ func main() { initCoreSv1(internalCoreSv1Chan, server) // Start ServiceManager - srvManager := servmanager.NewServiceManager(cfg, dm, cacheS, cdrDb, + srvManager := servmanager.NewServiceManager(cfg, dm, cdrDb, loadDb, filterSChan, server, internalDispatcherSChan, exitChan) + chS := services.NewCacheService() attrS := services.NewAttributeService() chrS := services.NewChargerService() tS := services.NewThresholdService() @@ -1241,16 +1211,19 @@ func main() { supS := services.NewSupplierService() schS := services.NewSchedulerService() cdrS := services.NewCDRServer() - srvManager.AddService(attrS, chrS, tS, stS, reS, supS, schS, cdrS, services.NewResponderService(internalRaterChan)) - internalAttributeSChan = attrS.GetIntenternalChan() - internalChargerSChan = chrS.GetIntenternalChan() - internalThresholdSChan = tS.GetIntenternalChan() - internalStatSChan = stS.GetIntenternalChan() - internalRsChan = reS.GetIntenternalChan() - internalSupplierSChan = supS.GetIntenternalChan() - internalSchedSChan = schS.GetIntenternalChan() - internalCdrSChan = cdrS.GetIntenternalChan() - go srvManager.StartServices() + srvManager.AddService(chS, attrS, chrS, tS, stS, reS, supS, schS, cdrS, services.NewResponderService(internalRaterChan)) + internalAttributeSChan := attrS.GetIntenternalChan() + internalChargerSChan := chrS.GetIntenternalChan() + internalThresholdSChan := tS.GetIntenternalChan() + internalStatSChan := stS.GetIntenternalChan() + internalRsChan := reS.GetIntenternalChan() + internalSupplierSChan := supS.GetIntenternalChan() + internalSchedSChan := schS.GetIntenternalChan() + internalCdrSChan := cdrS.GetIntenternalChan() + internalCacheSChan := chS.GetIntenternalChan() + srvManager.StartServices() + + cacheS := srvManager.GetCacheS() initServiceManagerV1(internalServeManagerChan, srvManager, server) diff --git a/config/config.go b/config/config.go index 34d5eae04..90ad13f3e 100755 --- a/config/config.go +++ b/config/config.go @@ -1207,7 +1207,10 @@ func (cfg *CGRConfig) FilterSCfg() *FilterSCfg { return cfg.filterSCfg } +// CacheCfg returns the config for Cache func (cfg *CGRConfig) CacheCfg() CacheCfg { + cfg.lks[CACHE_JSN].Lock() + defer cfg.lks[CACHE_JSN].Unlock() return cfg.cacheCfg } @@ -1483,7 +1486,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { break } fallthrough - case CACHE_JSN: + case CACHE_JSN: // no need to reload if !fall { break } diff --git a/services/attributes_it_test.go b/services/attributes_it_test.go index ce2081db6..a75c269ac 100644 --- a/services/attributes_it_test.go +++ b/services/attributes_it_test.go @@ -47,9 +47,10 @@ func TestAttributeSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheChargerFilterIndexes)) server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, - chS /*cdrStorage*/, nil, + /*cdrStorage*/ nil, /*loadStorage*/ nil, filterSChan, server, nil, engineShutdown) + srvMngr.SetCacheS(chS) attrS := NewAttributeService() srvMngr.AddService(attrS, NewChargerService()) if err = srvMngr.StartServices(); err != nil { diff --git a/services/caches.go b/services/caches.go new file mode 100644 index 000000000..6b71d827d --- /dev/null +++ b/services/caches.go @@ -0,0 +1,106 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewCacheService returns the Cache Service +func NewCacheService() servmanager.Service { + return &CacheService{ + connChan: make(chan rpcclient.RpcClientConnection, 1), + } +} + +// CacheService implements Service interface +type CacheService struct { + sync.RWMutex + chS *engine.CacheS + rpc *v1.CacheSv1 + connChan chan rpcclient.RpcClientConnection +} + +// Start should handle the sercive start +// inits the CacheS and starts precaching as well as populating internal channel for RPC conns +func (chS *CacheService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { + // safe to not check CacheS should never be stoped and then started again + // if chS.IsRunning() { + // return fmt.Errorf("service aleady running") + // } + + chS.Lock() + defer chS.Unlock() + chS.chS = engine.NewCacheS(sp.GetConfig(), sp.GetDM()) + go func() { + if err := chS.chS.Precache(); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> could not init, error: %s", utils.CacheS, err.Error())) + sp.GetExitChan() <- true + } + }() + + chS.rpc = v1.NewCacheSv1(chS.chS) + if !sp.GetConfig().DispatcherSCfg().Enabled { + sp.GetServer().RpcRegister(chS.rpc) + } + chS.connChan <- chS.rpc + + // set the cache in ServiceManager + sp.SetCacheS(chS.chS) + return +} + +// GetIntenternalChan returns the internal connection chanel +func (chS *CacheService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return chS.connChan +} + +// Reload handles the change of config +func (chS *CacheService) Reload(sp servmanager.ServiceProvider) (err error) { + return +} + +// Shutdown stops the service +func (chS *CacheService) Shutdown() (err error) { + return +} + +// GetRPCInterface returns the interface to register for server +func (chS *CacheService) GetRPCInterface() interface{} { + return chS.rpc +} + +// IsRunning returns if the service is running +func (chS *CacheService) IsRunning() bool { + chS.RLock() + defer chS.RUnlock() + return chS != nil && chS.chS != nil +} + +// ServiceName returns the service name +func (chS *CacheService) ServiceName() string { + return utils.CacheS +} diff --git a/services/cdrs_it_test.go b/services/cdrs_it_test.go index d7274e59a..5ca497ad7 100644 --- a/services/cdrs_it_test.go +++ b/services/cdrs_it_test.go @@ -52,9 +52,10 @@ func TestCdrsReload(t *testing.T) { responderChan <- v1.NewResourceSv1(nil) server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, - chS /*cdrStorage*/, nil, + /*cdrStorage*/ nil, /*loadStorage*/ nil, filterSChan, server, nil, engineShutdown) + srvMngr.SetCacheS(chS) cdrS := NewCDRServer() srvMngr.AddService(cdrS, NewResponderService(responderChan), NewChargerService()) if err = srvMngr.StartServices(); err != nil { diff --git a/services/chargers_it_test.go b/services/chargers_it_test.go index 77ca7b7a9..a29926679 100644 --- a/services/chargers_it_test.go +++ b/services/chargers_it_test.go @@ -49,9 +49,10 @@ func TestChargerSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheChargerFilterIndexes)) server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, - chS /*cdrStorage*/, nil, + /*cdrStorage*/ nil, /*loadStorage*/ nil, filterSChan, server, nil, engineShutdown) + srvMngr.SetCacheS(chS) chrS := NewChargerService() srvMngr.AddService(NewAttributeService(), chrS) if err = srvMngr.StartServices(); err != nil { diff --git a/services/resources_it_test.go b/services/resources_it_test.go index e3c82d2c2..18c2493ab 100644 --- a/services/resources_it_test.go +++ b/services/resources_it_test.go @@ -51,9 +51,10 @@ func TestResourceSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheResourceFilterIndexes)) server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, - chS /*cdrStorage*/, nil, + /*cdrStorage*/ nil, /*loadStorage*/ nil, filterSChan, server, nil, engineShutdown) + srvMngr.SetCacheS(chS) reS := NewResourceService() srvMngr.AddService(NewThresholdService(), reS) if err = srvMngr.StartServices(); err != nil { diff --git a/services/schedulers_it_test.go b/services/schedulers_it_test.go index 7448a8e91..cbfacabe1 100644 --- a/services/schedulers_it_test.go +++ b/services/schedulers_it_test.go @@ -53,9 +53,10 @@ func TestSchedulerSReload(t *testing.T) { t.Fatal(err) } srvMngr := servmanager.NewServiceManager(cfg, dm, - chS /*cdrStorage*/, nil, + /*cdrStorage*/ nil, /*loadStorage*/ nil, filterSChan, server, nil, engineShutdown) + srvMngr.SetCacheS(chS) schS := NewSchedulerService() internalCdrSChan := make(chan rpcclient.RpcClientConnection, 1) internalCdrSChan <- nil diff --git a/services/stats_it_test.go b/services/stats_it_test.go index ca7848665..f0d5b7a90 100644 --- a/services/stats_it_test.go +++ b/services/stats_it_test.go @@ -51,11 +51,12 @@ func TestStatSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheStatFilterIndexes)) server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, - chS /*cdrStorage*/, nil, + /*cdrStorage*/ nil, /*loadStorage*/ nil, filterSChan, server, nil, engineShutdown) + srvMngr.SetCacheS(chS) sS := NewStatService() - srvMngr.AddService(NewThresholdService(), sS) + srvMngr.AddService(&CacheService{chS: chS}, NewThresholdService(), sS) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/suppliers_it_test.go b/services/suppliers_it_test.go index 59bd0721d..ac1df2902 100644 --- a/services/suppliers_it_test.go +++ b/services/suppliers_it_test.go @@ -49,9 +49,10 @@ func TestSupplierSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheStatFilterIndexes)) server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, - chS /*cdrStorage*/, nil, + /*cdrStorage*/ nil, /*loadStorage*/ nil, filterSChan, server, nil, engineShutdown) + srvMngr.SetCacheS(chS) supS := NewSupplierService() srvMngr.AddService(supS, NewStatService()) if err = srvMngr.StartServices(); err != nil { diff --git a/services/thresholds_it_test.go b/services/thresholds_it_test.go index 59b1d1f26..e84f4afeb 100644 --- a/services/thresholds_it_test.go +++ b/services/thresholds_it_test.go @@ -47,9 +47,10 @@ func TestThresholdSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)) server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, - chS /*cdrStorage*/, nil, + /*cdrStorage*/ nil, /*loadStorage*/ nil, filterSChan, server, nil, engineShutdown) + srvMngr.SetCacheS(chS) tS := NewThresholdService() srvMngr.AddService(tS) if err = srvMngr.StartServices(); err != nil { diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 5f96337f0..94a91e827 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -33,7 +33,7 @@ import ( // NewServiceManager returns a service manager func NewServiceManager(cfg *config.CGRConfig, dm *engine.DataManager, - cacheS *engine.CacheS, cdrStorage engine.CdrStorage, + cdrStorage engine.CdrStorage, loadStorage engine.LoadStorage, filterSChan chan *engine.FilterS, server *utils.Server, dispatcherSChan chan rpcclient.RpcClientConnection, engineShutdown chan bool) *ServiceManager { @@ -41,7 +41,6 @@ func NewServiceManager(cfg *config.CGRConfig, dm *engine.DataManager, cfg: cfg, dm: dm, engineShutdown: engineShutdown, - cacheS: cacheS, cdrStorage: cdrStorage, loadStorage: loadStorage, @@ -238,6 +237,16 @@ func (srvMngr *ServiceManager) GetConnection(subsystem string, conns []*config.R // StartServices starts all enabled services func (srvMngr *ServiceManager) StartServices() (err error) { + // start the cacheS + if srvMngr.GetCacheS() == nil { + var chS Service + chS, err = srvMngr.GetService(utils.CacheS) + if err != nil { + return + } + chS.Start(srvMngr, true) + } + go srvMngr.handleReload() if srvMngr.cfg.AttributeSCfg().Enabled { go func() { @@ -461,12 +470,21 @@ func (srvMngr *ServiceManager) reloadService(srv Service, shouldRun bool) (err e // GetService returns the named service func (srvMngr *ServiceManager) GetService(subsystem string) (srv Service, err error) { var has bool + srvMngr.RLock() if srv, has = srvMngr.subsystems[subsystem]; !has { - return nil, utils.ErrNotFound + err = utils.ErrNotFound } + srvMngr.RUnlock() return } +// SetCacheS sets the cacheS +func (srvMngr *ServiceManager) SetCacheS(chS *engine.CacheS) { + srvMngr.Lock() + srvMngr.cacheS = chS + srvMngr.Unlock() +} + // ServiceProvider should implement this to provide information for service type ServiceProvider interface { // GetDM returns the DataManager @@ -491,6 +509,9 @@ type ServiceProvider interface { GetService(subsystem string) (Service, error) // AddService adds the given serices AddService(services ...Service) + // SetCacheS sets the cacheS + // Called when starting Cache Service + SetCacheS(chS *engine.CacheS) } // Service interface that describes what functions should a service implement