From 465848a5dd38fc3e126042feb86de3ff4019ffb7 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 14 May 2020 12:35:05 +0200 Subject: [PATCH] Adding service infrastructure for the RateS --- cmd/cgr-engine/cgr-engine.go | 10 ++- config/config.go | 7 ++ config/config_json.go | 3 +- config/ratescfg.go | 23 +++++++ rates/rates.go | 74 +++++++++++++++++++++ services/ees.go | 2 +- services/rates.go | 124 +++++++++++++++++++++++++++++++++++ servmanager/servmanager.go | 9 +++ utils/consts.go | 2 + 9 files changed, 250 insertions(+), 4 deletions(-) create mode 100644 config/ratescfg.go create mode 100644 rates/rates.go create mode 100644 services/rates.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index eeb3269ce..2bb60e4df 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -115,7 +115,7 @@ func startRpc(server *utils.Server, internalRaterChan, internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan, internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsv1Chan, internalCacheSChan, - internalEEsChan chan rpcclient.ClientConnector, + internalEEsChan, internalRateSChan chan rpcclient.ClientConnector, exitChan chan bool) { if !cfg.DispatcherSCfg().Enabled { select { // Any of the rpc methods will unlock listening to rpc requests @@ -147,6 +147,8 @@ func startRpc(server *utils.Server, internalRaterChan, internalCacheSChan <- chS case eeS := <-internalEEsChan: internalEEsChan <- eeS + case rateS := <-internalRateSChan: + internalRateSChan <- rateS } } else { select { @@ -416,6 +418,7 @@ func main() { internalAPIerSv2Chan := make(chan rpcclient.ClientConnector, 1) internalLoaderSChan := make(chan rpcclient.ClientConnector, 1) internalEEsChan := make(chan rpcclient.ClientConnector, 1) + internalRateSChan := make(chan rpcclient.ClientConnector, 1) // initialize the connManager before creating the DMService // because we need to pass the connection to it @@ -440,6 +443,7 @@ func main() { utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore): internalCoreSv1Chan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRALs): internalRALsChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs): internalEEsChan, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRateS): internalRateSChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan, }) @@ -523,6 +527,8 @@ func main() { ldrs, anz, dspS, dmService, storDBService, services.NewEventExporterService(cfg, filterSChan, connManager, server, exitChan, internalEEsChan), + services.NewRateService(cfg, filterSChan, + server, exitChan, internalRateSChan), ) srvManager.StartServices() // Start FilterS @@ -563,7 +569,7 @@ func main() { attrS.GetIntenternalChan(), chrS.GetIntenternalChan(), tS.GetIntenternalChan(), routeS.GetIntenternalChan(), smg.GetIntenternalChan(), anz.GetIntenternalChan(), dspS.GetIntenternalChan(), ldrs.GetIntenternalChan(), rals.GetIntenternalChan(), - internalCacheSChan, internalEEsChan, exitChan) + internalCacheSChan, internalEEsChan, internalRateSChan, exitChan) <-exitChan if *cpuProfDir != "" { // wait to end cpuProfiling diff --git a/config/config.go b/config/config.go index 1e5b44ae2..e4ee118a4 100755 --- a/config/config.go +++ b/config/config.go @@ -303,6 +303,7 @@ type CGRConfig struct { apier *ApierCfg // APIer config ersCfg *ERsCfg // EventReader config eesCfg *EEsCfg // EventExporter config + rateSCfg *RateSCfg // RateS config } var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes, @@ -990,6 +991,12 @@ func (cfg *CGRConfig) EEsNoLksCfg() *EEsCfg { return cfg.eesCfg } +func (cfg *CGRConfig) RateSCfg() *RateSCfg { + cfg.lks[RateSJson].RLock() + defer cfg.lks[RateSJson].RUnlock() + return cfg.rateSCfg +} + // RPCConns reads the RPCConns configuration func (cfg *CGRConfig) RPCConns() map[string]*RPCConn { cfg.lks[RPCConnsJsonName].RLock() diff --git a/config/config_json.go b/config/config_json.go index ae815ba0d..4fad842ba 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -59,6 +59,7 @@ const ( DNSAgentJson = "dns_agent" ERsJson = "ers" EEsJson = "ees" + RateSJson = "rates" RPCConnsJsonName = "rpc_conns" ) @@ -68,7 +69,7 @@ var ( CDRS_JSN, CDRE_JSN, ERsJson, SessionSJson, AsteriskAgentJSN, FreeSWITCHAgentJSN, KamailioAgentJSN, DA_JSN, RA_JSN, HttpAgentJson, DNSAgentJson, ATTRIBUTE_JSN, ChargerSCfgJson, RESOURCES_JSON, STATS_JSON, THRESHOLDS_JSON, RouteSJson, LoaderJson, MAILER_JSN, SURETAX_JSON, CgrLoaderCfgJson, CgrMigratorCfgJson, DispatcherSJson, - AnalyzerCfgJson, ApierS, EEsJson} + AnalyzerCfgJson, ApierS, EEsJson, RateSJson} ) // Loads the json config out of io.Reader, eg other sources than file, maybe over http diff --git a/config/ratescfg.go b/config/ratescfg.go new file mode 100644 index 000000000..f945311a2 --- /dev/null +++ b/config/ratescfg.go @@ -0,0 +1,23 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package config + +type RateSCfg struct { + Enabled bool +} diff --git a/rates/rates.go b/rates/rates.go new file mode 100644 index 000000000..e0398d591 --- /dev/null +++ b/rates/rates.go @@ -0,0 +1,74 @@ +/* +Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package rates + +import ( + "fmt" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +// NewRateS instantiates the RateS +func NewRateS(cfg *config.CGRConfig, filterS *engine.FilterS) *RateS { + return &RateS{ + cfg: cfg, + filterS: filterS, + } +} + +// RateS calculates costs for events +type RateS struct { + cfg *config.CGRConfig + filterS *engine.FilterS +} + +// ListenAndServe keeps the service alive +func (rS *RateS) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (err error) { + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s>", + utils.CoreS, utils.RateS)) + for { + select { + case e := <-exitChan: // global exit + rS.Shutdown() + exitChan <- e // put back for the others listening for shutdown request + break + case rld := <-cfgRld: // configuration was reloaded, destroy the cache + cfgRld <- rld + } + } + return +} + +// Shutdown is called to shutdown the service +func (rS *RateS) Shutdown() (err error) { + utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.RateS)) + return +} + +// Call implements rpcclient.ClientConnector interface for internal RPC +func (rS *RateS) Call(serviceMethod string, args interface{}, reply interface{}) error { + return utils.RPCCall(rS, serviceMethod, args, reply) +} + +// V1CostForEvent will be called to calculate the cost for an event +func (rS *RateS) V1CostForEvent(cgrEv *utils.CGREventWithOpts, rply *string) (err error) { + return +} diff --git a/services/ees.go b/services/ees.go index 5f907ba8f..f1b32f902 100644 --- a/services/ees.go +++ b/services/ees.go @@ -83,7 +83,7 @@ func (es *EventExporterService) ShouldRun() (should bool) { func (es *EventExporterService) IsRunning() bool { es.RLock() defer es.RUnlock() - return es != nil && es.eeS != nil + return es.eeS != nil } // Reload handles the change of config diff --git a/services/rates.go b/services/rates.go new file mode 100644 index 000000000..4c7e1b3db --- /dev/null +++ b/services/rates.go @@ -0,0 +1,124 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/cgrates/rates" + //"github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/rpcclient" +) + +// NewRateService constructs RateService +func NewRateService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, + server *utils.Server, exitChan chan bool, + intConnChan chan rpcclient.ClientConnector) servmanager.Service { + return &RateService{ + cfg: cfg, + filterSChan: filterSChan, + server: server, + exitChan: exitChan, + intConnChan: intConnChan, + rldChan: make(chan struct{}), + } +} + +// RateService is the service structure for RateS +type RateService struct { + sync.RWMutex + + cfg *config.CGRConfig + filterSChan chan *engine.FilterS + server *utils.Server + exitChan chan bool + intConnChan chan rpcclient.ClientConnector + rldChan chan struct{} + + rateS *rates.RateS + //rpc *v1.EventExporterSv1 +} + +// GetIntenternalChan is deprecated and it will be removed shortly +func (rs *RateService) GetIntenternalChan() (conn chan rpcclient.ClientConnector) { + panic("deprecated method") +} + + + +// ServiceName returns the service name +func (rs *RateService) ServiceName() string { + return utils.RateS +} + +// ShouldRun returns if the service should be running +func (rs *RateService) ShouldRun() (should bool) { + return rs.cfg.RateSCfg().Enabled +} + +// IsRunning returns if the service is running +func (rs *RateService) IsRunning() bool { + rs.RLock() + defer rs.RUnlock() + return rs.rateS != nil +} + +// Reload handles the change of config +func (rs *RateService) Reload() (err error) { + rs.rldChan <- struct{}{} + return +} + +// Shutdown stops the service +func (rs *RateService) Shutdown() (err error) { + rs.Lock() + defer rs.Unlock() + if err = rs.rateS.Shutdown(); err != nil { + return + } + rs.rateS = nil + <-rs.intConnChan + return +} + +// Start should handle the service start +func (rs *RateService) Start() (err error) { + if rs.IsRunning() { + return fmt.Errorf("service aleady running") + } + + fltrS := <-rs.filterSChan + rs.filterSChan <- fltrS + + rs.Lock() + rs.rateS = rates.NewRateS(rs.cfg, fltrS) + rs.Unlock() + /*rs.rpc = v1.NewEventExporterSv1(es.eeS) + if !rs.cfg.DispatcherSCfg().Enabled { + rs.server.RpcRegister(es.rpc) + } + */ + rs.intConnChan <- rs.rateS + return rs.rateS.ListenAndServe(rs.exitChan, rs.rldChan) +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index fec0e5011..bf40534a8 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -173,6 +173,7 @@ func (srvMngr *ServiceManager) StartServices() (err error) { utils.AnalyzerS: srvMngr.GetConfig().AnalyzerSCfg().Enabled, utils.DispatcherS: srvMngr.GetConfig().DispatcherSCfg().Enabled, utils.EventExporterS: srvMngr.GetConfig().EEsCfg().Enabled, + utils.RateS: srvMngr.GetConfig().RateSCfg().Enabled, } { if shouldRun { go srvMngr.startService(serviceName) @@ -309,6 +310,14 @@ func (srvMngr *ServiceManager) handleReload() { if err = srvMngr.reloadService(utils.StorDB); err != nil { return } + case <-srvMngr.GetConfig().GetReloadChan(config.EEsJson): + if err = srvMngr.reloadService(config.EEsJson); err != nil { + return + } + case <-srvMngr.GetConfig().GetReloadChan(config.RateSJson): + if err = srvMngr.reloadService(config.RateSJson); err != nil { + return + } case <-srvMngr.GetConfig().GetReloadChan(config.RPCConnsJsonName): engine.Cache.Clear([]string{utils.CacheRPCConnections}) } diff --git a/utils/consts.go b/utils/consts.go index 9da3ede6f..79fd4f056 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -380,6 +380,7 @@ const ( MetaCache = "*cache" MetaGuardian = "*guardians" MetaEEs = "*ees" + MetaRateS = "*rates" MetaContinue = "*continue" Migrator = "migrator" UnsupportedMigrationTask = "unsupported migration task" @@ -725,6 +726,7 @@ const ( MetaYearly = "*yearly" MetaDaily = "*daily" MetaWeekly = "*weekly" + RateS = "RateS" ) // Migrator Action