mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-15 13:19:53 +05:00
CDRS to work with *raw CDRs
This commit is contained in:
123
engine/cdrs.go
123
engine/cdrs.go
@@ -184,21 +184,8 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) {
|
||||
if cdr.Subject == "" { // Use account information as rating subject if missing
|
||||
cdr.Subject = cdr.Account
|
||||
}
|
||||
// replace aliases
|
||||
if err := LoadAlias(&AttrMatchingAlias{
|
||||
Destination: cdr.Destination,
|
||||
Direction: cdr.Direction,
|
||||
Tenant: cdr.Tenant,
|
||||
Category: cdr.Category,
|
||||
Account: cdr.Account,
|
||||
Subject: cdr.Subject,
|
||||
Context: utils.ALIAS_CONTEXT_RATING,
|
||||
}, cdr, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
// replace user profile fields
|
||||
if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil {
|
||||
return err
|
||||
if !cdr.Rated {
|
||||
cdr.RunID = utils.MetaRaw
|
||||
}
|
||||
if self.cgrCfg.CDRSStoreCdrs { // Store RawCDRs, this we do sync so we can reply with the status
|
||||
if err := self.cdrDb.SetCDR(cdr, false); err != nil { // Only original CDR stored in primary table, no derived
|
||||
@@ -226,21 +213,19 @@ func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR) error {
|
||||
}
|
||||
|
||||
func (self *CdrServer) rateStoreStatsReplicate(cdr *CDR) error {
|
||||
if cdr.RunID != utils.META_DEFAULT { // Process Aliases and Users for derived CDRs
|
||||
if err := LoadAlias(&AttrMatchingAlias{
|
||||
Destination: cdr.Destination,
|
||||
Direction: cdr.Direction,
|
||||
Tenant: cdr.Tenant,
|
||||
Category: cdr.Category,
|
||||
Account: cdr.Account,
|
||||
Subject: cdr.Subject,
|
||||
Context: utils.ALIAS_CONTEXT_RATING,
|
||||
}, cdr, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := LoadAlias(&AttrMatchingAlias{
|
||||
Destination: cdr.Destination,
|
||||
Direction: cdr.Direction,
|
||||
Tenant: cdr.Tenant,
|
||||
Category: cdr.Category,
|
||||
Account: cdr.Account,
|
||||
Subject: cdr.Subject,
|
||||
Context: utils.ALIAS_CONTEXT_RATING,
|
||||
}, cdr, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil {
|
||||
return err
|
||||
}
|
||||
// Rate CDR
|
||||
if self.rater != nil && !cdr.Rated {
|
||||
@@ -260,12 +245,6 @@ func (self *CdrServer) rateStoreStatsReplicate(cdr *CDR) error {
|
||||
if err := self.cdrDb.SetCDR(cdr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CDRS> Storing rated CDR %+v, got error: %s", cdr, err.Error()))
|
||||
}
|
||||
// Store CostDetails
|
||||
if cdr.Rated || utils.IsSliceMember([]string{utils.RATED, utils.META_RATED}, cdr.RequestType) { // Account related CDRs are saved automatically, so save the others here if requested
|
||||
if err := self.cdrDb.LogCallCost(cdr.CGRID, utils.CDRS_SOURCE, cdr.RunID, cdr.CostDetails); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CDRS> Storing costs for CDR %+v, costDetails: %+v, got error: %s", cdr, cdr.CostDetails, err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
// Attach CDR to stats
|
||||
if self.stats != nil { // Send CDR to stats
|
||||
@@ -280,11 +259,8 @@ func (self *CdrServer) rateStoreStatsReplicate(cdr *CDR) error {
|
||||
}
|
||||
|
||||
func (self *CdrServer) deriveCdrs(cdr *CDR) ([]*CDR, error) {
|
||||
if len(cdr.RunID) == 0 {
|
||||
cdr.RunID = utils.META_DEFAULT
|
||||
}
|
||||
cdrRuns := []*CDR{cdr}
|
||||
if cdr.Rated { // Do not derive already rated CDRs since they should be already derived
|
||||
if cdr.RunID != utils.MetaRaw { // Only derive *raw CDRs
|
||||
return cdrRuns, nil
|
||||
}
|
||||
attrsDC := &utils.AttrDerivedChargers{Tenant: cdr.Tenant, Category: cdr.Category, Direction: cdr.Direction,
|
||||
@@ -335,39 +311,6 @@ func (self *CdrServer) deriveCdrs(cdr *CDR) ([]*CDR, error) {
|
||||
return cdrRuns, nil
|
||||
}
|
||||
|
||||
// Retrive the cost from engine
|
||||
func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) {
|
||||
cc := new(CallCost)
|
||||
var err error
|
||||
timeStart := cdr.AnswerTime
|
||||
if timeStart.IsZero() { // Fix for FreeSWITCH unanswered calls
|
||||
timeStart = cdr.SetupTime
|
||||
}
|
||||
cd := &CallDescriptor{
|
||||
TOR: cdr.TOR,
|
||||
Direction: cdr.Direction,
|
||||
Tenant: cdr.Tenant,
|
||||
Category: cdr.Category,
|
||||
Subject: cdr.Subject,
|
||||
Account: cdr.Account,
|
||||
Destination: cdr.Destination,
|
||||
TimeStart: timeStart,
|
||||
TimeEnd: timeStart.Add(cdr.Usage),
|
||||
DurationIndex: cdr.Usage,
|
||||
}
|
||||
if utils.IsSliceMember([]string{utils.META_PSEUDOPREPAID, utils.META_POSTPAID, utils.META_PREPAID, utils.PSEUDOPREPAID, utils.POSTPAID, utils.PREPAID}, cdr.RequestType) { // Prepaid - Cost can be recalculated in case of missing records from SM
|
||||
if err = self.rater.Debit(cd, cc); err == nil { // Debit has occured, we are forced to write the log, even if CDR store is disabled
|
||||
self.cdrDb.LogCallCost(cdr.CGRID, utils.CDRS_SOURCE, cdr.RunID, cc)
|
||||
}
|
||||
} else {
|
||||
err = self.rater.GetCost(cd, cc)
|
||||
}
|
||||
if err != nil {
|
||||
return cc, err
|
||||
}
|
||||
return cc, nil
|
||||
}
|
||||
|
||||
func (self *CdrServer) rateCDR(cdr *CDR) error {
|
||||
var qryCC *CallCost
|
||||
var err error
|
||||
@@ -401,6 +344,39 @@ func (self *CdrServer) rateCDR(cdr *CDR) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Retrive the cost from engine
|
||||
func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) {
|
||||
cc := new(CallCost)
|
||||
var err error
|
||||
timeStart := cdr.AnswerTime
|
||||
if timeStart.IsZero() { // Fix for FreeSWITCH unanswered calls
|
||||
timeStart = cdr.SetupTime
|
||||
}
|
||||
cd := &CallDescriptor{
|
||||
TOR: cdr.TOR,
|
||||
Direction: cdr.Direction,
|
||||
Tenant: cdr.Tenant,
|
||||
Category: cdr.Category,
|
||||
Subject: cdr.Subject,
|
||||
Account: cdr.Account,
|
||||
Destination: cdr.Destination,
|
||||
TimeStart: timeStart,
|
||||
TimeEnd: timeStart.Add(cdr.Usage),
|
||||
DurationIndex: cdr.Usage,
|
||||
}
|
||||
if utils.IsSliceMember([]string{utils.META_PSEUDOPREPAID, utils.META_POSTPAID, utils.META_PREPAID, utils.PSEUDOPREPAID, utils.POSTPAID, utils.PREPAID}, cdr.RequestType) { // Prepaid - Cost can be recalculated in case of missing records from SM
|
||||
if err = self.rater.Debit(cd, cc); err == nil { // Debit has occured, we are forced to write the log, even if CDR store is disabled
|
||||
self.cdrDb.LogCallCost(cdr.CGRID, utils.CDRS_SOURCE, cdr.RunID, cc)
|
||||
}
|
||||
} else {
|
||||
err = self.rater.GetCost(cd, cc)
|
||||
}
|
||||
if err != nil {
|
||||
return cc, err
|
||||
}
|
||||
return cc, nil
|
||||
}
|
||||
|
||||
// ToDo: Add websocket support
|
||||
func (self *CdrServer) replicateCdr(cdr *CDR) error {
|
||||
for _, rplCfg := range self.cgrCfg.CDRSCdrReplication {
|
||||
@@ -424,7 +400,6 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error {
|
||||
content = utils.CONTENT_JSON
|
||||
body = cdr
|
||||
}
|
||||
|
||||
errChan := make(chan error)
|
||||
go func(body interface{}, rplCfg *config.CdrReplicationCfg, content string, errChan chan error) {
|
||||
fallbackPath := path.Join(
|
||||
|
||||
Reference in New Issue
Block a user