Ensure refund flag is not ignored when rerate is true

By default setting rerate to true also sets refund to true, but
flags should take precedence over defaults.

If rerate is true and refund is false, remove any previous
CostDetails from event to force rerate.

Centralize the parsing of processing flags.
This commit is contained in:
ionutboangiu
2024-07-05 18:51:43 +03:00
committed by Dan Christian Bogos
parent 0860642d97
commit 4e6cbe26c5
2 changed files with 87 additions and 109 deletions

View File

@@ -405,13 +405,66 @@ func (cdrS *CDRServer) exportCDRs(cdrs []*CDR) (err error) {
return
}
// processEvents processes a CGREvent based on arguments
func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher,
chrgS, attrS, refund, ralS, store, reRate, export, thdS, stS bool) (err error) {
if reRate {
refund = true
// cdrProcessingArgs holds the arguments for processing CDR events.
type cdrProcessingArgs struct {
attrS bool
chrgS bool
refund bool
ralS bool
store bool
reRate bool
export bool
thdS bool
stS bool
}
// newCDRProcessingArgs initializes processing arguments from config and overrides them with provided flags.
func newCDRProcessingArgs(cfg *config.CdrsCfg, flags utils.FlagsWithParams) cdrProcessingArgs {
args := cdrProcessingArgs{
attrS: len(cfg.AttributeSConns) != 0,
chrgS: len(cfg.ChargerSConns) != 0,
store: cfg.StoreCdrs,
export: len(cfg.OnlineCDRExports) != 0,
thdS: len(cfg.ThresholdSConns) != 0,
stS: len(cfg.StatSConns) != 0,
}
if attrS {
if flags.HasKey(utils.MetaAttributes) {
args.attrS = flags.GetBool(utils.MetaAttributes)
}
if flags.HasKey(utils.MetaChargers) {
args.chrgS = flags.GetBool(utils.MetaChargers)
}
if flags.HasKey(utils.MetaStore) {
args.store = flags.GetBool(utils.MetaStore)
}
if flags.HasKey(utils.MetaExport) {
args.export = flags.GetBool(utils.MetaExport)
}
if flags.HasKey(utils.MetaThresholds) {
args.thdS = flags.GetBool(utils.MetaThresholds)
}
if flags.HasKey(utils.MetaStats) {
args.stS = flags.GetBool(utils.MetaStats)
}
if flags.HasKey(utils.MetaRALs) {
args.ralS = flags.GetBool(utils.MetaRALs)
}
if flags.HasKey(utils.MetaRerate) {
args.reRate = flags.GetBool(utils.MetaRerate)
if args.reRate {
args.ralS = true
args.refund = true
}
}
if flags.HasKey(utils.MetaRefund) {
args.refund = flags.GetBool(utils.MetaRefund)
}
return args
}
// processEvents processes a CGREvent based on arguments
func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, args cdrProcessingArgs) (err error) {
if args.attrS {
for _, ev := range evs {
if err = cdrS.attrSProcessEvent(ev); err != nil {
utils.Logger.Warning(
@@ -423,7 +476,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher,
}
}
var cgrEvs []*utils.CGREventWithArgDispatcher
if chrgS {
if args.chrgS {
for _, ev := range evs {
var chrgEvs []*utils.CGREventWithArgDispatcher
if chrgEvs, err = cdrS.chrgrSProcessEvent(ev); err != nil {
@@ -440,7 +493,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher,
cgrEvs = evs
}
// Check if the unique ID was not already processed
if !refund {
if !args.refund {
for _, cgrEv := range cgrEvs {
me := MapEvent(cgrEv.CGREvent.Event)
if !me.HasField(utils.CGRID) { // try to compute the CGRID if missing
@@ -453,7 +506,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher,
me.GetStringIgnoreErrors(utils.CGRID),
me.GetStringIgnoreErrors(utils.RunID),
)
if Cache.HasItem(utils.CacheCDRIDs, uID) && !reRate {
if Cache.HasItem(utils.CacheCDRIDs, uID) && !args.reRate {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, utils.ErrExists, utils.ToJSON(cgrEv), utils.CacheS))
@@ -465,9 +518,9 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher,
}
// Populate CDR list out of events
cdrs := make([]*CDR, len(cgrEvs))
if refund || ralS || store || reRate || export {
if args.refund || args.ralS || args.store || args.reRate || args.export {
for i, cgrEv := range cgrEvs {
if refund {
if args.refund {
if _, has := cgrEv.Event[utils.CostDetails]; !has {
// if CostDetails is not populated or is nil, look for it inside the previously stored cdr
var cgrID string // prepare CGRID to filter for previous CDR
@@ -490,6 +543,9 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher,
cgrEv.Event[utils.CostDetails] = prevCDRs[0].CostDetails
}
}
} else if args.reRate {
// Force rerate by removing CostDetails to avoid marking as already rated.
delete(cgrEv.Event, utils.CostDetails)
}
if cdrs[i], err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg,
cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil {
@@ -501,7 +557,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher,
}
}
}
if refund {
if args.refund {
for _, cdr := range cdrs {
if rfnd, errRfd := cdrS.refundEventCost(cdr.CostDetails,
cdr.RequestType, cdr.ToR); errRfd != nil {
@@ -513,7 +569,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher,
}
}
}
if ralS {
if args.ralS {
for i, cdr := range cdrs {
for j, rtCDR := range cdrS.rateCDRWithErr(
&CDRWithArgDispatcher{CDR: cdr,
@@ -532,7 +588,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher,
}
}
}
if store {
if args.store {
refundCDRCosts := func() { // will be used to refund all CDRs on errors
for _, cdr := range cdrs { // refund what we have charged since duplicates are not allowed
if _, errRfd := cdrS.refundEventCost(cdr.CostDetails,
@@ -545,7 +601,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher,
}
for _, cdr := range cdrs {
if err = cdrS.cdrDb.SetCDR(cdr, false); err != nil {
if err != utils.ErrExists || !reRate {
if err != utils.ErrExists || !args.reRate {
refundCDRCosts()
return
}
@@ -560,7 +616,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher,
}
}
var partiallyExecuted bool // from here actions are optional and a general error is returned
if export {
if args.export {
if err = cdrS.exportCDRs(cdrs); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> exporting CDRs %+v",
@@ -568,7 +624,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher,
partiallyExecuted = true
}
}
if thdS {
if args.thdS {
for _, cgrEv := range cgrEvs {
if err = cdrS.thdSProcessEvent(cgrEv); err != nil {
utils.Logger.Warning(
@@ -578,7 +634,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher,
}
}
}
if stS {
if args.stS {
for _, cgrEv := range cgrEvs {
if err = cdrS.statSProcessEvent(cgrEv); err != nil {
utils.Logger.Warning(
@@ -660,21 +716,14 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithArgDispatcher, reply *string) (e
if cdr.RunID == utils.EmptyString {
cdr.RunID = utils.MetaDefault
}
procArgs := newCDRProcessingArgs(cdrS.cgrCfg.CdrsCfg(), nil)
procArgs.ralS = !cdr.PreRated
procArgs.chrgS = procArgs.chrgS && !cdr.PreRated
cgrEv := &utils.CGREventWithArgDispatcher{
CGREvent: cdr.AsCGREvent(),
ArgDispatcher: cdr.ArgDispatcher,
}
if err = cdrS.processEvents([]*utils.CGREventWithArgDispatcher{cgrEv},
len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0 && !cdr.PreRated,
len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0,
false,
!cdr.PreRated, // rate the CDR if is not PreRated
cdrS.cgrCfg.CdrsCfg().StoreCdrs,
false, // no rerate
len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0,
len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0,
len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0); err != nil {
if err = cdrS.processEvents([]*utils.CGREventWithArgDispatcher{cgrEv}, procArgs); err != nil {
return
}
*reply = utils.OK
@@ -716,58 +765,18 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er
}
// end of RPC caching
// processing options
// Compute processing options based on flags and configuration.
var flgs utils.FlagsWithParams
if flgs, err = utils.FlagsWithParamsFromSlice(arg.Flags); err != nil {
return
}
attrS := len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0
if flgs.HasKey(utils.MetaAttributes) {
attrS = flgs.GetBool(utils.MetaAttributes)
}
store := cdrS.cgrCfg.CdrsCfg().StoreCdrs
if flgs.HasKey(utils.MetaStore) {
store = flgs.GetBool(utils.MetaStore)
}
export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0
if flgs.HasKey(utils.MetaExport) {
export = flgs.GetBool(utils.MetaExport)
}
thdS := len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0
if flgs.HasKey(utils.MetaThresholds) {
thdS = flgs.GetBool(utils.MetaThresholds)
}
stS := len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0
if flgs.HasKey(utils.MetaStats) {
stS = flgs.GetBool(utils.MetaStats)
}
chrgS := len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0 // activate charging for the Event
if flgs.HasKey(utils.MetaChargers) {
chrgS = flgs.GetBool(utils.MetaChargers)
}
var ralS bool // activate single rating for the CDR
if flgs.HasKey(utils.MetaRALs) {
ralS = flgs.GetBool(utils.MetaRALs)
}
var reRate bool
if flgs.HasKey(utils.MetaRerate) {
reRate = flgs.GetBool(utils.MetaRerate)
if reRate {
ralS = true
}
}
var refund bool
if flgs.HasKey(utils.MetaRefund) {
refund = flgs.GetBool(utils.MetaRefund)
}
// end of processing options
procArgs := newCDRProcessingArgs(cdrS.cgrCfg.CdrsCfg(), flgs)
cgrEv := &utils.CGREventWithArgDispatcher{
CGREvent: &arg.CGREvent,
ArgDispatcher: arg.ArgDispatcher,
}
if err = cdrS.processEvents([]*utils.CGREventWithArgDispatcher{cgrEv}, chrgS, attrS, refund,
ralS, store, reRate, export, thdS, stS); err != nil {
if err = cdrS.processEvents([]*utils.CGREventWithArgDispatcher{cgrEv}, procArgs); err != nil {
return
}
*reply = utils.OK
@@ -895,42 +904,15 @@ func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) {
if err != nil {
return
}
// Compute processing options based on flags and configuration.
var flgs utils.FlagsWithParams
if flgs, err = utils.FlagsWithParamsFromSlice(arg.Flags); err != nil {
return
}
store := cdrS.cgrCfg.CdrsCfg().StoreCdrs
if flgs.HasKey(utils.MetaStore) {
store = flgs.GetBool(utils.MetaStore)
}
export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0
if flgs.HasKey(utils.MetaExport) {
export = flgs.GetBool(utils.MetaExport)
}
thdS := len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0
if flgs.HasKey(utils.MetaThresholds) {
thdS = flgs.GetBool(utils.MetaThresholds)
}
statS := len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0
if flgs.HasKey(utils.MetaStatS) {
statS = flgs.GetBool(utils.MetaStatS)
}
chrgS := len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0
if flgs.HasKey(utils.MetaChargers) {
chrgS = flgs.GetBool(utils.MetaChargers)
}
attrS := len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0
if flgs.HasKey(utils.MetaAttributes) {
attrS = flgs.GetBool(utils.MetaAttributes)
}
var reRate bool
if flgs.HasKey(utils.MetaRerate) {
reRate = flgs.GetBool(utils.MetaRerate)
}
procArgs := newCDRProcessingArgs(cdrS.cgrCfg.CdrsCfg(), flgs)
procArgs.ralS = true
if chrgS && len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) == 0 {
return utils.NewErrNotConnected(utils.ChargerS)
}
cgrEvs := make([]*utils.CGREventWithArgDispatcher, len(cdrs))
for i, cdr := range cdrs {
cdr.Cost = -1 // the cost will be recalculated
@@ -942,8 +924,7 @@ func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) {
ArgDispatcher: arg.ArgDispatcher,
}
}
if err = cdrS.processEvents(cgrEvs, chrgS, attrS, false,
true, store, reRate, export, thdS, statS); err != nil {
if err = cdrS.processEvents(cgrEvs, procArgs); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK

View File

@@ -314,8 +314,5 @@ func (gm *GOBMarshaler) Unmarshal(data []byte, v any) error {
// Decide the value of cacheCommit parameter based on transactionID
func cacheCommit(transactionID string) bool {
if transactionID == utils.NonTransactional {
return true
}
return false
return transactionID == utils.NonTransactional
}