mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
CDRS - injecting AccountDigest information in CDRS based on cdr_account_digest parameter
This commit is contained in:
@@ -301,7 +301,7 @@ func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage,
|
||||
func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, dataDB engine.AccountingStorage,
|
||||
internalRaterChan chan rpcclient.RpcClientConnection, internalPubSubSChan chan rpcclient.RpcClientConnection,
|
||||
internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection,
|
||||
internalCdrStatSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
|
||||
@@ -353,7 +353,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, cdrDb engine
|
||||
return
|
||||
}
|
||||
}
|
||||
cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, ralConn, pubSubConn, usersConn, aliasesConn, statsConn)
|
||||
cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, dataDB, ralConn, pubSubConn, usersConn, aliasesConn, statsConn)
|
||||
cdrServer.SetTimeToLive(cfg.ResponseCacheTTL, nil)
|
||||
utils.Logger.Info("Registering CDRS HTTP Handlers.")
|
||||
cdrServer.RegisterHandlersToServer(server)
|
||||
@@ -632,7 +632,8 @@ func main() {
|
||||
|
||||
// Start CDR Server
|
||||
if cfg.CDRSEnabled {
|
||||
go startCDRS(internalCdrSChan, cdrDb, internalRaterChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalCdrStatSChan, server, exitChan)
|
||||
go startCDRS(internalCdrSChan, cdrDb, accountDb,
|
||||
internalRaterChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalCdrStatSChan, server, exitChan)
|
||||
}
|
||||
|
||||
// Start CDR Stats server
|
||||
|
||||
@@ -225,9 +225,10 @@ type CGRConfig struct {
|
||||
LcrSubjectPrefixMatching bool // enables prefix matching for the lcr subject
|
||||
BalancerEnabled bool
|
||||
SchedulerEnabled bool
|
||||
CDRSEnabled bool // Enable CDR Server service
|
||||
CDRSExtraFields []*utils.RSRField // Extra fields to store in CDRs
|
||||
CDRSStoreCdrs bool // store cdrs in storDb
|
||||
CDRSEnabled bool // Enable CDR Server service
|
||||
CDRSExtraFields []*utils.RSRField // Extra fields to store in CDRs
|
||||
CDRSStoreCdrs bool // store cdrs in storDb
|
||||
CDRScdrAccountDigest bool
|
||||
CDRSRaterConns []*HaPoolConfig // address where to reach the Rater for cost calculation: <""|internal|x.y.z.y:1234>
|
||||
CDRSPubSubSConns []*HaPoolConfig // address where to reach the pubsub service: <""|internal|x.y.z.y:1234>
|
||||
CDRSUserSConns []*HaPoolConfig // address where to reach the users service: <""|internal|x.y.z.y:1234>
|
||||
@@ -816,6 +817,9 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
|
||||
if jsnCdrsCfg.Store_cdrs != nil {
|
||||
self.CDRSStoreCdrs = *jsnCdrsCfg.Store_cdrs
|
||||
}
|
||||
if jsnCdrsCfg.Cdr_account_digest != nil {
|
||||
self.CDRScdrAccountDigest = *jsnCdrsCfg.Cdr_account_digest
|
||||
}
|
||||
if jsnCdrsCfg.Rals_conns != nil {
|
||||
self.CDRSRaterConns = make([]*HaPoolConfig, len(*jsnCdrsCfg.Rals_conns))
|
||||
for idx, jsnHaCfg := range *jsnCdrsCfg.Rals_conns {
|
||||
|
||||
@@ -132,6 +132,7 @@ const CGRATES_CFG_JSON = `
|
||||
"enabled": false, // start the CDR Server service: <true|false>
|
||||
"extra_fields": [], // extra fields to store in CDRs for non-generic CDRs
|
||||
"store_cdrs": true, // store cdrs in storDb
|
||||
"cdr_account_digest": false, // add account information from dataDB
|
||||
"rals_conns": [
|
||||
{"address": "*internal"} // address where to reach the Rater for cost calculation, empty to disable functionality: <""|*internal|x.y.z.y:1234>
|
||||
],
|
||||
|
||||
@@ -174,9 +174,10 @@ func TestDfSchedulerJsonCfg(t *testing.T) {
|
||||
|
||||
func TestDfCdrsJsonCfg(t *testing.T) {
|
||||
eCfg := &CdrsJsonCfg{
|
||||
Enabled: utils.BoolPointer(false),
|
||||
Extra_fields: utils.StringSlicePointer([]string{}),
|
||||
Store_cdrs: utils.BoolPointer(true),
|
||||
Enabled: utils.BoolPointer(false),
|
||||
Extra_fields: utils.StringSlicePointer([]string{}),
|
||||
Store_cdrs: utils.BoolPointer(true),
|
||||
Cdr_account_digest: utils.BoolPointer(false),
|
||||
Rals_conns: &[]*HaPoolJsonCfg{
|
||||
&HaPoolJsonCfg{
|
||||
Address: utils.StringPointer("*internal"),
|
||||
|
||||
@@ -85,15 +85,16 @@ type SchedulerJsonCfg struct {
|
||||
|
||||
// Cdrs config section
|
||||
type CdrsJsonCfg struct {
|
||||
Enabled *bool
|
||||
Extra_fields *[]string
|
||||
Store_cdrs *bool
|
||||
Rals_conns *[]*HaPoolJsonCfg
|
||||
Pubsubs_conns *[]*HaPoolJsonCfg
|
||||
Users_conns *[]*HaPoolJsonCfg
|
||||
Aliases_conns *[]*HaPoolJsonCfg
|
||||
Cdrstats_conns *[]*HaPoolJsonCfg
|
||||
Cdr_replication *[]*CdrReplicationJsonCfg
|
||||
Enabled *bool
|
||||
Extra_fields *[]string
|
||||
Store_cdrs *bool
|
||||
Cdr_account_digest *bool
|
||||
Rals_conns *[]*HaPoolJsonCfg
|
||||
Pubsubs_conns *[]*HaPoolJsonCfg
|
||||
Users_conns *[]*HaPoolJsonCfg
|
||||
Aliases_conns *[]*HaPoolJsonCfg
|
||||
Cdrstats_conns *[]*HaPoolJsonCfg
|
||||
Cdr_replication *[]*CdrReplicationJsonCfg
|
||||
}
|
||||
|
||||
type CdrReplicationJsonCfg struct {
|
||||
|
||||
@@ -70,7 +70,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater, pubsub, users, aliases, stats rpcclient.RpcClientConnection) (*CdrServer, error) {
|
||||
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dataDB AccountingStorage, rater, pubsub, users, aliases, stats rpcclient.RpcClientConnection) (*CdrServer, error) {
|
||||
if rater == nil || reflect.ValueOf(rater).IsNil() { // Work around so we store actual nil instead of nil interface value, faster to check here than in CdrServer code
|
||||
rater = nil
|
||||
}
|
||||
@@ -92,6 +92,7 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater, pubsub, use
|
||||
type CdrServer struct {
|
||||
cgrCfg *config.CGRConfig
|
||||
cdrDb CdrStorage
|
||||
dataDB AccountingStorage
|
||||
rals rpcclient.RpcClientConnection
|
||||
pubsub rpcclient.RpcClientConnection
|
||||
users rpcclient.RpcClientConnection
|
||||
@@ -243,6 +244,16 @@ func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR, store, stats, rep
|
||||
if ratedCDR.CostDetails != nil {
|
||||
ratedCDR.CostDetails.UpdateCost()
|
||||
ratedCDR.CostDetails.UpdateRatedUsage()
|
||||
if utils.IsSliceMember([]string{utils.META_PREPAID, utils.PREPAID, utils.META_PSEUDOPREPAID, utils.PSEUDOPREPAID,
|
||||
utils.META_POSTPAID, utils.POSTPAID}, ratedCDR.RequestType) && self.cgrCfg.CDRScdrAccountDigest {
|
||||
acntID := utils.ConcatenatedKey(ratedCDR.Tenant, ratedCDR.Account)
|
||||
acnt, err := self.dataDB.GetAccount(acntID)
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CDRS> Querying AccountDigest for account: %s got error: %s", acntID, err.Error()))
|
||||
} else if acnt.ID != "" {
|
||||
ratedCDR.CostDetails.AccountDigest = acnt.AsAccountDigest()
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := self.cdrDb.SetCDR(ratedCDR, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CDRS> Storing rated CDR %+v, got error: %s", ratedCDR, err.Error()))
|
||||
|
||||
Reference in New Issue
Block a user