Adding EEs processing within CDRs

This commit is contained in:
DanB
2020-06-10 15:14:48 +02:00
parent e46f10b628
commit 8511d3c978

View File

@@ -420,6 +420,18 @@ func (cdrS *CDRServer) exportCDRs(cdrs []*CDR) (err error) {
return
}
// eeSProcessEvent will process the event with the EEs component
func (cdrS *CDRServer) eeSProcessEvent(cgrEv *utils.CGREventWithOpts) (err error) {
var reply string
if err = cdrS.connMgr.Call(cdrS.cgrCfg.CdrsCfg().EEsConns, nil,
utils.EventExporterSv1ProcessEvent,
cgrEv, &reply); err != nil &&
err.Error() == utils.ErrNotFound.Error() {
err = nil // NotFound is not considered error
}
return
}
// processEvent processes a CGREvent based on arguments
// in case of partially executed, both error and evs will be returned
func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithOpts,
@@ -571,11 +583,26 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithOpts,
}
var partiallyExecuted bool // from here actions are optional and a general error is returned
if export {
if err = cdrS.exportCDRs(cdrs); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> exporting CDRs %+v",
utils.CDRs, err.Error(), cdrs))
partiallyExecuted = true
if len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 {
if err = cdrS.exportCDRs(cdrs); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> exporting CDRs %+v",
utils.CDRs, err.Error(), cdrs))
partiallyExecuted = true
}
}
if len(cdrS.cgrCfg.CdrsCfg().EEsConns) != 0 {
for _, cgrEv := range cgrEvs {
evWithOpts := &utils.CGREventWithOpts{
CGREvent: cgrEv.CGREvent,
ArgDispatcher: cgrEv.ArgDispatcher}
if err = cdrS.eeSProcessEvent(evWithOpts); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> exporting cdr %+v",
utils.CDRs, err.Error(), evWithOpts))
partiallyExecuted = true
}
}
}
}
if thdS {
@@ -690,7 +717,7 @@ func (cdrS *CDRServer) V1ProcessCDR(cdr *CDRWithOpts, reply *string) (err error)
!cdr.PreRated, // rate the CDR if is not PreRated
cdrS.cgrCfg.CdrsCfg().StoreCdrs,
false, // no rerate
len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0,
(len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 || len(cdrS.cgrCfg.CdrsCfg().EEsConns) != 0),
len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0,
len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0); err != nil {
return
@@ -745,7 +772,7 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er
if flgs.HasKey(utils.MetaStore) {
store = flgs.GetBool(utils.MetaStore)
}
export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0
export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 || len(cdrS.cgrCfg.CdrsCfg().EEsConns) != 0
if flgs.HasKey(utils.MetaExport) {
export = flgs.GetBool(utils.MetaExport)
}
@@ -829,7 +856,7 @@ func (cdrS *CDRServer) V2ProcessEvent(arg *ArgV1ProcessEvent, evs *[]*utils.Even
if flgs.HasKey(utils.MetaStore) {
store = flgs.GetBool(utils.MetaStore)
}
export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0
export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 || len(cdrS.cgrCfg.CdrsCfg().EEsConns) != 0
if flgs.HasKey(utils.MetaExport) {
export = flgs.GetBool(utils.MetaExport)
}
@@ -1000,7 +1027,7 @@ func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) {
if flgs.HasKey(utils.MetaStore) {
store = flgs.GetBool(utils.MetaStore)
}
export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0
export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 || len(cdrS.cgrCfg.CdrsCfg().EEsConns) != 0
if flgs.HasKey(utils.MetaExport) {
export = flgs.GetBool(utils.MetaExport)
}