From 4e6cbe26c5842e1d60bb7d7e2874f4c88109d3cd Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Fri, 5 Jul 2024 18:51:43 +0300 Subject: [PATCH] Ensure refund flag is not ignored when rerate is true By default setting rerate to true also sets refund to true, but flags should take precedence over defaults. If rerate is true and refund is false, remove any previous CostDetails from event to force rerate. Centralize the parsing of processing flags. --- engine/cdrs.go | 191 ++++++++++++++++-------------------- engine/storage_interface.go | 5 +- 2 files changed, 87 insertions(+), 109 deletions(-) diff --git a/engine/cdrs.go b/engine/cdrs.go index 1dd7f2434..4017335f1 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -405,13 +405,66 @@ func (cdrS *CDRServer) exportCDRs(cdrs []*CDR) (err error) { return } -// processEvents processes a CGREvent based on arguments -func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, - chrgS, attrS, refund, ralS, store, reRate, export, thdS, stS bool) (err error) { - if reRate { - refund = true +// cdrProcessingArgs holds the arguments for processing CDR events. +type cdrProcessingArgs struct { + attrS bool + chrgS bool + refund bool + ralS bool + store bool + reRate bool + export bool + thdS bool + stS bool +} + +// newCDRProcessingArgs initializes processing arguments from config and overrides them with provided flags. +func newCDRProcessingArgs(cfg *config.CdrsCfg, flags utils.FlagsWithParams) cdrProcessingArgs { + args := cdrProcessingArgs{ + attrS: len(cfg.AttributeSConns) != 0, + chrgS: len(cfg.ChargerSConns) != 0, + store: cfg.StoreCdrs, + export: len(cfg.OnlineCDRExports) != 0, + thdS: len(cfg.ThresholdSConns) != 0, + stS: len(cfg.StatSConns) != 0, } - if attrS { + if flags.HasKey(utils.MetaAttributes) { + args.attrS = flags.GetBool(utils.MetaAttributes) + } + if flags.HasKey(utils.MetaChargers) { + args.chrgS = flags.GetBool(utils.MetaChargers) + } + if flags.HasKey(utils.MetaStore) { + args.store = flags.GetBool(utils.MetaStore) + } + if flags.HasKey(utils.MetaExport) { + args.export = flags.GetBool(utils.MetaExport) + } + if flags.HasKey(utils.MetaThresholds) { + args.thdS = flags.GetBool(utils.MetaThresholds) + } + if flags.HasKey(utils.MetaStats) { + args.stS = flags.GetBool(utils.MetaStats) + } + if flags.HasKey(utils.MetaRALs) { + args.ralS = flags.GetBool(utils.MetaRALs) + } + if flags.HasKey(utils.MetaRerate) { + args.reRate = flags.GetBool(utils.MetaRerate) + if args.reRate { + args.ralS = true + args.refund = true + } + } + if flags.HasKey(utils.MetaRefund) { + args.refund = flags.GetBool(utils.MetaRefund) + } + return args +} + +// processEvents processes a CGREvent based on arguments +func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, args cdrProcessingArgs) (err error) { + if args.attrS { for _, ev := range evs { if err = cdrS.attrSProcessEvent(ev); err != nil { utils.Logger.Warning( @@ -423,7 +476,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, } } var cgrEvs []*utils.CGREventWithArgDispatcher - if chrgS { + if args.chrgS { for _, ev := range evs { var chrgEvs []*utils.CGREventWithArgDispatcher if chrgEvs, err = cdrS.chrgrSProcessEvent(ev); err != nil { @@ -440,7 +493,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, cgrEvs = evs } // Check if the unique ID was not already processed - if !refund { + if !args.refund { for _, cgrEv := range cgrEvs { me := MapEvent(cgrEv.CGREvent.Event) if !me.HasField(utils.CGRID) { // try to compute the CGRID if missing @@ -453,7 +506,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, me.GetStringIgnoreErrors(utils.CGRID), me.GetStringIgnoreErrors(utils.RunID), ) - if Cache.HasItem(utils.CacheCDRIDs, uID) && !reRate { + if Cache.HasItem(utils.CacheCDRIDs, uID) && !args.reRate { utils.Logger.Warning( fmt.Sprintf("<%s> error: <%s> processing event %+v with %s", utils.CDRs, utils.ErrExists, utils.ToJSON(cgrEv), utils.CacheS)) @@ -465,9 +518,9 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, } // Populate CDR list out of events cdrs := make([]*CDR, len(cgrEvs)) - if refund || ralS || store || reRate || export { + if args.refund || args.ralS || args.store || args.reRate || args.export { for i, cgrEv := range cgrEvs { - if refund { + if args.refund { if _, has := cgrEv.Event[utils.CostDetails]; !has { // if CostDetails is not populated or is nil, look for it inside the previously stored cdr var cgrID string // prepare CGRID to filter for previous CDR @@ -490,6 +543,9 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, cgrEv.Event[utils.CostDetails] = prevCDRs[0].CostDetails } } + } else if args.reRate { + // Force rerate by removing CostDetails to avoid marking as already rated. + delete(cgrEv.Event, utils.CostDetails) } if cdrs[i], err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg, cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil { @@ -501,7 +557,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, } } } - if refund { + if args.refund { for _, cdr := range cdrs { if rfnd, errRfd := cdrS.refundEventCost(cdr.CostDetails, cdr.RequestType, cdr.ToR); errRfd != nil { @@ -513,7 +569,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, } } } - if ralS { + if args.ralS { for i, cdr := range cdrs { for j, rtCDR := range cdrS.rateCDRWithErr( &CDRWithArgDispatcher{CDR: cdr, @@ -532,7 +588,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, } } } - if store { + if args.store { refundCDRCosts := func() { // will be used to refund all CDRs on errors for _, cdr := range cdrs { // refund what we have charged since duplicates are not allowed if _, errRfd := cdrS.refundEventCost(cdr.CostDetails, @@ -545,7 +601,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, } for _, cdr := range cdrs { if err = cdrS.cdrDb.SetCDR(cdr, false); err != nil { - if err != utils.ErrExists || !reRate { + if err != utils.ErrExists || !args.reRate { refundCDRCosts() return } @@ -560,7 +616,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, } } var partiallyExecuted bool // from here actions are optional and a general error is returned - if export { + if args.export { if err = cdrS.exportCDRs(cdrs); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> error: <%s> exporting CDRs %+v", @@ -568,7 +624,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, partiallyExecuted = true } } - if thdS { + if args.thdS { for _, cgrEv := range cgrEvs { if err = cdrS.thdSProcessEvent(cgrEv); err != nil { utils.Logger.Warning( @@ -578,7 +634,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, } } } - if stS { + if args.stS { for _, cgrEv := range cgrEvs { if err = cdrS.statSProcessEvent(cgrEv); err != nil { utils.Logger.Warning( @@ -660,21 +716,14 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithArgDispatcher, reply *string) (e if cdr.RunID == utils.EmptyString { cdr.RunID = utils.MetaDefault } + procArgs := newCDRProcessingArgs(cdrS.cgrCfg.CdrsCfg(), nil) + procArgs.ralS = !cdr.PreRated + procArgs.chrgS = procArgs.chrgS && !cdr.PreRated cgrEv := &utils.CGREventWithArgDispatcher{ CGREvent: cdr.AsCGREvent(), ArgDispatcher: cdr.ArgDispatcher, } - - if err = cdrS.processEvents([]*utils.CGREventWithArgDispatcher{cgrEv}, - len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0 && !cdr.PreRated, - len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0, - false, - !cdr.PreRated, // rate the CDR if is not PreRated - cdrS.cgrCfg.CdrsCfg().StoreCdrs, - false, // no rerate - len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0, - len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0, - len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0); err != nil { + if err = cdrS.processEvents([]*utils.CGREventWithArgDispatcher{cgrEv}, procArgs); err != nil { return } *reply = utils.OK @@ -716,58 +765,18 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er } // end of RPC caching - // processing options + // Compute processing options based on flags and configuration. var flgs utils.FlagsWithParams if flgs, err = utils.FlagsWithParamsFromSlice(arg.Flags); err != nil { return } - attrS := len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0 - if flgs.HasKey(utils.MetaAttributes) { - attrS = flgs.GetBool(utils.MetaAttributes) - } - store := cdrS.cgrCfg.CdrsCfg().StoreCdrs - if flgs.HasKey(utils.MetaStore) { - store = flgs.GetBool(utils.MetaStore) - } - export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 - if flgs.HasKey(utils.MetaExport) { - export = flgs.GetBool(utils.MetaExport) - } - thdS := len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0 - if flgs.HasKey(utils.MetaThresholds) { - thdS = flgs.GetBool(utils.MetaThresholds) - } - stS := len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0 - if flgs.HasKey(utils.MetaStats) { - stS = flgs.GetBool(utils.MetaStats) - } - chrgS := len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0 // activate charging for the Event - if flgs.HasKey(utils.MetaChargers) { - chrgS = flgs.GetBool(utils.MetaChargers) - } - var ralS bool // activate single rating for the CDR - if flgs.HasKey(utils.MetaRALs) { - ralS = flgs.GetBool(utils.MetaRALs) - } - var reRate bool - if flgs.HasKey(utils.MetaRerate) { - reRate = flgs.GetBool(utils.MetaRerate) - if reRate { - ralS = true - } - } - var refund bool - if flgs.HasKey(utils.MetaRefund) { - refund = flgs.GetBool(utils.MetaRefund) - } - // end of processing options + procArgs := newCDRProcessingArgs(cdrS.cgrCfg.CdrsCfg(), flgs) cgrEv := &utils.CGREventWithArgDispatcher{ CGREvent: &arg.CGREvent, ArgDispatcher: arg.ArgDispatcher, } - if err = cdrS.processEvents([]*utils.CGREventWithArgDispatcher{cgrEv}, chrgS, attrS, refund, - ralS, store, reRate, export, thdS, stS); err != nil { + if err = cdrS.processEvents([]*utils.CGREventWithArgDispatcher{cgrEv}, procArgs); err != nil { return } *reply = utils.OK @@ -895,42 +904,15 @@ func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) { if err != nil { return } + + // Compute processing options based on flags and configuration. var flgs utils.FlagsWithParams if flgs, err = utils.FlagsWithParamsFromSlice(arg.Flags); err != nil { return } - store := cdrS.cgrCfg.CdrsCfg().StoreCdrs - if flgs.HasKey(utils.MetaStore) { - store = flgs.GetBool(utils.MetaStore) - } - export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 - if flgs.HasKey(utils.MetaExport) { - export = flgs.GetBool(utils.MetaExport) - } - thdS := len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0 - if flgs.HasKey(utils.MetaThresholds) { - thdS = flgs.GetBool(utils.MetaThresholds) - } - statS := len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0 - if flgs.HasKey(utils.MetaStatS) { - statS = flgs.GetBool(utils.MetaStatS) - } - chrgS := len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0 - if flgs.HasKey(utils.MetaChargers) { - chrgS = flgs.GetBool(utils.MetaChargers) - } - attrS := len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0 - if flgs.HasKey(utils.MetaAttributes) { - attrS = flgs.GetBool(utils.MetaAttributes) - } - var reRate bool - if flgs.HasKey(utils.MetaRerate) { - reRate = flgs.GetBool(utils.MetaRerate) - } + procArgs := newCDRProcessingArgs(cdrS.cgrCfg.CdrsCfg(), flgs) + procArgs.ralS = true - if chrgS && len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) == 0 { - return utils.NewErrNotConnected(utils.ChargerS) - } cgrEvs := make([]*utils.CGREventWithArgDispatcher, len(cdrs)) for i, cdr := range cdrs { cdr.Cost = -1 // the cost will be recalculated @@ -942,8 +924,7 @@ func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) { ArgDispatcher: arg.ArgDispatcher, } } - if err = cdrS.processEvents(cgrEvs, chrgS, attrS, false, - true, store, reRate, export, thdS, statS); err != nil { + if err = cdrS.processEvents(cgrEvs, procArgs); err != nil { return utils.NewErrServerError(err) } *reply = utils.OK diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 6c2391ada..04ae7a305 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -314,8 +314,5 @@ func (gm *GOBMarshaler) Unmarshal(data []byte, v any) error { // Decide the value of cacheCommit parameter based on transactionID func cacheCommit(transactionID string) bool { - if transactionID == utils.NonTransactional { - return true - } - return false + return transactionID == utils.NonTransactional }