diff --git a/engine/cdrs.go b/engine/cdrs.go index 1dd7f2434..4017335f1 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -405,13 +405,66 @@ func (cdrS *CDRServer) exportCDRs(cdrs []*CDR) (err error) { return } -// processEvents processes a CGREvent based on arguments -func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, - chrgS, attrS, refund, ralS, store, reRate, export, thdS, stS bool) (err error) { - if reRate { - refund = true +// cdrProcessingArgs holds the arguments for processing CDR events. +type cdrProcessingArgs struct { + attrS bool + chrgS bool + refund bool + ralS bool + store bool + reRate bool + export bool + thdS bool + stS bool +} + +// newCDRProcessingArgs initializes processing arguments from config and overrides them with provided flags. +func newCDRProcessingArgs(cfg *config.CdrsCfg, flags utils.FlagsWithParams) cdrProcessingArgs { + args := cdrProcessingArgs{ + attrS: len(cfg.AttributeSConns) != 0, + chrgS: len(cfg.ChargerSConns) != 0, + store: cfg.StoreCdrs, + export: len(cfg.OnlineCDRExports) != 0, + thdS: len(cfg.ThresholdSConns) != 0, + stS: len(cfg.StatSConns) != 0, } - if attrS { + if flags.HasKey(utils.MetaAttributes) { + args.attrS = flags.GetBool(utils.MetaAttributes) + } + if flags.HasKey(utils.MetaChargers) { + args.chrgS = flags.GetBool(utils.MetaChargers) + } + if flags.HasKey(utils.MetaStore) { + args.store = flags.GetBool(utils.MetaStore) + } + if flags.HasKey(utils.MetaExport) { + args.export = flags.GetBool(utils.MetaExport) + } + if flags.HasKey(utils.MetaThresholds) { + args.thdS = flags.GetBool(utils.MetaThresholds) + } + if flags.HasKey(utils.MetaStats) { + args.stS = flags.GetBool(utils.MetaStats) + } + if flags.HasKey(utils.MetaRALs) { + args.ralS = flags.GetBool(utils.MetaRALs) + } + if flags.HasKey(utils.MetaRerate) { + args.reRate = flags.GetBool(utils.MetaRerate) + if args.reRate { + args.ralS = true + args.refund = true + } + } + if flags.HasKey(utils.MetaRefund) { + args.refund = flags.GetBool(utils.MetaRefund) + } + return args +} + +// processEvents processes a CGREvent based on arguments +func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, args cdrProcessingArgs) (err error) { + if args.attrS { for _, ev := range evs { if err = cdrS.attrSProcessEvent(ev); err != nil { utils.Logger.Warning( @@ -423,7 +476,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, } } var cgrEvs []*utils.CGREventWithArgDispatcher - if chrgS { + if args.chrgS { for _, ev := range evs { var chrgEvs []*utils.CGREventWithArgDispatcher if chrgEvs, err = cdrS.chrgrSProcessEvent(ev); err != nil { @@ -440,7 +493,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, cgrEvs = evs } // Check if the unique ID was not already processed - if !refund { + if !args.refund { for _, cgrEv := range cgrEvs { me := MapEvent(cgrEv.CGREvent.Event) if !me.HasField(utils.CGRID) { // try to compute the CGRID if missing @@ -453,7 +506,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, me.GetStringIgnoreErrors(utils.CGRID), me.GetStringIgnoreErrors(utils.RunID), ) - if Cache.HasItem(utils.CacheCDRIDs, uID) && !reRate { + if Cache.HasItem(utils.CacheCDRIDs, uID) && !args.reRate { utils.Logger.Warning( fmt.Sprintf("<%s> error: <%s> processing event %+v with %s", utils.CDRs, utils.ErrExists, utils.ToJSON(cgrEv), utils.CacheS)) @@ -465,9 +518,9 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, } // Populate CDR list out of events cdrs := make([]*CDR, len(cgrEvs)) - if refund || ralS || store || reRate || export { + if args.refund || args.ralS || args.store || args.reRate || args.export { for i, cgrEv := range cgrEvs { - if refund { + if args.refund { if _, has := cgrEv.Event[utils.CostDetails]; !has { // if CostDetails is not populated or is nil, look for it inside the previously stored cdr var cgrID string // prepare CGRID to filter for previous CDR @@ -490,6 +543,9 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, cgrEv.Event[utils.CostDetails] = prevCDRs[0].CostDetails } } + } else if args.reRate { + // Force rerate by removing CostDetails to avoid marking as already rated. + delete(cgrEv.Event, utils.CostDetails) } if cdrs[i], err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg, cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil { @@ -501,7 +557,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, } } } - if refund { + if args.refund { for _, cdr := range cdrs { if rfnd, errRfd := cdrS.refundEventCost(cdr.CostDetails, cdr.RequestType, cdr.ToR); errRfd != nil { @@ -513,7 +569,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, } } } - if ralS { + if args.ralS { for i, cdr := range cdrs { for j, rtCDR := range cdrS.rateCDRWithErr( &CDRWithArgDispatcher{CDR: cdr, @@ -532,7 +588,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, } } } - if store { + if args.store { refundCDRCosts := func() { // will be used to refund all CDRs on errors for _, cdr := range cdrs { // refund what we have charged since duplicates are not allowed if _, errRfd := cdrS.refundEventCost(cdr.CostDetails, @@ -545,7 +601,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, } for _, cdr := range cdrs { if err = cdrS.cdrDb.SetCDR(cdr, false); err != nil { - if err != utils.ErrExists || !reRate { + if err != utils.ErrExists || !args.reRate { refundCDRCosts() return } @@ -560,7 +616,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, } } var partiallyExecuted bool // from here actions are optional and a general error is returned - if export { + if args.export { if err = cdrS.exportCDRs(cdrs); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> error: <%s> exporting CDRs %+v", @@ -568,7 +624,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, partiallyExecuted = true } } - if thdS { + if args.thdS { for _, cgrEv := range cgrEvs { if err = cdrS.thdSProcessEvent(cgrEv); err != nil { utils.Logger.Warning( @@ -578,7 +634,7 @@ func (cdrS *CDRServer) processEvents(evs []*utils.CGREventWithArgDispatcher, } } } - if stS { + if args.stS { for _, cgrEv := range cgrEvs { if err = cdrS.statSProcessEvent(cgrEv); err != nil { utils.Logger.Warning( @@ -660,21 +716,14 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithArgDispatcher, reply *string) (e if cdr.RunID == utils.EmptyString { cdr.RunID = utils.MetaDefault } + procArgs := newCDRProcessingArgs(cdrS.cgrCfg.CdrsCfg(), nil) + procArgs.ralS = !cdr.PreRated + procArgs.chrgS = procArgs.chrgS && !cdr.PreRated cgrEv := &utils.CGREventWithArgDispatcher{ CGREvent: cdr.AsCGREvent(), ArgDispatcher: cdr.ArgDispatcher, } - - if err = cdrS.processEvents([]*utils.CGREventWithArgDispatcher{cgrEv}, - len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0 && !cdr.PreRated, - len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0, - false, - !cdr.PreRated, // rate the CDR if is not PreRated - cdrS.cgrCfg.CdrsCfg().StoreCdrs, - false, // no rerate - len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0, - len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0, - len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0); err != nil { + if err = cdrS.processEvents([]*utils.CGREventWithArgDispatcher{cgrEv}, procArgs); err != nil { return } *reply = utils.OK @@ -716,58 +765,18 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er } // end of RPC caching - // processing options + // Compute processing options based on flags and configuration. var flgs utils.FlagsWithParams if flgs, err = utils.FlagsWithParamsFromSlice(arg.Flags); err != nil { return } - attrS := len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0 - if flgs.HasKey(utils.MetaAttributes) { - attrS = flgs.GetBool(utils.MetaAttributes) - } - store := cdrS.cgrCfg.CdrsCfg().StoreCdrs - if flgs.HasKey(utils.MetaStore) { - store = flgs.GetBool(utils.MetaStore) - } - export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 - if flgs.HasKey(utils.MetaExport) { - export = flgs.GetBool(utils.MetaExport) - } - thdS := len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0 - if flgs.HasKey(utils.MetaThresholds) { - thdS = flgs.GetBool(utils.MetaThresholds) - } - stS := len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0 - if flgs.HasKey(utils.MetaStats) { - stS = flgs.GetBool(utils.MetaStats) - } - chrgS := len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0 // activate charging for the Event - if flgs.HasKey(utils.MetaChargers) { - chrgS = flgs.GetBool(utils.MetaChargers) - } - var ralS bool // activate single rating for the CDR - if flgs.HasKey(utils.MetaRALs) { - ralS = flgs.GetBool(utils.MetaRALs) - } - var reRate bool - if flgs.HasKey(utils.MetaRerate) { - reRate = flgs.GetBool(utils.MetaRerate) - if reRate { - ralS = true - } - } - var refund bool - if flgs.HasKey(utils.MetaRefund) { - refund = flgs.GetBool(utils.MetaRefund) - } - // end of processing options + procArgs := newCDRProcessingArgs(cdrS.cgrCfg.CdrsCfg(), flgs) cgrEv := &utils.CGREventWithArgDispatcher{ CGREvent: &arg.CGREvent, ArgDispatcher: arg.ArgDispatcher, } - if err = cdrS.processEvents([]*utils.CGREventWithArgDispatcher{cgrEv}, chrgS, attrS, refund, - ralS, store, reRate, export, thdS, stS); err != nil { + if err = cdrS.processEvents([]*utils.CGREventWithArgDispatcher{cgrEv}, procArgs); err != nil { return } *reply = utils.OK @@ -895,42 +904,15 @@ func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) { if err != nil { return } + + // Compute processing options based on flags and configuration. var flgs utils.FlagsWithParams if flgs, err = utils.FlagsWithParamsFromSlice(arg.Flags); err != nil { return } - store := cdrS.cgrCfg.CdrsCfg().StoreCdrs - if flgs.HasKey(utils.MetaStore) { - store = flgs.GetBool(utils.MetaStore) - } - export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 - if flgs.HasKey(utils.MetaExport) { - export = flgs.GetBool(utils.MetaExport) - } - thdS := len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0 - if flgs.HasKey(utils.MetaThresholds) { - thdS = flgs.GetBool(utils.MetaThresholds) - } - statS := len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0 - if flgs.HasKey(utils.MetaStatS) { - statS = flgs.GetBool(utils.MetaStatS) - } - chrgS := len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0 - if flgs.HasKey(utils.MetaChargers) { - chrgS = flgs.GetBool(utils.MetaChargers) - } - attrS := len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0 - if flgs.HasKey(utils.MetaAttributes) { - attrS = flgs.GetBool(utils.MetaAttributes) - } - var reRate bool - if flgs.HasKey(utils.MetaRerate) { - reRate = flgs.GetBool(utils.MetaRerate) - } + procArgs := newCDRProcessingArgs(cdrS.cgrCfg.CdrsCfg(), flgs) + procArgs.ralS = true - if chrgS && len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) == 0 { - return utils.NewErrNotConnected(utils.ChargerS) - } cgrEvs := make([]*utils.CGREventWithArgDispatcher, len(cdrs)) for i, cdr := range cdrs { cdr.Cost = -1 // the cost will be recalculated @@ -942,8 +924,7 @@ func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) { ArgDispatcher: arg.ArgDispatcher, } } - if err = cdrS.processEvents(cgrEvs, chrgS, attrS, false, - true, store, reRate, export, thdS, statS); err != nil { + if err = cdrS.processEvents(cgrEvs, procArgs); err != nil { return utils.NewErrServerError(err) } *reply = utils.OK diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 6c2391ada..04ae7a305 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -314,8 +314,5 @@ func (gm *GOBMarshaler) Unmarshal(data []byte, v any) error { // Decide the value of cacheCommit parameter based on transactionID func cacheCommit(transactionID string) bool { - if transactionID == utils.NonTransactional { - return true - } - return false + return transactionID == utils.NonTransactional }