diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index eeb3269ce..2bb60e4df 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -115,7 +115,7 @@ func startRpc(server *utils.Server, internalRaterChan,
internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan,
internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan,
internalLoaderSChan, internalRALsv1Chan, internalCacheSChan,
- internalEEsChan chan rpcclient.ClientConnector,
+ internalEEsChan, internalRateSChan chan rpcclient.ClientConnector,
exitChan chan bool) {
if !cfg.DispatcherSCfg().Enabled {
select { // Any of the rpc methods will unlock listening to rpc requests
@@ -147,6 +147,8 @@ func startRpc(server *utils.Server, internalRaterChan,
internalCacheSChan <- chS
case eeS := <-internalEEsChan:
internalEEsChan <- eeS
+ case rateS := <-internalRateSChan:
+ internalRateSChan <- rateS
}
} else {
select {
@@ -416,6 +418,7 @@ func main() {
internalAPIerSv2Chan := make(chan rpcclient.ClientConnector, 1)
internalLoaderSChan := make(chan rpcclient.ClientConnector, 1)
internalEEsChan := make(chan rpcclient.ClientConnector, 1)
+ internalRateSChan := make(chan rpcclient.ClientConnector, 1)
// initialize the connManager before creating the DMService
// because we need to pass the connection to it
@@ -440,6 +443,7 @@ func main() {
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore): internalCoreSv1Chan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRALs): internalRALsChan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs): internalEEsChan,
+ utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRateS): internalRateSChan,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan,
})
@@ -523,6 +527,8 @@ func main() {
ldrs, anz, dspS, dmService, storDBService,
services.NewEventExporterService(cfg, filterSChan,
connManager, server, exitChan, internalEEsChan),
+ services.NewRateService(cfg, filterSChan,
+ server, exitChan, internalRateSChan),
)
srvManager.StartServices()
// Start FilterS
@@ -563,7 +569,7 @@ func main() {
attrS.GetIntenternalChan(), chrS.GetIntenternalChan(), tS.GetIntenternalChan(),
routeS.GetIntenternalChan(), smg.GetIntenternalChan(), anz.GetIntenternalChan(),
dspS.GetIntenternalChan(), ldrs.GetIntenternalChan(), rals.GetIntenternalChan(),
- internalCacheSChan, internalEEsChan, exitChan)
+ internalCacheSChan, internalEEsChan, internalRateSChan, exitChan)
<-exitChan
if *cpuProfDir != "" { // wait to end cpuProfiling
diff --git a/config/config.go b/config/config.go
index 1e5b44ae2..e4ee118a4 100755
--- a/config/config.go
+++ b/config/config.go
@@ -303,6 +303,7 @@ type CGRConfig struct {
apier *ApierCfg // APIer config
ersCfg *ERsCfg // EventReader config
eesCfg *EEsCfg // EventExporter config
+ rateSCfg *RateSCfg // RateS config
}
var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes,
@@ -990,6 +991,12 @@ func (cfg *CGRConfig) EEsNoLksCfg() *EEsCfg {
return cfg.eesCfg
}
+func (cfg *CGRConfig) RateSCfg() *RateSCfg {
+ cfg.lks[RateSJson].RLock()
+ defer cfg.lks[RateSJson].RUnlock()
+ return cfg.rateSCfg
+}
+
// RPCConns reads the RPCConns configuration
func (cfg *CGRConfig) RPCConns() map[string]*RPCConn {
cfg.lks[RPCConnsJsonName].RLock()
diff --git a/config/config_json.go b/config/config_json.go
index ae815ba0d..4fad842ba 100644
--- a/config/config_json.go
+++ b/config/config_json.go
@@ -59,6 +59,7 @@ const (
DNSAgentJson = "dns_agent"
ERsJson = "ers"
EEsJson = "ees"
+ RateSJson = "rates"
RPCConnsJsonName = "rpc_conns"
)
@@ -68,7 +69,7 @@ var (
CDRS_JSN, CDRE_JSN, ERsJson, SessionSJson, AsteriskAgentJSN, FreeSWITCHAgentJSN, KamailioAgentJSN,
DA_JSN, RA_JSN, HttpAgentJson, DNSAgentJson, ATTRIBUTE_JSN, ChargerSCfgJson, RESOURCES_JSON, STATS_JSON, THRESHOLDS_JSON,
RouteSJson, LoaderJson, MAILER_JSN, SURETAX_JSON, CgrLoaderCfgJson, CgrMigratorCfgJson, DispatcherSJson,
- AnalyzerCfgJson, ApierS, EEsJson}
+ AnalyzerCfgJson, ApierS, EEsJson, RateSJson}
)
// Loads the json config out of io.Reader, eg other sources than file, maybe over http
diff --git a/config/ratescfg.go b/config/ratescfg.go
new file mode 100644
index 000000000..f945311a2
--- /dev/null
+++ b/config/ratescfg.go
@@ -0,0 +1,23 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package config
+
+type RateSCfg struct {
+ Enabled bool
+}
diff --git a/rates/rates.go b/rates/rates.go
new file mode 100644
index 000000000..e0398d591
--- /dev/null
+++ b/rates/rates.go
@@ -0,0 +1,74 @@
+/*
+Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package rates
+
+import (
+ "fmt"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+)
+
+// NewRateS instantiates the RateS
+func NewRateS(cfg *config.CGRConfig, filterS *engine.FilterS) *RateS {
+ return &RateS{
+ cfg: cfg,
+ filterS: filterS,
+ }
+}
+
+// RateS calculates costs for events
+type RateS struct {
+ cfg *config.CGRConfig
+ filterS *engine.FilterS
+}
+
+// ListenAndServe keeps the service alive
+func (rS *RateS) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (err error) {
+ utils.Logger.Info(fmt.Sprintf("<%s> starting <%s>",
+ utils.CoreS, utils.RateS))
+ for {
+ select {
+ case e := <-exitChan: // global exit
+ rS.Shutdown()
+ exitChan <- e // put back for the others listening for shutdown request
+ break
+ case rld := <-cfgRld: // configuration was reloaded, destroy the cache
+ cfgRld <- rld
+ }
+ }
+ return
+}
+
+// Shutdown is called to shutdown the service
+func (rS *RateS) Shutdown() (err error) {
+ utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.RateS))
+ return
+}
+
+// Call implements rpcclient.ClientConnector interface for internal RPC
+func (rS *RateS) Call(serviceMethod string, args interface{}, reply interface{}) error {
+ return utils.RPCCall(rS, serviceMethod, args, reply)
+}
+
+// V1CostForEvent will be called to calculate the cost for an event
+func (rS *RateS) V1CostForEvent(cgrEv *utils.CGREventWithOpts, rply *string) (err error) {
+ return
+}
diff --git a/services/ees.go b/services/ees.go
index 5f907ba8f..f1b32f902 100644
--- a/services/ees.go
+++ b/services/ees.go
@@ -83,7 +83,7 @@ func (es *EventExporterService) ShouldRun() (should bool) {
func (es *EventExporterService) IsRunning() bool {
es.RLock()
defer es.RUnlock()
- return es != nil && es.eeS != nil
+ return es.eeS != nil
}
// Reload handles the change of config
diff --git a/services/rates.go b/services/rates.go
new file mode 100644
index 000000000..4c7e1b3db
--- /dev/null
+++ b/services/rates.go
@@ -0,0 +1,124 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package services
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/servmanager"
+ "github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/cgrates/rates"
+ //"github.com/cgrates/cgrates/apier/v1"
+ "github.com/cgrates/rpcclient"
+)
+
+// NewRateService constructs RateService
+func NewRateService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
+ server *utils.Server, exitChan chan bool,
+ intConnChan chan rpcclient.ClientConnector) servmanager.Service {
+ return &RateService{
+ cfg: cfg,
+ filterSChan: filterSChan,
+ server: server,
+ exitChan: exitChan,
+ intConnChan: intConnChan,
+ rldChan: make(chan struct{}),
+ }
+}
+
+// RateService is the service structure for RateS
+type RateService struct {
+ sync.RWMutex
+
+ cfg *config.CGRConfig
+ filterSChan chan *engine.FilterS
+ server *utils.Server
+ exitChan chan bool
+ intConnChan chan rpcclient.ClientConnector
+ rldChan chan struct{}
+
+ rateS *rates.RateS
+ //rpc *v1.EventExporterSv1
+}
+
+// GetIntenternalChan is deprecated and it will be removed shortly
+func (rs *RateService) GetIntenternalChan() (conn chan rpcclient.ClientConnector) {
+ panic("deprecated method")
+}
+
+
+
+// ServiceName returns the service name
+func (rs *RateService) ServiceName() string {
+ return utils.RateS
+}
+
+// ShouldRun returns if the service should be running
+func (rs *RateService) ShouldRun() (should bool) {
+ return rs.cfg.RateSCfg().Enabled
+}
+
+// IsRunning returns if the service is running
+func (rs *RateService) IsRunning() bool {
+ rs.RLock()
+ defer rs.RUnlock()
+ return rs.rateS != nil
+}
+
+// Reload handles the change of config
+func (rs *RateService) Reload() (err error) {
+ rs.rldChan <- struct{}{}
+ return
+}
+
+// Shutdown stops the service
+func (rs *RateService) Shutdown() (err error) {
+ rs.Lock()
+ defer rs.Unlock()
+ if err = rs.rateS.Shutdown(); err != nil {
+ return
+ }
+ rs.rateS = nil
+ <-rs.intConnChan
+ return
+}
+
+// Start should handle the service start
+func (rs *RateService) Start() (err error) {
+ if rs.IsRunning() {
+ return fmt.Errorf("service aleady running")
+ }
+
+ fltrS := <-rs.filterSChan
+ rs.filterSChan <- fltrS
+
+ rs.Lock()
+ rs.rateS = rates.NewRateS(rs.cfg, fltrS)
+ rs.Unlock()
+ /*rs.rpc = v1.NewEventExporterSv1(es.eeS)
+ if !rs.cfg.DispatcherSCfg().Enabled {
+ rs.server.RpcRegister(es.rpc)
+ }
+ */
+ rs.intConnChan <- rs.rateS
+ return rs.rateS.ListenAndServe(rs.exitChan, rs.rldChan)
+}
diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go
index fec0e5011..bf40534a8 100644
--- a/servmanager/servmanager.go
+++ b/servmanager/servmanager.go
@@ -173,6 +173,7 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
utils.AnalyzerS: srvMngr.GetConfig().AnalyzerSCfg().Enabled,
utils.DispatcherS: srvMngr.GetConfig().DispatcherSCfg().Enabled,
utils.EventExporterS: srvMngr.GetConfig().EEsCfg().Enabled,
+ utils.RateS: srvMngr.GetConfig().RateSCfg().Enabled,
} {
if shouldRun {
go srvMngr.startService(serviceName)
@@ -309,6 +310,14 @@ func (srvMngr *ServiceManager) handleReload() {
if err = srvMngr.reloadService(utils.StorDB); err != nil {
return
}
+ case <-srvMngr.GetConfig().GetReloadChan(config.EEsJson):
+ if err = srvMngr.reloadService(config.EEsJson); err != nil {
+ return
+ }
+ case <-srvMngr.GetConfig().GetReloadChan(config.RateSJson):
+ if err = srvMngr.reloadService(config.RateSJson); err != nil {
+ return
+ }
case <-srvMngr.GetConfig().GetReloadChan(config.RPCConnsJsonName):
engine.Cache.Clear([]string{utils.CacheRPCConnections})
}
diff --git a/utils/consts.go b/utils/consts.go
index 9da3ede6f..79fd4f056 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -380,6 +380,7 @@ const (
MetaCache = "*cache"
MetaGuardian = "*guardians"
MetaEEs = "*ees"
+ MetaRateS = "*rates"
MetaContinue = "*continue"
Migrator = "migrator"
UnsupportedMigrationTask = "unsupported migration task"
@@ -725,6 +726,7 @@ const (
MetaYearly = "*yearly"
MetaDaily = "*daily"
MetaWeekly = "*weekly"
+ RateS = "RateS"
)
// Migrator Action