diff --git a/engine/cdrs.go b/engine/cdrs.go index aa73a0b6f..89471b1f8 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -275,6 +275,7 @@ func (cdrS *CDRServer) attrStoExpThdStat(cgrEv *utils.CGREventWithArgDispatcher, return } +// rateCDRWithErr rates a CDR including errors func (cdrS *CDRServer) rateCDRWithErr(cdr *CDRWithArgDispatcher) (ratedCDRs []*CDR) { var err error ratedCDRs, err = cdrS.rateCDR(cdr) @@ -286,20 +287,21 @@ func (cdrS *CDRServer) rateCDRWithErr(cdr *CDRWithArgDispatcher) (ratedCDRs []*C return } -// refundCDR will refund the EventCost within the CDR -func (cdrS *CDRServer) refundCDR(cdr *CDR) (err error) { - if !AccountableRequestTypes.HasField(cdr.RequestType) || cdr.CostDetails == nil { +// refundEventCost will refund the EventCost using RefundIncrements +func (cdrS *CDRServer) refundEventCost(ec *EventCost, reqType, tor string) (err error) { + if ec == nil || !utils.AccountableRequestTypes.Has(reqType) { return // non refundable } - cd := cdr.CostDetails.AsRefundIncrements(cdr.ToR) + cd := ec.AsRefundIncrements(tor) if cd == nil || len(cd.Increments) == 0 { return } - var acnt engine.Account + var acnt Account if err = cdrS.rals.Call(utils.ResponderRefundIncrements, - &engine.CallDescriptorWithArgDispatcher{CallDescriptor: cd}, &acnt); err != nil { + &CallDescriptorWithArgDispatcher{CallDescriptor: cd}, &acnt); err != nil { return } + return } // chrgProcessEvent will process the CGREvent with ChargerS subsystem @@ -325,7 +327,8 @@ func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREventWithArgDispatcher, partExec = true continue } - for _, rtCDR := range cdrS.rateCDRWithErr(&CDRWithArgDispatcher{CDR: cdr, ArgDispatcher: cgrEv.ArgDispatcher}) { + for _, rtCDR := range cdrS.rateCDRWithErr( + &CDRWithArgDispatcher{CDR: cdr, ArgDispatcher: cgrEv.ArgDispatcher}) { arg := &utils.CGREventWithArgDispatcher{ CGREvent: rtCDR.AsCGREvent(), ArgDispatcher: cgrEv.ArgDispatcher, @@ -346,6 +349,29 @@ func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREventWithArgDispatcher, return } +// chrgrSProcessEvent forks CGREventWithArgDispatcher into multiples based on matching ChargerS profiles +func (cdrS *CDRServer) chrgrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (cgrEvs []*utils.CGREventWithArgDispatcher, err error) { + var chrgrs []*ChrgSProcessEventReply + if err = cdrS.chargerS.Call(utils.ChargerSv1ProcessEvent, + cgrEv, &chrgrs); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing CGR event %+v with %s.", + utils.CDRs, err.Error(), cgrEv, utils.ChargerS)) + return + } + if len(chrgrs) == 0 { + return + } + cgrEvs = make([]*utils.CGREventWithArgDispatcher, len(chrgrs)) + for i, cgrPrfl := range chrgrs { + cgrEvs[i] = &utils.CGREventWithArgDispatcher{ + cgrPrfl.CGREvent, + cgrEv.ArgDispatcher, + } + } + return +} + // statSProcessEvent will send the event to StatS if the connection is configured func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (err error) { var rplyEv AttrSProcessEventReply @@ -369,40 +395,36 @@ func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) return } -// thdSProcessEvent will send the event to ThresholdS if the connection is configured -func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) { +// thdSProcessEvent will send the event to ThresholdS +func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (err error) { var tIDs []string - // we clone the CGREvent so we can add EventType without it to be propagated + // we clone the CGREvent so we can add EventType without being propagated thArgs := &ArgsProcessEvent{CGREvent: cgrEv.CGREvent.Clone()} thArgs.CGREvent.Event[utils.EventType] = utils.CDR if cgrEv.ArgDispatcher != nil { thArgs.ArgDispatcher = cgrEv.ArgDispatcher } - if err := cdrS.thdS.Call(utils.ThresholdSv1ProcessEvent, + if err = cdrS.thdS.Call(utils.ThresholdSv1ProcessEvent, thArgs, &tIDs); err != nil && - err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s processing CDR event %+v with thdS.", - utils.CDRs, err.Error(), cgrEv)) - return + err.Error() == utils.ErrNotFound.Error() { + err = nil // NotFound is not considered error } + return } -// statSProcessEvent will send the event to StatS if the connection is configured -func (cdrS *CDRServer) statSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) { +// statSProcessEvent will send the event to StatS +func (cdrS *CDRServer) statSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (err error) { var reply []string statArgs := &StatsArgsProcessEvent{CGREvent: cgrEv.CGREvent} if cgrEv.ArgDispatcher != nil { statArgs.ArgDispatcher = cgrEv.ArgDispatcher } - if err := cdrS.statS.Call(utils.StatSv1ProcessEvent, + if err = cdrS.statS.Call(utils.StatSv1ProcessEvent, statArgs, &reply); err != nil && - err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s.", - utils.CDRs, err.Error(), cgrEv, utils.StatS)) - return + err.Error() == utils.ErrNotFound.Error() { + err = nil // NotFound is not considered error } + return } // exportCDRs will export the CDRs received @@ -426,6 +448,122 @@ func (cdrS *CDRServer) exportCDRs(cdrs []*CDR) (err error) { return } +// processEvent processes a CGREvent based on arguments +func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithArgDispatcher, + chrgS, attrS, ralS, store, reRate, export, thdS, stS bool) (err error) { + var cgrEvs []*utils.CGREventWithArgDispatcher + if chrgS { + if cgrEvs, err = cdrS.chrgrSProcessEvent(ev); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: <%s> processing event %+v with %s", + utils.CDRs, err.Error(), ev, utils.ChargerS)) + err = utils.ErrPartiallyExecuted + return + } + } else { // ChargerS not requested, charge the original event + cgrEvs = []*utils.CGREventWithArgDispatcher{ev} + } + if attrS { + for _, cgrEv := range cgrEvs { + if err = cdrS.attrSProcessEvent(cgrEv); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: <%s> processing event %+v with %s", + utils.CDRs, err.Error(), cgrEv, utils.AttributeS)) + err = utils.ErrPartiallyExecuted + return + } + } + } + cdrs := make([]*CDR, len(cgrEvs)) + if ralS || store || reRate || export { + for i, cgrEv := range cgrEvs { + if cdrs[i], err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg, + cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: <%s> converting event %+v to CDR", + utils.CDRs, err.Error(), cgrEv)) + err = utils.ErrPartiallyExecuted + return + } + } + } + if ralS { + for i, cdr := range cdrs { + for j, rtCDR := range cdrS.rateCDRWithErr( + &CDRWithArgDispatcher{CDR: cdr, + ArgDispatcher: ev.ArgDispatcher}) { + cgrEv := &utils.CGREventWithArgDispatcher{ + CGREvent: rtCDR.AsCGREvent(), + ArgDispatcher: ev.ArgDispatcher, + } + if j == 0 { // the first CDR will replace the events we got already as a small optimization + cdrs[i] = rtCDR + cgrEvs[i] = cgrEv + } else { + cdrs = append(cdrs, cdr) + cgrEvs = append(cgrEvs, cgrEv) + } + } + } + } + if store { + for _, cdr := range cdrs { + if err = cdrS.cdrDb.SetCDR(cdr, false); err != nil { + if err.Error() == "duplicate" && reRate { // fix error name here + if err = cdrS.refundEventCost(cdr.CostDetails, + cdr.RequestType, cdr.ToR); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: <%s> refunding CDR %+v", + utils.CDRs, err.Error(), cdr)) + err = utils.ErrPartiallyExecuted + return + } + // after refund we can force update + if err = cdrS.cdrDb.SetCDR(cdr, true); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: <%s> updating CDR %+v", + utils.CDRs, err.Error(), cdr)) + err = utils.ErrPartiallyExecuted + return + } + } + return + } + } + } + var partiallyExecuted bool // from here actions are optional and a general error is returned + if export { + if err = cdrS.exportCDRs(cdrs); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: <%s> exporting CDRs %+v", + utils.CDRs, err.Error(), cdrs)) + + } + } + if thdS { + for _, cgrEv := range cgrEvs { + if err = cdrS.thdSProcessEvent(cgrEv); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: <%s> processing event %+v with %s", + utils.CDRs, err.Error(), cgrEv, utils.ThresholdS)) + } + } + } + if stS { + for _, cgrEv := range cgrEvs { + if err = cdrS.statSProcessEvent(cgrEv); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: <%s> processing event %+v with %s", + utils.CDRs, err.Error(), cgrEv, utils.StatS)) + } + } + } + if partiallyExecuted { + err = utils.ErrPartiallyExecuted + } + return +} + // Call implements the rpcclient.RpcClientConnection interface func (cdrS *CDRServer) Call(serviceMethod string, args interface{}, reply interface{}) error { parts := strings.Split(serviceMethod, ".") @@ -539,14 +677,8 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithArgDispatcher, reply *string) (e } type ArgV1ProcessEvent struct { + Flags []string utils.CGREvent - AttributeS *bool // control AttributeS processing - RALs *bool // control if we rate the event - ChargerS *bool // control ChargerS processing - Store *bool // control storing of the CDR - Export *bool // control online exports for the CDR - ThresholdS *bool // control ThresholdS - StatS *bool // control sending the CDR to StatS for aggregation *utils.ArgDispatcher } @@ -575,34 +707,41 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er } // end of RPC caching + // processing options + var flgs utils.FlagsWithParams + if flgs, err = utils.FlagsWithParamsFromSlice(arg.Flags); err != nil { + return + } attrS := cdrS.attrS != nil - if arg.AttributeS != nil { - attrS = *arg.AttributeS + if flgs.HasKey(utils.MetaAttributes) { + attrS = flgs.GetBool(utils.MetaAttributes) } store := cdrS.cgrCfg.CdrsCfg().StoreCdrs - if arg.Store != nil { - store = *arg.Store + if flgs.HasKey(utils.MetaStore) { + store = flgs.GetBool(utils.MetaStore) } export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 - if arg.Export != nil { - export = *arg.Export + if flgs.HasKey(utils.MetaExport) { + export = flgs.GetBool(utils.MetaExport) } thdS := cdrS.thdS != nil - if arg.ThresholdS != nil { - thdS = *arg.ThresholdS + if flgs.HasKey(utils.MetaThresholds) { + thdS = flgs.GetBool(utils.MetaThresholds) } statS := cdrS.statS != nil - if arg.StatS != nil { - statS = *arg.StatS + if flgs.HasKey(utils.MetaStats) { + statS = flgs.GetBool(utils.MetaStats) } - chrgS := cdrS.chargerS != nil - if arg.ChargerS != nil { - chrgS = *arg.ChargerS + chrgS := cdrS.chargerS != nil // activate charging for the Event + if flgs.HasKey(utils.MetaChargers) { + chrgS = flgs.GetBool(utils.MetaChargers) } - var ralS bool // by default we don't extra charge the received CDR - if arg.RALs != nil { - ralS = *arg.RALs + var ralS bool // activate single rating for the CDR + if flgs.HasKey(utils.MetaRALs) { + ralS = flgs.GetBool(utils.MetaRALs) } + // end of processing options + cgrEv := &utils.CGREventWithArgDispatcher{ CGREvent: &arg.CGREvent, } diff --git a/engine/eventcost.go b/engine/eventcost.go index 10d174b09..8c4e85d65 100644 --- a/engine/eventcost.go +++ b/engine/eventcost.go @@ -288,7 +288,7 @@ func (ec *EventCost) AsRefundIncrements(tor string) (cd *CallDescriptor) { blncSmry := ec.AccountSummary.BalanceSummaries.BalanceSummaryWithUUD(ec.Accounting[cIcrm.AccountingID].BalanceUUID) if blncSmry.Type == utils.MONETARY { cd.Increments[iIdx].BalanceInfo.Monetary = &MonetaryInfo{UUID: blncSmry.UUID} - } else if NonMonetaryBalances.HasField(blncSmry.Type) { + } else if utils.NonMonetaryBalances.Has(blncSmry.Type) { cd.Increments[iIdx].BalanceInfo.Unit = &UnitInfo{UUID: blncSmry.UUID} } if ec.Accounting[cIcrm.AccountingID].ExtraChargeID == utils.META_NONE || @@ -301,7 +301,7 @@ func (ec *EventCost) AsRefundIncrements(tor string) (cd *CallDescriptor) { ec.Accounting[ec.Accounting[cIcrm.AccountingID].ExtraChargeID].BalanceUUID) if extraSmry.Type == utils.MONETARY { cd.Increments[iIdx].BalanceInfo.Monetary = &MonetaryInfo{UUID: extraSmry.UUID} - } else if NonMonetaryBalances.HasField(blncSmry.Type) { + } else if utils.NonMonetaryBalances.Has(blncSmry.Type) { cd.Increments[iIdx].BalanceInfo.Unit = &UnitInfo{UUID: extraSmry.UUID} } } diff --git a/engine/libeventcost.go b/engine/libeventcost.go index 22bd28328..5d9c48109 100644 --- a/engine/libeventcost.go +++ b/engine/libeventcost.go @@ -27,20 +27,6 @@ import ( "github.com/cgrates/cgrates/utils" ) -// NonMonetaryBalances are types of balances which are not handled as monetary -var NonMonetaryBalances = MapEvent{ - utils.VOICE: struct{}{}, - utils.SMS: struct{}{}, - utils.DATA: struct{}{}, - utils.GENERIC: struct{}{}, -} - -var AccountableRequestTypes = MapEvent{ - utils.MetaPrepaid: struct{}{}, - utils.MetaPostpaid: struct{}{}, - utils.MetaPseudoprepaid: struct{}{}, -} - // ChargingInterval represents one interval out of Usage providing charging info // eg: PEAK vs OFFPEAK type ChargingInterval struct { diff --git a/sessions/sessions.go b/sessions/sessions.go index 3efbd4902..635e44aee 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -397,14 +397,14 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs var reply string for _, cgrEv := range cgrEvs { argsProc := &engine.ArgV1ProcessEvent{ + Flags: []string{fmt.Sprintf("%s:false", utils.MetaChargers), + fmt.Sprintf("%s:false", utils.MetaAttributes)}, CGREvent: *cgrEv, - ChargerS: utils.BoolPointer(false), - AttributeS: utils.BoolPointer(false), ArgDispatcher: s.ArgDispatcher, } if unratedReqs.HasField( // order additional rating for unrated request types engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RequestType)) { - argsProc.RALs = utils.BoolPointer(true) + argsProc.Flags = append(argsProc.Flags, fmt.Sprintf("%s:true", utils.MetaRALs)) } if err = sS.cdrS.Call(utils.CDRsV1ProcessEvent, argsProc, &reply); err != nil { utils.Logger.Warning( @@ -2637,14 +2637,14 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, var withErrors bool for _, cgrEv := range cgrEvs { argsProc := &engine.ArgV1ProcessEvent{ + Flags: []string{fmt.Sprintf("%s:false", utils.MetaChargers), + fmt.Sprintf("%s:false", utils.MetaAttributes)}, CGREvent: *cgrEv, - ChargerS: utils.BoolPointer(false), - AttributeS: utils.BoolPointer(false), ArgDispatcher: cgrEvWithArgDisp.ArgDispatcher, } if mp := engine.MapEvent(cgrEv.Event); mp.GetStringIgnoreErrors(utils.RunID) != utils.MetaRaw && // check if is *raw unratedReqs.HasField(mp.GetStringIgnoreErrors(utils.RequestType)) { // order additional rating for unrated request types - argsProc.RALs = utils.BoolPointer(true) + argsProc.Flags = append(argsProc.Flags, fmt.Sprintf("%s:true", utils.MetaRALs)) } if err = sS.cdrS.Call(utils.CDRsV1ProcessEvent, argsProc, rply); err != nil { @@ -2990,8 +2990,8 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, dbtItvl := sS.cgrCfg.SessionSCfg().DebitInterval //convert from Flags []string to utils.FlagsWithParams - argsFlagsWithParams, err := utils.FlagsWithParamsFromSlice(args.Flags) - if err != nil { + var argsFlagsWithParams utils.FlagsWithParams + if argsFlagsWithParams, err = utils.FlagsWithParamsFromSlice(args.Flags); err != nil { return } // check for *attribute diff --git a/utils/consts.go b/utils/consts.go index 5b8e10f7a..220926923 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -102,6 +102,12 @@ var ( DispatcherProfilePrefix: CacheDispatcherFilterIndexes, } CacheIndexesToPrefix map[string]string // will be built on init + + // NonMonetaryBalances are types of balances which are not handled as monetary + NonMonetaryBalances = NewStringSet([]string{VOICE, SMS, DATA, GENERIC}) + + // AccountableRequestTypes are the ones handled by Accounting subsystem + AccountableRequestTypes = NewStringSet([]string{META_PREPAID, META_POSTPAID, META_PSEUDOPREPAID}) ) const ( @@ -508,6 +514,7 @@ const ( MetaRemove = "*remove" MetaStore = "*store" MetaClear = "*clear" + MetaExport = "*export" LoadIDs = "load_ids" DNSAgent = "DNSAgent" TLSNoCaps = "tls" diff --git a/utils/map.go b/utils/map.go index 68d471972..59ca21736 100644 --- a/utils/map.go +++ b/utils/map.go @@ -288,3 +288,15 @@ func (fWp FlagsWithParams) SliceFlags() (sls []string) { } return } + +// GetBool returns the flag as boolean +func (fWp FlagsWithParams) GetBool(key string) (b bool) { + var v interface{} + if _, b = fWp[key]; !b { + return // not present means false + } + if v.(string) != "" && v.(string) != "true" { + return // not empty nor true means false again + } + return true +} diff --git a/utils/set.go b/utils/set.go index 15d8245a4..2995691ba 100644 --- a/utils/set.go +++ b/utils/set.go @@ -24,6 +24,7 @@ func NewStringSet(dataSlice []string) (s *StringSet) { return s } +// StringSet will manage data within a set type StringSet struct { data map[string]struct{} }