From 382a9707823ff95be96611209472f9a20cc312fa Mon Sep 17 00:00:00 2001 From: TeoV Date: Tue, 22 Dec 2020 10:59:27 +0200 Subject: [PATCH] Add AccountS in services and cmd/cgr-engine --- accounts/accounts.go | 2 - cmd/cgr-engine/cgr-engine.go | 13 ++- data/conf/samples/tutmysql/cgrates.json | 6 + services/accounts.go | 139 ++++++++++++++++++++++++ services/actions.go | 6 +- utils/consts.go | 5 + 6 files changed, 164 insertions(+), 7 deletions(-) create mode 100644 services/accounts.go diff --git a/accounts/accounts.go b/accounts/accounts.go index 1426e5372..019913453 100644 --- a/accounts/accounts.go +++ b/accounts/accounts.go @@ -44,8 +44,6 @@ type AccountS struct { // ListenAndServe keeps the service alive func (aS *AccountS) ListenAndServe(stopChan, cfgRld chan struct{}) { - utils.Logger.Info(fmt.Sprintf("<%s> starting <%s>", - utils.CoreS, utils.AccountS)) for { select { case <-stopChan: diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index b7a80b5bc..13b4acbb7 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -144,7 +144,8 @@ func startRPC(server *cores.Server, internalRaterChan, internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan, internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsv1Chan, internalCacheSChan, - internalEEsChan, internalRateSChan, internalActionSChan chan rpcclient.ClientConnector, + internalEEsChan, internalRateSChan, internalActionSChan, + internalAccountSChan chan rpcclient.ClientConnector, shdChan *utils.SyncedChan) { if !cfg.DispatcherSCfg().Enabled { select { // Any of the rpc methods will unlock listening to rpc requests @@ -180,6 +181,8 @@ func startRPC(server *cores.Server, internalRaterChan, internalRateSChan <- rateS case actionS := <-internalActionSChan: internalActionSChan <- actionS + case accountS := <-internalAccountSChan: + internalAccountSChan <- accountS case <-shdChan.Done(): return } @@ -497,6 +500,7 @@ func main() { internalEEsChan := make(chan rpcclient.ClientConnector, 1) internalRateSChan := make(chan rpcclient.ClientConnector, 1) internalActionSChan := make(chan rpcclient.ClientConnector, 1) + internalAccountSChan := make(chan rpcclient.ClientConnector, 1) // initialize the connManager before creating the DMService // because we need to pass the connection to it @@ -524,6 +528,7 @@ func main() { utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRateS): internalRateSChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions): internalActionSChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts): internalAccountSChan, }) srvDep := map[string]*sync.WaitGroup{ utils.AnalyzerS: new(sync.WaitGroup), @@ -559,6 +564,7 @@ func main() { utils.StorDB: new(sync.WaitGroup), utils.ThresholdS: new(sync.WaitGroup), utils.ActionS: new(sync.WaitGroup), + utils.AccountS: new(sync.WaitGroup), } gvService := services.NewGlobalVarS(cfg, srvDep) shdWg.Add(1) @@ -673,6 +679,7 @@ func main() { server, internalRateSChan, anz, srvDep), services.NewSIPAgent(cfg, filterSChan, shdChan, connManager, srvDep), services.NewActionService(cfg, dmService, cacheS, filterSChan, server, internalActionSChan, anz, srvDep), + services.NewAccountService(cfg, dmService, cacheS, filterSChan, server, internalAccountSChan, anz, srvDep), ) srvManager.StartServices() // Start FilterS @@ -708,6 +715,7 @@ func main() { engine.IntRPC.AddInternalRPCClient(utils.ActionSv1, internalActionSChan) engine.IntRPC.AddInternalRPCClient(utils.EeSv1, internalEEsChan) engine.IntRPC.AddInternalRPCClient(utils.DispatcherSv1, internalDispatcherSChan) + engine.IntRPC.AddInternalRPCClient(utils.AccountSv1, internalAccountSChan) initConfigSv1(internalConfigChan, server, anz) @@ -721,7 +729,8 @@ func main() { internalAttributeSChan, internalChargerSChan, internalThresholdSChan, internalRouteSChan, internalSessionSChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsChan, - internalCacheSChan, internalEEsChan, internalRateSChan, internalActionSChan, shdChan) + internalCacheSChan, internalEEsChan, internalRateSChan, internalActionSChan, + internalAccountSChan, shdChan) <-shdChan.Done() shtdDone := make(chan struct{}) diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index 267827a14..49dfdd1b0 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -121,6 +121,12 @@ "enabled": true }, + +"accounts": { + "enabled": true +}, + + "filters": { "apiers_conns": ["*internal"], }, diff --git a/services/accounts.go b/services/accounts.go new file mode 100644 index 000000000..04828f638 --- /dev/null +++ b/services/accounts.go @@ -0,0 +1,139 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + "github.com/cgrates/cgrates/accounts" + + v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewAccountService returns the Account Service +func NewAccountService(cfg *config.CGRConfig, dm *DataDBService, + cacheS *engine.CacheS, filterSChan chan *engine.FilterS, + server *cores.Server, internalChan chan rpcclient.ClientConnector, + anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { + return &AccountService{ + connChan: internalChan, + cfg: cfg, + dm: dm, + cacheS: cacheS, + filterSChan: filterSChan, + server: server, + anz: anz, + srvDep: srvDep, + rldChan: make(chan struct{}), + } +} + +// AccountService implements Service interface +type AccountService struct { + sync.RWMutex + cfg *config.CGRConfig + dm *DataDBService + cacheS *engine.CacheS + filterSChan chan *engine.FilterS + server *cores.Server + + rldChan chan struct{} + stopChan chan struct{} + + acts *accounts.AccountS + rpc *v1.AccountSv1 // useful on restart + connChan chan rpcclient.ClientConnector // publish the internal Subsystem when available + anz *AnalyzerService + srvDep map[string]*sync.WaitGroup +} + +// Start should handle the sercive start +func (acts *AccountService) Start() (err error) { + if acts.IsRunning() { + return utils.ErrServiceAlreadyRunning + } + + <-acts.cacheS.GetPrecacheChannel(utils.CacheAccountProfiles) + <-acts.cacheS.GetPrecacheChannel(utils.CacheAccounts2) + <-acts.cacheS.GetPrecacheChannel(utils.CacheAccountProfilesFilterIndexes) + + filterS := <-acts.filterSChan + acts.filterSChan <- filterS + dbchan := acts.dm.GetDMChan() + datadb := <-dbchan + dbchan <- datadb + + acts.Lock() + defer acts.Unlock() + acts.acts = accounts.NewAccountS(acts.cfg, filterS, datadb) + acts.stopChan = make(chan struct{}) + go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan) + + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AccountS)) + acts.rpc = v1.NewAccountSv1(acts.acts) + if !acts.cfg.DispatcherSCfg().Enabled { + acts.server.RpcRegister(acts.rpc) + } + acts.connChan <- acts.anz.GetInternalCodec(acts.rpc, utils.AccountS) + return +} + +// Reload handles the change of config +func (acts *AccountService) Reload() (err error) { + acts.rldChan <- struct{}{} + return // for the moment nothing to reload +} + +// Shutdown stops the service +func (acts *AccountService) Shutdown() (err error) { + acts.Lock() + defer acts.Unlock() + close(acts.stopChan) + if err = acts.acts.Shutdown(); err != nil { + return + } + acts.acts = nil + acts.rpc = nil + <-acts.connChan + return +} + +// IsRunning returns if the service is running +func (acts *AccountService) IsRunning() bool { + acts.RLock() + defer acts.RUnlock() + return acts != nil && acts.acts != nil +} + +// ServiceName returns the service name +func (acts *AccountService) ServiceName() string { + return utils.AccountS +} + +// ShouldRun returns if the service should be running +func (acts *AccountService) ShouldRun() bool { + return acts.cfg.AccountSCfg().Enabled +} diff --git a/services/actions.go b/services/actions.go index 20ffd7e9e..1a590c2f1 100644 --- a/services/actions.go +++ b/services/actions.go @@ -70,7 +70,7 @@ type ActionService struct { srvDep map[string]*sync.WaitGroup } -// Start should handle the sercive start +// Start should handle the service start func (acts *ActionService) Start() (err error) { if acts.IsRunning() { return utils.ErrServiceAlreadyRunning @@ -91,12 +91,12 @@ func (acts *ActionService) Start() (err error) { acts.stopChan = make(chan struct{}) go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan) - utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AttributeS)) + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ActionS)) acts.rpc = v1.NewActionSv1(acts.acts) if !acts.cfg.DispatcherSCfg().Enabled { acts.server.RpcRegister(acts.rpc) } - acts.connChan <- acts.anz.GetInternalCodec(acts.rpc, utils.AttributeS) + acts.connChan <- acts.anz.GetInternalCodec(acts.rpc, utils.ActionS) return } diff --git a/utils/consts.go b/utils/consts.go index 5ccdf6b7b..e1c8ecfb4 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -1552,6 +1552,11 @@ const ( ActionSv1Ping = "ActionSv1.Ping" ) +const ( + AccountSv1 = "AccountSv1" + AccountSv1Ping = "AccountSv1.Ping" +) + const ( CoreS = "CoreS" CoreSv1 = "CoreSv1"