Redesign of CDRs.V2ProcessCDR method

This commit is contained in:
DanB
2019-03-20 14:17:37 +01:00
parent eafaf12697
commit d01e9cf4ed
4 changed files with 263 additions and 99 deletions

View File

@@ -233,14 +233,43 @@ func (cdrS *CDRServer) getCostFromRater(cdr *CDR) (*CallCost, error) {
return cc, nil
}
// chrgRaStoReThStaCDR will process the CGREvent with ChargerS subsystem
// processEvent will process a CGREvent with the configured subsystems
func (cdrS *CDRServer) processEvent(cgrEv *utils.CGREvent,
attrS, store, export, thdS, statS bool) (err error) {
if attrS {
if err = cdrS.attrSProcessEvent(cgrEv); err != nil {
return
}
}
if thdS {
go cdrS.thdSProcessEvent(cgrEv)
}
if statS {
go cdrS.statSProcessEvent(cgrEv)
}
var cdr *CDR
if cdr, err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg,
cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil {
return
}
if store {
if err = cdrS.cdrDb.SetCDR(cdr, false); err != nil {
return
}
}
if export {
go cdrS.exportCDRs([]*CDR{cdr})
}
return
}
// chrgProcessEvent will process the CGREvent with ChargerS subsystem
// it is designed to run in it's own goroutine
func (cdrS *CDRServer) chrgRaStoReThStaCDR(cgrEv *utils.CGREvent,
store, export, thdS, statS *bool) (err error) {
func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREvent,
attrS, store, export, thdS, statS bool) (err error) {
var chrgrs []*ChrgSProcessEventReply
if err = cdrS.chargerS.Call(utils.ChargerSv1ProcessEvent,
cgrEv, &chrgrs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
cgrEv, &chrgrs); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing CGR event %+v with %s.",
utils.CDRs, err.Error(), cgrEv, utils.ChargerS))
@@ -248,16 +277,14 @@ func (cdrS *CDRServer) chrgRaStoReThStaCDR(cgrEv *utils.CGREvent,
}
var partExec bool
for _, chrgr := range chrgrs {
cdr, errCdr := NewMapEvent(chrgr.CGREvent.Event).AsCDR(cdrS.cgrCfg,
cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if errCdr != nil {
if errProc := cdrS.processEvent(chrgr.CGREvent,
attrS, store, export, thdS, statS); errProc != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s converting CDR event %+v with %s.",
utils.CDRs, errCdr.Error(), cgrEv, utils.ChargerS))
fmt.Sprintf("<%s> error: %s converting CDR event %+v with %s",
utils.CDRs, errProc.Error(), cgrEv, utils.ChargerS))
partExec = true
continue
}
cdrS.raStoReThStaCDR(cdr, store, export, thdS, statS)
}
if partExec {
err = utils.ErrPartiallyExecuted
@@ -265,55 +292,6 @@ func (cdrS *CDRServer) chrgRaStoReThStaCDR(cgrEv *utils.CGREvent,
return
}
// raStoReThStaCDR will RAte/STOtore/REplicate/THresholds/STAts the CDR received
// used by both chargerS as well as re-/rating
func (cdrS *CDRServer) raStoReThStaCDR(cdr *CDR,
store, export, thdS, statS *bool) {
ratedCDRs, err := cdrS.rateCDR(cdr)
if err != nil {
cdr.Cost = -1.0 // If there was an error, mark the CDR
cdr.ExtraInfo = err.Error()
ratedCDRs = []*CDR{cdr}
}
for _, rtCDR := range ratedCDRs {
shouldStore := cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs
if store != nil {
shouldStore = *store
}
if shouldStore { // Store CDR
go func(rtCDR *CDR) {
if err := cdrS.cdrDb.SetCDR(rtCDR, true); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s storing CDR %+v.",
utils.CDRs, err.Error(), rtCDR))
}
}(rtCDR)
}
shouldExport := len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0
if export != nil {
shouldExport = *export
}
if shouldExport {
go cdrS.exportCDRs([]*CDR{rtCDR})
}
cgrEv := rtCDR.AsCGREvent()
shouldThdS := cdrS.thdS != nil
if thdS != nil {
shouldThdS = *thdS
}
if shouldThdS {
go cdrS.thdSProcessEvent(cgrEv)
}
shouldStatS := cdrS.statS != nil
if statS != nil {
shouldStatS = *statS
}
if shouldStatS {
go cdrS.statSProcessEvent(cgrEv)
}
}
}
// statSProcessEvent will send the event to StatS if the connection is configured
func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREvent) (err error) {
var rplyEv AttrSProcessEventReply
@@ -478,7 +456,8 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) {
}
if cdrS.chargerS != nil &&
utils.IsSliceMember([]string{"", utils.MetaRaw}, cdr.RunID) {
go cdrS.chrgRaStoReThStaCDR(cgrEv, nil, nil, nil, nil)
go cdrS.chrgProcessEvent(cgrEv, cdrS.attrS != nil, cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs,
len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0, cdrS.thdS != nil, cdrS.statS != nil)
}
*reply = utils.OK
@@ -519,61 +498,40 @@ func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err er
nil, true, utils.NonTransactional)
}
// end of RPC caching
attrS := cdrS.attrS != nil
if arg.AttributeS != nil {
attrS = *arg.AttributeS
}
cgrEv := &arg.CGREvent
if attrS {
if err = cdrS.attrSProcessEvent(cgrEv); err != nil {
err = utils.NewErrServerError(err)
return
}
}
var rawCDR *CDR
if rawCDR, err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg,
cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil {
err = utils.NewErrServerError(err)
return
}
store := cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs
if arg.Store != nil {
store = *arg.Store
}
if store { // Store *raw CDR
if err = cdrS.cdrDb.SetCDR(rawCDR, false); err != nil {
err = utils.NewErrServerError(err) // Cannot store CDR
return
}
}
export := len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0
if arg.Export != nil {
export = *arg.Export
}
if export {
cdrS.exportCDRs([]*CDR{rawCDR}) // Replicate raw CDR
}
thrdS := cdrS.thdS != nil
thdS := cdrS.thdS != nil
if arg.ThresholdS != nil {
thrdS = *arg.ThresholdS
}
if thrdS {
go cdrS.thdSProcessEvent(cgrEv)
thdS = *arg.ThresholdS
}
statS := cdrS.statS != nil
if arg.StatS != nil {
statS = *arg.StatS
}
if statS {
go cdrS.statSProcessEvent(cgrEv)
}
chrgS := cdrS.chargerS != nil
if arg.ChargerS != nil {
chrgS = *arg.ChargerS
}
cgrEv := &arg.CGREvent
if err = cdrS.processEvent(cgrEv,
attrS, store, export, thdS, statS); err != nil {
err = utils.NewErrServerError(err)
return
}
if chrgS {
go cdrS.chrgRaStoReThStaCDR(cgrEv,
arg.Store, arg.Export, arg.ThresholdS, arg.StatS)
go cdrS.chrgProcessEvent(cgrEv,
attrS, store, export, thdS, statS)
}
*reply = utils.OK
return nil
@@ -692,18 +650,31 @@ func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) {
if err != nil {
return err
}
store := cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs
if arg.Store != nil {
store = *arg.Store
}
export := len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0
if arg.Export != nil {
export = *arg.Export
}
thdS := cdrS.thdS != nil
if arg.ThresholdS != nil {
thdS = *arg.ThresholdS
}
statS := cdrS.statS != nil
if arg.StatS != nil {
statS = *arg.StatS
}
for _, cdr := range cdrs {
if arg.ChargerS != nil && *arg.ChargerS {
if cdrS.chargerS == nil {
return utils.NewErrNotConnected(utils.ChargerS)
}
if err = cdrS.chrgRaStoReThStaCDR(cdr.AsCGREvent(),
arg.Store, arg.Export, arg.ThresholdS, arg.StatS); err != nil {
if err = cdrS.chrgProcessEvent(cdr.AsCGREvent(),
false, store, export, thdS, statS); err != nil {
return utils.NewErrServerError(err)
}
} else {
cdrS.raStoReThStaCDR(cdr, arg.Store,
arg.Export, arg.ThresholdS, arg.StatS)
}
}
*reply = utils.OK

View File

@@ -263,7 +263,13 @@ func (me MapEvent) AsCDR(cfg *config.CGRConfig, tnt, tmz string) (cdr *CDR, err
if cdr.Cost, err = utils.IfaceAsFloat64(v); err != nil {
return nil, err
}
case utils.CostDetails, utils.ExtraInfo, utils.OrderID:
case utils.CostDetails:
var canCast bool
if cdr.CostDetails, canCast = v.(*EventCost); !canCast {
return nil, fmt.Errorf("cannot cast field: %+v to *EventCost", v)
}
case utils.ExtraInfo, utils.OrderID:
}
}
if cfg != nil {

View File

@@ -400,6 +400,182 @@ func TestMapEventAsCDR(t *testing.T) {
} else if !reflect.DeepEqual(expected, rply) {
t.Errorf("Expecting %+v, received: %+v", expected, rply)
}
ec1 := &EventCost{
CGRID: "164b0422fdc6a5117031b427439482c6a4f90e41",
RunID: utils.META_DEFAULT,
StartTime: time.Date(2017, 1, 9, 16, 18, 21, 0, time.UTC),
Charges: []*ChargingInterval{
&ChargingInterval{
RatingID: "c1a5ab9",
Increments: []*ChargingIncrement{
&ChargingIncrement{
Usage: time.Duration(0),
Cost: 0.1,
AccountingID: "9bdad10",
CompressFactor: 1,
},
&ChargingIncrement{
Usage: time.Duration(1 * time.Second),
Cost: 0,
AccountingID: "3455b83",
CompressFactor: 10,
},
&ChargingIncrement{
Usage: time.Duration(10 * time.Second),
Cost: 0.01,
AccountingID: "a012888",
CompressFactor: 2,
},
&ChargingIncrement{
Usage: time.Duration(1 * time.Second),
Cost: 0.005,
AccountingID: "44d6c02",
CompressFactor: 30,
},
},
CompressFactor: 1,
},
&ChargingInterval{
RatingID: "c1a5ab9",
Increments: []*ChargingIncrement{
&ChargingIncrement{
Usage: time.Duration(1 * time.Second),
Cost: 0.01,
AccountingID: "a012888",
CompressFactor: 60,
},
},
CompressFactor: 4,
},
&ChargingInterval{
RatingID: "c1a5ab9",
Increments: []*ChargingIncrement{
&ChargingIncrement{
Usage: time.Duration(1 * time.Second),
Cost: 0,
AccountingID: "3455b83",
CompressFactor: 10,
},
&ChargingIncrement{
Usage: time.Duration(10 * time.Second),
Cost: 0.01,
AccountingID: "a012888",
CompressFactor: 2,
},
&ChargingIncrement{
Usage: time.Duration(1 * time.Second),
Cost: 0.005,
AccountingID: "44d6c02",
CompressFactor: 30,
},
},
CompressFactor: 5,
},
},
AccountSummary: &AccountSummary{
Tenant: "cgrates.org",
ID: "dan",
BalanceSummaries: []*BalanceSummary{
&BalanceSummary{
Type: "*monetary",
Value: 50,
Disabled: false},
&BalanceSummary{
ID: "4b8b53d7-c1a1-4159-b845-4623a00a0165",
Type: "*monetary",
Value: 25,
Disabled: false},
&BalanceSummary{
Type: "*voice",
Value: 200,
Disabled: false,
},
},
AllowNegative: false,
Disabled: false,
},
Rating: Rating{
"3cd6425": &RatingUnit{
RoundingMethod: "*up",
RoundingDecimals: 5,
TimingID: "7f324ab",
RatesID: "4910ecf",
RatingFiltersID: "43e77dc",
},
"c1a5ab9": &RatingUnit{
ConnectFee: 0.1,
RoundingMethod: "*up",
RoundingDecimals: 5,
TimingID: "7f324ab",
RatesID: "ec1a177",
RatingFiltersID: "43e77dc",
},
},
Accounting: Accounting{
"a012888": &BalanceCharge{
AccountID: "cgrates.org:dan",
BalanceUUID: "8c54a9e9-d610-4c82-bcb5-a315b9a65010",
Units: 0.01,
},
"188bfa6": &BalanceCharge{
AccountID: "cgrates.org:dan",
BalanceUUID: "8c54a9e9-d610-4c82-bcb5-a315b9a65010",
Units: 0.005,
},
"9bdad10": &BalanceCharge{
AccountID: "cgrates.org:dan",
BalanceUUID: "8c54a9e9-d610-4c82-bcb5-a315b9a65010",
Units: 0.1,
},
"44d6c02": &BalanceCharge{
AccountID: "cgrates.org:dan",
BalanceUUID: "7a54a9e9-d610-4c82-bcb5-a315b9a65010",
RatingID: "3cd6425",
Units: 1,
ExtraChargeID: "188bfa6",
},
"3455b83": &BalanceCharge{
AccountID: "cgrates.org:dan",
BalanceUUID: "9d54a9e9-d610-4c82-bcb5-a315b9a65089",
Units: 1,
ExtraChargeID: "*none",
},
},
RatingFilters: RatingFilters{
"43e77dc": RatingMatchedFilters{
"DestinationID": "GERMANY",
"DestinationPrefix": "+49",
"RatingPlanID": "RPL_RETAIL1",
"Subject": "*out:cgrates.org:call:*any",
},
},
Rates: ChargedRates{
"ec1a177": RateGroups{
&Rate{
GroupIntervalStart: time.Duration(0),
Value: 0.01,
RateIncrement: time.Duration(1 * time.Minute),
RateUnit: time.Duration(1 * time.Second)},
},
"4910ecf": RateGroups{
&Rate{
GroupIntervalStart: time.Duration(0),
Value: 0.005,
RateIncrement: time.Duration(1 * time.Second),
RateUnit: time.Duration(1 * time.Second)},
&Rate{
GroupIntervalStart: time.Duration(60 * time.Second),
Value: 0.005,
RateIncrement: time.Duration(1 * time.Second),
RateUnit: time.Duration(1 * time.Second)},
},
},
Timings: ChargedTimings{
"7f324ab": &ChargedTiming{
StartTime: "00:00:00",
},
},
}
me = MapEvent{
"ExtraField1": 5,
"Source": 1001,
@@ -409,6 +585,7 @@ func TestMapEventAsCDR(t *testing.T) {
"Usage": "42s",
"PreRated": "True",
"Cost": "42.3",
"CostDetails": ec1,
}
expected = &CDR{
CGRID: "da39a3ee5e6b4b0d3255bfef95601890afd80709",
@@ -427,6 +604,7 @@ func TestMapEventAsCDR(t *testing.T) {
ToR: utils.VOICE,
RequestType: cfg.GeneralCfg().DefaultReqType,
Category: cfg.GeneralCfg().DefaultCategory,
CostDetails: ec1,
}
if rply, err := me.AsCDR(cfg, "itsyscom.com", utils.EmptyString); err != nil {
t.Error(err)

View File

@@ -171,3 +171,12 @@ func (ev *CGREvent) RemFldsWithPrefix(prfx string) {
}
}
}
// CGREvents is a group of generic events processed by CGR services
// ie: derived CDRs
type CGREvents struct {
Tenant string
ID string
Time *time.Time // event time
Events []map[string]interface{}
}