From 9dcbaaf06ac7e400a0ae2b2a24db970125276d89 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 20 Sep 2019 14:46:30 +0300 Subject: [PATCH] Added Responder service implementation --- apier/v1/apier.go | 13 +++---- cmd/cgr-engine/rater.go | 3 +- services/responders.go | 75 ++++++++++++++++++++++++++++---------- servmanager/servmanager.go | 4 ++ 4 files changed, 66 insertions(+), 29 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 99ff2381b..ddc28be3d 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -48,13 +48,12 @@ type ApierV1 struct { Config *config.CGRConfig Responder *engine.Responder CDRs rpcclient.RpcClientConnection // FixMe: populate it from cgr-engine - Scheduler SchedulerGeter - // ServManager *servmanager.ServiceManager // Need to have them capitalize so we can export in V2 - HTTPPoster *engine.HTTPPoster - FilterS *engine.FilterS //Used for CDR Exporter - CacheS rpcclient.RpcClientConnection - SchedulerS rpcclient.RpcClientConnection - AttributeS rpcclient.RpcClientConnection + Scheduler SchedulerGeter // Need to have them capitalize so we can export in V2 + HTTPPoster *engine.HTTPPoster + FilterS *engine.FilterS //Used for CDR Exporter + CacheS rpcclient.RpcClientConnection + SchedulerS rpcclient.RpcClientConnection + AttributeS rpcclient.RpcClientConnection } // Call implements rpcclient.RpcClientConnection interface for internal RPC diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index 1aa28cd2a..427ce2219 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -205,8 +205,7 @@ func startRater(internalRaterChan, internalApierv1, internalApierv2, internalThd CdrDb: cdrDb, Config: cfg, Responder: responder, - // ServManager: serviceManager, - Scheduler: schS, + Scheduler: schS, HTTPPoster: engine.NewHTTPPoster(cfg.GeneralCfg().HttpSkipTlsVerify, cfg.GeneralCfg().ReplyTimeout), FilterS: filterS, diff --git a/services/responders.go b/services/responders.go index bd92ee745..61c25e91c 100644 --- a/services/responders.go +++ b/services/responders.go @@ -19,33 +19,68 @@ along with this program. If not, see package services import ( + "fmt" + + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) // NewResponderService returns the Resonder Service -func NewResponderService(connChan chan rpcclient.RpcClientConnection) servmanager.Service { +func NewResponderService(connChan chan rpcclient.RpcClientConnection) *ResponderService { return &ResponderService{ connChan: connChan, } } // ResponderService implements Service interface -// ToDo: Add the rest of functionality -// only the chanel without reload functionality type ResponderService struct { - // resp *engine.ResponderService - // rpc *v1.respV1 + resp *engine.Responder connChan chan rpcclient.RpcClientConnection } // Start should handle the sercive start +// For this service the start should be called from RAL Service func (resp *ResponderService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { - // if resp.IsRunning() { - // return fmt.Errorf("service aleady running") - // } - return utils.ErrNotImplemented + if resp.IsRunning() { + return fmt.Errorf("service aleady running") + } + var waitTasks []chan struct{} + cacheTaskChan := make(chan struct{}) + waitTasks = append(waitTasks, cacheTaskChan) + + var thdS, stats rpcclient.RpcClientConnection + if thdS, err = sp.GetConnection(utils.ThresholdS, sp.GetConfig().RalsCfg().RALsThresholdSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s, error: %s", + utils.RALService, utils.ThresholdS, err.Error())) + return + } + if stats, err = sp.GetConnection(utils.StatS, sp.GetConfig().RalsCfg().RALsStatSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s, error: %s", + utils.RALService, utils.StatS, err.Error())) + return + } + if thdS != nil { + engine.SetThresholdS(thdS) // temporary architectural fix until we will have separate AccountS + } + if stats != nil { + engine.SetStatS(stats) + } + + resp.resp = &engine.Responder{ + ExitChan: sp.GetExitChan(), + MaxComputedUsage: sp.GetConfig().RalsCfg().RALsMaxComputedUsage, + } + + if !sp.GetConfig().DispatcherSCfg().Enabled { + sp.GetServer().RpcRegister(resp.resp) + } + + utils.RegisterRpcParams("", resp.resp) + + resp.connChan <- resp.resp // Rater done + return } // GetIntenternalChan returns the internal connection chanel @@ -55,32 +90,32 @@ func (resp *ResponderService) GetIntenternalChan() (conn chan rpcclient.RpcClien // Reload handles the change of config func (resp *ResponderService) Reload(sp servmanager.ServiceProvider) (err error) { - return utils.ErrNotImplemented + return } // Shutdown stops the service func (resp *ResponderService) Shutdown() (err error) { - return utils.ErrNotImplemented - // if err = resp.resp.Shutdown(); err != nil { - // return - // } - // resp.resp = nil - // resp.rpc = nil - // <-resp.connChan - // return + resp.resp = nil + <-resp.connChan + return } // GetRPCInterface returns the interface to register for server func (resp *ResponderService) GetRPCInterface() interface{} { - return nil //resp.rpc + return resp.resp } // IsRunning returns if the service is running func (resp *ResponderService) IsRunning() bool { - return resp != nil // && resp.resp != nil + return resp != nil && resp.resp != nil } // ServiceName returns the service name func (resp *ResponderService) ServiceName() string { return utils.ResponderS } + +// GetResponder returns the responder created +func (resp *ResponderService) GetResponder() *engine.Responder { + return resp.resp +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 5000a7a90..fb9b20cfa 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -478,6 +478,10 @@ type ServiceProvider interface { GetExitChan() chan bool // GetConnection creates a rpcClient to the specified subsystem GetConnection(subsystem string, cfg []*config.RemoteHost) (rpcclient.RpcClientConnection, error) + // GetService returns the named service + GetService(subsystem string) (Service, error) + // AddService adds the given serices + AddService(services ...Service) } // Service interface that describes what functions should a service implement