From 2b5a3e5a5ecd48d9786a4826c31f9645c4aad178 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 26 Nov 2024 15:50:08 +0200 Subject: [PATCH] elasticsearch: switch to fully-typed API - index request options are now used directly during ExportEvent. They are passed to the request as options only if they were configured in the first place. - implement PrepareMap and PrepareOrderMap methods for the elastic exporter. bytePreparing methods are not needed anymore as the Event map can be exported directly. - elasticsearch.Client -> elasticsearch.TypedClient - rename prepareOpts -> parseClientOpts --- ees/elastic.go | 158 ++++++++++++----------- ees/elastic_it_test.go | 2 +- ees/elastic_test.go | 281 +---------------------------------------- 3 files changed, 84 insertions(+), 357 deletions(-) diff --git a/ees/elastic.go b/ees/elastic.go index e95cc3247..bfb297cd6 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -19,82 +19,49 @@ along with this program. If not, see package ees import ( - "bytes" "context" - "encoding/json" "fmt" "os" + "strconv" "strings" "sync" "github.com/elastic/elastic-transport-go/v8/elastictransport" - "github.com/elastic/go-elasticsearch/v8/esapi" + "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/optype" + "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/refresh" + "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/versiontype" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" elasticsearch "github.com/elastic/go-elasticsearch/v8" ) -func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) { - el := &ElasticEE{ - cfg: cfg, - dc: dc, - reqs: newConcReq(cfg.ConcurrentRequests), - } - if err := el.prepareOpts(); err != nil { - return nil, err - } - return el, nil -} - // ElasticEE implements EventExporter interface for ElasticSearch export. type ElasticEE struct { mu sync.RWMutex cfg *config.EventExporterCfg dc *utils.SafeMapStorage reqs *concReq - bytePreparing - client *elasticsearch.Client + client *elasticsearch.TypedClient clientCfg elasticsearch.Config +} - // indexReqOpts is used to store IndexRequest options for convenience - // and does not represent the IndexRequest itself. - indexReqOpts esapi.IndexRequest +func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) { + el := &ElasticEE{ + cfg: cfg, + dc: dc, + reqs: newConcReq(cfg.ConcurrentRequests), + } + if err := el.parseClientOpts(); err != nil { + return nil, err + } + return el, nil } // init will create all the necessary dependencies, including opening the file -func (e *ElasticEE) prepareOpts() error { +func (e *ElasticEE) parseClientOpts() error { opts := e.cfg.Opts.Els - - // Parse index request options. - e.indexReqOpts.Index = utils.CDRsTBL - if opts.Index != nil { - e.indexReqOpts.Index = *opts.Index - } - e.indexReqOpts.IfPrimaryTerm = opts.IfPrimaryTerm - e.indexReqOpts.IfSeqNo = opts.IfSeqNo - if opts.OpType != nil { - e.indexReqOpts.OpType = *opts.OpType - } - if opts.Pipeline != nil { - e.indexReqOpts.Pipeline = *opts.Pipeline - } - if opts.Routing != nil { - e.indexReqOpts.Routing = *opts.Routing - } - if opts.Timeout != nil { - e.indexReqOpts.Timeout = *opts.Timeout - } - e.indexReqOpts.Version = opts.Version - if opts.VersionType != nil { - e.indexReqOpts.VersionType = *opts.VersionType - } - if opts.WaitForActiveShards != nil { - e.indexReqOpts.WaitForActiveShards = *opts.WaitForActiveShards - } - - // Parse client config options. if opts.Cloud != nil && *opts.Cloud { e.clientCfg.CloudID = e.Cfg().ExportPath } else { @@ -183,12 +150,12 @@ func (e *ElasticEE) Connect() (err error) { if e.client != nil { // check if connection is cached return } - e.client, err = elasticsearch.NewClient(e.clientCfg) + e.client, err = elasticsearch.NewTypedClient(e.clientCfg) return } // ExportEvent implements EventExporter -func (e *ElasticEE) ExportEvent(ev any, key string) error { +func (e *ElasticEE) ExportEvent(event any, key string) error { e.reqs.get() e.mu.RLock() defer func() { @@ -198,43 +165,74 @@ func (e *ElasticEE) ExportEvent(ev any, key string) error { if e.client == nil { return utils.ErrDisconnected } - req := esapi.IndexRequest{ - DocumentID: key, - Body: bytes.NewReader(ev.([]byte)), - Refresh: "true", - Index: e.indexReqOpts.Index, - IfPrimaryTerm: e.indexReqOpts.IfPrimaryTerm, - IfSeqNo: e.indexReqOpts.IfSeqNo, - OpType: e.indexReqOpts.OpType, - Pipeline: e.indexReqOpts.Pipeline, - Routing: e.indexReqOpts.Routing, - Timeout: e.indexReqOpts.Timeout, - Version: e.indexReqOpts.Version, - VersionType: e.indexReqOpts.VersionType, - WaitForActiveShards: e.indexReqOpts.WaitForActiveShards, - } - resp, err := req.Do(context.Background(), e.client) - if err != nil { - return err + // Build and send index request. + opts := e.cfg.Opts.Els + indexName := utils.CDRsTBL + if opts.Index != nil { + indexName = *opts.Index } - defer resp.Body.Close() - if resp.IsError() { - var errResp map[string]any - if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil { - return err + req := e.client.Index(indexName). + Id(key). + Request(event). + Refresh(refresh.True) + + if opts.IfPrimaryTerm != nil { + req.IfPrimaryTerm(strconv.Itoa(*opts.IfPrimaryTerm)) + } + if opts.IfSeqNo != nil { + req.IfSeqNo(strconv.Itoa(*opts.IfSeqNo)) + } + if opts.OpType != nil { + req.OpType(optype.OpType{Name: *opts.OpType}) + } + if opts.Pipeline != nil { + req.Pipeline(*opts.Pipeline) + } + if opts.Routing != nil { + req.Routing(*opts.Routing) + } + if opts.Timeout != nil { + req.Timeout((*opts.Timeout).String()) + } + if opts.Version != nil { + req.Version(strconv.Itoa(*opts.Version)) + } + if opts.VersionType != nil { + req.VersionType(versiontype.VersionType{Name: *opts.VersionType}) + } + if opts.WaitForActiveShards != nil { + req.WaitForActiveShards(*opts.WaitForActiveShards) + } + _, err := req.Do(context.TODO()) + return err +} + +func (e *ElasticEE) PrepareMap(cgrEv *utils.CGREvent) (any, error) { + return cgrEv.Event, nil +} + +func (e *ElasticEE) PrepareOrderMap(onm *utils.OrderedNavigableMap) (any, error) { + preparedMap := make(map[string]any) + for el := onm.GetFirstElement(); el != nil; el = el.Next() { + path := el.Value + item, err := onm.Field(path) + if err != nil { + utils.Logger.Warning(fmt.Sprintf( + "<%s> exporter %q: failed to retrieve field at path %q", + utils.EEs, e.cfg.ID, path)) + continue } - utils.Logger.Warning(fmt.Sprintf( - "<%s> exporter %q: failed to index document: %+v", - utils.EEs, e.Cfg().ID, errResp)) + path = path[:len(path)-1] // remove the last index + preparedMap[strings.Join(path, utils.NestingSep)] = item.String() } - return nil + return preparedMap, nil } func (e *ElasticEE) Close() error { e.mu.Lock() + defer e.mu.Unlock() e.client = nil - e.mu.Unlock() return nil } diff --git a/ees/elastic_it_test.go b/ees/elastic_it_test.go index adf20af8b..5b6031110 100644 --- a/ees/elastic_it_test.go +++ b/ees/elastic_it_test.go @@ -192,7 +192,7 @@ func initElsClient(t *testing.T, cfg *config.CGRConfig, exporterType string) *el tmp := &ElasticEE{ cfg: eeCfg, } - if err := tmp.prepareOpts(); err != nil { + if err := tmp.parseClientOpts(); err != nil { t.Fatal(err) } client, err := elasticsearch.NewTypedClient(tmp.clientCfg) diff --git a/ees/elastic_test.go b/ees/elastic_test.go index b90426032..1cbbce3f8 100644 --- a/ees/elastic_test.go +++ b/ees/elastic_test.go @@ -18,9 +18,6 @@ along with this program. If not, see package ees import ( - "bytes" - "io" - "net/http" "reflect" "testing" @@ -53,7 +50,7 @@ func TestInitClient(t *testing.T) { }, }, } - if err := ee.prepareOpts(); err != nil { + if err := ee.parseClientOpts(); err != nil { t.Error(err) } errExpect := `cannot create client: cannot parse url: parse "/\x00": net/url: invalid control character in URL` @@ -61,190 +58,8 @@ func TestInitClient(t *testing.T) { t.Errorf("Expected %+v \n but got %+v", errExpect, err) } } -func TestInitCase1(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - Els: &config.ElsOpts{Index: utils.StringPointer("test")}, - RPC: &config.RPCOpts{}, - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := "test" - if !reflect.DeepEqual(ee.indexReqOpts.Index, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.indexReqOpts.Index) - } -} -func TestInitCase2(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - Els: &config.ElsOpts{ - IfPrimaryTerm: utils.IntPointer(20)}, - RPC: &config.RPCOpts{}, - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := utils.IntPointer(20) - if !reflect.DeepEqual(ee.indexReqOpts.IfPrimaryTerm, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfPrimaryTerm)) - } -} - -func TestInitCase3(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - Els: &config.ElsOpts{ - IfSeqNo: utils.IntPointer(20)}, - RPC: &config.RPCOpts{}, - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := utils.IntPointer(20) - if !reflect.DeepEqual(ee.indexReqOpts.IfSeqNo, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfSeqNo)) - } -} - -func TestInitCase4(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - Els: &config.ElsOpts{ - OpType: utils.StringPointer("test")}, - RPC: &config.RPCOpts{}, - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := "test" - if !reflect.DeepEqual(ee.indexReqOpts.OpType, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.OpType)) - } -} - -func TestInitCase5(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - Els: &config.ElsOpts{ - Pipeline: utils.StringPointer("test")}, - RPC: &config.RPCOpts{}, - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := "test" - if !reflect.DeepEqual(ee.indexReqOpts.Pipeline, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Pipeline)) - } -} - -func TestInitCase6(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - Els: &config.ElsOpts{ - Routing: utils.StringPointer("test")}, - RPC: &config.RPCOpts{}, - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := "test" - if !reflect.DeepEqual(ee.indexReqOpts.Routing, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Routing)) - } -} - -func TestInitCase8(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - Els: &config.ElsOpts{ - Version: utils.IntPointer(20), - }, - RPC: &config.RPCOpts{}, - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := utils.IntPointer(20) - if !reflect.DeepEqual(ee.indexReqOpts.Version, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Version)) - } -} - -func TestInitCase9(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - Els: &config.ElsOpts{ - VersionType: utils.StringPointer("test"), - }, - RPC: &config.RPCOpts{}, - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := "test" - if !reflect.DeepEqual(ee.indexReqOpts.VersionType, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.VersionType)) - } -} - -func TestInitCase10(t *testing.T) { - ee := &ElasticEE{ - cfg: &config.EventExporterCfg{ - Opts: &config.EventExporterOpts{ - Els: &config.ElsOpts{ - WaitForActiveShards: utils.StringPointer("test")}, - RPC: &config.RPCOpts{}, - }, - }, - } - if err := ee.prepareOpts(); err != nil { - t.Error(err) - } - eeExpect := "test" - if !reflect.DeepEqual(ee.indexReqOpts.WaitForActiveShards, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.WaitForActiveShards)) - } -} - -type mockClientErr struct{} - -func (mockClientErr) Perform(req *http.Request) (res *http.Response, err error) { - res = &http.Response{ - StatusCode: 300, - Body: io.NopCloser(bytes.NewBuffer([]byte(`{"test":"test"}`))), - Header: http.Header{}, - } - return res, nil -} - -func TestElasticExportEvent(t *testing.T) { +func TestElasticExportEventErr(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() dc, err := newEEMetrics("Local") if err != nil { @@ -257,93 +72,7 @@ func TestElasticExportEvent(t *testing.T) { if err = eEe.Connect(); err != nil { t.Error(err) } - eEe.client.Transport = new(mockClientErr) - if err := eEe.ExportEvent([]byte{}, ""); err != nil { - t.Error(err) - } -} - -type mockClientErr2 struct{} - -func (mockClientErr2) Perform(req *http.Request) (res *http.Response, err error) { - res = &http.Response{ - StatusCode: 300, - Body: io.NopCloser(bytes.NewBuffer([]byte(""))), - Header: http.Header{}, - } - return res, nil -} - -func TestElasticExportEvent2(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } - eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc) - if err != nil { - t.Error(err) - } - if err = eEe.Connect(); err != nil { - t.Error(err) - } - eEe.client.Transport = new(mockClientErr2) - - errExpect := io.EOF - if err := eEe.ExportEvent([]byte{}, ""); err == nil || err != errExpect { - t.Errorf("Expected %v but received %v", errExpect, err) - } -} - -type mockClient struct{} - -func (mockClient) Perform(req *http.Request) (res *http.Response, err error) { - res = &http.Response{ - StatusCode: 200, - Body: io.NopCloser(bytes.NewBuffer([]byte(""))), - Header: http.Header{}, - } - return res, nil -} - -func TestElasticExportEvent3(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } - eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc) - if err != nil { - t.Error(err) - } - if err := eEe.prepareOpts(); err != nil { - t.Error(err) - } - if err = eEe.Connect(); err != nil { - t.Error(err) - } - eEe.client.Transport = new(mockClient) - errExpect := `the client noticed that the server is not Elasticsearch and we do not support this unknown product` - cgrCfg.EEsCfg().Exporters[0].ComputeFields() - if err := eEe.ExportEvent([]byte{}, ""); err == nil || err.Error() != errExpect { - t.Errorf("Expected %q but got %q", errExpect, err) - } -} - -func TestElasticExportEvent4(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } - eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc) - if err != nil { - t.Error(err) - } - if err = eEe.Connect(); err != nil { - t.Error(err) - } - errExpect := `unsupported protocol scheme ""` + errExpect := `an error happened during the Index query execution: unsupported protocol scheme ""` if err := eEe.ExportEvent([]byte{}, ""); err == nil || err.Error() != errExpect { t.Errorf("Expected %q but got %q", errExpect, err) } @@ -351,7 +80,7 @@ func TestElasticExportEvent4(t *testing.T) { func TestElasticClose(t *testing.T) { elasticEE := &ElasticEE{ - client: &elasticsearch.Client{}, + client: &elasticsearch.TypedClient{}, } err := elasticEE.Close() if elasticEE.client != nil { @@ -366,7 +95,7 @@ func TestElasticConnect(t *testing.T) { t.Run("ClientAlreadyExists", func(t *testing.T) { elasticEE := &ElasticEE{ - client: &elasticsearch.Client{}, + client: &elasticsearch.TypedClient{}, } err := elasticEE.Connect()