From ab7f65d3bbf5a677223104d6e3c683204d7ddfbb Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 13 Dec 2015 18:03:14 +0100 Subject: [PATCH] CDRS to work with *raw CDRs --- engine/cdrs.go | 123 ++++++++++++++++++++----------------------------- 1 file changed, 49 insertions(+), 74 deletions(-) diff --git a/engine/cdrs.go b/engine/cdrs.go index f14a0e178..3648db2e7 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -184,21 +184,8 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) { if cdr.Subject == "" { // Use account information as rating subject if missing cdr.Subject = cdr.Account } - // replace aliases - if err := LoadAlias(&AttrMatchingAlias{ - Destination: cdr.Destination, - Direction: cdr.Direction, - Tenant: cdr.Tenant, - Category: cdr.Category, - Account: cdr.Account, - Subject: cdr.Subject, - Context: utils.ALIAS_CONTEXT_RATING, - }, cdr, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound { - return err - } - // replace user profile fields - if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil { - return err + if !cdr.Rated { + cdr.RunID = utils.MetaRaw } if self.cgrCfg.CDRSStoreCdrs { // Store RawCDRs, this we do sync so we can reply with the status if err := self.cdrDb.SetCDR(cdr, false); err != nil { // Only original CDR stored in primary table, no derived @@ -226,21 +213,19 @@ func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR) error { } func (self *CdrServer) rateStoreStatsReplicate(cdr *CDR) error { - if cdr.RunID != utils.META_DEFAULT { // Process Aliases and Users for derived CDRs - if err := LoadAlias(&AttrMatchingAlias{ - Destination: cdr.Destination, - Direction: cdr.Direction, - Tenant: cdr.Tenant, - Category: cdr.Category, - Account: cdr.Account, - Subject: cdr.Subject, - Context: utils.ALIAS_CONTEXT_RATING, - }, cdr, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound { - return err - } - if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil { - return err - } + if err := LoadAlias(&AttrMatchingAlias{ + Destination: cdr.Destination, + Direction: cdr.Direction, + Tenant: cdr.Tenant, + Category: cdr.Category, + Account: cdr.Account, + Subject: cdr.Subject, + Context: utils.ALIAS_CONTEXT_RATING, + }, cdr, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound { + return err + } + if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil { + return err } // Rate CDR if self.rater != nil && !cdr.Rated { @@ -260,12 +245,6 @@ func (self *CdrServer) rateStoreStatsReplicate(cdr *CDR) error { if err := self.cdrDb.SetCDR(cdr, true); err != nil { utils.Logger.Err(fmt.Sprintf(" Storing rated CDR %+v, got error: %s", cdr, err.Error())) } - // Store CostDetails - if cdr.Rated || utils.IsSliceMember([]string{utils.RATED, utils.META_RATED}, cdr.RequestType) { // Account related CDRs are saved automatically, so save the others here if requested - if err := self.cdrDb.LogCallCost(cdr.CGRID, utils.CDRS_SOURCE, cdr.RunID, cdr.CostDetails); err != nil { - utils.Logger.Err(fmt.Sprintf(" Storing costs for CDR %+v, costDetails: %+v, got error: %s", cdr, cdr.CostDetails, err.Error())) - } - } } // Attach CDR to stats if self.stats != nil { // Send CDR to stats @@ -280,11 +259,8 @@ func (self *CdrServer) rateStoreStatsReplicate(cdr *CDR) error { } func (self *CdrServer) deriveCdrs(cdr *CDR) ([]*CDR, error) { - if len(cdr.RunID) == 0 { - cdr.RunID = utils.META_DEFAULT - } cdrRuns := []*CDR{cdr} - if cdr.Rated { // Do not derive already rated CDRs since they should be already derived + if cdr.RunID != utils.MetaRaw { // Only derive *raw CDRs return cdrRuns, nil } attrsDC := &utils.AttrDerivedChargers{Tenant: cdr.Tenant, Category: cdr.Category, Direction: cdr.Direction, @@ -335,39 +311,6 @@ func (self *CdrServer) deriveCdrs(cdr *CDR) ([]*CDR, error) { return cdrRuns, nil } -// Retrive the cost from engine -func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) { - cc := new(CallCost) - var err error - timeStart := cdr.AnswerTime - if timeStart.IsZero() { // Fix for FreeSWITCH unanswered calls - timeStart = cdr.SetupTime - } - cd := &CallDescriptor{ - TOR: cdr.TOR, - Direction: cdr.Direction, - Tenant: cdr.Tenant, - Category: cdr.Category, - Subject: cdr.Subject, - Account: cdr.Account, - Destination: cdr.Destination, - TimeStart: timeStart, - TimeEnd: timeStart.Add(cdr.Usage), - DurationIndex: cdr.Usage, - } - if utils.IsSliceMember([]string{utils.META_PSEUDOPREPAID, utils.META_POSTPAID, utils.META_PREPAID, utils.PSEUDOPREPAID, utils.POSTPAID, utils.PREPAID}, cdr.RequestType) { // Prepaid - Cost can be recalculated in case of missing records from SM - if err = self.rater.Debit(cd, cc); err == nil { // Debit has occured, we are forced to write the log, even if CDR store is disabled - self.cdrDb.LogCallCost(cdr.CGRID, utils.CDRS_SOURCE, cdr.RunID, cc) - } - } else { - err = self.rater.GetCost(cd, cc) - } - if err != nil { - return cc, err - } - return cc, nil -} - func (self *CdrServer) rateCDR(cdr *CDR) error { var qryCC *CallCost var err error @@ -401,6 +344,39 @@ func (self *CdrServer) rateCDR(cdr *CDR) error { return nil } +// Retrive the cost from engine +func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) { + cc := new(CallCost) + var err error + timeStart := cdr.AnswerTime + if timeStart.IsZero() { // Fix for FreeSWITCH unanswered calls + timeStart = cdr.SetupTime + } + cd := &CallDescriptor{ + TOR: cdr.TOR, + Direction: cdr.Direction, + Tenant: cdr.Tenant, + Category: cdr.Category, + Subject: cdr.Subject, + Account: cdr.Account, + Destination: cdr.Destination, + TimeStart: timeStart, + TimeEnd: timeStart.Add(cdr.Usage), + DurationIndex: cdr.Usage, + } + if utils.IsSliceMember([]string{utils.META_PSEUDOPREPAID, utils.META_POSTPAID, utils.META_PREPAID, utils.PSEUDOPREPAID, utils.POSTPAID, utils.PREPAID}, cdr.RequestType) { // Prepaid - Cost can be recalculated in case of missing records from SM + if err = self.rater.Debit(cd, cc); err == nil { // Debit has occured, we are forced to write the log, even if CDR store is disabled + self.cdrDb.LogCallCost(cdr.CGRID, utils.CDRS_SOURCE, cdr.RunID, cc) + } + } else { + err = self.rater.GetCost(cd, cc) + } + if err != nil { + return cc, err + } + return cc, nil +} + // ToDo: Add websocket support func (self *CdrServer) replicateCdr(cdr *CDR) error { for _, rplCfg := range self.cgrCfg.CDRSCdrReplication { @@ -424,7 +400,6 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error { content = utils.CONTENT_JSON body = cdr } - errChan := make(chan error) go func(body interface{}, rplCfg *config.CdrReplicationCfg, content string, errChan chan error) { fallbackPath := path.Join(