diff --git a/engine/cdrs.go b/engine/cdrs.go index d07476731..0ae92a4e1 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -233,14 +233,43 @@ func (cdrS *CDRServer) getCostFromRater(cdr *CDR) (*CallCost, error) { return cc, nil } -// chrgRaStoReThStaCDR will process the CGREvent with ChargerS subsystem +// processEvent will process a CGREvent with the configured subsystems +func (cdrS *CDRServer) processEvent(cgrEv *utils.CGREvent, + attrS, store, export, thdS, statS bool) (err error) { + if attrS { + if err = cdrS.attrSProcessEvent(cgrEv); err != nil { + return + } + } + if thdS { + go cdrS.thdSProcessEvent(cgrEv) + } + if statS { + go cdrS.statSProcessEvent(cgrEv) + } + var cdr *CDR + if cdr, err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg, + cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil { + return + } + if store { + if err = cdrS.cdrDb.SetCDR(cdr, false); err != nil { + return + } + } + if export { + go cdrS.exportCDRs([]*CDR{cdr}) + } + return +} + +// chrgProcessEvent will process the CGREvent with ChargerS subsystem // it is designed to run in it's own goroutine -func (cdrS *CDRServer) chrgRaStoReThStaCDR(cgrEv *utils.CGREvent, - store, export, thdS, statS *bool) (err error) { +func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREvent, + attrS, store, export, thdS, statS bool) (err error) { var chrgrs []*ChrgSProcessEventReply if err = cdrS.chargerS.Call(utils.ChargerSv1ProcessEvent, - cgrEv, &chrgrs); err != nil && - err.Error() != utils.ErrNotFound.Error() { + cgrEv, &chrgrs); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing CGR event %+v with %s.", utils.CDRs, err.Error(), cgrEv, utils.ChargerS)) @@ -248,16 +277,14 @@ func (cdrS *CDRServer) chrgRaStoReThStaCDR(cgrEv *utils.CGREvent, } var partExec bool for _, chrgr := range chrgrs { - cdr, errCdr := NewMapEvent(chrgr.CGREvent.Event).AsCDR(cdrS.cgrCfg, - cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone) - if errCdr != nil { + if errProc := cdrS.processEvent(chrgr.CGREvent, + attrS, store, export, thdS, statS); errProc != nil { utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s converting CDR event %+v with %s.", - utils.CDRs, errCdr.Error(), cgrEv, utils.ChargerS)) + fmt.Sprintf("<%s> error: %s converting CDR event %+v with %s", + utils.CDRs, errProc.Error(), cgrEv, utils.ChargerS)) partExec = true continue } - cdrS.raStoReThStaCDR(cdr, store, export, thdS, statS) } if partExec { err = utils.ErrPartiallyExecuted @@ -265,55 +292,6 @@ func (cdrS *CDRServer) chrgRaStoReThStaCDR(cgrEv *utils.CGREvent, return } -// raStoReThStaCDR will RAte/STOtore/REplicate/THresholds/STAts the CDR received -// used by both chargerS as well as re-/rating -func (cdrS *CDRServer) raStoReThStaCDR(cdr *CDR, - store, export, thdS, statS *bool) { - ratedCDRs, err := cdrS.rateCDR(cdr) - if err != nil { - cdr.Cost = -1.0 // If there was an error, mark the CDR - cdr.ExtraInfo = err.Error() - ratedCDRs = []*CDR{cdr} - } - for _, rtCDR := range ratedCDRs { - shouldStore := cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs - if store != nil { - shouldStore = *store - } - if shouldStore { // Store CDR - go func(rtCDR *CDR) { - if err := cdrS.cdrDb.SetCDR(rtCDR, true); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s storing CDR %+v.", - utils.CDRs, err.Error(), rtCDR)) - } - }(rtCDR) - } - shouldExport := len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 - if export != nil { - shouldExport = *export - } - if shouldExport { - go cdrS.exportCDRs([]*CDR{rtCDR}) - } - cgrEv := rtCDR.AsCGREvent() - shouldThdS := cdrS.thdS != nil - if thdS != nil { - shouldThdS = *thdS - } - if shouldThdS { - go cdrS.thdSProcessEvent(cgrEv) - } - shouldStatS := cdrS.statS != nil - if statS != nil { - shouldStatS = *statS - } - if shouldStatS { - go cdrS.statSProcessEvent(cgrEv) - } - } -} - // statSProcessEvent will send the event to StatS if the connection is configured func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREvent) (err error) { var rplyEv AttrSProcessEventReply @@ -478,7 +456,8 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) { } if cdrS.chargerS != nil && utils.IsSliceMember([]string{"", utils.MetaRaw}, cdr.RunID) { - go cdrS.chrgRaStoReThStaCDR(cgrEv, nil, nil, nil, nil) + go cdrS.chrgProcessEvent(cgrEv, cdrS.attrS != nil, cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs, + len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0, cdrS.thdS != nil, cdrS.statS != nil) } *reply = utils.OK @@ -519,61 +498,40 @@ func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err er nil, true, utils.NonTransactional) } // end of RPC caching + attrS := cdrS.attrS != nil if arg.AttributeS != nil { attrS = *arg.AttributeS } - cgrEv := &arg.CGREvent - if attrS { - if err = cdrS.attrSProcessEvent(cgrEv); err != nil { - err = utils.NewErrServerError(err) - return - } - } - var rawCDR *CDR - if rawCDR, err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg, - cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil { - err = utils.NewErrServerError(err) - return - } store := cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs if arg.Store != nil { store = *arg.Store } - if store { // Store *raw CDR - if err = cdrS.cdrDb.SetCDR(rawCDR, false); err != nil { - err = utils.NewErrServerError(err) // Cannot store CDR - return - } - } export := len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 if arg.Export != nil { export = *arg.Export } - if export { - cdrS.exportCDRs([]*CDR{rawCDR}) // Replicate raw CDR - } - thrdS := cdrS.thdS != nil + thdS := cdrS.thdS != nil if arg.ThresholdS != nil { - thrdS = *arg.ThresholdS - } - if thrdS { - go cdrS.thdSProcessEvent(cgrEv) + thdS = *arg.ThresholdS } statS := cdrS.statS != nil if arg.StatS != nil { statS = *arg.StatS } - if statS { - go cdrS.statSProcessEvent(cgrEv) - } chrgS := cdrS.chargerS != nil if arg.ChargerS != nil { chrgS = *arg.ChargerS } + cgrEv := &arg.CGREvent + if err = cdrS.processEvent(cgrEv, + attrS, store, export, thdS, statS); err != nil { + err = utils.NewErrServerError(err) + return + } if chrgS { - go cdrS.chrgRaStoReThStaCDR(cgrEv, - arg.Store, arg.Export, arg.ThresholdS, arg.StatS) + go cdrS.chrgProcessEvent(cgrEv, + attrS, store, export, thdS, statS) } *reply = utils.OK return nil @@ -692,18 +650,31 @@ func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) { if err != nil { return err } + store := cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs + if arg.Store != nil { + store = *arg.Store + } + export := len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 + if arg.Export != nil { + export = *arg.Export + } + thdS := cdrS.thdS != nil + if arg.ThresholdS != nil { + thdS = *arg.ThresholdS + } + statS := cdrS.statS != nil + if arg.StatS != nil { + statS = *arg.StatS + } for _, cdr := range cdrs { if arg.ChargerS != nil && *arg.ChargerS { if cdrS.chargerS == nil { return utils.NewErrNotConnected(utils.ChargerS) } - if err = cdrS.chrgRaStoReThStaCDR(cdr.AsCGREvent(), - arg.Store, arg.Export, arg.ThresholdS, arg.StatS); err != nil { + if err = cdrS.chrgProcessEvent(cdr.AsCGREvent(), + false, store, export, thdS, statS); err != nil { return utils.NewErrServerError(err) } - } else { - cdrS.raStoReThStaCDR(cdr, arg.Store, - arg.Export, arg.ThresholdS, arg.StatS) } } *reply = utils.OK diff --git a/engine/mapevent.go b/engine/mapevent.go index 723120f79..881ea7d45 100644 --- a/engine/mapevent.go +++ b/engine/mapevent.go @@ -263,7 +263,13 @@ func (me MapEvent) AsCDR(cfg *config.CGRConfig, tnt, tmz string) (cdr *CDR, err if cdr.Cost, err = utils.IfaceAsFloat64(v); err != nil { return nil, err } - case utils.CostDetails, utils.ExtraInfo, utils.OrderID: + case utils.CostDetails: + var canCast bool + if cdr.CostDetails, canCast = v.(*EventCost); !canCast { + return nil, fmt.Errorf("cannot cast field: %+v to *EventCost", v) + } + case utils.ExtraInfo, utils.OrderID: + } } if cfg != nil { diff --git a/engine/mapevent_test.go b/engine/mapevent_test.go index bc879c441..15bdc7bd4 100644 --- a/engine/mapevent_test.go +++ b/engine/mapevent_test.go @@ -400,6 +400,182 @@ func TestMapEventAsCDR(t *testing.T) { } else if !reflect.DeepEqual(expected, rply) { t.Errorf("Expecting %+v, received: %+v", expected, rply) } + ec1 := &EventCost{ + CGRID: "164b0422fdc6a5117031b427439482c6a4f90e41", + RunID: utils.META_DEFAULT, + StartTime: time.Date(2017, 1, 9, 16, 18, 21, 0, time.UTC), + Charges: []*ChargingInterval{ + &ChargingInterval{ + RatingID: "c1a5ab9", + Increments: []*ChargingIncrement{ + &ChargingIncrement{ + Usage: time.Duration(0), + Cost: 0.1, + AccountingID: "9bdad10", + CompressFactor: 1, + }, + &ChargingIncrement{ + Usage: time.Duration(1 * time.Second), + Cost: 0, + AccountingID: "3455b83", + CompressFactor: 10, + }, + &ChargingIncrement{ + Usage: time.Duration(10 * time.Second), + Cost: 0.01, + AccountingID: "a012888", + CompressFactor: 2, + }, + &ChargingIncrement{ + Usage: time.Duration(1 * time.Second), + Cost: 0.005, + AccountingID: "44d6c02", + CompressFactor: 30, + }, + }, + CompressFactor: 1, + }, + &ChargingInterval{ + RatingID: "c1a5ab9", + Increments: []*ChargingIncrement{ + &ChargingIncrement{ + Usage: time.Duration(1 * time.Second), + Cost: 0.01, + AccountingID: "a012888", + CompressFactor: 60, + }, + }, + CompressFactor: 4, + }, + &ChargingInterval{ + RatingID: "c1a5ab9", + Increments: []*ChargingIncrement{ + &ChargingIncrement{ + Usage: time.Duration(1 * time.Second), + Cost: 0, + AccountingID: "3455b83", + CompressFactor: 10, + }, + &ChargingIncrement{ + Usage: time.Duration(10 * time.Second), + Cost: 0.01, + AccountingID: "a012888", + CompressFactor: 2, + }, + &ChargingIncrement{ + Usage: time.Duration(1 * time.Second), + Cost: 0.005, + AccountingID: "44d6c02", + CompressFactor: 30, + }, + }, + CompressFactor: 5, + }, + }, + AccountSummary: &AccountSummary{ + Tenant: "cgrates.org", + ID: "dan", + BalanceSummaries: []*BalanceSummary{ + &BalanceSummary{ + Type: "*monetary", + Value: 50, + Disabled: false}, + &BalanceSummary{ + ID: "4b8b53d7-c1a1-4159-b845-4623a00a0165", + Type: "*monetary", + Value: 25, + Disabled: false}, + &BalanceSummary{ + Type: "*voice", + Value: 200, + Disabled: false, + }, + }, + AllowNegative: false, + Disabled: false, + }, + Rating: Rating{ + "3cd6425": &RatingUnit{ + RoundingMethod: "*up", + RoundingDecimals: 5, + TimingID: "7f324ab", + RatesID: "4910ecf", + RatingFiltersID: "43e77dc", + }, + "c1a5ab9": &RatingUnit{ + ConnectFee: 0.1, + RoundingMethod: "*up", + RoundingDecimals: 5, + TimingID: "7f324ab", + RatesID: "ec1a177", + RatingFiltersID: "43e77dc", + }, + }, + Accounting: Accounting{ + "a012888": &BalanceCharge{ + AccountID: "cgrates.org:dan", + BalanceUUID: "8c54a9e9-d610-4c82-bcb5-a315b9a65010", + Units: 0.01, + }, + "188bfa6": &BalanceCharge{ + AccountID: "cgrates.org:dan", + BalanceUUID: "8c54a9e9-d610-4c82-bcb5-a315b9a65010", + Units: 0.005, + }, + "9bdad10": &BalanceCharge{ + AccountID: "cgrates.org:dan", + BalanceUUID: "8c54a9e9-d610-4c82-bcb5-a315b9a65010", + Units: 0.1, + }, + "44d6c02": &BalanceCharge{ + AccountID: "cgrates.org:dan", + BalanceUUID: "7a54a9e9-d610-4c82-bcb5-a315b9a65010", + RatingID: "3cd6425", + Units: 1, + ExtraChargeID: "188bfa6", + }, + "3455b83": &BalanceCharge{ + AccountID: "cgrates.org:dan", + BalanceUUID: "9d54a9e9-d610-4c82-bcb5-a315b9a65089", + Units: 1, + ExtraChargeID: "*none", + }, + }, + RatingFilters: RatingFilters{ + "43e77dc": RatingMatchedFilters{ + "DestinationID": "GERMANY", + "DestinationPrefix": "+49", + "RatingPlanID": "RPL_RETAIL1", + "Subject": "*out:cgrates.org:call:*any", + }, + }, + Rates: ChargedRates{ + "ec1a177": RateGroups{ + &Rate{ + GroupIntervalStart: time.Duration(0), + Value: 0.01, + RateIncrement: time.Duration(1 * time.Minute), + RateUnit: time.Duration(1 * time.Second)}, + }, + "4910ecf": RateGroups{ + &Rate{ + GroupIntervalStart: time.Duration(0), + Value: 0.005, + RateIncrement: time.Duration(1 * time.Second), + RateUnit: time.Duration(1 * time.Second)}, + &Rate{ + GroupIntervalStart: time.Duration(60 * time.Second), + Value: 0.005, + RateIncrement: time.Duration(1 * time.Second), + RateUnit: time.Duration(1 * time.Second)}, + }, + }, + Timings: ChargedTimings{ + "7f324ab": &ChargedTiming{ + StartTime: "00:00:00", + }, + }, + } me = MapEvent{ "ExtraField1": 5, "Source": 1001, @@ -409,6 +585,7 @@ func TestMapEventAsCDR(t *testing.T) { "Usage": "42s", "PreRated": "True", "Cost": "42.3", + "CostDetails": ec1, } expected = &CDR{ CGRID: "da39a3ee5e6b4b0d3255bfef95601890afd80709", @@ -427,6 +604,7 @@ func TestMapEventAsCDR(t *testing.T) { ToR: utils.VOICE, RequestType: cfg.GeneralCfg().DefaultReqType, Category: cfg.GeneralCfg().DefaultCategory, + CostDetails: ec1, } if rply, err := me.AsCDR(cfg, "itsyscom.com", utils.EmptyString); err != nil { t.Error(err) diff --git a/utils/cgrevent.go b/utils/cgrevent.go index 041e26ee3..398b80d1a 100644 --- a/utils/cgrevent.go +++ b/utils/cgrevent.go @@ -171,3 +171,12 @@ func (ev *CGREvent) RemFldsWithPrefix(prfx string) { } } } + +// CGREvents is a group of generic events processed by CGR services +// ie: derived CDRs +type CGREvents struct { + Tenant string + ID string + Time *time.Time // event time + Events []map[string]interface{} +}