mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
Adding service infrastructure for the RateS
This commit is contained in:
@@ -115,7 +115,7 @@ func startRpc(server *utils.Server, internalRaterChan,
|
||||
internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan,
|
||||
internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan,
|
||||
internalLoaderSChan, internalRALsv1Chan, internalCacheSChan,
|
||||
internalEEsChan chan rpcclient.ClientConnector,
|
||||
internalEEsChan, internalRateSChan chan rpcclient.ClientConnector,
|
||||
exitChan chan bool) {
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
select { // Any of the rpc methods will unlock listening to rpc requests
|
||||
@@ -147,6 +147,8 @@ func startRpc(server *utils.Server, internalRaterChan,
|
||||
internalCacheSChan <- chS
|
||||
case eeS := <-internalEEsChan:
|
||||
internalEEsChan <- eeS
|
||||
case rateS := <-internalRateSChan:
|
||||
internalRateSChan <- rateS
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
@@ -416,6 +418,7 @@ func main() {
|
||||
internalAPIerSv2Chan := make(chan rpcclient.ClientConnector, 1)
|
||||
internalLoaderSChan := make(chan rpcclient.ClientConnector, 1)
|
||||
internalEEsChan := make(chan rpcclient.ClientConnector, 1)
|
||||
internalRateSChan := make(chan rpcclient.ClientConnector, 1)
|
||||
|
||||
// initialize the connManager before creating the DMService
|
||||
// because we need to pass the connection to it
|
||||
@@ -440,6 +443,7 @@ func main() {
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore): internalCoreSv1Chan,
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRALs): internalRALsChan,
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs): internalEEsChan,
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRateS): internalRateSChan,
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan,
|
||||
})
|
||||
|
||||
@@ -523,6 +527,8 @@ func main() {
|
||||
ldrs, anz, dspS, dmService, storDBService,
|
||||
services.NewEventExporterService(cfg, filterSChan,
|
||||
connManager, server, exitChan, internalEEsChan),
|
||||
services.NewRateService(cfg, filterSChan,
|
||||
server, exitChan, internalRateSChan),
|
||||
)
|
||||
srvManager.StartServices()
|
||||
// Start FilterS
|
||||
@@ -563,7 +569,7 @@ func main() {
|
||||
attrS.GetIntenternalChan(), chrS.GetIntenternalChan(), tS.GetIntenternalChan(),
|
||||
routeS.GetIntenternalChan(), smg.GetIntenternalChan(), anz.GetIntenternalChan(),
|
||||
dspS.GetIntenternalChan(), ldrs.GetIntenternalChan(), rals.GetIntenternalChan(),
|
||||
internalCacheSChan, internalEEsChan, exitChan)
|
||||
internalCacheSChan, internalEEsChan, internalRateSChan, exitChan)
|
||||
<-exitChan
|
||||
|
||||
if *cpuProfDir != "" { // wait to end cpuProfiling
|
||||
|
||||
@@ -303,6 +303,7 @@ type CGRConfig struct {
|
||||
apier *ApierCfg // APIer config
|
||||
ersCfg *ERsCfg // EventReader config
|
||||
eesCfg *EEsCfg // EventExporter config
|
||||
rateSCfg *RateSCfg // RateS config
|
||||
}
|
||||
|
||||
var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes,
|
||||
@@ -990,6 +991,12 @@ func (cfg *CGRConfig) EEsNoLksCfg() *EEsCfg {
|
||||
return cfg.eesCfg
|
||||
}
|
||||
|
||||
func (cfg *CGRConfig) RateSCfg() *RateSCfg {
|
||||
cfg.lks[RateSJson].RLock()
|
||||
defer cfg.lks[RateSJson].RUnlock()
|
||||
return cfg.rateSCfg
|
||||
}
|
||||
|
||||
// RPCConns reads the RPCConns configuration
|
||||
func (cfg *CGRConfig) RPCConns() map[string]*RPCConn {
|
||||
cfg.lks[RPCConnsJsonName].RLock()
|
||||
|
||||
@@ -59,6 +59,7 @@ const (
|
||||
DNSAgentJson = "dns_agent"
|
||||
ERsJson = "ers"
|
||||
EEsJson = "ees"
|
||||
RateSJson = "rates"
|
||||
RPCConnsJsonName = "rpc_conns"
|
||||
)
|
||||
|
||||
@@ -68,7 +69,7 @@ var (
|
||||
CDRS_JSN, CDRE_JSN, ERsJson, SessionSJson, AsteriskAgentJSN, FreeSWITCHAgentJSN, KamailioAgentJSN,
|
||||
DA_JSN, RA_JSN, HttpAgentJson, DNSAgentJson, ATTRIBUTE_JSN, ChargerSCfgJson, RESOURCES_JSON, STATS_JSON, THRESHOLDS_JSON,
|
||||
RouteSJson, LoaderJson, MAILER_JSN, SURETAX_JSON, CgrLoaderCfgJson, CgrMigratorCfgJson, DispatcherSJson,
|
||||
AnalyzerCfgJson, ApierS, EEsJson}
|
||||
AnalyzerCfgJson, ApierS, EEsJson, RateSJson}
|
||||
)
|
||||
|
||||
// Loads the json config out of io.Reader, eg other sources than file, maybe over http
|
||||
|
||||
23
config/ratescfg.go
Normal file
23
config/ratescfg.go
Normal file
@@ -0,0 +1,23 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package config
|
||||
|
||||
type RateSCfg struct {
|
||||
Enabled bool
|
||||
}
|
||||
74
rates/rates.go
Normal file
74
rates/rates.go
Normal file
@@ -0,0 +1,74 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package rates
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// NewRateS instantiates the RateS
|
||||
func NewRateS(cfg *config.CGRConfig, filterS *engine.FilterS) *RateS {
|
||||
return &RateS{
|
||||
cfg: cfg,
|
||||
filterS: filterS,
|
||||
}
|
||||
}
|
||||
|
||||
// RateS calculates costs for events
|
||||
type RateS struct {
|
||||
cfg *config.CGRConfig
|
||||
filterS *engine.FilterS
|
||||
}
|
||||
|
||||
// ListenAndServe keeps the service alive
|
||||
func (rS *RateS) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (err error) {
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s>",
|
||||
utils.CoreS, utils.RateS))
|
||||
for {
|
||||
select {
|
||||
case e := <-exitChan: // global exit
|
||||
rS.Shutdown()
|
||||
exitChan <- e // put back for the others listening for shutdown request
|
||||
break
|
||||
case rld := <-cfgRld: // configuration was reloaded, destroy the cache
|
||||
cfgRld <- rld
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Shutdown is called to shutdown the service
|
||||
func (rS *RateS) Shutdown() (err error) {
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.RateS))
|
||||
return
|
||||
}
|
||||
|
||||
// Call implements rpcclient.ClientConnector interface for internal RPC
|
||||
func (rS *RateS) Call(serviceMethod string, args interface{}, reply interface{}) error {
|
||||
return utils.RPCCall(rS, serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
// V1CostForEvent will be called to calculate the cost for an event
|
||||
func (rS *RateS) V1CostForEvent(cgrEv *utils.CGREventWithOpts, rply *string) (err error) {
|
||||
return
|
||||
}
|
||||
@@ -83,7 +83,7 @@ func (es *EventExporterService) ShouldRun() (should bool) {
|
||||
func (es *EventExporterService) IsRunning() bool {
|
||||
es.RLock()
|
||||
defer es.RUnlock()
|
||||
return es != nil && es.eeS != nil
|
||||
return es.eeS != nil
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
|
||||
124
services/rates.go
Normal file
124
services/rates.go
Normal file
@@ -0,0 +1,124 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/cgrates/rates"
|
||||
//"github.com/cgrates/cgrates/apier/v1"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// NewRateService constructs RateService
|
||||
func NewRateService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
|
||||
server *utils.Server, exitChan chan bool,
|
||||
intConnChan chan rpcclient.ClientConnector) servmanager.Service {
|
||||
return &RateService{
|
||||
cfg: cfg,
|
||||
filterSChan: filterSChan,
|
||||
server: server,
|
||||
exitChan: exitChan,
|
||||
intConnChan: intConnChan,
|
||||
rldChan: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// RateService is the service structure for RateS
|
||||
type RateService struct {
|
||||
sync.RWMutex
|
||||
|
||||
cfg *config.CGRConfig
|
||||
filterSChan chan *engine.FilterS
|
||||
server *utils.Server
|
||||
exitChan chan bool
|
||||
intConnChan chan rpcclient.ClientConnector
|
||||
rldChan chan struct{}
|
||||
|
||||
rateS *rates.RateS
|
||||
//rpc *v1.EventExporterSv1
|
||||
}
|
||||
|
||||
// GetIntenternalChan is deprecated and it will be removed shortly
|
||||
func (rs *RateService) GetIntenternalChan() (conn chan rpcclient.ClientConnector) {
|
||||
panic("deprecated method")
|
||||
}
|
||||
|
||||
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (rs *RateService) ServiceName() string {
|
||||
return utils.RateS
|
||||
}
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (rs *RateService) ShouldRun() (should bool) {
|
||||
return rs.cfg.RateSCfg().Enabled
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (rs *RateService) IsRunning() bool {
|
||||
rs.RLock()
|
||||
defer rs.RUnlock()
|
||||
return rs.rateS != nil
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (rs *RateService) Reload() (err error) {
|
||||
rs.rldChan <- struct{}{}
|
||||
return
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (rs *RateService) Shutdown() (err error) {
|
||||
rs.Lock()
|
||||
defer rs.Unlock()
|
||||
if err = rs.rateS.Shutdown(); err != nil {
|
||||
return
|
||||
}
|
||||
rs.rateS = nil
|
||||
<-rs.intConnChan
|
||||
return
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (rs *RateService) Start() (err error) {
|
||||
if rs.IsRunning() {
|
||||
return fmt.Errorf("service aleady running")
|
||||
}
|
||||
|
||||
fltrS := <-rs.filterSChan
|
||||
rs.filterSChan <- fltrS
|
||||
|
||||
rs.Lock()
|
||||
rs.rateS = rates.NewRateS(rs.cfg, fltrS)
|
||||
rs.Unlock()
|
||||
/*rs.rpc = v1.NewEventExporterSv1(es.eeS)
|
||||
if !rs.cfg.DispatcherSCfg().Enabled {
|
||||
rs.server.RpcRegister(es.rpc)
|
||||
}
|
||||
*/
|
||||
rs.intConnChan <- rs.rateS
|
||||
return rs.rateS.ListenAndServe(rs.exitChan, rs.rldChan)
|
||||
}
|
||||
@@ -173,6 +173,7 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
|
||||
utils.AnalyzerS: srvMngr.GetConfig().AnalyzerSCfg().Enabled,
|
||||
utils.DispatcherS: srvMngr.GetConfig().DispatcherSCfg().Enabled,
|
||||
utils.EventExporterS: srvMngr.GetConfig().EEsCfg().Enabled,
|
||||
utils.RateS: srvMngr.GetConfig().RateSCfg().Enabled,
|
||||
} {
|
||||
if shouldRun {
|
||||
go srvMngr.startService(serviceName)
|
||||
@@ -309,6 +310,14 @@ func (srvMngr *ServiceManager) handleReload() {
|
||||
if err = srvMngr.reloadService(utils.StorDB); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.EEsJson):
|
||||
if err = srvMngr.reloadService(config.EEsJson); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.RateSJson):
|
||||
if err = srvMngr.reloadService(config.RateSJson); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.RPCConnsJsonName):
|
||||
engine.Cache.Clear([]string{utils.CacheRPCConnections})
|
||||
}
|
||||
|
||||
@@ -380,6 +380,7 @@ const (
|
||||
MetaCache = "*cache"
|
||||
MetaGuardian = "*guardians"
|
||||
MetaEEs = "*ees"
|
||||
MetaRateS = "*rates"
|
||||
MetaContinue = "*continue"
|
||||
Migrator = "migrator"
|
||||
UnsupportedMigrationTask = "unsupported migration task"
|
||||
@@ -725,6 +726,7 @@ const (
|
||||
MetaYearly = "*yearly"
|
||||
MetaDaily = "*daily"
|
||||
MetaWeekly = "*weekly"
|
||||
RateS = "RateS"
|
||||
)
|
||||
|
||||
// Migrator Action
|
||||
|
||||
Reference in New Issue
Block a user