From 035bac3688be7779ff82fa0d3ff37a599968a286 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Mon, 27 Jan 2025 19:31:20 +0200 Subject: [PATCH] elasticsearch: switch to fully-typed API - index request options are now used directly during ExportEvent. They are passed to the request as options only if they were configured in the first place. - implement PrepareMap and PrepareOrderMap methods for the elastic exporter. bytePreparing methods are not needed anymore as the Event map can be exported directly. - elasticsearch.Client -> elasticsearch.TypedClient - rename prepareOpts -> parseClientOpts --- ees/elastic.go | 163 ++++++++++++++-------------- ees/elastic_test.go | 255 +------------------------------------------- 2 files changed, 83 insertions(+), 335 deletions(-) diff --git a/ees/elastic.go b/ees/elastic.go index dec5e3cf2..c4d3f711e 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -19,15 +19,16 @@ along with this program. If not, see package ees import ( - "bytes" - "encoding/json" "fmt" "os" + "strconv" "strings" "sync" "github.com/elastic/elastic-transport-go/v8/elastictransport" - "github.com/elastic/go-elasticsearch/v8/esapi" + "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/optype" + "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/refresh" + "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/versiontype" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" @@ -36,66 +37,31 @@ import ( elasticsearch "github.com/elastic/go-elasticsearch/v8" ) -func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) { - el := &ElasticEE{ - cfg: cfg, - dc: dc, - reqs: newConcReq(cfg.ConcurrentRequests), - } - if err := el.prepareOpts(); err != nil { - return nil, err - } - return el, nil -} - // ElasticEE implements EventExporter interface for ElasticSearch export. type ElasticEE struct { mu sync.RWMutex cfg *config.EventExporterCfg dc *utils.SafeMapStorage reqs *concReq - bytePreparing - client *elasticsearch.Client + client *elasticsearch.TypedClient clientCfg elasticsearch.Config - - // indexReqOpts is used to store IndexRequest options for convenience - // and does not represent the IndexRequest itself. - indexReqOpts esapi.IndexRequest } -// init will create all the necessary dependencies, including opening the file -func (e *ElasticEE) prepareOpts() error { +func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) { + el := &ElasticEE{ + cfg: cfg, + dc: dc, + reqs: newConcReq(cfg.ConcurrentRequests), + } + if err := el.parseClientOpts(); err != nil { + return nil, err + } + return el, nil +} + +func (e *ElasticEE) parseClientOpts() error { opts := e.cfg.Opts - - // Parse index request options. - e.indexReqOpts.Index = utils.CDRsTBL - if opts.ElsIndex != nil { - e.indexReqOpts.Index = *opts.ElsIndex - } - e.indexReqOpts.IfPrimaryTerm = opts.ElsIfPrimaryTerm - e.indexReqOpts.IfSeqNo = opts.ElsIfSeqNo - if opts.ElsOpType != nil { - e.indexReqOpts.OpType = *opts.ElsOpType - } - if opts.ElsPipeline != nil { - e.indexReqOpts.Pipeline = *opts.ElsPipeline - } - if opts.ElsRouting != nil { - e.indexReqOpts.Routing = *opts.ElsRouting - } - if opts.ElsTimeout != nil { - e.indexReqOpts.Timeout = *opts.ElsTimeout - } - e.indexReqOpts.Version = opts.ElsVersion - if opts.ElsVersionType != nil { - e.indexReqOpts.VersionType = *opts.ElsVersionType - } - if opts.ElsWaitForActiveShards != nil { - e.indexReqOpts.WaitForActiveShards = *opts.ElsWaitForActiveShards - } - - // Parse client config options. if opts.ElsCloud != nil && *opts.ElsCloud { e.clientCfg.CloudID = e.Cfg().ExportPath } else { @@ -184,12 +150,12 @@ func (e *ElasticEE) Connect() (err error) { if e.client != nil { // check if connection is cached return } - e.client, err = elasticsearch.NewClient(e.clientCfg) + e.client, err = elasticsearch.NewTypedClient(e.clientCfg) return } // ExportEvent implements EventExporter -func (e *ElasticEE) ExportEvent(ctx *context.Context, ev, extraData any) error { +func (e *ElasticEE) ExportEvent(ctx *context.Context, event, extraData any) error { e.reqs.get() e.mu.RLock() defer func() { @@ -199,44 +165,75 @@ func (e *ElasticEE) ExportEvent(ctx *context.Context, ev, extraData any) error { if e.client == nil { return utils.ErrDisconnected } - key := extraData.(string) - req := esapi.IndexRequest{ - DocumentID: key, - Body: bytes.NewReader(ev.([]byte)), - Refresh: "true", - Index: e.indexReqOpts.Index, - IfPrimaryTerm: e.indexReqOpts.IfPrimaryTerm, - IfSeqNo: e.indexReqOpts.IfSeqNo, - OpType: e.indexReqOpts.OpType, - Pipeline: e.indexReqOpts.Pipeline, - Routing: e.indexReqOpts.Routing, - Timeout: e.indexReqOpts.Timeout, - Version: e.indexReqOpts.Version, - VersionType: e.indexReqOpts.VersionType, - WaitForActiveShards: e.indexReqOpts.WaitForActiveShards, - } - resp, err := req.Do(ctx, e.client) - if err != nil { - return err + // Build and send index request. + key := extraData.(string) + opts := e.cfg.Opts + indexName := utils.CDRsTBL + if opts.ElsIndex != nil { + indexName = *opts.ElsIndex } - defer resp.Body.Close() - if resp.IsError() { - var errResp map[string]any - if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil { - return err + req := e.client.Index(indexName). + Id(key). + Request(event). + Refresh(refresh.True) + + if opts.ElsIfPrimaryTerm != nil { + req.IfPrimaryTerm(strconv.Itoa(*opts.ElsIfPrimaryTerm)) + } + if opts.ElsIfSeqNo != nil { + req.IfSeqNo(strconv.Itoa(*opts.ElsIfSeqNo)) + } + if opts.ElsOpType != nil { + req.OpType(optype.OpType{Name: *opts.ElsOpType}) + } + if opts.ElsPipeline != nil { + req.Pipeline(*opts.ElsPipeline) + } + if opts.ElsRouting != nil { + req.Routing(*opts.ElsRouting) + } + if opts.ElsTimeout != nil { + req.Timeout((*opts.ElsTimeout).String()) + } + if opts.ElsVersion != nil { + req.Version(strconv.Itoa(*opts.ElsVersion)) + } + if opts.ElsVersionType != nil { + req.VersionType(versiontype.VersionType{Name: *opts.ElsVersionType}) + } + if opts.ElsWaitForActiveShards != nil { + req.WaitForActiveShards(*opts.ElsWaitForActiveShards) + } + _, err := req.Do(context.TODO()) + return err +} + +func (e *ElasticEE) PrepareMap(cgrEv *utils.CGREvent) (any, error) { + return cgrEv.Event, nil +} + +func (e *ElasticEE) PrepareOrderMap(onm *utils.OrderedNavigableMap) (any, error) { + preparedMap := make(map[string]any) + for el := onm.GetFirstElement(); el != nil; el = el.Next() { + path := el.Value + item, err := onm.Field(path) + if err != nil { + utils.Logger.Warning(fmt.Sprintf( + "<%s> exporter %q: failed to retrieve field at path %q", + utils.EEs, e.cfg.ID, path)) + continue } - utils.Logger.Warning(fmt.Sprintf( - "<%s> exporter %q: failed to index document: %+v", - utils.EEs, e.Cfg().ID, errResp)) + path = path[:len(path)-1] // remove the last index + preparedMap[strings.Join(path, utils.NestingSep)] = item.String() } - return nil + return preparedMap, nil } func (e *ElasticEE) Close() error { e.mu.Lock() + defer e.mu.Unlock() e.client = nil - e.mu.Unlock() return nil } diff --git a/ees/elastic_test.go b/ees/elastic_test.go index b990bf6bc..012f36dff 100644 --- a/ees/elastic_test.go +++ b/ees/elastic_test.go @@ -18,9 +18,6 @@ along with this program. If not, see package ees import ( - "bytes" - "io" - "net/http" "reflect" "testing" @@ -50,7 +47,7 @@ func TestInitClient(t *testing.T) { Opts: &config.EventExporterOpts{}, }, } - if err := ee.prepareOpts(); err != nil { + if err := ee.parseClientOpts(); err != nil { t.Error(err) } errExpect := `cannot create client: cannot parse url: parse "/\x00": net/url: invalid control character in URL` @@ -58,171 +55,8 @@ func TestInitClient(t *testing.T) { t.Errorf("Expected %+v \n but got %+v", errExpect, err) } } -func TestInitCase1(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - ElsIndex: utils.StringPointer("test"), - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := "test" - if !reflect.DeepEqual(ee.indexReqOpts.Index, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.indexReqOpts.Index) - } -} -func TestInitCase2(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - ElsIfPrimaryTerm: utils.IntPointer(20), - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := utils.IntPointer(20) - if !reflect.DeepEqual(ee.indexReqOpts.IfPrimaryTerm, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfPrimaryTerm)) - } -} - -func TestInitCase3(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - ElsIfSeqNo: utils.IntPointer(20), - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := utils.IntPointer(20) - if !reflect.DeepEqual(ee.indexReqOpts.IfSeqNo, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfSeqNo)) - } -} - -func TestInitCase4(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - ElsOpType: utils.StringPointer("test"), - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := "test" - if !reflect.DeepEqual(ee.indexReqOpts.OpType, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.OpType)) - } -} - -func TestInitCase5(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - ElsPipeline: utils.StringPointer("test"), - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := "test" - if !reflect.DeepEqual(ee.indexReqOpts.Pipeline, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Pipeline)) - } -} - -func TestInitCase6(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - ElsRouting: utils.StringPointer("test"), - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := "test" - if !reflect.DeepEqual(ee.indexReqOpts.Routing, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Routing)) - } -} - -func TestInitCase8(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - ElsVersion: utils.IntPointer(20), - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := utils.IntPointer(20) - if !reflect.DeepEqual(ee.indexReqOpts.Version, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Version)) - } -} - -func TestInitCase9(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - ElsVersionType: utils.StringPointer("test"), - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := "test" - if !reflect.DeepEqual(ee.indexReqOpts.VersionType, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.VersionType)) - } -} - -func TestInitCase10(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - ElsWaitForActiveShards: utils.StringPointer("test"), - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := "test" - if !reflect.DeepEqual(ee.indexReqOpts.WaitForActiveShards, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.WaitForActiveShards)) - } -} - -type mockClientErr struct{} - -func (mockClientErr) Perform(req *http.Request) (res *http.Response, err error) { - res = &http.Response{ - StatusCode: 300, - Body: io.NopCloser(bytes.NewBuffer([]byte(`{"test":"test"}`))), - Header: http.Header{}, - } - return res, nil -} - -func TestElasticExportEvent(t *testing.T) { +func TestElasticExportEventErr(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() dc, err := newEEMetrics("Local") if err != nil { @@ -235,90 +69,7 @@ func TestElasticExportEvent(t *testing.T) { if err = eEe.Connect(); err != nil { t.Error(err) } - eEe.client.Transport = new(mockClientErr) - if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err != nil { - t.Error(err) - } -} - -type mockClientErr2 struct{} - -func (mockClientErr2) Perform(req *http.Request) (res *http.Response, err error) { - res = &http.Response{ - StatusCode: 300, - Body: io.NopCloser(bytes.NewBuffer([]byte(""))), - Header: http.Header{}, - } - return res, nil -} - -func TestElasticExportEvent2(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } - eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc) - if err != nil { - t.Error(err) - } - if err = eEe.Connect(); err != nil { - t.Error(err) - } - eEe.client.Transport = new(mockClientErr2) - - errExpect := io.EOF - if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err == nil || err != errExpect { - t.Errorf("Expected %v but received %v", errExpect, err) - } -} - -type mockClient struct{} - -func (mockClient) Perform(req *http.Request) (res *http.Response, err error) { - res = &http.Response{ - StatusCode: 200, - Body: io.NopCloser(bytes.NewBuffer([]byte(""))), - Header: http.Header{}, - } - return res, nil -} -func TestElasticExportEvent3(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } - eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc) - if err != nil { - t.Error(err) - } - if err = eEe.Connect(); err != nil { - t.Error(err) - } - eEe.client.Transport = new(mockClient) - errExpect := `the client noticed that the server is not Elasticsearch and we do not support this unknown product` - - cgrCfg.EEsCfg().Exporters[0].ComputeFields() - if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err == nil || err.Error() != errExpect { - t.Error(err) - } -} - -func TestElasticExportEvent4(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } - eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc) - if err != nil { - t.Error(err) - } - if err = eEe.Connect(); err != nil { - t.Error(err) - } - errExpect := `unsupported protocol scheme ""` + errExpect := `an error happened during the Index query execution: unsupported protocol scheme ""` if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err == nil || err.Error() != errExpect { t.Errorf("Expected %q but got %q", errExpect, err) }