Updated Scheduler Service

This commit is contained in:
Trial97
2019-10-03 14:22:05 +03:00
committed by Dan Christian Bogos
parent 64661546bd
commit 7f88e7e153
4 changed files with 82 additions and 54 deletions

View File

@@ -655,9 +655,11 @@ func main() {
internalServeManagerChan := make(chan rpcclient.RpcClientConnection, 1)
internalConfigChan := make(chan rpcclient.RpcClientConnection, 1)
internalCoreSv1Chan := make(chan rpcclient.RpcClientConnection, 1)
internalCacheSChan := make(chan rpcclient.RpcClientConnection, 1)
internalRaterChan := make(chan rpcclient.RpcClientConnection, 1)
// internalRALsv1Chan := make(chan rpcclient.RpcClientConnection, 1)
// internalSMGChan := make(chan rpcclient.RpcClientConnection, 1)
// init CacheS
cacheS := initCacheS(internalCacheSChan, server, dm, exitChan)
@@ -679,16 +681,22 @@ func main() {
reS := services.NewResourceService(cfg, dm, cacheS, filterSChan, server,
tS.GetIntenternalChan(), internalDispatcherSChan)
supS := services.NewSupplierService(cfg, dm, cacheS, filterSChan, server,
attrS.GetIntenternalChan(), stS.GetIntenternalChan(), reS.GetIntenternalChan(), internalDispatcherSChan)
/* schS := services.NewSchedulerService()
cdrS := services.NewCDRServer()
rals := services.NewRalService(srvManager)
apiv1, _ := srvManager.GetService(utils.ApierV1)
apiv2, _ := srvManager.GetService(utils.ApierV2)
resp, _ := srvManager.GetService(utils.ResponderS)
smg := services.NewSessionService()
grd := services.NewGuardianService()*/
srvManager.AddServices( /*chS, */ attrS, chrS, tS, stS, reS, supS) /*, schS, cdrS, rals, smg, grd,
attrS.GetIntenternalChan(), stS.GetIntenternalChan(),
reS.GetIntenternalChan(), internalDispatcherSChan)
schS := services.NewSchedulerService(cfg, dm, cacheS, server, internalDispatcherSChan)
// cdrS := services.NewCDRServer(cfg, dm, cdrDb, filterSChan, server,
// chrS.GetIntenternalChan(),internalRALsv1Chan,
// attrS.GetIntenternalChan(), tS.GetIntenternalChan(),
// stS.GetIntenternalChan(), internalDispatcherSChan)
// schS.SetCdrsConns(cdrS.GetIntenternalChan())
/*
rals := services.NewRalService(srvManager)
apiv1, _ := srvManager.GetService(utils.ApierV1)
apiv2, _ := srvManager.GetService(utils.ApierV2)
resp, _ := srvManager.GetService(utils.ResponderS)
smg := services.NewSessionService()
grd := services.NewGuardianService()*/
srvManager.AddServices( /*chS, */ attrS, chrS, tS, stS, reS, supS, schS) /*, cdrS, rals, smg, grd,
services.NewEventReaderService(),
services.NewDNSAgent(),
services.NewFreeswitchAgent(),
@@ -701,7 +709,6 @@ func main() {
/*
internalSchedSChan := schS.GetIntenternalChan()
internalCdrSChan := cdrS.GetIntenternalChan()
internalCacheSChan := chS.GetIntenternalChan()
internalApierV1Chan := apiv1.GetIntenternalChan()
internalApierV2Chan := apiv2.GetIntenternalChan()
@@ -727,14 +734,14 @@ func main() {
// engine.IntRPC.AddInternalRPCClient(utils.ApierV2, internalApierV2Chan)
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, attrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan) // server or from apier
// engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, internalCdrSChan)
// engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, internalCdrSChan)
// engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, cdrS.GetIntenternalChan())
// engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, cdrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, chrS.GetIntenternalChan())
// engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan)
engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, internalLoaderSChan)
engine.IntRPC.AddInternalRPCClient(utils.ResourceSv1, reS.GetIntenternalChan())
// engine.IntRPC.AddInternalRPCClient(utils.Responder, internalRaterChan)
// engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, internalSchedSChan) // server or from apier
engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, schS.GetIntenternalChan())
// engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, internalSMGChan) // server or from apier
engine.IntRPC.AddInternalRPCClient(utils.StatSv1, stS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.SupplierSv1, supS.GetIntenternalChan())
@@ -748,7 +755,7 @@ func main() {
initConfigSv1(internalConfigChan, server)
// Start CDRC components if necessary
// go startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan, filterSChan, exitChan)
// go startCdrcs(cdrS.GetIntenternalChan(), internalRaterChan, internalDispatcherSChan, filterSChan, exitChan)
if cfg.DispatcherSCfg().Enabled {
go startDispatcherService(internalDispatcherSChan,
@@ -764,7 +771,7 @@ func main() {
// Serve rpc connections
/*
go startRpc(server, internalRaterChan, internalCdrSChan,
go startRpc(server, internalRaterChan, cdrS.GetIntenternalChan(),
reS.GetIntenternalChan(), stS.GetIntenternalChan(),
attrS.GetIntenternalChan(), chrS.GetIntenternalChan(), tS.GetIntenternalChan(),
supS.GetIntenternalChan(), internalSMGChan, internalAnalyzerSChan,

View File

@@ -23,53 +23,65 @@ import (
"sync"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewSchedulerService returns the Scheduler Service
func NewSchedulerService() *SchedulerService {
func NewSchedulerService(cfg *config.CGRConfig, dm *engine.DataManager,
cacheS *engine.CacheS, server *utils.Server,
dispatcherChan chan rpcclient.RpcClientConnection) *SchedulerService {
return &SchedulerService{
connChan: make(chan rpcclient.RpcClientConnection, 1),
connChan: make(chan rpcclient.RpcClientConnection, 1),
cfg: cfg,
dm: dm,
cacheS: cacheS,
server: server,
dispatcherChan: dispatcherChan,
}
}
// SchedulerService implements Service interface
type SchedulerService struct {
sync.RWMutex
cfg *config.CGRConfig
dm *engine.DataManager
cacheS *engine.CacheS
server *utils.Server
cdrSChan chan rpcclient.RpcClientConnection
dispatcherChan chan rpcclient.RpcClientConnection
schS *scheduler.Scheduler
rpc *v1.SchedulerSv1
connChan chan rpcclient.RpcClientConnection
}
// Start should handle the sercive start
func (schS *SchedulerService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
func (schS *SchedulerService) Start() (err error) {
if schS.IsRunning() {
return fmt.Errorf("service aleady running")
}
if waitCache { // Wait for cache to load data before starting
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheActionPlans) // wait for ActionPlans to be cached
}
<-schS.cacheS.GetPrecacheChannel(utils.CacheActionPlans) // wait for ActionPlans to be cached
schS.Lock()
defer schS.Unlock()
utils.Logger.Info("<ServiceManager> Starting CGRateS Scheduler.")
schS.schS = scheduler.NewScheduler(sp.GetDM())
schS.schS = scheduler.NewScheduler(schS.dm)
go schS.schS.Loop()
schS.rpc = v1.NewSchedulerSv1(sp.GetConfig())
if !sp.GetConfig().DispatcherSCfg().Enabled {
sp.GetServer().RpcRegister(schS.rpc)
schS.rpc = v1.NewSchedulerSv1(schS.cfg)
if !schS.cfg.DispatcherSCfg().Enabled {
schS.server.RpcRegister(schS.rpc)
}
schS.connChan <- schS.rpc
// Create connection to CDR Server and share it in engine(used for *cdrlog action)
cdrsConn, err := sp.NewConnection(utils.CDRServer, sp.GetConfig().SchedulerCfg().CDRsConns)
cdrsConn, err := NewConnection(schS.cfg, schS.cdrSChan, schS.dispatcherChan, schS.cfg.SchedulerCfg().CDRsConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error()))
return
@@ -87,7 +99,14 @@ func (schS *SchedulerService) GetIntenternalChan() (conn chan rpcclient.RpcClien
}
// Reload handles the change of config
func (schS *SchedulerService) Reload(sp servmanager.ServiceProvider) (err error) {
func (schS *SchedulerService) Reload() (err error) {
cdrsConn, err := NewConnection(schS.cfg, schS.cdrSChan, schS.dispatcherChan, schS.cfg.SchedulerCfg().CDRsConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error()))
return
}
// ToDo: this should be send to scheduler
engine.SetSchedCdrsConns(cdrsConn)
schS.Lock()
schS.schS.Reload()
schS.Unlock()
@@ -105,13 +124,6 @@ func (schS *SchedulerService) Shutdown() (err error) {
return
}
// GetRPCInterface returns the interface to register for server
func (schS *SchedulerService) GetRPCInterface() interface{} {
schS.RLock()
defer schS.RUnlock()
return schS.rpc
}
// IsRunning returns if the service is running
func (schS *SchedulerService) IsRunning() bool {
schS.RLock()
@@ -130,3 +142,14 @@ func (schS *SchedulerService) GetScheduler() *scheduler.Scheduler {
defer schS.RUnlock()
return schS.schS
}
// ShouldRun returns if the service should be running
func (schS *SchedulerService) ShouldRun() bool {
return schS.cfg.SchedulerCfg().Enabled
}
// SetCdrsConns sets the value for cdrSChan
// this needs to be called before StartServices
func (schS *SchedulerService) SetCdrsConns(cdrSChan chan rpcclient.RpcClientConnection) {
schS.cdrSChan = cdrSChan
}

View File

@@ -38,8 +38,6 @@ func TestSchedulerSReload(t *testing.T) {
}
utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID)
utils.Logger.SetLogLevel(7)
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
engineShutdown := make(chan bool, 1)
chS := engine.NewCacheS(cfg, nil)
close(chS.GetPrecacheChannel(utils.CacheActionPlans))
@@ -52,15 +50,15 @@ func TestSchedulerSReload(t *testing.T) {
if err != nil {
t.Fatal(err)
}
srvMngr := servmanager.NewServiceManager(cfg, dm,
/*cdrStorage*/ nil,
/*loadStorage*/ nil, filterSChan,
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
/*cdrStorage*/ nil /*loadStorage*/, nil /*filterSChan*/, nil,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
schS := NewSchedulerService()
schS := NewSchedulerService(cfg, dm, chS, server, nil)
internalCdrSChan := make(chan rpcclient.RpcClientConnection, 1)
internalCdrSChan <- nil
srvMngr.AddService(schS, &CDRServer{connChan: internalCdrSChan})
schS.SetCdrsConns(internalCdrSChan)
srvMngr.AddServices(schS)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -258,10 +258,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
}
if srvMngr.GetConfig().SupplierSCfg().Enabled {
go srvMngr.startService(utils.SupplierS)
}
if srvMngr.GetConfig().SchedulerCfg().Enabled {
go srvMngr.startService(utils.SchedulerS)
} /*
if srvMngr.GetConfig().SchedulerCfg().Enabled {
go srvMngr.startService(utils.SchedulerS)
}
if srvMngr.GetConfig().CdrsCfg().Enabled {
go srvMngr.startService(utils.CDRServer)
}
@@ -351,15 +351,15 @@ func (srvMngr *ServiceManager) handleReload() {
case <-srvMngr.GetConfig().GetReloadChan(config.SupplierSJson):
if err = srvMngr.reloadService(utils.SupplierS); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.SCHEDULER_JSN):
if err = srvMngr.reloadService(utils.SchedulerS); err != nil {
return
} /*
case <-srvMngr.GetConfig().GetReloadChan(config.SCHEDULER_JSN):
if err = srvMngr.reloadService(utils.SchedulerS); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.CDRS_JSN):
if err = srvMngr.reloadService(utils.CDRServer); err != nil {
return
}
if err = srvMngr.reloadService(utils.CDRServer); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.RALS_JSN):
if err = srvMngr.reloadService(utils.RALService); err != nil {
return