From 633a11cd697b3b87e430422fbe30c4128bca84e1 Mon Sep 17 00:00:00 2001 From: Tripon Alexandru-Ionut Date: Thu, 4 Jul 2019 16:02:30 +0300 Subject: [PATCH] Updated EvenType in cdrs for ThresholdSv1ProcessEvent --- apier/v1/accounts_it_test.go | 4 ++-- apier/v1/cdre_it_test.go | 2 +- engine/cdrs.go | 33 +++++++++++++++------------------ 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/apier/v1/accounts_it_test.go b/apier/v1/accounts_it_test.go index 8dc3b2fd6..3013e51d0 100644 --- a/apier/v1/accounts_it_test.go +++ b/apier/v1/accounts_it_test.go @@ -250,7 +250,7 @@ func testAccITSetBalanceWithExtraData(t *testing.T) { t.Error("Unexpected error: ", err.Error()) } else if len(cdrs) != 1 { t.Error("Unexpected number of CDRs returned: ", len(cdrs)) - } else if len(cdrs[0].ExtraFields) != 3 { // included EventType + } else if len(cdrs[0].ExtraFields) != 2 { t.Error("Unexpected number of ExtraFields returned: ", len(cdrs[0].ExtraFields)) } } @@ -278,7 +278,7 @@ func testAccITSetBalanceWithExtraData2(t *testing.T) { t.Error("Unexpected error: ", err.Error()) } else if len(cdrs) != 1 { t.Error("Unexpected number of CDRs returned: ", len(cdrs)) - } else if len(cdrs[0].ExtraFields) != 3 { // included EventType + } else if len(cdrs[0].ExtraFields) != 2 { t.Error("Unexpected number of ExtraFields returned: ", len(cdrs[0].ExtraFields)) } else if cdrs[0].ExtraFields["ActionVal"] != "1.5" { t.Error("Unexpected value of ExtraFields[ActionVal] returned: ", cdrs[0].ExtraFields["ActionVal"]) diff --git a/apier/v1/cdre_it_test.go b/apier/v1/cdre_it_test.go index 48ec62e2f..86cb5ab90 100755 --- a/apier/v1/cdre_it_test.go +++ b/apier/v1/cdre_it_test.go @@ -194,7 +194,7 @@ func testCDReProcessExternalCdr(t *testing.T) { SetupTime: "2014-08-04T13:00:00Z", AnswerTime: "2014-08-04T13:00:07Z", Usage: "1s", - ExtraFields: map[string]string{"EventType": "cdr", "field_extr1": "val_extr1", "fieldextr2": "valextr2"}, + ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, } var reply string if err := cdreRPC.Call("CDRsV1.ProcessExternalCDR", cdr, &reply); err != nil { diff --git a/engine/cdrs.go b/engine/cdrs.go index 662120e33..fb92bd001 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -239,14 +239,14 @@ func (cdrS *CDRServer) getCostFromRater(cdr *CDRWithArgDispatcher) (*CallCost, e // attrStoExpThdStat will process a CGREvent with the configured subsystems func (cdrS *CDRServer) attrStoExpThdStat(cgrEv *utils.CGREventWithArgDispatcher, - attrS, store, allowUpdate, export, thdS, statS bool) (err error) { + attrS, store, allowUpdate, export, thdS, statS bool, eventType string) (err error) { if attrS { if err = cdrS.attrSProcessEvent(cgrEv); err != nil { return } } if thdS { - go cdrS.thdSProcessEvent(cgrEv) + go cdrS.thdSProcessEvent(cgrEv, eventType) } if statS { go cdrS.statSProcessEvent(cgrEv) @@ -281,7 +281,7 @@ func (cdrS *CDRServer) rateCDRWithErr(cdr *CDRWithArgDispatcher) (ratedCDRs []*C // chrgProcessEvent will process the CGREvent with ChargerS subsystem // it is designed to run in it's own goroutine func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREventWithArgDispatcher, - attrS, store, allowUpdate, export, thdS, statS bool) (err error) { + attrS, store, allowUpdate, export, thdS, statS bool, eventType string) (err error) { var chrgrs []*ChrgSProcessEventReply if err = cdrS.chargerS.Call(utils.ChargerSv1ProcessEvent, cgrEv, &chrgrs); err != nil { @@ -307,7 +307,7 @@ func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREventWithArgDispatcher, ArgDispatcher: cgrEv.ArgDispatcher, } if errProc := cdrS.attrStoExpThdStat(arg, - attrS, store, allowUpdate, export, thdS, statS); errProc != nil { + attrS, store, allowUpdate, export, thdS, statS, eventType); errProc != nil { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s", utils.CDRs, errProc.Error(), cgrEv, utils.ChargerS)) @@ -346,9 +346,11 @@ func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) } // thdSProcessEvent will send the event to ThresholdS if the connection is configured -func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) { +func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher, eventType string) { var tIDs []string - thArgs := &ArgsProcessEvent{CGREvent: cgrEv.CGREvent} + // we clone the CGREvent so we can add EventType without it to be propagated + thArgs := &ArgsProcessEvent{CGREvent: cgrEv.CGREvent.Clone()} + thArgs.CGREvent.Event[utils.EventType] = eventType if cgrEv.ArgDispatcher != nil { thArgs.ArgDispatcher = cgrEv.ArgDispatcher } @@ -477,8 +479,7 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithArgDispatcher, reply *string) (e }, ArgDispatcher: cdr.ArgDispatcher, } - //Add event type as cdr - cgrEv.CGREvent.Event[utils.EventType] = utils.CDRPoster + if cdrS.attrS != nil { if err = cdrS.attrSProcessEvent(cgrEv); err != nil { err = utils.NewErrServerError(err) @@ -498,7 +499,7 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithArgDispatcher, reply *string) (e cdrS.exportCDRs([]*CDR{cdr.CDR}) // Replicate raw CDR } if cdrS.thdS != nil { - go cdrS.thdSProcessEvent(cgrEv) + go cdrS.thdSProcessEvent(cgrEv, utils.CDRPoster) } if cdrS.statS != nil { go cdrS.statSProcessEvent(cgrEv) @@ -506,7 +507,7 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithArgDispatcher, reply *string) (e if cdrS.chargerS != nil && utils.IsSliceMember([]string{"", utils.MetaRaw}, cdr.RunID) { go cdrS.chrgProcessEvent(cgrEv, cdrS.attrS != nil, cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs, false, - len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0, cdrS.thdS != nil, cdrS.statS != nil) + len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0, cdrS.thdS != nil, cdrS.statS != nil, utils.CDRPoster) } *reply = utils.OK @@ -584,12 +585,10 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er if arg.ArgDispatcher != nil { cgrEv.ArgDispatcher = arg.ArgDispatcher } - //Add event type as event - cgrEv.CGREvent.Event[utils.EventType] = utils.Event if !ralS { if err = cdrS.attrStoExpThdStat(cgrEv, - attrS, store, false, export, thdS, statS); err != nil { + attrS, store, false, export, thdS, statS, utils.Event); err != nil { err = utils.NewErrServerError(err) return } @@ -611,7 +610,7 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er ArgDispatcher: arg.ArgDispatcher, } if errProc := cdrS.attrStoExpThdStat(cgrEv, - attrS, store, false, export, thdS, statS); err != nil { + attrS, store, false, export, thdS, statS, utils.Event); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing event %+v ", utils.CDRs, errProc.Error(), cgrEv)) @@ -626,7 +625,7 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er } if chrgS { go cdrS.chrgProcessEvent(cgrEv, - attrS, store, false, export, thdS, statS) + attrS, store, false, export, thdS, statS, utils.Event) } *reply = utils.OK return nil @@ -773,10 +772,8 @@ func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) { CGREvent: cdr.AsCGREvent(), ArgDispatcher: arg.ArgDispatcher, } - //Add event type as cdr - argCharger.CGREvent.Event[utils.EventType] = utils.CDRPoster if err = cdrS.chrgProcessEvent(argCharger, - false, store, true, export, thdS, statS); err != nil { + false, store, true, export, thdS, statS, utils.CDRPoster); err != nil { return utils.NewErrServerError(err) } }