From a3ebbe38ec9fc8aeb2adb41740e2f7b24477060e Mon Sep 17 00:00:00 2001 From: Trial97 Date: Mon, 6 Sep 2021 17:03:34 +0300 Subject: [PATCH] Added EeSV1 APIs back --- actions/export.go | 6 +- actions/export_test.go | 9 +- actions/log.go | 2 +- apis/ees.go | 36 +++++++ data/conf/samples/ees/cgrates.json | 9 ++ dispatchers/cdrs_test.go | 148 ----------------------------- ees/amqp.go | 3 +- ees/amqpv1.go | 5 +- ees/ee.go | 28 ++---- ees/ees.go | 37 ++++---- ees/ees_test.go | 26 ++--- ees/elastic.go | 6 +- ees/elastic_test.go | 9 +- ees/filecsv.go | 7 +- ees/filecsv_test.go | 3 +- ees/filefwv.go | 7 +- ees/filefwv_test.go | 5 +- ees/httpjsonmap.go | 11 ++- ees/httpjsonmap_test.go | 9 +- ees/httppost.go | 7 +- ees/httppost_test.go | 9 +- ees/kafka.go | 6 +- ees/libcdre.go | 2 +- ees/log.go | 3 +- ees/nats.go | 5 +- ees/nats_it_test.go | 5 +- ees/poster_it_test.go | 12 +-- ees/s3.go | 5 +- ees/sql.go | 3 +- ees/sql_it_test.go | 3 +- ees/sqs.go | 5 +- ees/virtualee.go | 13 +-- ees/virtualee_test.go | 3 +- engine/exportrequest.go | 4 +- ers/amqp.go | 2 +- ers/amqpv1.go | 2 +- ers/ers.go | 2 +- ers/kafka.go | 2 +- ers/nats.go | 2 +- ers/s3.go | 2 +- ers/sqs.go | 2 +- services/ees.go | 18 ++-- 42 files changed, 195 insertions(+), 288 deletions(-) create mode 100644 apis/ees.go diff --git a/actions/export.go b/actions/export.go index eb9383e08..3db8418bd 100644 --- a/actions/export.go +++ b/actions/export.go @@ -63,7 +63,7 @@ func (aL *actHTTPPost) cfg() *engine.APAction { } // execute implements actioner interface -func (aL *actHTTPPost) execute(_ *context.Context, data utils.MapStorage, _ string) (err error) { +func (aL *actHTTPPost) execute(ctx *context.Context, data utils.MapStorage, _ string) (err error) { var body []byte if body, err = json.Marshal(data); err != nil { return @@ -71,8 +71,8 @@ func (aL *actHTTPPost) execute(_ *context.Context, data utils.MapStorage, _ stri var partExec bool for _, pstr := range aL.pstrs { if async, has := aL.cfg().Opts[utils.MetaAsync]; has && utils.IfaceAsString(async) == utils.TrueStr { - go ees.ExportWithAttempts(pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString) - } else if err = ees.ExportWithAttempts(pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString); err != nil { + go ees.ExportWithAttempts(context.Background(), pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString) + } else if err = ees.ExportWithAttempts(ctx, pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString); err != nil { if pstr.Cfg().FailedPostsDir != utils.MetaNone { err = nil } else { diff --git a/actions/export_test.go b/actions/export_test.go index 487c9bbcc..9668f672b 100644 --- a/actions/export_test.go +++ b/actions/export_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -65,7 +66,7 @@ func TestACHTTPPostExecute(t *testing.T) { log.SetOutput(buff) expected := ` Exporter could not export because err: ` - if err := http.execute(nil, dataStorage, utils.EmptyString); err != nil { + if err := http.execute(context.Background(), dataStorage, utils.EmptyString); err != nil { t.Error(err) } else if rcv := buff.String(); !strings.Contains(rcv, expected) { t.Errorf("Expected %+v, received %+v", expected, rcv) @@ -76,7 +77,7 @@ func TestACHTTPPostExecute(t *testing.T) { // channels cannot be marshaled dataStorage[utils.MetaReq] = make(chan struct{}, 1) expected = "json: unsupported type: chan struct {}" - if err := http.execute(nil, dataStorage, utils.EmptyString); err == nil || err.Error() != expected { + if err := http.execute(context.Background(), dataStorage, utils.EmptyString); err == nil || err.Error() != expected { t.Errorf("Expected %+v, received %+v", expected, err) } @@ -88,7 +89,7 @@ func TestACHTTPPostExecute(t *testing.T) { http.aCfg.Opts = make(map[string]interface{}) http.aCfg.Opts[utils.MetaAsync] = true http.config.GeneralCfg().FailedPostsDir = utils.MetaNone - if err := http.execute(nil, dataStorage, utils.EmptyString); err != nil { + if err := http.execute(context.Background(), dataStorage, utils.EmptyString); err != nil { t.Error(err) } } @@ -114,7 +115,7 @@ func TestACHTTPPostValues(t *testing.T) { }, } - if err := http.execute(nil, dataStorage, + if err := http.execute(context.Background(), dataStorage, utils.EmptyString); err == nil || err != utils.ErrPartiallyExecuted { t.Errorf("Expected %+v, received %+v", utils.ErrPartiallyExecuted, err) } diff --git a/actions/log.go b/actions/log.go index 4dfec3f3c..3c5a024c3 100644 --- a/actions/log.go +++ b/actions/log.go @@ -89,7 +89,7 @@ func (aL *actCDRLog) execute(ctx *context.Context, data utils.MapStorage, _ stri }, aL.config.GeneralCfg().DefaultTenant, aL.filterS, oNm) - if err = cdrLogReq.SetFields(template); err != nil { + if err = cdrLogReq.SetFields(ctx, template); err != nil { return } var rply string diff --git a/apis/ees.go b/apis/ees.go new file mode 100644 index 000000000..62eba4dcc --- /dev/null +++ b/apis/ees.go @@ -0,0 +1,36 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package apis + +import ( + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/ees" + "github.com/cgrates/cgrates/utils" +) + +func NewEeSv1(eesV1 *ees.EventExporterS) *EeSv1 { + return &EeSv1{ees: eesV1} +} + +// EeSv1 exports RPC from RLs +type EeSv1 struct { + ees *ees.EventExporterS + ping +} + +func (cS *EeSv1) ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEeIDs, rply *map[string]map[string]interface{}) error { + return cS.ees.V1ProcessEvent(ctx, cgrEv, rply) +} diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index e1c35f026..e559a1315 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -423,6 +423,15 @@ "natsSubject": "processed_cdrs", } }, + { + "id": "NatsJsonMapExporter2", + "type": "*natsJSONMap", + "export_path": "nats://localhost:4222", + "attempts": 1, + "opts": { + "natsSubject": "processed_cdrs", + } + }, { "id": "SQLExportCDR", "type": "*sql", diff --git a/dispatchers/cdrs_test.go b/dispatchers/cdrs_test.go index 96a842337..f5e6d1829 100644 --- a/dispatchers/cdrs_test.go +++ b/dispatchers/cdrs_test.go @@ -65,120 +65,6 @@ func TestDspCDRsV1PingNilError(t *testing.T) { } } -func TestDspCDRsV1GetCDRsError(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) - cgrCfg.DispatcherSCfg().AttributeSConns = []string{"test"} - CGREvent := &utils.RPCCDRsFilterWithAPIOpts{} - var reply *[]*engine.CDR - result := dspSrv.CDRsV1GetCDRs(CGREvent, reply) - expected := "MANDATORY_IE_MISSING: [ApiKey]" - if result == nil || result.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - -func TestDspCDRsV1GetCDRsNil(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) - CGREvent := &utils.RPCCDRsFilterWithAPIOpts{ - Tenant: "tenant", - } - var reply *[]*engine.CDR - result := dspSrv.CDRsV1GetCDRs(CGREvent, reply) - expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" - if result == nil || result.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - -func TestDspCDRsV1GetCDRsCountError(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) - cgrCfg.DispatcherSCfg().AttributeSConns = []string{"test"} - CGREvent := &utils.RPCCDRsFilterWithAPIOpts{} - var reply *int64 - result := dspSrv.CDRsV1GetCDRsCount(CGREvent, reply) - expected := "MANDATORY_IE_MISSING: [ApiKey]" - if result == nil || result.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - -func TestDspCDRsV1GetCDRsCountNil(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) - CGREvent := &utils.RPCCDRsFilterWithAPIOpts{ - Tenant: "tenant", - } - var reply *int64 - result := dspSrv.CDRsV1GetCDRsCount(CGREvent, reply) - expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" - if result == nil || result.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - -func TestDspCDRsV1RateCDRsError(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) - cgrCfg.DispatcherSCfg().AttributeSConns = []string{"test"} - CGREvent := &engine.ArgRateCDRs{} - var reply *string - result := dspSrv.CDRsV1RateCDRs(CGREvent, reply) - expected := "MANDATORY_IE_MISSING: [ApiKey]" - if result == nil || result.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - -func TestDspCDRsV1RateCDRsNil(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) - CGREvent := &engine.ArgRateCDRs{ - Tenant: "tenant", - } - var reply *string - result := dspSrv.CDRsV1RateCDRs(CGREvent, reply) - expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" - if result == nil || result.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - -func TestDspCDRsV1ProcessExternalCDRError(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) - cgrCfg.DispatcherSCfg().AttributeSConns = []string{"test"} - CGREvent := &engine.ExternalCDRWithAPIOpts{ - ExternalCDR: &engine.ExternalCDR{ - Tenant: "tenant", - }, - } - var reply *string - result := dspSrv.CDRsV1ProcessExternalCDR(CGREvent, reply) - expected := "MANDATORY_IE_MISSING: [ApiKey]" - if result == nil || result.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - -func TestDspCDRsV1ProcessExternalCDRNil(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) - CGREvent := &engine.ExternalCDRWithAPIOpts{ - ExternalCDR: &engine.ExternalCDR{ - Tenant: "tenant", - }, - } - var reply *string - result := dspSrv.CDRsV1ProcessExternalCDR(CGREvent, reply) - expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" - if result == nil || result.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - func TestDspCDRsV1ProcessEventError(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) @@ -211,40 +97,6 @@ func TestDspCDRsV1ProcessEventNil(t *testing.T) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) } } - -func TestDspCDRsV1ProcessCDRError(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) - cgrCfg.DispatcherSCfg().AttributeSConns = []string{"test"} - CGREvent := &engine.CDRWithAPIOpts{ - CDR: &engine.CDR{ - Tenant: "tenant", - }, - } - var reply *string - result := dspSrv.CDRsV1ProcessCDR(CGREvent, reply) - expected := "MANDATORY_IE_MISSING: [ApiKey]" - if result == nil || result.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - -func TestDspCDRsV1ProcessCDRNil(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) - CGREvent := &engine.CDRWithAPIOpts{ - CDR: &engine.CDR{ - Tenant: "tenant", - }, - } - var reply *string - result := dspSrv.CDRsV1ProcessCDR(CGREvent, reply) - expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" - if result == nil || result.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - func TestDspCDRsV2ProcessEventError(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) diff --git a/ees/amqp.go b/ees/amqp.go index d03e2b071..b5d627346 100644 --- a/ees/amqp.go +++ b/ees/amqp.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/streadway/amqp" @@ -136,7 +137,7 @@ func (pstr *AMQPee) Connect() (err error) { return } -func (pstr *AMQPee) ExportEvent(content interface{}, _ string) (err error) { +func (pstr *AMQPee) ExportEvent(_ *context.Context, content interface{}, _ string) (err error) { pstr.reqs.get() pstr.RLock() if pstr.postChan == nil { diff --git a/ees/amqpv1.go b/ees/amqpv1.go index bda9aca6e..163e1e1f7 100644 --- a/ees/amqpv1.go +++ b/ees/amqpv1.go @@ -19,10 +19,10 @@ along with this program. If not, see package ees import ( - "context" "sync" amqpv1 "github.com/Azure/go-amqp" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -78,7 +78,7 @@ func (pstr *AMQPv1EE) Connect() (err error) { return } -func (pstr *AMQPv1EE) ExportEvent(content interface{}, _ string) (err error) { +func (pstr *AMQPv1EE) ExportEvent(ctx *context.Context, content interface{}, _ string) (err error) { pstr.reqs.get() pstr.RLock() defer func() { @@ -95,7 +95,6 @@ func (pstr *AMQPv1EE) ExportEvent(content interface{}, _ string) (err error) { return } // Send message - ctx := context.Background() err = sender.Send(ctx, amqpv1.NewMessage(content.([]byte))) sender.Close(ctx) return diff --git a/ees/ee.go b/ees/ee.go index 175c917cf..62ee42919 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -24,17 +24,18 @@ import ( "strings" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) type EventExporter interface { - Cfg() *config.EventExporterCfg // return the config - Connect() error // called before exporting an event to make sure it is connected - ExportEvent(interface{}, string) error // called on each event to be exported - Close() error // called when the exporter needs to terminate - GetMetrics() *utils.SafeMapStorage // called to get metrics + Cfg() *config.EventExporterCfg // return the config + Connect() error // called before exporting an event to make sure it is connected + ExportEvent(*context.Context, interface{}, string) error // called on each event to be exported + Close() error // called when the exporter needs to terminate + GetMetrics() *utils.SafeMapStorage // called to get metrics PrepareMap(map[string]interface{}) (interface{}, error) PrepareOrderMap(*utils.OrderedNavigableMap) (interface{}, error) } @@ -110,26 +111,13 @@ func (c *concReq) done() { } // composeHeaderTrailer will return the orderNM for *hdr or *trl -func composeHeaderTrailer(prfx string, fields []*config.FCTemplate, dc utils.DataStorage, cfg *config.CGRConfig, fltS *engine.FilterS) (r *utils.OrderedNavigableMap, err error) { +func composeHeaderTrailer(ctx *context.Context, prfx string, fields []*config.FCTemplate, dc utils.DataStorage, cfg *config.CGRConfig, fltS *engine.FilterS) (r *utils.OrderedNavigableMap, err error) { r = utils.NewOrderedNavigableMap() err = engine.NewExportRequest(map[string]utils.DataStorage{ utils.MetaDC: dc, utils.MetaCfg: cfg.GetDataProvider(), }, cfg.GeneralCfg().DefaultTenant, fltS, - map[string]*utils.OrderedNavigableMap{prfx: r}).SetFields(fields) - return -} - -func composeExp(fields []*config.FCTemplate, cgrEv *utils.CGREvent, dc utils.DataStorage, cfg *config.CGRConfig, fltS *engine.FilterS) (r *utils.OrderedNavigableMap, err error) { - r = utils.NewOrderedNavigableMap() - err = engine.NewExportRequest(map[string]utils.DataStorage{ - utils.MetaReq: utils.MapStorage(cgrEv.Event), - utils.MetaDC: dc, - utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts), - utils.MetaCfg: cfg.GetDataProvider(), - }, utils.FirstNonEmpty(cgrEv.Tenant, cfg.GeneralCfg().DefaultTenant), - fltS, - map[string]*utils.OrderedNavigableMap{utils.MetaExp: r}).SetFields(fields) + map[string]*utils.OrderedNavigableMap{prfx: r}).SetFields(ctx, fields) return } diff --git a/ees/ees.go b/ees/ees.go index 7bbdf21ed..bb48de5cb 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -98,18 +98,21 @@ func (eeS *EventExporterS) setupCache(chCfgs map[string]*config.CacheParamCfg) { eeS.eesMux.Unlock() } -func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREvent, attrIDs []string, ctx string) (err error) { +func (eeS *EventExporterS) attrSProcessEvent(ctx *context.Context, cgrEv *utils.CGREvent, attrIDs []string, attributeSCtx string) (err error) { var rplyEv engine.AttrSProcessEventReply if cgrEv.APIOpts == nil { cgrEv.APIOpts = make(map[string]interface{}) } cgrEv.APIOpts[utils.Subsys] = utils.MetaEEs - cgrEv.APIOpts[utils.OptsContext] = ctx + cgrEv.APIOpts[utils.OptsContext] = utils.FirstNonEmpty( + attributeSCtx, + utils.IfaceAsString(cgrEv.APIOpts[utils.OptsContext]), + utils.MetaEEs) attrArgs := &engine.AttrArgsProcessEvent{ AttributeIDs: attrIDs, CGREvent: cgrEv, } - if err = eeS.connMgr.Call(context.TODO(), + if err = eeS.connMgr.Call(ctx, eeS.cfg.EEsNoLksCfg().AttributeSConns, utils.AttributeSv1ProcessEvent, attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 { @@ -122,7 +125,7 @@ func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREvent, attrIDs []st // V1ProcessEvent will be called each time a new event is received from readers // rply -> map[string]map[string]interface{} -func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *map[string]map[string]interface{}) (err error) { +func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEeIDs, rply *map[string]map[string]interface{}) (err error) { eeS.cfg.RLocks(config.EEsJSON) defer eeS.cfg.RUnlocks(config.EEsJSON) @@ -146,7 +149,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply * if len(eeCfg.Filters) != 0 { tnt := utils.FirstNonEmpty(cgrEv.Tenant, eeS.cfg.GeneralCfg().DefaultTenant) - if pass, errPass := eeS.filterS.Pass(context.TODO(), tnt, + if pass, errPass := eeS.filterS.Pass(ctx, tnt, eeCfg.Filters, cgrDp); errPass != nil { return errPass } else if !pass { @@ -155,12 +158,8 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply * } if eeCfg.Flags.GetBool(utils.MetaAttributes) { - if err = eeS.attrSProcessEvent( - cgrEv.CGREvent, - eeCfg.AttributeSIDs, utils.FirstNonEmpty( - eeCfg.AttributeSCtx, - utils.IfaceAsString(cgrEv.APIOpts[utils.OptsContext]), - utils.MetaEEs)); err != nil { + if err = eeS.attrSProcessEvent(ctx, cgrEv.CGREvent, + eeCfg.AttributeSIDs, eeCfg.AttributeSCtx); err != nil { return } } @@ -188,9 +187,11 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply * metricMapLock.Lock() metricsMap[ee.Cfg().ID] = utils.MapStorage{} // will return the ID for all processed exporters metricMapLock.Unlock() - + ctx := ctx if eeCfg.Synchronous { wg.Add(1) // wait for synchronous or file ones since these need to be done before continuing + } else { + ctx = context.Background() // is async so lose the API context } // log the message before starting the gorutine, but still execute the exporter if hasVerbose && !eeCfg.Synchronous { @@ -199,7 +200,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply * utils.EEs, ee.Cfg().ID)) } go func(evict, sync bool, ee EventExporter) { - if err := exportEventWithExporter(ee, cgrEv.CGREvent, evict, eeS.cfg, eeS.filterS); err != nil { + if err := exportEventWithExporter(ctx, ee, cgrEv.CGREvent, evict, eeS.cfg, eeS.filterS); err != nil { withErr = true } if sync { @@ -243,7 +244,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply * return } -func exportEventWithExporter(exp EventExporter, ev *utils.CGREvent, oneTime bool, cfg *config.CGRConfig, filterS *engine.FilterS) (err error) { +func exportEventWithExporter(ctx *context.Context, exp EventExporter, ev *utils.CGREvent, oneTime bool, cfg *config.CGRConfig, filterS *engine.FilterS) (err error) { defer func() { updateEEMetrics(exp.GetMetrics(), ev.ID, ev.Event, err != nil, utils.FirstNonEmpty(exp.Cfg().Timezone, cfg.GeneralCfg().DefaultTimezone)) @@ -269,7 +270,7 @@ func exportEventWithExporter(exp EventExporter, ev *utils.CGREvent, oneTime bool utils.MetaCfg: cfg.GetDataProvider(), }, utils.FirstNonEmpty(ev.Tenant, cfg.GeneralCfg().DefaultTenant), filterS, - map[string]*utils.OrderedNavigableMap{utils.MetaExp: expNM}).SetFields(exp.Cfg().ContentFields()) + map[string]*utils.OrderedNavigableMap{utils.MetaExp: expNM}).SetFields(ctx, exp.Cfg().ContentFields()) if eEv, err = exp.PrepareOrderMap(expNM); err != nil { return } @@ -277,10 +278,10 @@ func exportEventWithExporter(exp EventExporter, ev *utils.CGREvent, oneTime bool key := utils.ConcatenatedKey(utils.FirstNonEmpty(engine.MapEvent(ev.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID()), utils.FirstNonEmpty(engine.MapEvent(ev.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault)) - return ExportWithAttempts(exp, eEv, key) + return ExportWithAttempts(ctx, exp, eEv, key) } -func ExportWithAttempts(exp EventExporter, eEv interface{}, key string) (err error) { +func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}, key string) (err error) { if exp.Cfg().FailedPostsDir != utils.MetaNone { defer func() { if err != nil { @@ -307,7 +308,7 @@ func ExportWithAttempts(exp EventExporter, eEv interface{}, key string) (err err return } for i := 0; i < exp.Cfg().Attempts; i++ { - if err = exp.ExportEvent(eEv, key); err == nil || + if err = exp.ExportEvent(ctx, eEv, key); err == nil || err == utils.ErrDisconnected { // special error in case the exporter was closed break } diff --git a/ees/ees_test.go b/ees/ees_test.go index 528c022c9..c053bc397 100644 --- a/ees/ees_test.go +++ b/ees/ees_test.go @@ -115,7 +115,7 @@ func TestAttrSProcessEvent(t *testing.T) { }) eeS := NewEventExporterS(cgrCfg, filterS, connMgr) // cgrEv := &utils.CGREvent{} - if err := eeS.attrSProcessEvent(cgrEv, []string{}, utils.EmptyString); err != nil { + if err := eeS.attrSProcessEvent(context.TODO(), cgrEv, []string{}, utils.EmptyString); err != nil { t.Error(err) } } @@ -141,7 +141,7 @@ func TestAttrSProcessEvent2(t *testing.T) { }) eeS := NewEventExporterS(cgrCfg, filterS, connMgr) cgrEv := &utils.CGREvent{} - if err := eeS.attrSProcessEvent(cgrEv, []string{}, utils.EmptyString); err != nil { + if err := eeS.attrSProcessEvent(context.TODO(), cgrEv, []string{}, utils.EmptyString); err != nil { t.Error(err) } } @@ -189,7 +189,7 @@ func TestV1ProcessEvent(t *testing.T) { rplyExpect := map[string]map[string]interface{}{ "SQLExporterFull": {}, } - if err := eeS.V1ProcessEvent(cgrEv, &rply); err != nil { + if err := eeS.V1ProcessEvent(context.TODO(), cgrEv, &rply); err != nil { t.Error(err) } else if !reflect.DeepEqual(rply, rplyExpect) { t.Errorf("Expected %q but received %q", rplyExpect, rply) @@ -225,13 +225,13 @@ func TestV1ProcessEvent2(t *testing.T) { } var rply map[string]map[string]interface{} errExpect := "NOT_FOUND" - if err := eeS.V1ProcessEvent(cgrEv, &rply); err == nil || err.Error() != errExpect { + if err := eeS.V1ProcessEvent(context.TODO(), cgrEv, &rply); err == nil || err.Error() != errExpect { t.Errorf("Expecting %q but received %q", errExpect, err) } errExpect = "NOT_FOUND:test" eeS.cfg.EEsCfg().Exporters[0].Filters = []string{"test"} - if err := eeS.V1ProcessEvent(cgrEv, &rply); err == nil || err.Error() != errExpect { + if err := eeS.V1ProcessEvent(context.TODO(), cgrEv, &rply); err == nil || err.Error() != errExpect { t.Errorf("Expecting %q but received %q", errExpect, err) } } @@ -257,7 +257,7 @@ func TestV1ProcessEvent3(t *testing.T) { } var rply map[string]map[string]interface{} errExpect := "MANDATORY_IE_MISSING: [connIDs]" - if err := eeS.V1ProcessEvent(cgrEv, &rply); err == nil || err.Error() != errExpect { + if err := eeS.V1ProcessEvent(context.TODO(), cgrEv, &rply); err == nil || err.Error() != errExpect { t.Errorf("Expecting %q but received %q", errExpect, err) } } @@ -293,7 +293,7 @@ func TestV1ProcessEvent4(t *testing.T) { } var rply map[string]map[string]interface{} errExpect := "PARTIALLY_EXECUTED" - if err := eeS.V1ProcessEvent(cgrEv, &rply); err == nil || err.Error() != errExpect { + if err := eeS.V1ProcessEvent(context.TODO(), cgrEv, &rply); err == nil || err.Error() != errExpect { t.Errorf("Expecting %q but received %q", errExpect, err) } else if len(rply) != 0 { t.Error("Unexpected reply result") @@ -318,9 +318,9 @@ func (m mockEventExporter) GetMetrics() *utils.SafeMapStorage { return m.dc } -func (mockEventExporter) Cfg() *config.EventExporterCfg { return new(config.EventExporterCfg) } -func (mockEventExporter) Connect() error { return nil } -func (mockEventExporter) ExportEvent(interface{}, string) error { return nil } +func (mockEventExporter) Cfg() *config.EventExporterCfg { return new(config.EventExporterCfg) } +func (mockEventExporter) Connect() error { return nil } +func (mockEventExporter) ExportEvent(*context.Context, interface{}, string) error { return nil } func (mockEventExporter) Close() error { utils.Logger.Warning("NOT IMPLEMENTED") return nil @@ -354,7 +354,7 @@ func TestV1ProcessEventMockMetrics(t *testing.T) { } var rply map[string]map[string]interface{} errExpect := "cannot cast to map[string]interface{} 5 for positive exports" - if err := eeS.V1ProcessEvent(cgrEv, &rply); err == nil || err.Error() != errExpect { + if err := eeS.V1ProcessEvent(context.TODO(), cgrEv, &rply); err == nil || err.Error() != errExpect { t.Errorf("Expecting %q but received %q", errExpect, err) } } @@ -386,7 +386,7 @@ func TestV1ProcessEvent5(t *testing.T) { eeS := NewEventExporterS(cgrCfg, filterS, nil) var rply map[string]map[string]interface{} errExpect := "unsupported exporter type: " - if err := eeS.V1ProcessEvent(cgrEv, &rply); err == nil || err.Error() != errExpect { + if err := eeS.V1ProcessEvent(context.TODO(), cgrEv, &rply); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } } @@ -411,7 +411,7 @@ func TestV1ProcessEvent6(t *testing.T) { }, } var rply map[string]map[string]interface{} - if err := eeS.V1ProcessEvent(cgrEv, &rply); err != nil { + if err := eeS.V1ProcessEvent(context.TODO(), cgrEv, &rply); err != nil { t.Error(err) } } diff --git a/ees/elastic.go b/ees/elastic.go index 47ba58221..b50ce50e2 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -20,7 +20,6 @@ package ees import ( "bytes" - "context" "encoding/json" "fmt" "strings" @@ -28,6 +27,7 @@ import ( "github.com/elastic/go-elasticsearch/esapi" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" elasticsearch "github.com/elastic/go-elasticsearch" @@ -120,7 +120,7 @@ func (eEe *ElasticEE) Connect() (err error) { } // ExportEvent implements EventExporter -func (eEe *ElasticEE) ExportEvent(ev interface{}, key string) (err error) { +func (eEe *ElasticEE) ExportEvent(ctx *context.Context, ev interface{}, key string) (err error) { eEe.reqs.get() eEe.RLock() defer func() { @@ -148,7 +148,7 @@ func (eEe *ElasticEE) ExportEvent(ev interface{}, key string) (err error) { } var resp *esapi.Response - if resp, err = eReq.Do(context.Background(), eEe.eClnt); err != nil { + if resp, err = eReq.Do(ctx, eEe.eClnt); err != nil { return } defer resp.Body.Close() diff --git a/ees/elastic_test.go b/ees/elastic_test.go index 308f4a20a..80ff341ed 100644 --- a/ees/elastic_test.go +++ b/ees/elastic_test.go @@ -24,6 +24,7 @@ import ( "reflect" "testing" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -261,7 +262,7 @@ func TestElasticExportEvent(t *testing.T) { t.Error(err) } eEe.eClnt.Transport = new(mockClientErr) - if err := eEe.ExportEvent([]byte{}, ""); err != nil { + if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err != nil { t.Error(err) } } @@ -293,7 +294,7 @@ func TestElasticExportEvent2(t *testing.T) { eEe.eClnt.Transport = new(mockClientErr2) errExpect := io.EOF - if err := eEe.ExportEvent([]byte{}, ""); err == nil || err != errExpect { + if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err == nil || err != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } } @@ -324,7 +325,7 @@ func TestElasticExportEvent3(t *testing.T) { eEe.eClnt.Transport = new(mockClient) // errExpect := `unsupported protocol scheme ""` cgrCfg.EEsCfg().Exporters[0].ComputeFields() - if err := eEe.ExportEvent([]byte{}, ""); err != nil { + if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err != nil { t.Error(err) } } @@ -343,7 +344,7 @@ func TestElasticExportEvent4(t *testing.T) { t.Error(err) } errExpect := `unsupported protocol scheme ""` - if err := eEe.ExportEvent([]byte{}, ""); err == nil || err.Error() != errExpect { + if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err == nil || err.Error() != errExpect { t.Errorf("Expected %q but got %q", errExpect, err) } } diff --git a/ees/filecsv.go b/ees/filecsv.go index 5b14b88a1..0a909d22a 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -26,6 +26,7 @@ import ( "path" "sync" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/config" @@ -83,7 +84,7 @@ func (fCsv *FileCSVee) init() (err error) { func (fCsv *FileCSVee) composeHeader() (err error) { if len(fCsv.Cfg().HeaderFields()) != 0 { var exp *utils.OrderedNavigableMap - if exp, err = composeHeaderTrailer(utils.MetaHdr, fCsv.Cfg().HeaderFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil { + if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, fCsv.Cfg().HeaderFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil { return } return fCsv.csvWriter.Write(exp.OrderedFieldsAsStrings()) @@ -95,7 +96,7 @@ func (fCsv *FileCSVee) composeHeader() (err error) { func (fCsv *FileCSVee) composeTrailer() (err error) { if len(fCsv.Cfg().TrailerFields()) != 0 { var exp *utils.OrderedNavigableMap - if exp, err = composeHeaderTrailer(utils.MetaTrl, fCsv.Cfg().TrailerFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil { + if exp, err = composeHeaderTrailer(context.Background(), utils.MetaTrl, fCsv.Cfg().TrailerFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil { return } return fCsv.csvWriter.Write(exp.OrderedFieldsAsStrings()) @@ -107,7 +108,7 @@ func (fCsv *FileCSVee) Cfg() *config.EventExporterCfg { return fCsv.cfg } func (fCsv *FileCSVee) Connect() (_ error) { return } -func (fCsv *FileCSVee) ExportEvent(ev interface{}, _ string) error { +func (fCsv *FileCSVee) ExportEvent(_ *context.Context, ev interface{}, _ string) error { fCsv.Lock() // make sure that only one event is writen in file at once defer fCsv.Unlock() return fCsv.csvWriter.Write(ev.([]string)) diff --git a/ees/filecsv_test.go b/ees/filecsv_test.go index c1fe001e6..8c3b473bd 100644 --- a/ees/filecsv_test.go +++ b/ees/filecsv_test.go @@ -25,6 +25,7 @@ import ( "reflect" "testing" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -200,7 +201,7 @@ func TestFileCsvExportEvent(t *testing.T) { dc: dc, } - if err := fCsv.ExportEvent([]string{"value", "3"}, ""); err != nil { + if err := fCsv.ExportEvent(context.Background(), []string{"value", "3"}, ""); err != nil { t.Error(err) } csvNW.Flush() diff --git a/ees/filefwv.go b/ees/filefwv.go index 157051551..991475831 100644 --- a/ees/filefwv.go +++ b/ees/filefwv.go @@ -25,6 +25,7 @@ import ( "path" "sync" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -75,7 +76,7 @@ func (fFwv *FileFWVee) composeHeader() (err error) { return } var exp *utils.OrderedNavigableMap - if exp, err = composeHeaderTrailer(utils.MetaHdr, fFwv.Cfg().HeaderFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil { + if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, fFwv.Cfg().HeaderFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil { return } for _, record := range exp.OrderedFieldsAsStrings() { @@ -93,7 +94,7 @@ func (fFwv *FileFWVee) composeTrailer() (err error) { return } var exp *utils.OrderedNavigableMap - if exp, err = composeHeaderTrailer(utils.MetaTrl, fFwv.Cfg().TrailerFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil { + if exp, err = composeHeaderTrailer(context.Background(), utils.MetaTrl, fFwv.Cfg().TrailerFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil { return } for _, record := range exp.OrderedFieldsAsStrings() { @@ -109,7 +110,7 @@ func (fFwv *FileFWVee) Cfg() *config.EventExporterCfg { return fFwv.cfg } func (fFwv *FileFWVee) Connect() (_ error) { return } -func (fFwv *FileFWVee) ExportEvent(records interface{}, _ string) (err error) { +func (fFwv *FileFWVee) ExportEvent(_ *context.Context, records interface{}, _ string) (err error) { fFwv.Lock() // make sure that only one event is writen in file at once defer fFwv.Unlock() for _, record := range records.([]string) { diff --git a/ees/filefwv_test.go b/ees/filefwv_test.go index 294101f92..97c7d47a3 100644 --- a/ees/filefwv_test.go +++ b/ees/filefwv_test.go @@ -25,6 +25,7 @@ import ( "reflect" "testing" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -184,7 +185,7 @@ func TestFileFwvExportEvent(t *testing.T) { file: nopCloser{byteBuff}, dc: dc, } - if err := fFwv.ExportEvent([]string{"value", "3"}, ""); err != nil { + if err := fFwv.ExportEvent(context.Background(), []string{"value", "3"}, ""); err != nil { t.Error(err) } csvNW.Flush() @@ -220,7 +221,7 @@ func TestFileFwvExportEventWriteError(t *testing.T) { file: nopCloserWrite{byteBuff}, dc: dc, } - if err := fFwv.ExportEvent([]string{""}, ""); err == nil || err != utils.ErrNotImplemented { + if err := fFwv.ExportEvent(context.Background(), []string{""}, ""); err == nil || err != utils.ErrNotImplemented { t.Errorf("Expected %q but received %q", utils.ErrNotImplemented, err) } } diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index ff82ee741..0bd7fccf9 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -27,6 +27,7 @@ import ( "net/url" "strings" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -61,7 +62,7 @@ func (httpEE *HTTPjsonMapEE) composeHeader(cgrCfg *config.CGRConfig, filterS *en return } var exp *utils.OrderedNavigableMap - if exp, err = composeHeaderTrailer(utils.MetaHdr, httpEE.Cfg().HeaderFields(), httpEE.dc, cgrCfg, filterS); err != nil { + if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, httpEE.Cfg().HeaderFields(), httpEE.dc, cgrCfg, filterS); err != nil { return } for el := exp.GetFirstElement(); el != nil; el = el.Next() { @@ -77,12 +78,12 @@ func (httpEE *HTTPjsonMapEE) Cfg() *config.EventExporterCfg { return httpEE.cfg func (httpEE *HTTPjsonMapEE) Connect() (_ error) { return } -func (httpEE *HTTPjsonMapEE) ExportEvent(content interface{}, _ string) (err error) { +func (httpEE *HTTPjsonMapEE) ExportEvent(ctx *context.Context, content interface{}, _ string) (err error) { httpEE.reqs.get() defer httpEE.reqs.done() pReq := content.(*HTTPPosterRequest) var req *http.Request - if req, err = prepareRequest(httpEE.Cfg().ExportPath, utils.ContentJSON, pReq.Body, pReq.Header); err != nil { + if req, err = prepareRequest(ctx, httpEE.Cfg().ExportPath, utils.ContentJSON, pReq.Body, pReq.Header); err != nil { return } _, err = sendHTTPReq(httpEE.client, req) @@ -116,7 +117,7 @@ func (httpEE *HTTPjsonMapEE) PrepareOrderMap(mp *utils.OrderedNavigableMap) (int }, err } -func prepareRequest(addr, cType string, content interface{}, hdr http.Header) (req *http.Request, err error) { +func prepareRequest(ctx *context.Context, addr, cType string, content interface{}, hdr http.Header) (req *http.Request, err error) { var body io.Reader if cType == utils.ContentForm { body = strings.NewReader(content.(url.Values).Encode()) @@ -128,7 +129,7 @@ func prepareRequest(addr, cType string, content interface{}, hdr http.Header) (r contentType = "application/json" } hdr.Set("Content-Type", contentType) - if req, err = http.NewRequest(http.MethodPost, addr, body); err != nil { + if req, err = http.NewRequestWithContext(ctx, http.MethodPost, addr, body); err != nil { return } req.Header = hdr diff --git a/ees/httpjsonmap_test.go b/ees/httpjsonmap_test.go index e32f71415..c6a1e907a 100644 --- a/ees/httpjsonmap_test.go +++ b/ees/httpjsonmap_test.go @@ -27,6 +27,7 @@ import ( "testing" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -54,7 +55,7 @@ func TestHttpJsonMapExportEvent1(t *testing.T) { t.Error(err) } errExpect := `Post "/var/spool/cgrates/ees": unsupported protocol scheme ""` - if err := httpEE.ExportEvent(&HTTPPosterRequest{Body: []byte{}, Header: make(http.Header)}, ""); err == nil || err.Error() != errExpect { + if err := httpEE.ExportEvent(context.Background(), &HTTPPosterRequest{Body: []byte{}, Header: make(http.Header)}, ""); err == nil || err.Error() != errExpect { t.Errorf("Expected %q but received %q", errExpect, err) } } @@ -84,7 +85,7 @@ func TestHttpJsonMapExportEvent2(t *testing.T) { t.Error(err) } - if err := httpEE.ExportEvent(&HTTPPosterRequest{Body: []byte(`{"2": "*req.field2"}`), Header: make(http.Header)}, ""); err != nil { + if err := httpEE.ExportEvent(context.Background(), &HTTPPosterRequest{Body: []byte(`{"2": "*req.field2"}`), Header: make(http.Header)}, ""); err != nil { t.Error(err) } } @@ -119,7 +120,7 @@ func TestHttpJsonMapSync(t *testing.T) { } for i := 0; i < 3; i++ { - go exp.ExportEvent(&HTTPPosterRequest{Body: []byte(`{"2": "*req.field2"}`), Header: make(http.Header)}, "") + go exp.ExportEvent(context.Background(), &HTTPPosterRequest{Body: []byte(`{"2": "*req.field2"}`), Header: make(http.Header)}, "") } select { @@ -160,7 +161,7 @@ func TestHttpJsonMapSyncLimit(t *testing.T) { } for i := 0; i < 3; i++ { - go exp.ExportEvent(&HTTPPosterRequest{Body: []byte(`{"2": "*req.field2"}`), Header: make(http.Header)}, "") + go exp.ExportEvent(context.Background(), &HTTPPosterRequest{Body: []byte(`{"2": "*req.field2"}`), Header: make(http.Header)}, "") } select { diff --git a/ees/httppost.go b/ees/httppost.go index e116632dd..0b5553efa 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -23,6 +23,7 @@ import ( "net/url" "strings" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -61,7 +62,7 @@ func (httpPost *HTTPPostEE) composeHeader(cgrCfg *config.CGRConfig, filterS *eng return } var exp *utils.OrderedNavigableMap - if exp, err = composeHeaderTrailer(utils.MetaHdr, httpPost.Cfg().HeaderFields(), httpPost.dc, cgrCfg, filterS); err != nil { + if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, httpPost.Cfg().HeaderFields(), httpPost.dc, cgrCfg, filterS); err != nil { return } for el := exp.GetFirstElement(); el != nil; el = el.Next() { @@ -77,12 +78,12 @@ func (httpPost *HTTPPostEE) Cfg() *config.EventExporterCfg { return httpPost.cfg func (httpPost *HTTPPostEE) Connect() (_ error) { return } -func (httpPost *HTTPPostEE) ExportEvent(content interface{}, _ string) (err error) { +func (httpPost *HTTPPostEE) ExportEvent(ctx *context.Context, content interface{}, _ string) (err error) { httpPost.reqs.get() defer httpPost.reqs.done() pReq := content.(*HTTPPosterRequest) var req *http.Request - if req, err = prepareRequest(httpPost.Cfg().ExportPath, utils.ContentForm, pReq.Body, pReq.Header); err != nil { + if req, err = prepareRequest(ctx, httpPost.Cfg().ExportPath, utils.ContentForm, pReq.Body, pReq.Header); err != nil { return } _, err = sendHTTPReq(httpPost.client, req) diff --git a/ees/httppost_test.go b/ees/httppost_test.go index 919667ead..f16af4e83 100644 --- a/ees/httppost_test.go +++ b/ees/httppost_test.go @@ -28,6 +28,7 @@ import ( "testing" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -61,7 +62,7 @@ func TestHttpPostExportEvent(t *testing.T) { "Test1": 3, } errExpect := `Post "/var/spool/cgrates/ees": unsupported protocol scheme ""` - if err := httpPost.ExportEvent(&HTTPPosterRequest{Body: url.Values{}, Header: make(http.Header)}, ""); err == nil || err.Error() != errExpect { + if err := httpPost.ExportEvent(context.Background(), &HTTPPosterRequest{Body: url.Values{}, Header: make(http.Header)}, ""); err == nil || err.Error() != errExpect { t.Errorf("Expected %q but received %q", errExpect, err) } } @@ -91,7 +92,7 @@ func TestHttpPostExportEvent2(t *testing.T) { if err != nil { t.Fatal(err) } - if err := httpPost.ExportEvent(vals, ""); err != nil { + if err := httpPost.ExportEvent(context.Background(), vals, ""); err != nil { t.Error(err) } } @@ -135,7 +136,7 @@ func TestHttpPostSync(t *testing.T) { } for i := 0; i < 3; i++ { - go exp.ExportEvent(vals, "") + go exp.ExportEvent(context.Background(), vals, "") } select { @@ -188,7 +189,7 @@ func TestHttpPostSyncLimit(t *testing.T) { } for i := 0; i < 3; i++ { - go exp.ExportEvent(vals, "") + go exp.ExportEvent(context.Background(), vals, "") } select { case <-test: diff --git a/ees/kafka.go b/ees/kafka.go index 47dd98f9a..755646601 100644 --- a/ees/kafka.go +++ b/ees/kafka.go @@ -18,9 +18,9 @@ along with this program. If not, see package ees import ( - "context" "sync" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" kafka "github.com/segmentio/kafka-go" @@ -72,7 +72,7 @@ func (pstr *KafkaEE) Connect() (_ error) { return } -func (pstr *KafkaEE) ExportEvent(content interface{}, key string) (err error) { +func (pstr *KafkaEE) ExportEvent(ctx *context.Context, content interface{}, key string) (err error) { pstr.reqs.get() pstr.RLock() if pstr.writer == nil { @@ -80,7 +80,7 @@ func (pstr *KafkaEE) ExportEvent(content interface{}, key string) (err error) { pstr.reqs.done() return utils.ErrDisconnected } - err = pstr.writer.WriteMessages(context.Background(), kafka.Message{ + err = pstr.writer.WriteMessages(ctx, kafka.Message{ Key: []byte(key), Value: content.([]byte), }) diff --git a/ees/libcdre.go b/ees/libcdre.go index 10b5217a1..ee8fe8a01 100644 --- a/ees/libcdre.go +++ b/ees/libcdre.go @@ -173,7 +173,7 @@ func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *Export keyFunc = utils.UUIDSha1Prefix } for _, ev := range expEv.Events { - if err = ExportWithAttempts(ee, ev, keyFunc()); err != nil { + if err = ExportWithAttempts(context.Background(), ee, ev, keyFunc()); err != nil { failedEvents.AddEvent(ev) } } diff --git a/ees/log.go b/ees/log.go index af276ee19..1c0d18863 100644 --- a/ees/log.go +++ b/ees/log.go @@ -22,6 +22,7 @@ import ( "fmt" "strings" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -41,7 +42,7 @@ type LogEE struct { func (vEe *LogEE) Cfg() *config.EventExporterCfg { return vEe.cfg } func (vEe *LogEE) Connect() error { return nil } -func (vEe *LogEE) ExportEvent(mp interface{}, _ string) error { +func (vEe *LogEE) ExportEvent(_ *context.Context, mp interface{}, _ string) error { utils.Logger.Info( fmt.Sprintf("<%s> <%s> exported: <%s>", utils.EEs, vEe.Cfg().ID, utils.ToJSON(mp))) diff --git a/ees/nats.go b/ees/nats.go index fc9929864..b71e792ad 100644 --- a/ees/nats.go +++ b/ees/nats.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/nats-io/nats.go" @@ -99,7 +100,7 @@ func (pstr *NatsEE) Connect() (err error) { return } -func (pstr *NatsEE) ExportEvent(content interface{}, _ string) (err error) { +func (pstr *NatsEE) ExportEvent(ctx *context.Context, content interface{}, _ string) (err error) { pstr.reqs.get() pstr.RLock() if pstr.poster == nil { @@ -108,7 +109,7 @@ func (pstr *NatsEE) ExportEvent(content interface{}, _ string) (err error) { return utils.ErrDisconnected } if pstr.jetStream { - _, err = pstr.posterJS.Publish(pstr.subject, content.([]byte)) + _, err = pstr.posterJS.Publish(pstr.subject, content.([]byte), nats.Context(ctx)) } else { err = pstr.poster.Publish(pstr.subject, content.([]byte)) } diff --git a/ees/nats_it_test.go b/ees/nats_it_test.go index 4360abe7e..7ad50157c 100644 --- a/ees/nats_it_test.go +++ b/ees/nats_it_test.go @@ -27,6 +27,7 @@ import ( "testing" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -105,7 +106,7 @@ func TestNatsEE(t *testing.T) { "Destination": "1002", }, } - if err := exportEventWithExporter(evExp, cgrEv, true, cgrCfg, new(engine.FilterS)); err != nil { + if err := exportEventWithExporter(context.Background(), evExp, cgrEv, true, cgrCfg, new(engine.FilterS)); err != nil { t.Fatal(err) } testCleanDirectory(t) @@ -171,7 +172,7 @@ func TestNatsEE2(t *testing.T) { "Destination": "1002", }, } - if err := exportEventWithExporter(evExp, cgrEv, true, cgrCfg, new(engine.FilterS)); err != nil { + if err := exportEventWithExporter(context.Background(), evExp, cgrEv, true, cgrCfg, new(engine.FilterS)); err != nil { t.Fatal(err) } testCleanDirectory(t) diff --git a/ees/poster_it_test.go b/ees/poster_it_test.go index 868ee8af4..23d4f5a18 100644 --- a/ees/poster_it_test.go +++ b/ees/poster_it_test.go @@ -21,7 +21,6 @@ along with this program. If not, see package ees import ( - "context" "encoding/json" "flag" "net/http" @@ -38,6 +37,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/sqs" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -76,7 +76,7 @@ func TestHttpJsonPoster(t *testing.T) { if err != nil { t.Error(err) } - if err = ExportWithAttempts(pstr, &HTTPPosterRequest{Body: jsn, Header: make(http.Header)}, ""); err == nil { + if err = ExportWithAttempts(context.Background(), pstr, &HTTPPosterRequest{Body: jsn, Header: make(http.Header)}, ""); err == nil { t.Error("Expected error") } AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.MetaHTTPjsonMap, "test1", jsn, make(map[string]interface{})) @@ -112,7 +112,7 @@ func TestHttpBytesPoster(t *testing.T) { if err != nil { t.Error(err) } - if err = ExportWithAttempts(pstr, &HTTPPosterRequest{Body: content, Header: make(http.Header)}, ""); err == nil { + if err = ExportWithAttempts(context.Background(), pstr, &HTTPPosterRequest{Body: content, Header: make(http.Header)}, ""); err == nil { t.Error("Expected error") } AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.ContentJSON, "test2", content, make(map[string]interface{})) @@ -166,7 +166,7 @@ func TestSQSPoster(t *testing.T) { Attempts: 5, Opts: opts, }, nil) - if err := ExportWithAttempts(pstr, []byte(body), ""); err != nil { + if err := ExportWithAttempts(context.Background(), pstr, []byte(body), ""); err != nil { t.Fatal(err) } @@ -249,7 +249,7 @@ func TestS3Poster(t *testing.T) { Attempts: 5, Opts: opts, }, nil) - if err := ExportWithAttempts(pstr, []byte(body), key); err != nil { + if err := ExportWithAttempts(context.Background(), pstr, []byte(body), key); err != nil { t.Fatal(err) } key += ".json" @@ -308,7 +308,7 @@ func TestAMQPv1Poster(t *testing.T) { Attempts: 5, Opts: opts, }, nil) - if err := ExportWithAttempts(pstr, []byte(body), ""); err != nil { + if err := ExportWithAttempts(context.Background(), pstr, []byte(body), ""); err != nil { t.Fatal(err) } // Create client diff --git a/ees/s3.go b/ees/s3.go index 8aedd8c29..6aad90547 100644 --- a/ees/s3.go +++ b/ees/s3.go @@ -27,6 +27,7 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -111,10 +112,10 @@ func (pstr *S3EE) Connect() (err error) { return } -func (pstr *S3EE) ExportEvent(message interface{}, key string) (err error) { +func (pstr *S3EE) ExportEvent(ctx *context.Context, message interface{}, key string) (err error) { pstr.reqs.get() pstr.RLock() - _, err = pstr.up.Upload(&s3manager.UploadInput{ + _, err = pstr.up.UploadWithContext(ctx, &s3manager.UploadInput{ Bucket: aws.String(pstr.bucket), // Can also use the `filepath` standard library package to modify the diff --git a/ees/sql.go b/ees/sql.go index bf8b011ec..a1c4d5cc1 100644 --- a/ees/sql.go +++ b/ees/sql.go @@ -29,6 +29,7 @@ import ( "gorm.io/driver/postgres" "gorm.io/gorm" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -142,7 +143,7 @@ func (sqlEe *SQLEe) Connect() (err error) { return } -func (sqlEe *SQLEe) ExportEvent(req interface{}, _ string) error { +func (sqlEe *SQLEe) ExportEvent(_ *context.Context, req interface{}, _ string) error { sqlEe.reqs.get() sqlEe.RLock() defer func() { diff --git a/ees/sql_it_test.go b/ees/sql_it_test.go index 1e01b9043..d8318a230 100644 --- a/ees/sql_it_test.go +++ b/ees/sql_it_test.go @@ -28,6 +28,7 @@ import ( "testing" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/utils" "gorm.io/driver/mysql" @@ -297,7 +298,7 @@ func TestSQLExportEvent1(t *testing.T) { if err := sqlEe.Connect(); err != nil { t.Fatal(err) } - if err := sqlEe.ExportEvent(&sqlPosterRequest{Querry: "INSERT INTO cdrs VALUES (); ", Values: []interface{}{}}, ""); err != nil { + if err := sqlEe.ExportEvent(context.Background(), &sqlPosterRequest{Querry: "INSERT INTO cdrs VALUES (); ", Values: []interface{}{}}, ""); err != nil { t.Error(err) } sqlEe.Close() diff --git a/ees/sqs.go b/ees/sqs.go index 83f97d65f..3b9bd5b34 100644 --- a/ees/sqs.go +++ b/ees/sqs.go @@ -26,6 +26,7 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -129,10 +130,10 @@ func (pstr *SQSee) Connect() (err error) { return } -func (pstr *SQSee) ExportEvent(message interface{}, _ string) (err error) { +func (pstr *SQSee) ExportEvent(ctx *context.Context, message interface{}, _ string) (err error) { pstr.reqs.get() pstr.RLock() - _, err = pstr.svc.SendMessage( + _, err = pstr.svc.SendMessageWithContext(ctx, &sqs.SendMessageInput{ MessageBody: aws.String(string(message.([]byte))), QueueUrl: pstr.queueURL, diff --git a/ees/virtualee.go b/ees/virtualee.go index 5852b560a..a89a97580 100644 --- a/ees/virtualee.go +++ b/ees/virtualee.go @@ -19,6 +19,7 @@ along with this program. If not, see package ees import ( + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -36,12 +37,12 @@ type VirtualEE struct { dc *utils.SafeMapStorage } -func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg } -func (vEe *VirtualEE) Connect() error { return nil } -func (vEe *VirtualEE) ExportEvent(interface{}, string) error { return nil } -func (vEe *VirtualEE) Close() error { return nil } -func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc } -func (vEe *VirtualEE) PrepareMap(map[string]interface{}) (interface{}, error) { return nil, nil } +func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg } +func (vEe *VirtualEE) Connect() error { return nil } +func (vEe *VirtualEE) ExportEvent(*context.Context, interface{}, string) error { return nil } +func (vEe *VirtualEE) Close() error { return nil } +func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc } +func (vEe *VirtualEE) PrepareMap(map[string]interface{}) (interface{}, error) { return nil, nil } func (vEe *VirtualEE) PrepareOrderMap(*utils.OrderedNavigableMap) (interface{}, error) { return nil, nil } diff --git a/ees/virtualee_test.go b/ees/virtualee_test.go index 95d9ac56e..9324a6966 100644 --- a/ees/virtualee_test.go +++ b/ees/virtualee_test.go @@ -22,6 +22,7 @@ import ( "reflect" "testing" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/utils" ) @@ -40,7 +41,7 @@ func TestVirtualEeGetMetrics(t *testing.T) { } func TestVirtualEeExportEvent(t *testing.T) { vEe := &VirtualEE{} - if err := vEe.ExportEvent([]byte{}, ""); err != nil { + if err := vEe.ExportEvent(context.Background(), []byte{}, ""); err != nil { t.Error(err) } vEe.Close() diff --git a/engine/exportrequest.go b/engine/exportrequest.go index a356564df..3db790c60 100644 --- a/engine/exportrequest.go +++ b/engine/exportrequest.go @@ -96,9 +96,9 @@ func (eeR *ExportRequest) FieldAsString(fldPath []string) (val string, err error } //SetFields will populate fields of AgentRequest out of templates -func (eeR *ExportRequest) SetFields(tplFlds []*config.FCTemplate) (err error) { +func (eeR *ExportRequest) SetFields(ctx *context.Context, tplFlds []*config.FCTemplate) (err error) { for _, tplFld := range tplFlds { - if pass, err := eeR.filterS.Pass(context.TODO(), eeR.tnt, + if pass, err := eeR.filterS.Pass(ctx, eeR.tnt, tplFld.Filters, eeR); err != nil { return err } else if !pass { diff --git a/ers/amqp.go b/ers/amqp.go index 1e141e4f1..74a51e681 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -167,7 +167,7 @@ func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) { utils.ERs, msg.MessageId, err.Error())) } if rdr.poster != nil { // post it - if err := ees.ExportWithAttempts(rdr.poster, msg.Body, utils.EmptyString); err != nil { + if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Body, utils.EmptyString); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, msg.MessageId, err.Error())) diff --git a/ers/amqpv1.go b/ers/amqpv1.go index d816af5a6..186939e33 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -145,7 +145,7 @@ func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) { utils.ERs, err.Error())) } if rdr.poster != nil { // post it - if err := ees.ExportWithAttempts(rdr.poster, body, utils.EmptyString); err != nil { + if err := ees.ExportWithAttempts(context.Background(), rdr.poster, body, utils.EmptyString); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message error: %s", utils.ERs, err.Error())) diff --git a/ers/ers.go b/ers/ers.go index 496351a88..c963534b6 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -370,7 +370,7 @@ func (erS *ERService) onEvicted(id string, value interface{}) { utils.MetaExp: utils.NewOrderedNavigableMap(), }) - if err = eeReq.SetFields(eEvs.rdrCfg.CacheDumpFields); err != nil { + if err = eeReq.SetFields(context.Background(), eEvs.rdrCfg.CacheDumpFields); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> Converting CDR with CGRID: <%s> to record , ignoring due to error: <%s>", utils.ERs, id, err.Error())) diff --git a/ers/kafka.go b/ers/kafka.go index 992338b26..83b0a8b6f 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -141,7 +141,7 @@ func (rdr *KafkaER) readLoop(r *kafka.Reader) { utils.ERs, string(msg.Key), err.Error())) } if rdr.poster != nil { // post it - if err := ees.ExportWithAttempts(rdr.poster, msg.Value, string(msg.Key)); err != nil { + if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Value, string(msg.Key)); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, string(msg.Key), err.Error())) diff --git a/ers/nats.go b/ers/nats.go index 8e87c6335..cda095d17 100644 --- a/ers/nats.go +++ b/ers/nats.go @@ -136,7 +136,7 @@ func (rdr *NatsER) Serve() (err error) { utils.ERs, string(msg.Data), err.Error())) } if rdr.poster != nil { // post it - if err := ees.ExportWithAttempts(rdr.poster, msg.Data, utils.EmptyString); err != nil { + if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Data, utils.EmptyString); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, string(msg.Data), err.Error())) diff --git a/ers/s3.go b/ers/s3.go index ecea6bd8f..ca8e8a133 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -246,7 +246,7 @@ func (rdr *S3ER) readMsg(scv *s3.S3, key string) (err error) { } if rdr.poster != nil { // post it - if err = ees.ExportWithAttempts(rdr.poster, msg, key); err != nil { + if err = ees.ExportWithAttempts(context.Background(), rdr.poster, msg, key); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, key, err.Error())) diff --git a/ers/sqs.go b/ers/sqs.go index d7f2923ed..547d121c2 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -261,7 +261,7 @@ func (rdr *SQSER) readMsg(scv sqsClient, msg *sqs.Message) (err error) { } if rdr.poster != nil { // post it - if err = ees.ExportWithAttempts(rdr.poster, body, key); err != nil { + if err = ees.ExportWithAttempts(context.Background(), rdr.poster, body, key); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, key, err.Error())) diff --git a/services/ees.go b/services/ees.go index 42e80e057..3f968e3ca 100644 --- a/services/ees.go +++ b/services/ees.go @@ -23,6 +23,7 @@ import ( "sync" "github.com/cgrates/birpc" + "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/ees" @@ -60,8 +61,8 @@ type EventExporterService struct { rldChan chan struct{} stopChan chan struct{} - eeS *ees.EventExporterS - // rpc *v1.EeSv1 + eeS *ees.EventExporterS + rpc *apis.EeSv1 anz *AnalyzerService srvDep map[string]*sync.WaitGroup } @@ -96,7 +97,7 @@ func (es *EventExporterService) Shutdown() (err error) { close(es.stopChan) es.eeS.Shutdown() es.eeS = nil - //<-es.intConnChan + <-es.intConnChan return } @@ -118,10 +119,11 @@ func (es *EventExporterService) Start() (err error) { es.stopChan = make(chan struct{}) go es.eeS.ListenAndServe(es.stopChan, es.rldChan) - // es.rpc = v1.NewEeSv1(es.eeS) - // if !es.cfg.DispatcherSCfg().Enabled { - // es.server.RpcRegister(es.rpc) - // } - // es.intConnChan <- es.anz.GetInternalCodec(es.eeS, utils.EventExporterS) + es.rpc = apis.NewEeSv1(es.eeS) + srv, _ := birpc.NewService(es.rpc, "", false) + if !es.cfg.DispatcherSCfg().Enabled { + es.server.RpcRegister(srv) + } + es.intConnChan <- es.anz.GetInternalCodec(srv, utils.EEs) return }