diff --git a/ees/ees.go b/ees/ees.go index 2d7851206..cc2f48360 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -102,24 +102,23 @@ func (eeS *EventExporterS) setupCache(chCfgs map[string]*config.CacheParamCfg) { eeS.eesMux.Unlock() } -func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREvent, attrIDs []string, ctx string) (err error) { +func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREvent, attrIDs []string, ctx string) (*utils.CGREvent, error) { var rplyEv engine.AttrSProcessEventReply - if cgrEv.APIOpts == nil { - cgrEv.APIOpts = make(map[string]any) - } cgrEv.APIOpts[utils.MetaSubsys] = utils.MetaEEs cgrEv.APIOpts[utils.OptsAttributesProfileIDs] = attrIDs cgrEv.APIOpts[utils.OptsContext] = utils.FirstNonEmpty(ctx, utils.IfaceAsString(cgrEv.APIOpts[utils.OptsContext]), utils.MetaEEs) - if err = eeS.connMgr.Call(context.TODO(), eeS.cfg.EEsNoLksCfg().AttributeSConns, - utils.AttributeSv1ProcessEvent, - cgrEv, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 { - *cgrEv = *rplyEv.CGREvent - } else if err != nil && - err.Error() == utils.ErrNotFound.Error() { - err = nil // cancel ErrNotFound + err := eeS.connMgr.Call(context.TODO(), + eeS.cfg.EEsNoLksCfg().AttributeSConns, + utils.AttributeSv1ProcessEvent, cgrEv, &rplyEv) + + if err != nil && err.Error() != utils.ErrNotFound.Error() { + return nil, err } - return + if len(rplyEv.AlteredFields) != 0 { + return rplyEv.CGREvent, nil + } + return cgrEv, nil } // V1ProcessEvent will be called each time a new event is received from readers @@ -146,6 +145,11 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG continue } + if cgrEv.APIOpts == nil { + cgrEv.APIOpts = make(map[string]any) + } + cgrEv.APIOpts[utils.MetaExporterID] = eeCfg.ID + if len(eeCfg.Filters) != 0 { tnt := utils.FirstNonEmpty(cgrEv.Tenant, eeS.cfg.GeneralCfg().DefaultTenant) if pass, errPass := eeS.filterS.Pass(tnt, @@ -156,19 +160,21 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG } } - clonedCgrEv := cgrEv.CGREvent.Clone() - clonedCgrEv.APIOpts[utils.MetaExporterID] = eeCfg.ID + var exportEvent *utils.CGREvent if eeCfg.Flags.GetBool(utils.MetaAttributes) { - if err = eeS.attrSProcessEvent( - clonedCgrEv, + if exportEvent, err = eeS.attrSProcessEvent( + cgrEv.CGREvent, eeCfg.AttributeSIDs, utils.FirstNonEmpty( eeCfg.AttributeSCtx, - utils.IfaceAsString(clonedCgrEv.APIOpts[utils.OptsContext]), + utils.IfaceAsString(cgrEv.APIOpts[utils.OptsContext]), utils.MetaEEs)); err != nil { return } } + if exportEvent == nil { + exportEvent = cgrEv.CGREvent + } eeS.eesMux.RLock() eeCache, hasCache := eeS.eesChs[eeCfg.Type] @@ -206,7 +212,7 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG utils.EEs, ee.Cfg().ID)) } go func(evict, sync bool, ee EventExporter) { - if err := exportEventWithExporter(ee, clonedCgrEv, evict, eeS.cfg, eeS.filterS); err != nil { + if err := exportEventWithExporter(ee, exportEvent, evict, eeS.cfg, eeS.filterS); err != nil { withErr = true } if sync { diff --git a/ees/ees_test.go b/ees/ees_test.go index 82bab57aa..4bca0bea9 100644 --- a/ees/ees_test.go +++ b/ees/ees_test.go @@ -128,9 +128,9 @@ func TestAttrSProcessEvent(t *testing.T) { eeS := NewEventExporterS(cfg, filterS, connMgr) // cgrEv := &utils.CGREvent{} exp := &utils.CGREvent{Event: map[string]any{"testcase": 1}} - if err := eeS.attrSProcessEvent(cgrEv, []string{}, utils.EmptyString); err != nil { + if rplyEv, err := eeS.attrSProcessEvent(cgrEv, []string{}, utils.EmptyString); err != nil { t.Error(err) - } else if !reflect.DeepEqual(exp, cgrEv) { + } else if !reflect.DeepEqual(exp, rplyEv) { t.Errorf("Expected %v but received %v", utils.ToJSON(exp), utils.ToJSON(cgrEv)) } } @@ -155,8 +155,10 @@ func TestAttrSProcessEvent2(t *testing.T) { utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes): clientConn, }) eeS := NewEventExporterS(cfg, filterS, connMgr) - cgrEv := &utils.CGREvent{} - if err := eeS.attrSProcessEvent(cgrEv, []string{}, utils.EmptyString); err != nil { + cgrEv := &utils.CGREvent{ + APIOpts: make(map[string]any), + } + if _, err := eeS.attrSProcessEvent(cgrEv, []string{}, utils.EmptyString); err != nil { t.Error(err) } }