Updated EvenType in cdrs for ThresholdSv1ProcessEvent

This commit is contained in:
Tripon Alexandru-Ionut
2019-07-04 16:02:30 +03:00
committed by Dan Christian Bogos
parent a7e65058a1
commit 633a11cd69
3 changed files with 18 additions and 21 deletions

View File

@@ -239,14 +239,14 @@ func (cdrS *CDRServer) getCostFromRater(cdr *CDRWithArgDispatcher) (*CallCost, e
// attrStoExpThdStat will process a CGREvent with the configured subsystems
func (cdrS *CDRServer) attrStoExpThdStat(cgrEv *utils.CGREventWithArgDispatcher,
attrS, store, allowUpdate, export, thdS, statS bool) (err error) {
attrS, store, allowUpdate, export, thdS, statS bool, eventType string) (err error) {
if attrS {
if err = cdrS.attrSProcessEvent(cgrEv); err != nil {
return
}
}
if thdS {
go cdrS.thdSProcessEvent(cgrEv)
go cdrS.thdSProcessEvent(cgrEv, eventType)
}
if statS {
go cdrS.statSProcessEvent(cgrEv)
@@ -281,7 +281,7 @@ func (cdrS *CDRServer) rateCDRWithErr(cdr *CDRWithArgDispatcher) (ratedCDRs []*C
// chrgProcessEvent will process the CGREvent with ChargerS subsystem
// it is designed to run in it's own goroutine
func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREventWithArgDispatcher,
attrS, store, allowUpdate, export, thdS, statS bool) (err error) {
attrS, store, allowUpdate, export, thdS, statS bool, eventType string) (err error) {
var chrgrs []*ChrgSProcessEventReply
if err = cdrS.chargerS.Call(utils.ChargerSv1ProcessEvent,
cgrEv, &chrgrs); err != nil {
@@ -307,7 +307,7 @@ func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREventWithArgDispatcher,
ArgDispatcher: cgrEv.ArgDispatcher,
}
if errProc := cdrS.attrStoExpThdStat(arg,
attrS, store, allowUpdate, export, thdS, statS); errProc != nil {
attrS, store, allowUpdate, export, thdS, statS, eventType); errProc != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s",
utils.CDRs, errProc.Error(), cgrEv, utils.ChargerS))
@@ -346,9 +346,11 @@ func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher)
}
// thdSProcessEvent will send the event to ThresholdS if the connection is configured
func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) {
func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher, eventType string) {
var tIDs []string
thArgs := &ArgsProcessEvent{CGREvent: cgrEv.CGREvent}
// we clone the CGREvent so we can add EventType without it to be propagated
thArgs := &ArgsProcessEvent{CGREvent: cgrEv.CGREvent.Clone()}
thArgs.CGREvent.Event[utils.EventType] = eventType
if cgrEv.ArgDispatcher != nil {
thArgs.ArgDispatcher = cgrEv.ArgDispatcher
}
@@ -477,8 +479,7 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithArgDispatcher, reply *string) (e
},
ArgDispatcher: cdr.ArgDispatcher,
}
//Add event type as cdr
cgrEv.CGREvent.Event[utils.EventType] = utils.CDRPoster
if cdrS.attrS != nil {
if err = cdrS.attrSProcessEvent(cgrEv); err != nil {
err = utils.NewErrServerError(err)
@@ -498,7 +499,7 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithArgDispatcher, reply *string) (e
cdrS.exportCDRs([]*CDR{cdr.CDR}) // Replicate raw CDR
}
if cdrS.thdS != nil {
go cdrS.thdSProcessEvent(cgrEv)
go cdrS.thdSProcessEvent(cgrEv, utils.CDRPoster)
}
if cdrS.statS != nil {
go cdrS.statSProcessEvent(cgrEv)
@@ -506,7 +507,7 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithArgDispatcher, reply *string) (e
if cdrS.chargerS != nil &&
utils.IsSliceMember([]string{"", utils.MetaRaw}, cdr.RunID) {
go cdrS.chrgProcessEvent(cgrEv, cdrS.attrS != nil, cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs, false,
len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0, cdrS.thdS != nil, cdrS.statS != nil)
len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0, cdrS.thdS != nil, cdrS.statS != nil, utils.CDRPoster)
}
*reply = utils.OK
@@ -584,12 +585,10 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er
if arg.ArgDispatcher != nil {
cgrEv.ArgDispatcher = arg.ArgDispatcher
}
//Add event type as event
cgrEv.CGREvent.Event[utils.EventType] = utils.Event
if !ralS {
if err = cdrS.attrStoExpThdStat(cgrEv,
attrS, store, false, export, thdS, statS); err != nil {
attrS, store, false, export, thdS, statS, utils.Event); err != nil {
err = utils.NewErrServerError(err)
return
}
@@ -611,7 +610,7 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er
ArgDispatcher: arg.ArgDispatcher,
}
if errProc := cdrS.attrStoExpThdStat(cgrEv,
attrS, store, false, export, thdS, statS); err != nil {
attrS, store, false, export, thdS, statS, utils.Event); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v ",
utils.CDRs, errProc.Error(), cgrEv))
@@ -626,7 +625,7 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er
}
if chrgS {
go cdrS.chrgProcessEvent(cgrEv,
attrS, store, false, export, thdS, statS)
attrS, store, false, export, thdS, statS, utils.Event)
}
*reply = utils.OK
return nil
@@ -773,10 +772,8 @@ func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) {
CGREvent: cdr.AsCGREvent(),
ArgDispatcher: arg.ArgDispatcher,
}
//Add event type as cdr
argCharger.CGREvent.Event[utils.EventType] = utils.CDRPoster
if err = cdrS.chrgProcessEvent(argCharger,
false, store, true, export, thdS, statS); err != nil {
false, store, true, export, thdS, statS, utils.CDRPoster); err != nil {
return utils.NewErrServerError(err)
}
}