From 04fe577be1c78ac76a0127aa07f6d19bca254f3f Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 12 Sep 2019 14:37:39 +0300 Subject: [PATCH] Added AttributeS as service in ServiceManager --- cmd/cgr-engine/cgr-engine.go | 12 ++-- services/attributes.go | 105 +++++++++++++++++++++++++++++++++++ servmanager/servmanager.go | 11 ++++ 3 files changed, 124 insertions(+), 4 deletions(-) create mode 100644 services/attributes.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 6b6ea413d..c65a3f66a 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -42,6 +42,7 @@ import ( "github.com/cgrates/cgrates/ers" "github.com/cgrates/cgrates/loaders" "github.com/cgrates/cgrates/scheduler" + "github.com/cgrates/cgrates/services" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" @@ -1692,6 +1693,9 @@ func main() { // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, dm, cacheS, cdrDb, loadDb, filterSChan, server, exitChan) + attrS := services.NewAttributeService() + srvManager.AddService(attrS) + internalAttributeSChan = attrS.GetIntenternalChan() go srvManager.StartServices() initServiceManagerV1(internalServeManagerChan, srvManager, server) @@ -1801,10 +1805,10 @@ func main() { // Start FilterS go startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, internalRaterChan, cfg, dm, exitChan) - if cfg.AttributeSCfg().Enabled { - go startAttributeService(internalAttributeSChan, cacheS, - cfg, dm, server, filterSChan, exitChan) - } + // if cfg.AttributeSCfg().Enabled { + // go startAttributeService(internalAttributeSChan, cacheS, + // cfg, dm, server, filterSChan, exitChan) + // } if cfg.ChargerSCfg().Enabled { go startChargerService(internalChargerSChan, internalAttributeSChan, internalDispatcherSChan, cacheS, cfg, dm, server, diff --git a/services/attributes.go b/services/attributes.go new file mode 100644 index 000000000..5b7b29219 --- /dev/null +++ b/services/attributes.go @@ -0,0 +1,105 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + + v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +func NewAttributeService() servmanager.Service { + return &AttributeService{ + connChan: make(chan rpcclient.RpcClientConnection, 1), + } +} + +// AttributeService implements Service interface +type AttributeService struct { + attrS *engine.AttributeService + rpc *v1.AttributeSv1 + connChan chan rpcclient.RpcClientConnection +} + +// Start should handle the sercive start +func (attrS *AttributeService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { + if attrS.IsRunning() { + return fmt.Errorf("service aleady running") + } + + if waitCache { + <-sp.GetCacheS().GetPrecacheChannel(utils.CacheAttributeProfiles) + <-sp.GetCacheS().GetPrecacheChannel(utils.CacheAttributeFilterIndexes) + } + + attrS.attrS, err = engine.NewAttributeService(sp.GetDM(), + sp.GetFilterS(), sp.GetConfig()) + if err != nil { + utils.Logger.Crit( + fmt.Sprintf("<%s> Could not init, error: %s", + utils.AttributeS, err.Error())) + return + } + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.ServiceManager, utils.AttributeS)) + aSv1 := v1.NewAttributeSv1(attrS.attrS) + if !sp.GetConfig().DispatcherSCfg().Enabled { + sp.GetServer().RpcRegister(aSv1) + } + attrS.connChan <- aSv1 + attrS.rpc = aSv1 + return +} + +// GetIntenternalChan returns the internal connection chanel +func (attrS *AttributeService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return attrS.connChan +} + +// Reload handles the change of config +func (attrS *AttributeService) Reload(sp servmanager.ServiceProvider) (err error) { return } + +// Shutdown stops the service +func (attrS *AttributeService) Shutdown() (err error) { + if err = attrS.attrS.Shutdown(); err != nil { + return + } + attrS.attrS = nil + attrS.rpc = nil + <-attrS.connChan + return +} + +// GetRPCInterface returns the interface to register for server +func (attrS *AttributeService) GetRPCInterface() interface{} { + return attrS.rpc +} + +// IsRunning returns if the service is running +func (attrS *AttributeService) IsRunning() bool { + return attrS != nil && attrS.attrS != nil +} + +// ServiceName returns the service name +func (attrS *AttributeService) ServiceName() string { + return utils.AttributeS +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index c609fd1c9..3a8e32cbe 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -267,6 +267,17 @@ func (srvMngr *ServiceManager) GetConnection(subsystem string, cfg *config.Remot // StartServices starts all enabled services func (srvMngr *ServiceManager) StartServices() (err error) { // go hendleReloads() + if srvMngr.cfg.AttributeSCfg().Enabled { + go func() { + if attrS, has := srvMngr.subsystems[utils.AttributeS]; !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start", utils.AttributeS)) + srvMngr.engineShutdown <- true + } else if err = attrS.Start(srvMngr, true); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start because: %s", utils.AttributeS, err)) + srvMngr.engineShutdown <- true + } + }() + } // startServer() return