Added CacheS to ServiceManager

This commit is contained in:
Trial97
2019-09-20 16:59:13 +03:00
committed by Dan Christian Bogos
parent 9824406198
commit 758074c4de
12 changed files with 166 additions and 55 deletions

View File

@@ -825,25 +825,6 @@ func startAnalyzerService(internalAnalyzerSChan chan rpcclient.RpcClientConnecti
internalAnalyzerSChan <- aSv1
}
// initCacheS inits the CacheS and starts precaching as well as populating internal channel for RPC conns
func initCacheS(internalCacheSChan chan rpcclient.RpcClientConnection,
server *utils.Server, dm *engine.DataManager, exitChan chan bool) (chS *engine.CacheS) {
chS = engine.NewCacheS(cfg, dm)
go func() {
if err := chS.Precache(); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> could not init, error: %s", utils.CacheS, err.Error()))
exitChan <- true
}
}()
chSv1 := v1.NewCacheSv1(chS)
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(chSv1)
}
internalCacheSChan <- chS
return
}
func initGuardianSv1(internalGuardianSChan chan rpcclient.RpcClientConnection, server *utils.Server) {
grdSv1 := v1.NewGuardianSv1()
if !cfg.DispatcherSCfg().Enabled {
@@ -1201,17 +1182,8 @@ func main() {
filterSChan := make(chan *engine.FilterS, 1)
internalDispatcherSChan := make(chan rpcclient.RpcClientConnection, 1)
internalRaterChan := make(chan rpcclient.RpcClientConnection, 1)
internalCdrSChan := make(chan rpcclient.RpcClientConnection, 1)
internalSMGChan := make(chan rpcclient.RpcClientConnection, 1)
internalAttributeSChan := make(chan rpcclient.RpcClientConnection, 1)
internalChargerSChan := make(chan rpcclient.RpcClientConnection, 1)
internalRsChan := make(chan rpcclient.RpcClientConnection, 1)
internalStatSChan := make(chan rpcclient.RpcClientConnection, 1)
internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1)
internalSupplierSChan := make(chan rpcclient.RpcClientConnection, 1)
internalAnalyzerSChan := make(chan rpcclient.RpcClientConnection, 1)
internalCacheSChan := make(chan rpcclient.RpcClientConnection, 1)
internalSchedSChan := make(chan rpcclient.RpcClientConnection, 1)
internalGuardianSChan := make(chan rpcclient.RpcClientConnection, 1)
internalLoaderSChan := make(chan rpcclient.RpcClientConnection, 1)
internalApierV1Chan := make(chan rpcclient.RpcClientConnection, 1)
@@ -1221,9 +1193,6 @@ func main() {
internalCoreSv1Chan := make(chan rpcclient.RpcClientConnection, 1)
internalRALsv1Chan := make(chan rpcclient.RpcClientConnection, 1)
// init CacheS
cacheS := initCacheS(internalCacheSChan, server, dm, exitChan)
// init GuardianSv1
initGuardianSv1(internalGuardianSChan, server)
@@ -1231,8 +1200,9 @@ func main() {
initCoreSv1(internalCoreSv1Chan, server)
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, dm, cacheS, cdrDb,
srvManager := servmanager.NewServiceManager(cfg, dm, cdrDb,
loadDb, filterSChan, server, internalDispatcherSChan, exitChan)
chS := services.NewCacheService()
attrS := services.NewAttributeService()
chrS := services.NewChargerService()
tS := services.NewThresholdService()
@@ -1241,16 +1211,19 @@ func main() {
supS := services.NewSupplierService()
schS := services.NewSchedulerService()
cdrS := services.NewCDRServer()
srvManager.AddService(attrS, chrS, tS, stS, reS, supS, schS, cdrS, services.NewResponderService(internalRaterChan))
internalAttributeSChan = attrS.GetIntenternalChan()
internalChargerSChan = chrS.GetIntenternalChan()
internalThresholdSChan = tS.GetIntenternalChan()
internalStatSChan = stS.GetIntenternalChan()
internalRsChan = reS.GetIntenternalChan()
internalSupplierSChan = supS.GetIntenternalChan()
internalSchedSChan = schS.GetIntenternalChan()
internalCdrSChan = cdrS.GetIntenternalChan()
go srvManager.StartServices()
srvManager.AddService(chS, attrS, chrS, tS, stS, reS, supS, schS, cdrS, services.NewResponderService(internalRaterChan))
internalAttributeSChan := attrS.GetIntenternalChan()
internalChargerSChan := chrS.GetIntenternalChan()
internalThresholdSChan := tS.GetIntenternalChan()
internalStatSChan := stS.GetIntenternalChan()
internalRsChan := reS.GetIntenternalChan()
internalSupplierSChan := supS.GetIntenternalChan()
internalSchedSChan := schS.GetIntenternalChan()
internalCdrSChan := cdrS.GetIntenternalChan()
internalCacheSChan := chS.GetIntenternalChan()
srvManager.StartServices()
cacheS := srvManager.GetCacheS()
initServiceManagerV1(internalServeManagerChan, srvManager, server)

View File

@@ -1207,7 +1207,10 @@ func (cfg *CGRConfig) FilterSCfg() *FilterSCfg {
return cfg.filterSCfg
}
// CacheCfg returns the config for Cache
func (cfg *CGRConfig) CacheCfg() CacheCfg {
cfg.lks[CACHE_JSN].Lock()
defer cfg.lks[CACHE_JSN].Unlock()
return cfg.cacheCfg
}
@@ -1483,7 +1486,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) {
break
}
fallthrough
case CACHE_JSN:
case CACHE_JSN: // no need to reload
if !fall {
break
}

View File

@@ -47,9 +47,10 @@ func TestAttributeSReload(t *testing.T) {
close(chS.GetPrecacheChannel(utils.CacheChargerFilterIndexes))
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
chS /*cdrStorage*/, nil,
/*cdrStorage*/ nil,
/*loadStorage*/ nil, filterSChan,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
attrS := NewAttributeService()
srvMngr.AddService(attrS, NewChargerService())
if err = srvMngr.StartServices(); err != nil {

106
services/caches.go Normal file
View File

@@ -0,0 +1,106 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"fmt"
"sync"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewCacheService returns the Cache Service
func NewCacheService() servmanager.Service {
return &CacheService{
connChan: make(chan rpcclient.RpcClientConnection, 1),
}
}
// CacheService implements Service interface
type CacheService struct {
sync.RWMutex
chS *engine.CacheS
rpc *v1.CacheSv1
connChan chan rpcclient.RpcClientConnection
}
// Start should handle the sercive start
// inits the CacheS and starts precaching as well as populating internal channel for RPC conns
func (chS *CacheService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
// safe to not check CacheS should never be stoped and then started again
// if chS.IsRunning() {
// return fmt.Errorf("service aleady running")
// }
chS.Lock()
defer chS.Unlock()
chS.chS = engine.NewCacheS(sp.GetConfig(), sp.GetDM())
go func() {
if err := chS.chS.Precache(); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> could not init, error: %s", utils.CacheS, err.Error()))
sp.GetExitChan() <- true
}
}()
chS.rpc = v1.NewCacheSv1(chS.chS)
if !sp.GetConfig().DispatcherSCfg().Enabled {
sp.GetServer().RpcRegister(chS.rpc)
}
chS.connChan <- chS.rpc
// set the cache in ServiceManager
sp.SetCacheS(chS.chS)
return
}
// GetIntenternalChan returns the internal connection chanel
func (chS *CacheService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) {
return chS.connChan
}
// Reload handles the change of config
func (chS *CacheService) Reload(sp servmanager.ServiceProvider) (err error) {
return
}
// Shutdown stops the service
func (chS *CacheService) Shutdown() (err error) {
return
}
// GetRPCInterface returns the interface to register for server
func (chS *CacheService) GetRPCInterface() interface{} {
return chS.rpc
}
// IsRunning returns if the service is running
func (chS *CacheService) IsRunning() bool {
chS.RLock()
defer chS.RUnlock()
return chS != nil && chS.chS != nil
}
// ServiceName returns the service name
func (chS *CacheService) ServiceName() string {
return utils.CacheS
}

View File

@@ -52,9 +52,10 @@ func TestCdrsReload(t *testing.T) {
responderChan <- v1.NewResourceSv1(nil)
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
chS /*cdrStorage*/, nil,
/*cdrStorage*/ nil,
/*loadStorage*/ nil, filterSChan,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
cdrS := NewCDRServer()
srvMngr.AddService(cdrS, NewResponderService(responderChan), NewChargerService())
if err = srvMngr.StartServices(); err != nil {

View File

@@ -49,9 +49,10 @@ func TestChargerSReload(t *testing.T) {
close(chS.GetPrecacheChannel(utils.CacheChargerFilterIndexes))
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
chS /*cdrStorage*/, nil,
/*cdrStorage*/ nil,
/*loadStorage*/ nil, filterSChan,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
chrS := NewChargerService()
srvMngr.AddService(NewAttributeService(), chrS)
if err = srvMngr.StartServices(); err != nil {

View File

@@ -51,9 +51,10 @@ func TestResourceSReload(t *testing.T) {
close(chS.GetPrecacheChannel(utils.CacheResourceFilterIndexes))
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
chS /*cdrStorage*/, nil,
/*cdrStorage*/ nil,
/*loadStorage*/ nil, filterSChan,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
reS := NewResourceService()
srvMngr.AddService(NewThresholdService(), reS)
if err = srvMngr.StartServices(); err != nil {

View File

@@ -53,9 +53,10 @@ func TestSchedulerSReload(t *testing.T) {
t.Fatal(err)
}
srvMngr := servmanager.NewServiceManager(cfg, dm,
chS /*cdrStorage*/, nil,
/*cdrStorage*/ nil,
/*loadStorage*/ nil, filterSChan,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
schS := NewSchedulerService()
internalCdrSChan := make(chan rpcclient.RpcClientConnection, 1)
internalCdrSChan <- nil

View File

@@ -51,11 +51,12 @@ func TestStatSReload(t *testing.T) {
close(chS.GetPrecacheChannel(utils.CacheStatFilterIndexes))
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
chS /*cdrStorage*/, nil,
/*cdrStorage*/ nil,
/*loadStorage*/ nil, filterSChan,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
sS := NewStatService()
srvMngr.AddService(NewThresholdService(), sS)
srvMngr.AddService(&CacheService{chS: chS}, NewThresholdService(), sS)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -49,9 +49,10 @@ func TestSupplierSReload(t *testing.T) {
close(chS.GetPrecacheChannel(utils.CacheStatFilterIndexes))
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
chS /*cdrStorage*/, nil,
/*cdrStorage*/ nil,
/*loadStorage*/ nil, filterSChan,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
supS := NewSupplierService()
srvMngr.AddService(supS, NewStatService())
if err = srvMngr.StartServices(); err != nil {

View File

@@ -47,9 +47,10 @@ func TestThresholdSReload(t *testing.T) {
close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes))
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
chS /*cdrStorage*/, nil,
/*cdrStorage*/ nil,
/*loadStorage*/ nil, filterSChan,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
tS := NewThresholdService()
srvMngr.AddService(tS)
if err = srvMngr.StartServices(); err != nil {

View File

@@ -33,7 +33,7 @@ import (
// NewServiceManager returns a service manager
func NewServiceManager(cfg *config.CGRConfig, dm *engine.DataManager,
cacheS *engine.CacheS, cdrStorage engine.CdrStorage,
cdrStorage engine.CdrStorage,
loadStorage engine.LoadStorage, filterSChan chan *engine.FilterS,
server *utils.Server, dispatcherSChan chan rpcclient.RpcClientConnection,
engineShutdown chan bool) *ServiceManager {
@@ -41,7 +41,6 @@ func NewServiceManager(cfg *config.CGRConfig, dm *engine.DataManager,
cfg: cfg,
dm: dm,
engineShutdown: engineShutdown,
cacheS: cacheS,
cdrStorage: cdrStorage,
loadStorage: loadStorage,
@@ -238,6 +237,16 @@ func (srvMngr *ServiceManager) GetConnection(subsystem string, conns []*config.R
// StartServices starts all enabled services
func (srvMngr *ServiceManager) StartServices() (err error) {
// start the cacheS
if srvMngr.GetCacheS() == nil {
var chS Service
chS, err = srvMngr.GetService(utils.CacheS)
if err != nil {
return
}
chS.Start(srvMngr, true)
}
go srvMngr.handleReload()
if srvMngr.cfg.AttributeSCfg().Enabled {
go func() {
@@ -461,12 +470,21 @@ func (srvMngr *ServiceManager) reloadService(srv Service, shouldRun bool) (err e
// GetService returns the named service
func (srvMngr *ServiceManager) GetService(subsystem string) (srv Service, err error) {
var has bool
srvMngr.RLock()
if srv, has = srvMngr.subsystems[subsystem]; !has {
return nil, utils.ErrNotFound
err = utils.ErrNotFound
}
srvMngr.RUnlock()
return
}
// SetCacheS sets the cacheS
func (srvMngr *ServiceManager) SetCacheS(chS *engine.CacheS) {
srvMngr.Lock()
srvMngr.cacheS = chS
srvMngr.Unlock()
}
// ServiceProvider should implement this to provide information for service
type ServiceProvider interface {
// GetDM returns the DataManager
@@ -491,6 +509,9 @@ type ServiceProvider interface {
GetService(subsystem string) (Service, error)
// AddService adds the given serices
AddService(services ...Service)
// SetCacheS sets the cacheS
// Called when starting Cache Service
SetCacheS(chS *engine.CacheS)
}
// Service interface that describes what functions should a service implement