mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Clone export event only if processed by attributes
This commit is contained in:
committed by
Dan Christian Bogos
parent
3ff7007421
commit
3637e4364e
42
ees/ees.go
42
ees/ees.go
@@ -102,24 +102,23 @@ func (eeS *EventExporterS) setupCache(chCfgs map[string]*config.CacheParamCfg) {
|
||||
eeS.eesMux.Unlock()
|
||||
}
|
||||
|
||||
func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREvent, attrIDs []string, ctx string) (err error) {
|
||||
func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREvent, attrIDs []string, ctx string) (*utils.CGREvent, error) {
|
||||
var rplyEv engine.AttrSProcessEventReply
|
||||
if cgrEv.APIOpts == nil {
|
||||
cgrEv.APIOpts = make(map[string]any)
|
||||
}
|
||||
cgrEv.APIOpts[utils.MetaSubsys] = utils.MetaEEs
|
||||
cgrEv.APIOpts[utils.OptsAttributesProfileIDs] = attrIDs
|
||||
cgrEv.APIOpts[utils.OptsContext] = utils.FirstNonEmpty(ctx,
|
||||
utils.IfaceAsString(cgrEv.APIOpts[utils.OptsContext]), utils.MetaEEs)
|
||||
if err = eeS.connMgr.Call(context.TODO(), eeS.cfg.EEsNoLksCfg().AttributeSConns,
|
||||
utils.AttributeSv1ProcessEvent,
|
||||
cgrEv, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 {
|
||||
*cgrEv = *rplyEv.CGREvent
|
||||
} else if err != nil &&
|
||||
err.Error() == utils.ErrNotFound.Error() {
|
||||
err = nil // cancel ErrNotFound
|
||||
err := eeS.connMgr.Call(context.TODO(),
|
||||
eeS.cfg.EEsNoLksCfg().AttributeSConns,
|
||||
utils.AttributeSv1ProcessEvent, cgrEv, &rplyEv)
|
||||
|
||||
if err != nil && err.Error() != utils.ErrNotFound.Error() {
|
||||
return nil, err
|
||||
}
|
||||
return
|
||||
if len(rplyEv.AlteredFields) != 0 {
|
||||
return rplyEv.CGREvent, nil
|
||||
}
|
||||
return cgrEv, nil
|
||||
}
|
||||
|
||||
// V1ProcessEvent will be called each time a new event is received from readers
|
||||
@@ -146,6 +145,11 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG
|
||||
continue
|
||||
}
|
||||
|
||||
if cgrEv.APIOpts == nil {
|
||||
cgrEv.APIOpts = make(map[string]any)
|
||||
}
|
||||
cgrEv.APIOpts[utils.MetaExporterID] = eeCfg.ID
|
||||
|
||||
if len(eeCfg.Filters) != 0 {
|
||||
tnt := utils.FirstNonEmpty(cgrEv.Tenant, eeS.cfg.GeneralCfg().DefaultTenant)
|
||||
if pass, errPass := eeS.filterS.Pass(tnt,
|
||||
@@ -156,19 +160,21 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG
|
||||
}
|
||||
}
|
||||
|
||||
clonedCgrEv := cgrEv.CGREvent.Clone()
|
||||
clonedCgrEv.APIOpts[utils.MetaExporterID] = eeCfg.ID
|
||||
var exportEvent *utils.CGREvent
|
||||
if eeCfg.Flags.GetBool(utils.MetaAttributes) {
|
||||
if err = eeS.attrSProcessEvent(
|
||||
clonedCgrEv,
|
||||
if exportEvent, err = eeS.attrSProcessEvent(
|
||||
cgrEv.CGREvent,
|
||||
eeCfg.AttributeSIDs,
|
||||
utils.FirstNonEmpty(
|
||||
eeCfg.AttributeSCtx,
|
||||
utils.IfaceAsString(clonedCgrEv.APIOpts[utils.OptsContext]),
|
||||
utils.IfaceAsString(cgrEv.APIOpts[utils.OptsContext]),
|
||||
utils.MetaEEs)); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if exportEvent == nil {
|
||||
exportEvent = cgrEv.CGREvent
|
||||
}
|
||||
|
||||
eeS.eesMux.RLock()
|
||||
eeCache, hasCache := eeS.eesChs[eeCfg.Type]
|
||||
@@ -206,7 +212,7 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG
|
||||
utils.EEs, ee.Cfg().ID))
|
||||
}
|
||||
go func(evict, sync bool, ee EventExporter) {
|
||||
if err := exportEventWithExporter(ee, clonedCgrEv, evict, eeS.cfg, eeS.filterS); err != nil {
|
||||
if err := exportEventWithExporter(ee, exportEvent, evict, eeS.cfg, eeS.filterS); err != nil {
|
||||
withErr = true
|
||||
}
|
||||
if sync {
|
||||
|
||||
@@ -128,9 +128,9 @@ func TestAttrSProcessEvent(t *testing.T) {
|
||||
eeS := NewEventExporterS(cfg, filterS, connMgr)
|
||||
// cgrEv := &utils.CGREvent{}
|
||||
exp := &utils.CGREvent{Event: map[string]any{"testcase": 1}}
|
||||
if err := eeS.attrSProcessEvent(cgrEv, []string{}, utils.EmptyString); err != nil {
|
||||
if rplyEv, err := eeS.attrSProcessEvent(cgrEv, []string{}, utils.EmptyString); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(exp, cgrEv) {
|
||||
} else if !reflect.DeepEqual(exp, rplyEv) {
|
||||
t.Errorf("Expected %v but received %v", utils.ToJSON(exp), utils.ToJSON(cgrEv))
|
||||
}
|
||||
}
|
||||
@@ -155,8 +155,10 @@ func TestAttrSProcessEvent2(t *testing.T) {
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes): clientConn,
|
||||
})
|
||||
eeS := NewEventExporterS(cfg, filterS, connMgr)
|
||||
cgrEv := &utils.CGREvent{}
|
||||
if err := eeS.attrSProcessEvent(cgrEv, []string{}, utils.EmptyString); err != nil {
|
||||
cgrEv := &utils.CGREvent{
|
||||
APIOpts: make(map[string]any),
|
||||
}
|
||||
if _, err := eeS.attrSProcessEvent(cgrEv, []string{}, utils.EmptyString); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user