mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Updated Service interface
This commit is contained in:
committed by
Dan Christian Bogos
parent
910782b605
commit
c9029b0f5c
@@ -94,12 +94,12 @@ func (srvMngr *ServiceManager) Call(serviceMethod string, args interface{}, repl
|
||||
return err
|
||||
}
|
||||
|
||||
// ArgShutdownService are passed to Start/StopService/Status RPC methods
|
||||
// ArgStartService are passed to Start/StopService/Status RPC methods
|
||||
type ArgStartService struct {
|
||||
ServiceID string
|
||||
}
|
||||
|
||||
// ShutdownService shuts-down a service with ID
|
||||
// V1StartService starts a service with ID
|
||||
func (srvMngr *ServiceManager) V1StartService(args ArgStartService, reply *string) (err error) {
|
||||
switch args.ServiceID {
|
||||
case utils.MetaScheduler:
|
||||
@@ -118,7 +118,7 @@ func (srvMngr *ServiceManager) V1StartService(args ArgStartService, reply *strin
|
||||
return
|
||||
}
|
||||
|
||||
// ShutdownService shuts-down a service with ID
|
||||
// V1StopService shuts-down a service with ID
|
||||
func (srvMngr *ServiceManager) V1StopService(args ArgStartService, reply *string) (err error) {
|
||||
switch args.ServiceID {
|
||||
case utils.MetaScheduler:
|
||||
@@ -137,7 +137,7 @@ func (srvMngr *ServiceManager) V1StopService(args ArgStartService, reply *string
|
||||
return
|
||||
}
|
||||
|
||||
// ShutdownService shuts-down a service with ID
|
||||
// V1ServiceStatus returns the service status
|
||||
func (srvMngr *ServiceManager) V1ServiceStatus(args ArgStartService, reply *string) error {
|
||||
srvMngr.RLock()
|
||||
defer srvMngr.RUnlock()
|
||||
@@ -217,11 +217,7 @@ func (srvMngr *ServiceManager) NewConnection(subsystem string, conns []*config.R
|
||||
if len(conns) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
service, has := srvMngr.GetService(subsystem)
|
||||
if !has { // used to not cause panics because of services that are not already migrated
|
||||
return nil, errors.New(utils.UnsupportedServiceIDCaps)
|
||||
}
|
||||
internalChan := service.GetIntenternalChan()
|
||||
internalChan := srvMngr.GetService(subsystem).GetIntenternalChan()
|
||||
if srvMngr.GetConfig().DispatcherSCfg().Enabled {
|
||||
internalChan = srvMngr.dispatcherSChan
|
||||
}
|
||||
@@ -317,6 +313,7 @@ func (srvMngr *ServiceManager) handleReload() {
|
||||
for {
|
||||
select {
|
||||
case ext := <-srvMngr.engineShutdown:
|
||||
srvMngr.engineShutdown <- ext
|
||||
for srviceName, srv := range srvMngr.subsystems { // gracefully stop all running subsystems
|
||||
if !srv.IsRunning() {
|
||||
continue
|
||||
@@ -326,82 +323,81 @@ func (srvMngr *ServiceManager) handleReload() {
|
||||
utils.ServiceManager, srviceName, err))
|
||||
}
|
||||
}
|
||||
srvMngr.engineShutdown <- ext
|
||||
return
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.ATTRIBUTE_JSN):
|
||||
if err = srvMngr.reloadService(utils.AttributeS, srvMngr.GetConfig().AttributeSCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.AttributeS); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.ChargerSCfgJson):
|
||||
if err = srvMngr.reloadService(utils.ChargerS, srvMngr.GetConfig().ChargerSCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.ChargerS); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.THRESHOLDS_JSON):
|
||||
if err = srvMngr.reloadService(utils.ThresholdS, srvMngr.GetConfig().ThresholdSCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.ThresholdS); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.STATS_JSON):
|
||||
if err = srvMngr.reloadService(utils.StatS, srvMngr.GetConfig().StatSCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.StatS); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.RESOURCES_JSON):
|
||||
if err = srvMngr.reloadService(utils.ResourceS, srvMngr.GetConfig().ResourceSCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.ResourceS); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.SupplierSJson):
|
||||
if err = srvMngr.reloadService(utils.SupplierS, srvMngr.GetConfig().SupplierSCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.SupplierS); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.SCHEDULER_JSN):
|
||||
if err = srvMngr.reloadService(utils.SchedulerS, srvMngr.GetConfig().SchedulerCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.SchedulerS); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.CDRS_JSN):
|
||||
if err = srvMngr.reloadService(utils.CDRServer, srvMngr.GetConfig().CdrsCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.CDRServer); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.RALS_JSN):
|
||||
if err = srvMngr.reloadService(utils.RALService, srvMngr.GetConfig().RalsCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.RALService); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.Apier):
|
||||
if err = srvMngr.reloadService(utils.ApierV1, srvMngr.GetConfig().RalsCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.ApierV1); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.SessionSJson):
|
||||
if err = srvMngr.reloadService(utils.SessionS, srvMngr.GetConfig().SessionSCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.SessionS); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.ERsJson):
|
||||
if err = srvMngr.reloadService(utils.ERs, srvMngr.GetConfig().ERsCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.ERs); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.DNSAgentJson):
|
||||
if err = srvMngr.reloadService(utils.DNSAgent, srvMngr.GetConfig().DNSAgentCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.DNSAgent); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.FreeSWITCHAgentJSN):
|
||||
if err = srvMngr.reloadService(utils.FreeSWITCHAgent, srvMngr.GetConfig().FsAgentCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.FreeSWITCHAgent); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.KamailioAgentJSN):
|
||||
if err = srvMngr.reloadService(utils.KamailioAgent, srvMngr.GetConfig().KamAgentCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.KamailioAgent); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.AsteriskAgentJSN):
|
||||
if err = srvMngr.reloadService(utils.AsteriskAgent, srvMngr.GetConfig().AsteriskAgentCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.AsteriskAgent); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.RA_JSN):
|
||||
if err = srvMngr.reloadService(utils.RadiusAgent, srvMngr.GetConfig().RadiusAgentCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.RadiusAgent); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.DA_JSN):
|
||||
if err = srvMngr.reloadService(utils.DiameterAgent, srvMngr.GetConfig().DiameterAgentCfg().Enabled); err != nil {
|
||||
if err = srvMngr.reloadService(utils.DiameterAgent); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.HttpAgentJson):
|
||||
if err = srvMngr.reloadService(utils.HTTPAgent, len(srvMngr.GetConfig().HttpAgentCfg()) != 0); err != nil {
|
||||
if err = srvMngr.reloadService(utils.HTTPAgent); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -409,23 +405,18 @@ func (srvMngr *ServiceManager) handleReload() {
|
||||
}
|
||||
}
|
||||
|
||||
func (srvMngr *ServiceManager) reloadService(srviceName string, shouldRun bool) (err error) {
|
||||
srv, has := srvMngr.GetService(srviceName)
|
||||
if !has { // this should not happen (check the added services)
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>",
|
||||
utils.ServiceManager, srviceName))
|
||||
srvMngr.engineShutdown <- true
|
||||
return
|
||||
}
|
||||
if shouldRun {
|
||||
func (srvMngr *ServiceManager) reloadService(srviceName string) (err error) {
|
||||
srv := srvMngr.GetService(srviceName)
|
||||
|
||||
if srv.ShouldRun() {
|
||||
if srv.IsRunning() {
|
||||
if err = srv.Reload(srvMngr); err != nil {
|
||||
if err = srv.Reload(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to reload <%s>", utils.ServiceManager, srv.ServiceName()))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
}
|
||||
} else {
|
||||
if err = srv.Start(srvMngr, true); err != nil {
|
||||
if err = srv.Start(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, srv.ServiceName()))
|
||||
srvMngr.engineShutdown <- true
|
||||
return // stop if we encounter an error
|
||||
@@ -442,24 +433,23 @@ func (srvMngr *ServiceManager) reloadService(srviceName string, shouldRun bool)
|
||||
}
|
||||
|
||||
func (srvMngr *ServiceManager) startService(srviceName string) {
|
||||
srv, has := srvMngr.GetService(srviceName)
|
||||
if !has { // this should not happen (check the added services)
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>",
|
||||
utils.ServiceManager, srviceName))
|
||||
srvMngr.engineShutdown <- true
|
||||
return
|
||||
}
|
||||
if err := srv.Start(srvMngr, true); err != nil {
|
||||
srv := srvMngr.GetService(srviceName)
|
||||
if err := srv.Start(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, srviceName, err))
|
||||
srvMngr.engineShutdown <- true
|
||||
}
|
||||
}
|
||||
|
||||
// GetService returns the named service
|
||||
func (srvMngr ServiceManager) GetService(subsystem string) (srv Service, has bool) {
|
||||
func (srvMngr ServiceManager) GetService(subsystem string) (srv Service) {
|
||||
var has bool
|
||||
srvMngr.RLock()
|
||||
srv, has = srvMngr.subsystems[subsystem]
|
||||
srvMngr.RUnlock()
|
||||
if !has { // this should not happen (check the added services)
|
||||
panic(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>",
|
||||
utils.ServiceManager, subsystem)) // because this is not dinamic this should not happen
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -470,49 +460,20 @@ func (srvMngr *ServiceManager) SetCacheS(chS *engine.CacheS) {
|
||||
srvMngr.Unlock()
|
||||
}
|
||||
|
||||
// ServiceProvider should implement this to provide information for service
|
||||
type ServiceProvider interface {
|
||||
// GetDM returns the DataManager
|
||||
GetDM() *engine.DataManager
|
||||
// GetCDRStorage returns the CdrStorage
|
||||
GetCDRStorage() engine.CdrStorage
|
||||
// GetLoadStorage returns the LoadStorage
|
||||
GetLoadStorage() engine.LoadStorage
|
||||
// GetConfig returns the Configuration
|
||||
GetConfig() *config.CGRConfig
|
||||
// GetCacheS returns the CacheS
|
||||
GetCacheS() *engine.CacheS
|
||||
// GetFilterS returns the FilterS
|
||||
GetFilterS() *engine.FilterS
|
||||
// GetServer returns the Server
|
||||
GetServer() *utils.Server
|
||||
// GetExitChan returns the exit chanel
|
||||
GetExitChan() chan bool
|
||||
// NewConnection creates a rpcClient to the specified subsystem
|
||||
NewConnection(subsystem string, cfg []*config.RemoteHost) (rpcclient.RpcClientConnection, error)
|
||||
// GetService returns the named service
|
||||
GetService(subsystem string) (Service, bool)
|
||||
// AddService adds the given serices
|
||||
AddService(services ...Service)
|
||||
// SetCacheS sets the cacheS
|
||||
// Called when starting Cache Service
|
||||
SetCacheS(chS *engine.CacheS)
|
||||
}
|
||||
|
||||
// Service interface that describes what functions should a service implement
|
||||
type Service interface {
|
||||
// Start should handle the sercive start
|
||||
Start(sp ServiceProvider, waitCache bool) error
|
||||
Start() error
|
||||
// Reload handles the change of config
|
||||
Reload(sp ServiceProvider) error
|
||||
// GetRPCInterface returns the interface to register for server
|
||||
GetRPCInterface() interface{}
|
||||
Reload() error
|
||||
// Shutdown stops the service
|
||||
Shutdown() error
|
||||
// GetIntenternalChan returns the internal connection chanel
|
||||
GetIntenternalChan() chan rpcclient.RpcClientConnection
|
||||
// IsRunning returns if the service is running
|
||||
IsRunning() bool
|
||||
// ShouldRun returns if the service should be running
|
||||
ShouldRun() bool
|
||||
// ServiceName returns the service name
|
||||
ServiceName() string
|
||||
// Shutdown stops the service
|
||||
Shutdown() error
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user