mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-17 14:19:54 +05:00
Adding chargers_conns configuration in CDRS
This commit is contained in:
@@ -389,12 +389,25 @@ func startHTTPAgent(internalSMGChan chan rpcclient.RpcClientConnection,
|
||||
|
||||
func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
|
||||
cdrDb engine.CdrStorage, dm *engine.DataManager,
|
||||
internalRaterChan, internalPubSubSChan, internalAttributeSChan, internalUserSChan, internalAliaseSChan,
|
||||
internalCdrStatSChan, internalThresholdSChan, internalStatSChan chan rpcclient.RpcClientConnection,
|
||||
internalRaterChan, internalPubSubSChan, internalAttributeSChan,
|
||||
internalUserSChan, internalAliaseSChan,
|
||||
internalCdrStatSChan, internalThresholdSChan, internalStatSChan,
|
||||
internalChargerSChan chan rpcclient.RpcClientConnection,
|
||||
server *utils.Server, exitChan chan bool) {
|
||||
var err error
|
||||
utils.Logger.Info("Starting CGRateS CDRS service.")
|
||||
var ralConn, pubSubConn, usersConn, attrSConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn *rpcclient.RpcClientPool
|
||||
var ralConn, pubSubConn, usersConn, attrSConn, aliasesConn, cdrstatsConn,
|
||||
thresholdSConn, statsConn, chargerSConn *rpcclient.RpcClientPool
|
||||
if len(cfg.CDRSChargerSConns) != 0 { // Conn pool towards RAL
|
||||
chargerSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate,
|
||||
cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
cfg.CDRSChargerSConns, internalChargerSChan, cfg.InternalTtl)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s", utils.ChargerS, err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(cfg.CDRSRaterConns) != 0 { // Conn pool towards RAL
|
||||
ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate,
|
||||
cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
@@ -477,7 +490,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
|
||||
}
|
||||
}
|
||||
cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, dm, ralConn, pubSubConn,
|
||||
attrSConn, usersConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn)
|
||||
attrSConn, usersConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn, chargerSConn)
|
||||
cdrServer.SetTimeToLive(cfg.ResponseCacheTTL, nil)
|
||||
utils.Logger.Info("Registering CDRS HTTP Handlers.")
|
||||
cdrServer.RegisterHandlersToServer(server)
|
||||
@@ -1205,7 +1218,8 @@ func main() {
|
||||
go startCDRS(internalCdrSChan, cdrDb, dm,
|
||||
internalRaterChan, internalPubSubSChan, internalAttributeSChan,
|
||||
internalUserSChan, internalAliaseSChan, internalCdrStatSChan,
|
||||
internalThresholdSChan, internalStatSChan, server, exitChan)
|
||||
internalThresholdSChan, internalStatSChan, internalChargerSChan,
|
||||
server, exitChan)
|
||||
}
|
||||
|
||||
// Start CDR Stats server
|
||||
|
||||
@@ -327,6 +327,7 @@ type CGRConfig struct {
|
||||
CDRSStoreCdrs bool // store cdrs in storDb
|
||||
CDRScdrAccountSummary bool
|
||||
CDRSSMCostRetries int
|
||||
CDRSChargerSConns []*HaPoolConfig
|
||||
CDRSRaterConns []*HaPoolConfig // address where to reach the Rater for cost calculation: <""|internal|x.y.z.y:1234>
|
||||
CDRSPubSubSConns []*HaPoolConfig // address where to reach the pubsub service: <""|internal|x.y.z.y:1234>
|
||||
CDRSAttributeSConns []*HaPoolConfig // address where to reach the users service: <""|internal|x.y.z.y:1234>
|
||||
@@ -417,6 +418,11 @@ func (self *CGRConfig) checkConfigSanity() error {
|
||||
}
|
||||
// CDRServer checks
|
||||
if self.CDRSEnabled {
|
||||
for _, conn := range self.CDRSChargerSConns {
|
||||
if conn.Address == utils.MetaInternal && !self.chargerSCfg.Enabled {
|
||||
return errors.New("ChargerS not enabled but requested by CDRS component.")
|
||||
}
|
||||
}
|
||||
for _, cdrsRaterConn := range self.CDRSRaterConns {
|
||||
if cdrsRaterConn.Address == utils.MetaInternal && !self.RALsEnabled {
|
||||
return errors.New("RALs not enabled but requested by CDRS component.")
|
||||
@@ -1180,6 +1186,13 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) (err error) {
|
||||
if jsnCdrsCfg.Sessions_cost_retries != nil {
|
||||
self.CDRSSMCostRetries = *jsnCdrsCfg.Sessions_cost_retries
|
||||
}
|
||||
if jsnCdrsCfg.Chargers_conns != nil {
|
||||
self.CDRSChargerSConns = make([]*HaPoolConfig, len(*jsnCdrsCfg.Chargers_conns))
|
||||
for idx, jsnHaCfg := range *jsnCdrsCfg.Chargers_conns {
|
||||
self.CDRSChargerSConns[idx] = NewDfltHaPoolConfig()
|
||||
self.CDRSChargerSConns[idx].loadFromJsonCfg(jsnHaCfg)
|
||||
}
|
||||
}
|
||||
if jsnCdrsCfg.Rals_conns != nil {
|
||||
self.CDRSRaterConns = make([]*HaPoolConfig, len(*jsnCdrsCfg.Rals_conns))
|
||||
for idx, jsnHaCfg := range *jsnCdrsCfg.Rals_conns {
|
||||
|
||||
@@ -180,6 +180,7 @@ const CGRATES_CFG_JSON = `
|
||||
"extra_fields": [], // extra fields to store in CDRs for non-generic CDRs
|
||||
"store_cdrs": true, // store cdrs in storDb
|
||||
"sessions_cost_retries": 5, // number of queries to sessions_costs before recalculating CDR
|
||||
"chargers_conns": [], // address where to reach the charger service, empty to disable charger functionality: <""|*internal|x.y.z.y:1234>
|
||||
"rals_conns": [
|
||||
{"address": "*internal"} // address where to reach the Rater for cost calculation, empty to disable functionality: <""|*internal|x.y.z.y:1234>
|
||||
],
|
||||
|
||||
@@ -277,6 +277,7 @@ func TestDfCdrsJsonCfg(t *testing.T) {
|
||||
Extra_fields: &[]string{},
|
||||
Store_cdrs: utils.BoolPointer(true),
|
||||
Sessions_cost_retries: utils.IntPointer(5),
|
||||
Chargers_conns: &[]*HaPoolJsonCfg{},
|
||||
Rals_conns: &[]*HaPoolJsonCfg{
|
||||
&HaPoolJsonCfg{
|
||||
Address: utils.StringPointer("*internal"),
|
||||
|
||||
@@ -506,6 +506,9 @@ func TestCgrCfgJSONDefaultsCDRS(t *testing.T) {
|
||||
if !reflect.DeepEqual(cgrCfg.CDRSRaterConns, []*HaPoolConfig{&HaPoolConfig{Address: "*internal"}}) {
|
||||
t.Error(cgrCfg.CDRSRaterConns)
|
||||
}
|
||||
if !reflect.DeepEqual(cgrCfg.CDRSChargerSConns, eHaPoolCfg) {
|
||||
t.Error(cgrCfg.CDRSChargerSConns)
|
||||
}
|
||||
if !reflect.DeepEqual(cgrCfg.CDRSPubSubSConns, eHaPoolCfg) {
|
||||
t.Error(cgrCfg.CDRSPubSubSConns)
|
||||
}
|
||||
|
||||
@@ -115,6 +115,7 @@ type CdrsJsonCfg struct {
|
||||
Extra_fields *[]string
|
||||
Store_cdrs *bool
|
||||
Sessions_cost_retries *int
|
||||
Chargers_conns *[]*HaPoolJsonCfg
|
||||
Rals_conns *[]*HaPoolJsonCfg
|
||||
Pubsubs_conns *[]*HaPoolJsonCfg
|
||||
Attributes_conns *[]*HaPoolJsonCfg
|
||||
|
||||
@@ -69,7 +69,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, rater, pubsub,
|
||||
attrs, users, aliases, cdrstats, thdS, stats rpcclient.RpcClientConnection) (*CdrServer, error) {
|
||||
attrs, users, aliases, cdrstats, thdS, stats, chargerS rpcclient.RpcClientConnection) (*CdrServer, error) {
|
||||
if rater != nil && reflect.ValueOf(rater).IsNil() { // Work around so we store actual nil instead of nil interface value, faster to check here than in CdrServer code
|
||||
rater = nil
|
||||
}
|
||||
@@ -94,9 +94,13 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, r
|
||||
if stats != nil && reflect.ValueOf(stats).IsNil() {
|
||||
stats = nil
|
||||
}
|
||||
if chargerS != nil && reflect.ValueOf(chargerS).IsNil() {
|
||||
chargerS = nil
|
||||
}
|
||||
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dm: dm,
|
||||
rals: rater, pubsub: pubsub, users: users, aliases: aliases,
|
||||
cdrstats: cdrstats, stats: stats, thdS: thdS, guard: guardian.Guardian,
|
||||
cdrstats: cdrstats, stats: stats, thdS: thdS,
|
||||
chargerS: chargerS, guard: guardian.Guardian,
|
||||
httpPoster: NewHTTPPoster(cgrCfg.HttpSkipTlsVerify, cgrCfg.ReplyTimeout)}, nil
|
||||
}
|
||||
|
||||
@@ -112,6 +116,7 @@ type CdrServer struct {
|
||||
cdrstats rpcclient.RpcClientConnection
|
||||
thdS rpcclient.RpcClientConnection
|
||||
stats rpcclient.RpcClientConnection
|
||||
chargerS rpcclient.RpcClientConnection
|
||||
guard *guardian.GuardianLocker
|
||||
responseCache *utils.ResponseCache
|
||||
httpPoster *HTTPPoster // used for replication
|
||||
|
||||
Reference in New Issue
Block a user