From 31752db840ab7a1fa4c21ddaf477fd1fa84be354 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 16 Jul 2018 20:27:47 +0200 Subject: [PATCH] Adding chargers_conns configuration in CDRS --- cmd/cgr-engine/cgr-engine.go | 24 +++++++++++++++++++----- config/config.go | 13 +++++++++++++ config/config_defaults.go | 1 + config/config_json_test.go | 1 + config/config_test.go | 3 +++ config/libconfig_json.go | 1 + engine/cdrs.go | 9 +++++++-- 7 files changed, 45 insertions(+), 7 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index e27a14d58..b3696e082 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -389,12 +389,25 @@ func startHTTPAgent(internalSMGChan chan rpcclient.RpcClientConnection, func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, dm *engine.DataManager, - internalRaterChan, internalPubSubSChan, internalAttributeSChan, internalUserSChan, internalAliaseSChan, - internalCdrStatSChan, internalThresholdSChan, internalStatSChan chan rpcclient.RpcClientConnection, + internalRaterChan, internalPubSubSChan, internalAttributeSChan, + internalUserSChan, internalAliaseSChan, + internalCdrStatSChan, internalThresholdSChan, internalStatSChan, + internalChargerSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { var err error utils.Logger.Info("Starting CGRateS CDRS service.") - var ralConn, pubSubConn, usersConn, attrSConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn *rpcclient.RpcClientPool + var ralConn, pubSubConn, usersConn, attrSConn, aliasesConn, cdrstatsConn, + thresholdSConn, statsConn, chargerSConn *rpcclient.RpcClientPool + if len(cfg.CDRSChargerSConns) != 0 { // Conn pool towards RAL + chargerSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate, + cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.CDRSChargerSConns, internalChargerSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", utils.ChargerS, err.Error())) + exitChan <- true + return + } + } if len(cfg.CDRSRaterConns) != 0 { // Conn pool towards RAL ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, @@ -477,7 +490,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, } } cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, dm, ralConn, pubSubConn, - attrSConn, usersConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn) + attrSConn, usersConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn, chargerSConn) cdrServer.SetTimeToLive(cfg.ResponseCacheTTL, nil) utils.Logger.Info("Registering CDRS HTTP Handlers.") cdrServer.RegisterHandlersToServer(server) @@ -1205,7 +1218,8 @@ func main() { go startCDRS(internalCdrSChan, cdrDb, dm, internalRaterChan, internalPubSubSChan, internalAttributeSChan, internalUserSChan, internalAliaseSChan, internalCdrStatSChan, - internalThresholdSChan, internalStatSChan, server, exitChan) + internalThresholdSChan, internalStatSChan, internalChargerSChan, + server, exitChan) } // Start CDR Stats server diff --git a/config/config.go b/config/config.go index f9f1b6083..a1e0ce252 100755 --- a/config/config.go +++ b/config/config.go @@ -327,6 +327,7 @@ type CGRConfig struct { CDRSStoreCdrs bool // store cdrs in storDb CDRScdrAccountSummary bool CDRSSMCostRetries int + CDRSChargerSConns []*HaPoolConfig CDRSRaterConns []*HaPoolConfig // address where to reach the Rater for cost calculation: <""|internal|x.y.z.y:1234> CDRSPubSubSConns []*HaPoolConfig // address where to reach the pubsub service: <""|internal|x.y.z.y:1234> CDRSAttributeSConns []*HaPoolConfig // address where to reach the users service: <""|internal|x.y.z.y:1234> @@ -417,6 +418,11 @@ func (self *CGRConfig) checkConfigSanity() error { } // CDRServer checks if self.CDRSEnabled { + for _, conn := range self.CDRSChargerSConns { + if conn.Address == utils.MetaInternal && !self.chargerSCfg.Enabled { + return errors.New("ChargerS not enabled but requested by CDRS component.") + } + } for _, cdrsRaterConn := range self.CDRSRaterConns { if cdrsRaterConn.Address == utils.MetaInternal && !self.RALsEnabled { return errors.New("RALs not enabled but requested by CDRS component.") @@ -1180,6 +1186,13 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) (err error) { if jsnCdrsCfg.Sessions_cost_retries != nil { self.CDRSSMCostRetries = *jsnCdrsCfg.Sessions_cost_retries } + if jsnCdrsCfg.Chargers_conns != nil { + self.CDRSChargerSConns = make([]*HaPoolConfig, len(*jsnCdrsCfg.Chargers_conns)) + for idx, jsnHaCfg := range *jsnCdrsCfg.Chargers_conns { + self.CDRSChargerSConns[idx] = NewDfltHaPoolConfig() + self.CDRSChargerSConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } if jsnCdrsCfg.Rals_conns != nil { self.CDRSRaterConns = make([]*HaPoolConfig, len(*jsnCdrsCfg.Rals_conns)) for idx, jsnHaCfg := range *jsnCdrsCfg.Rals_conns { diff --git a/config/config_defaults.go b/config/config_defaults.go index 96c761d8b..56ddf3cc3 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -180,6 +180,7 @@ const CGRATES_CFG_JSON = ` "extra_fields": [], // extra fields to store in CDRs for non-generic CDRs "store_cdrs": true, // store cdrs in storDb "sessions_cost_retries": 5, // number of queries to sessions_costs before recalculating CDR + "chargers_conns": [], // address where to reach the charger service, empty to disable charger functionality: <""|*internal|x.y.z.y:1234> "rals_conns": [ {"address": "*internal"} // address where to reach the Rater for cost calculation, empty to disable functionality: <""|*internal|x.y.z.y:1234> ], diff --git a/config/config_json_test.go b/config/config_json_test.go index 28c6aa289..7307e774f 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -277,6 +277,7 @@ func TestDfCdrsJsonCfg(t *testing.T) { Extra_fields: &[]string{}, Store_cdrs: utils.BoolPointer(true), Sessions_cost_retries: utils.IntPointer(5), + Chargers_conns: &[]*HaPoolJsonCfg{}, Rals_conns: &[]*HaPoolJsonCfg{ &HaPoolJsonCfg{ Address: utils.StringPointer("*internal"), diff --git a/config/config_test.go b/config/config_test.go index 4b23d1e5f..a185d53c6 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -506,6 +506,9 @@ func TestCgrCfgJSONDefaultsCDRS(t *testing.T) { if !reflect.DeepEqual(cgrCfg.CDRSRaterConns, []*HaPoolConfig{&HaPoolConfig{Address: "*internal"}}) { t.Error(cgrCfg.CDRSRaterConns) } + if !reflect.DeepEqual(cgrCfg.CDRSChargerSConns, eHaPoolCfg) { + t.Error(cgrCfg.CDRSChargerSConns) + } if !reflect.DeepEqual(cgrCfg.CDRSPubSubSConns, eHaPoolCfg) { t.Error(cgrCfg.CDRSPubSubSConns) } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 9e27d5d83..4ac8452a5 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -115,6 +115,7 @@ type CdrsJsonCfg struct { Extra_fields *[]string Store_cdrs *bool Sessions_cost_retries *int + Chargers_conns *[]*HaPoolJsonCfg Rals_conns *[]*HaPoolJsonCfg Pubsubs_conns *[]*HaPoolJsonCfg Attributes_conns *[]*HaPoolJsonCfg diff --git a/engine/cdrs.go b/engine/cdrs.go index c805acb98..acf54ad08 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -69,7 +69,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { } func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, rater, pubsub, - attrs, users, aliases, cdrstats, thdS, stats rpcclient.RpcClientConnection) (*CdrServer, error) { + attrs, users, aliases, cdrstats, thdS, stats, chargerS rpcclient.RpcClientConnection) (*CdrServer, error) { if rater != nil && reflect.ValueOf(rater).IsNil() { // Work around so we store actual nil instead of nil interface value, faster to check here than in CdrServer code rater = nil } @@ -94,9 +94,13 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, r if stats != nil && reflect.ValueOf(stats).IsNil() { stats = nil } + if chargerS != nil && reflect.ValueOf(chargerS).IsNil() { + chargerS = nil + } return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dm: dm, rals: rater, pubsub: pubsub, users: users, aliases: aliases, - cdrstats: cdrstats, stats: stats, thdS: thdS, guard: guardian.Guardian, + cdrstats: cdrstats, stats: stats, thdS: thdS, + chargerS: chargerS, guard: guardian.Guardian, httpPoster: NewHTTPPoster(cgrCfg.HttpSkipTlsVerify, cgrCfg.ReplyTimeout)}, nil } @@ -112,6 +116,7 @@ type CdrServer struct { cdrstats rpcclient.RpcClientConnection thdS rpcclient.RpcClientConnection stats rpcclient.RpcClientConnection + chargerS rpcclient.RpcClientConnection guard *guardian.GuardianLocker responseCache *utils.ResponseCache httpPoster *HTTPPoster // used for replication