Revise CDR rerating

The (*CDRServer).processEvent function is now called processEvents and can
be passed an array of CGREvents instead of only one. This was done because
when calling the RateCDRs API we want to first refund all CDRs before
starting to debit again.

The rerate parameter is now no longer hardcoded to true for the RateCDRs API.If
required, the "*rerate" flag must be provided by the caller.

Now, the refundEventCost function returns an additional boolean, that signals
whether the refund occured or didn't.

If the reRate parameter is set to true, also set refund to true.

In case CostDetails is not populated, retrieve it from StorDB if possible
and add it to the CGREvent before converting to CDRs. Set CostDetails back
to nil once the refund goes through.

Remove the refund logic from within the store block.

Now that the refund happens before the debit, revise the expected values for
the "testV1CDRsProcessEventWithRefund" subtest within the
apier/v1/cdrs_it_test.go file.

Add an integration test for the following scenario:
 -create one account with one balance of 1 free minute and rating for the rest.
 -send one CDR of two minutes with ProcessEvent. This should consume 60s out of
the free balance and charge 60s. The SetupTime in the CDR should be 1 hour after
the second CDR.
 -send the second CDR with an usage of 2m. This should be charged entirely.
 -send a RateCDR API call with OrderBy: "SetupTime". This should rerate the two
CDRs from above and change their order of rating.
This commit is contained in:
ionutboangiu
2023-04-19 03:49:51 -04:00
committed by Dan Christian Bogos
parent 425590d733
commit 4cd2dc3de8
7 changed files with 673 additions and 52 deletions

View File

@@ -178,7 +178,7 @@ func (cdrS *CDRServer) rateCDR(cdr *CDRWithArgDispatcher) ([]*CDR, error) {
if cdr.Usage == 0 {
cdrClone.Usage = smCost.Usage
} else if smCost.Usage != cdr.Usage {
if err = cdrS.refundEventCost(smCost.CostDetails,
if _, err = cdrS.refundEventCost(smCost.CostDetails,
cdrClone.RequestType, cdrClone.ToR); err != nil {
return nil, err
}
@@ -212,7 +212,7 @@ func (cdrS *CDRServer) rateCDR(cdr *CDRWithArgDispatcher) ([]*CDR, error) {
cdr.CostDetails.Compute()
return []*CDR{cdr.CDR}, nil
}
if err = cdrS.refundEventCost(cdr.CostDetails,
if _, err = cdrS.refundEventCost(cdr.CostDetails,
cdr.RequestType, cdr.ToR); err != nil {
return nil, err
}
@@ -287,9 +287,9 @@ func (cdrS *CDRServer) rateCDRWithErr(cdr *CDRWithArgDispatcher) (ratedCDRs []*C
}
// refundEventCost will refund the EventCost using RefundIncrements
func (cdrS *CDRServer) refundEventCost(ec *EventCost, reqType, tor string) (err error) {
func (cdrS *CDRServer) refundEventCost(ec *EventCost, reqType, tor string) (rfnd bool, err error) {
if len(cdrS.cgrCfg.CdrsCfg().RaterConns) == 0 {
return utils.NewErrNotConnected(utils.RALService)
return false, utils.NewErrNotConnected(utils.RALService)
}
if ec == nil || !utils.AccountableRequestTypes.Has(reqType) {
return // non refundable
@@ -304,7 +304,7 @@ func (cdrS *CDRServer) refundEventCost(ec *EventCost, reqType, tor string) (err
&CallDescriptorWithArgDispatcher{CallDescriptor: cd}, &acnt); err != nil {
return
}
return
return true, nil
}
// chrgrSProcessEvent forks CGREventWithArgDispatcher into multiples based on matching ChargerS profiles
@@ -404,29 +404,39 @@ func (cdrS *CDRServer) exportCDRs(cdrs []*CDR) (err error) {
return
}
// processEvent processes a CGREvent based on arguments
func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithArgDispatcher,
// processEvents processes a CGREvent based on arguments
func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher,
chrgS, attrS, refund, ralS, store, reRate, export, thdS, stS bool) (err error) {
if reRate {
refund = true
}
if attrS {
if err = cdrS.attrSProcessEvent(ev); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), utils.ToJSON(ev), utils.AttributeS))
err = utils.ErrPartiallyExecuted
return
for _, ev := range evs {
if err = cdrS.attrSProcessEvent(ev); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), utils.ToJSON(ev), utils.AttributeS))
err = utils.ErrPartiallyExecuted
return
}
}
}
var cgrEvs []*utils.CGREventWithArgDispatcher
if chrgS {
if cgrEvs, err = cdrS.chrgrSProcessEvent(ev); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), utils.ToJSON(ev), utils.ChargerS))
err = utils.ErrPartiallyExecuted
return
for _, ev := range evs {
var chrgEvs []*utils.CGREventWithArgDispatcher
if chrgEvs, err = cdrS.chrgrSProcessEvent(ev); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), utils.ToJSON(ev), utils.ChargerS))
err = utils.ErrPartiallyExecuted
return
} else {
cgrEvs = append(cgrEvs, chrgEvs...)
}
}
} else { // ChargerS not requested, charge the original event
cgrEvs = []*utils.CGREventWithArgDispatcher{ev}
cgrEvs = evs
}
// Check if the unique ID was not already processed
if !refund {
@@ -456,6 +466,30 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithArgDispatcher,
cdrs := make([]*CDR, len(cgrEvs))
if refund || ralS || store || reRate || export {
for i, cgrEv := range cgrEvs {
if refund {
if _, has := cgrEv.Event[utils.CostDetails]; !has {
// if CostDetails is not populated or is nil, look for it inside the previously stored cdr
var cgrID string // prepare CGRID to filter for previous CDR
if val, has := cgrEv.Event[utils.CGRID]; !has {
cgrID = utils.Sha1(utils.IfaceAsString(cgrEv.Event[utils.OriginID]),
utils.IfaceAsString(cgrEv.Event[utils.OriginHost]))
} else {
cgrID = utils.IfaceAsString(val)
}
var prevCDRs []*CDR // only one should be returned
if prevCDRs, _, err = cdrS.cdrDb.GetCDRs(
&utils.CDRsFilter{CGRIDs: []string{cgrID},
RunIDs: []string{utils.IfaceAsString(cgrEv.Event[utils.RunID])}}, false); err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> could not retrieve previously stored CDR, error: <%s>",
utils.CDRs, err.Error()))
err = utils.ErrPartiallyExecuted
return
} else {
cgrEv.Event[utils.CostDetails] = prevCDRs[0].CostDetails
}
}
}
if cdrs[i], err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg,
cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil {
utils.Logger.Warning(
@@ -468,12 +502,13 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithArgDispatcher,
}
if refund {
for _, cdr := range cdrs {
if errRfd := cdrS.refundEventCost(cdr.CostDetails,
if rfnd, errRfd := cdrS.refundEventCost(cdr.CostDetails,
cdr.RequestType, cdr.ToR); errRfd != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> refunding CDR %+v",
utils.CDRs, errRfd.Error(), cdr))
} else if rfnd {
cdr.CostDetails = nil // this makes sure that the rater will recalculate (and debit) the cost
}
}
}
@@ -481,10 +516,10 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithArgDispatcher,
for i, cdr := range cdrs {
for j, rtCDR := range cdrS.rateCDRWithErr(
&CDRWithArgDispatcher{CDR: cdr,
ArgDispatcher: ev.ArgDispatcher}) {
ArgDispatcher: cgrEvs[i].ArgDispatcher}) {
cgrEv := &utils.CGREventWithArgDispatcher{
CGREvent: rtCDR.AsCGREvent(),
ArgDispatcher: ev.ArgDispatcher,
ArgDispatcher: cgrEvs[i].ArgDispatcher,
}
if j == 0 { // the first CDR will replace the events we got already as a small optimization
cdrs[i] = rtCDR
@@ -499,7 +534,7 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithArgDispatcher,
if store {
refundCDRCosts := func() { // will be used to refund all CDRs on errors
for _, cdr := range cdrs { // refund what we have charged since duplicates are not allowed
if errRfd := cdrS.refundEventCost(cdr.CostDetails,
if _, errRfd := cdrS.refundEventCost(cdr.CostDetails,
cdr.RequestType, cdr.ToR); errRfd != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> refunding CDR %+v",
@@ -513,21 +548,6 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithArgDispatcher,
refundCDRCosts()
return
}
// CDR was found in StorDB
// reRate is allowed, refund the previous CDR
var prevCDRs []*CDR // only one should be returned
if prevCDRs, _, err = cdrS.cdrDb.GetCDRs(
&utils.CDRsFilter{CGRIDs: []string{cdr.CGRID},
RunIDs: []string{cdr.RunID}}, false); err != nil {
refundCDRCosts()
return
}
if err = cdrS.refundEventCost(prevCDRs[0].CostDetails,
cdr.RequestType, cdr.ToR); err != nil {
refundCDRCosts()
return
}
// after refund we can force update
if err = cdrS.cdrDb.SetCDR(cdr, true); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> updating CDR %+v",
@@ -644,7 +664,7 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithArgDispatcher, reply *string) (e
ArgDispatcher: cdr.ArgDispatcher,
}
if err = cdrS.processEvent(cgrEv,
if err = cdrS.processEvents([]*utils.CGREventWithArgDispatcher{cgrEv},
len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0 && !cdr.PreRated,
len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0,
false,
@@ -745,7 +765,7 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er
CGREvent: &arg.CGREvent,
ArgDispatcher: arg.ArgDispatcher,
}
if err = cdrS.processEvent(cgrEv, chrgS, attrS, refund,
if err = cdrS.processEvents([]*utils.CGREventWithArgDispatcher{cgrEv}, chrgS, attrS, refund,
ralS, store, reRate, export, thdS, stS); err != nil {
return
}
@@ -902,23 +922,28 @@ func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) {
if flgs.HasKey(utils.MetaAttributes) {
attrS = flgs.GetBool(utils.MetaAttributes)
}
var reRate bool
if flgs.HasKey(utils.MetaRerate) {
reRate = flgs.GetBool(utils.MetaRerate)
}
if chrgS && len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) == 0 {
return utils.NewErrNotConnected(utils.ChargerS)
}
for _, cdr := range cdrs {
cgrEvs := make([]*utils.CGREventWithArgDispatcher, len(cdrs))
for i, cdr := range cdrs {
cdr.Cost = -1 // the cost will be recalculated
if cdr.Tenant == utils.EmptyString {
cdr.Tenant = cdrS.cgrCfg.GeneralCfg().DefaultTenant
}
cgrEv := &utils.CGREventWithArgDispatcher{
cgrEvs[i] = &utils.CGREventWithArgDispatcher{
CGREvent: cdr.AsCGREvent(),
ArgDispatcher: arg.ArgDispatcher,
}
if err = cdrS.processEvent(cgrEv, chrgS, attrS, false,
true, store, true, export, thdS, statS); err != nil {
return utils.NewErrServerError(err)
}
}
if err = cdrS.processEvents(cgrEvs, chrgS, attrS, false,
true, store, reRate, export, thdS, statS); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
return