diff --git a/ees/elastic.go b/ees/elastic.go
index e95cc3247..bfb297cd6 100644
--- a/ees/elastic.go
+++ b/ees/elastic.go
@@ -19,82 +19,49 @@ along with this program. If not, see
package ees
import (
- "bytes"
"context"
- "encoding/json"
"fmt"
"os"
+ "strconv"
"strings"
"sync"
"github.com/elastic/elastic-transport-go/v8/elastictransport"
- "github.com/elastic/go-elasticsearch/v8/esapi"
+ "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/optype"
+ "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/refresh"
+ "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/versiontype"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
elasticsearch "github.com/elastic/go-elasticsearch/v8"
)
-func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) {
- el := &ElasticEE{
- cfg: cfg,
- dc: dc,
- reqs: newConcReq(cfg.ConcurrentRequests),
- }
- if err := el.prepareOpts(); err != nil {
- return nil, err
- }
- return el, nil
-}
-
// ElasticEE implements EventExporter interface for ElasticSearch export.
type ElasticEE struct {
mu sync.RWMutex
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
reqs *concReq
- bytePreparing
- client *elasticsearch.Client
+ client *elasticsearch.TypedClient
clientCfg elasticsearch.Config
+}
- // indexReqOpts is used to store IndexRequest options for convenience
- // and does not represent the IndexRequest itself.
- indexReqOpts esapi.IndexRequest
+func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) {
+ el := &ElasticEE{
+ cfg: cfg,
+ dc: dc,
+ reqs: newConcReq(cfg.ConcurrentRequests),
+ }
+ if err := el.parseClientOpts(); err != nil {
+ return nil, err
+ }
+ return el, nil
}
// init will create all the necessary dependencies, including opening the file
-func (e *ElasticEE) prepareOpts() error {
+func (e *ElasticEE) parseClientOpts() error {
opts := e.cfg.Opts.Els
-
- // Parse index request options.
- e.indexReqOpts.Index = utils.CDRsTBL
- if opts.Index != nil {
- e.indexReqOpts.Index = *opts.Index
- }
- e.indexReqOpts.IfPrimaryTerm = opts.IfPrimaryTerm
- e.indexReqOpts.IfSeqNo = opts.IfSeqNo
- if opts.OpType != nil {
- e.indexReqOpts.OpType = *opts.OpType
- }
- if opts.Pipeline != nil {
- e.indexReqOpts.Pipeline = *opts.Pipeline
- }
- if opts.Routing != nil {
- e.indexReqOpts.Routing = *opts.Routing
- }
- if opts.Timeout != nil {
- e.indexReqOpts.Timeout = *opts.Timeout
- }
- e.indexReqOpts.Version = opts.Version
- if opts.VersionType != nil {
- e.indexReqOpts.VersionType = *opts.VersionType
- }
- if opts.WaitForActiveShards != nil {
- e.indexReqOpts.WaitForActiveShards = *opts.WaitForActiveShards
- }
-
- // Parse client config options.
if opts.Cloud != nil && *opts.Cloud {
e.clientCfg.CloudID = e.Cfg().ExportPath
} else {
@@ -183,12 +150,12 @@ func (e *ElasticEE) Connect() (err error) {
if e.client != nil { // check if connection is cached
return
}
- e.client, err = elasticsearch.NewClient(e.clientCfg)
+ e.client, err = elasticsearch.NewTypedClient(e.clientCfg)
return
}
// ExportEvent implements EventExporter
-func (e *ElasticEE) ExportEvent(ev any, key string) error {
+func (e *ElasticEE) ExportEvent(event any, key string) error {
e.reqs.get()
e.mu.RLock()
defer func() {
@@ -198,43 +165,74 @@ func (e *ElasticEE) ExportEvent(ev any, key string) error {
if e.client == nil {
return utils.ErrDisconnected
}
- req := esapi.IndexRequest{
- DocumentID: key,
- Body: bytes.NewReader(ev.([]byte)),
- Refresh: "true",
- Index: e.indexReqOpts.Index,
- IfPrimaryTerm: e.indexReqOpts.IfPrimaryTerm,
- IfSeqNo: e.indexReqOpts.IfSeqNo,
- OpType: e.indexReqOpts.OpType,
- Pipeline: e.indexReqOpts.Pipeline,
- Routing: e.indexReqOpts.Routing,
- Timeout: e.indexReqOpts.Timeout,
- Version: e.indexReqOpts.Version,
- VersionType: e.indexReqOpts.VersionType,
- WaitForActiveShards: e.indexReqOpts.WaitForActiveShards,
- }
- resp, err := req.Do(context.Background(), e.client)
- if err != nil {
- return err
+ // Build and send index request.
+ opts := e.cfg.Opts.Els
+ indexName := utils.CDRsTBL
+ if opts.Index != nil {
+ indexName = *opts.Index
}
- defer resp.Body.Close()
- if resp.IsError() {
- var errResp map[string]any
- if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil {
- return err
+ req := e.client.Index(indexName).
+ Id(key).
+ Request(event).
+ Refresh(refresh.True)
+
+ if opts.IfPrimaryTerm != nil {
+ req.IfPrimaryTerm(strconv.Itoa(*opts.IfPrimaryTerm))
+ }
+ if opts.IfSeqNo != nil {
+ req.IfSeqNo(strconv.Itoa(*opts.IfSeqNo))
+ }
+ if opts.OpType != nil {
+ req.OpType(optype.OpType{Name: *opts.OpType})
+ }
+ if opts.Pipeline != nil {
+ req.Pipeline(*opts.Pipeline)
+ }
+ if opts.Routing != nil {
+ req.Routing(*opts.Routing)
+ }
+ if opts.Timeout != nil {
+ req.Timeout((*opts.Timeout).String())
+ }
+ if opts.Version != nil {
+ req.Version(strconv.Itoa(*opts.Version))
+ }
+ if opts.VersionType != nil {
+ req.VersionType(versiontype.VersionType{Name: *opts.VersionType})
+ }
+ if opts.WaitForActiveShards != nil {
+ req.WaitForActiveShards(*opts.WaitForActiveShards)
+ }
+ _, err := req.Do(context.TODO())
+ return err
+}
+
+func (e *ElasticEE) PrepareMap(cgrEv *utils.CGREvent) (any, error) {
+ return cgrEv.Event, nil
+}
+
+func (e *ElasticEE) PrepareOrderMap(onm *utils.OrderedNavigableMap) (any, error) {
+ preparedMap := make(map[string]any)
+ for el := onm.GetFirstElement(); el != nil; el = el.Next() {
+ path := el.Value
+ item, err := onm.Field(path)
+ if err != nil {
+ utils.Logger.Warning(fmt.Sprintf(
+ "<%s> exporter %q: failed to retrieve field at path %q",
+ utils.EEs, e.cfg.ID, path))
+ continue
}
- utils.Logger.Warning(fmt.Sprintf(
- "<%s> exporter %q: failed to index document: %+v",
- utils.EEs, e.Cfg().ID, errResp))
+ path = path[:len(path)-1] // remove the last index
+ preparedMap[strings.Join(path, utils.NestingSep)] = item.String()
}
- return nil
+ return preparedMap, nil
}
func (e *ElasticEE) Close() error {
e.mu.Lock()
+ defer e.mu.Unlock()
e.client = nil
- e.mu.Unlock()
return nil
}
diff --git a/ees/elastic_it_test.go b/ees/elastic_it_test.go
index adf20af8b..5b6031110 100644
--- a/ees/elastic_it_test.go
+++ b/ees/elastic_it_test.go
@@ -192,7 +192,7 @@ func initElsClient(t *testing.T, cfg *config.CGRConfig, exporterType string) *el
tmp := &ElasticEE{
cfg: eeCfg,
}
- if err := tmp.prepareOpts(); err != nil {
+ if err := tmp.parseClientOpts(); err != nil {
t.Fatal(err)
}
client, err := elasticsearch.NewTypedClient(tmp.clientCfg)
diff --git a/ees/elastic_test.go b/ees/elastic_test.go
index b90426032..1cbbce3f8 100644
--- a/ees/elastic_test.go
+++ b/ees/elastic_test.go
@@ -18,9 +18,6 @@ along with this program. If not, see
package ees
import (
- "bytes"
- "io"
- "net/http"
"reflect"
"testing"
@@ -53,7 +50,7 @@ func TestInitClient(t *testing.T) {
},
},
}
- if err := ee.prepareOpts(); err != nil {
+ if err := ee.parseClientOpts(); err != nil {
t.Error(err)
}
errExpect := `cannot create client: cannot parse url: parse "/\x00": net/url: invalid control character in URL`
@@ -61,190 +58,8 @@ func TestInitClient(t *testing.T) {
t.Errorf("Expected %+v \n but got %+v", errExpect, err)
}
}
-func TestInitCase1(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- Els: &config.ElsOpts{Index: utils.StringPointer("test")},
- RPC: &config.RPCOpts{},
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := "test"
- if !reflect.DeepEqual(ee.indexReqOpts.Index, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.indexReqOpts.Index)
- }
-}
-func TestInitCase2(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- Els: &config.ElsOpts{
- IfPrimaryTerm: utils.IntPointer(20)},
- RPC: &config.RPCOpts{},
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := utils.IntPointer(20)
- if !reflect.DeepEqual(ee.indexReqOpts.IfPrimaryTerm, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfPrimaryTerm))
- }
-}
-
-func TestInitCase3(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- Els: &config.ElsOpts{
- IfSeqNo: utils.IntPointer(20)},
- RPC: &config.RPCOpts{},
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := utils.IntPointer(20)
- if !reflect.DeepEqual(ee.indexReqOpts.IfSeqNo, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfSeqNo))
- }
-}
-
-func TestInitCase4(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- Els: &config.ElsOpts{
- OpType: utils.StringPointer("test")},
- RPC: &config.RPCOpts{},
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := "test"
- if !reflect.DeepEqual(ee.indexReqOpts.OpType, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.OpType))
- }
-}
-
-func TestInitCase5(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- Els: &config.ElsOpts{
- Pipeline: utils.StringPointer("test")},
- RPC: &config.RPCOpts{},
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := "test"
- if !reflect.DeepEqual(ee.indexReqOpts.Pipeline, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Pipeline))
- }
-}
-
-func TestInitCase6(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- Els: &config.ElsOpts{
- Routing: utils.StringPointer("test")},
- RPC: &config.RPCOpts{},
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := "test"
- if !reflect.DeepEqual(ee.indexReqOpts.Routing, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Routing))
- }
-}
-
-func TestInitCase8(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- Els: &config.ElsOpts{
- Version: utils.IntPointer(20),
- },
- RPC: &config.RPCOpts{},
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := utils.IntPointer(20)
- if !reflect.DeepEqual(ee.indexReqOpts.Version, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Version))
- }
-}
-
-func TestInitCase9(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- Els: &config.ElsOpts{
- VersionType: utils.StringPointer("test"),
- },
- RPC: &config.RPCOpts{},
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := "test"
- if !reflect.DeepEqual(ee.indexReqOpts.VersionType, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.VersionType))
- }
-}
-
-func TestInitCase10(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- Els: &config.ElsOpts{
- WaitForActiveShards: utils.StringPointer("test")},
- RPC: &config.RPCOpts{},
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := "test"
- if !reflect.DeepEqual(ee.indexReqOpts.WaitForActiveShards, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.WaitForActiveShards))
- }
-}
-
-type mockClientErr struct{}
-
-func (mockClientErr) Perform(req *http.Request) (res *http.Response, err error) {
- res = &http.Response{
- StatusCode: 300,
- Body: io.NopCloser(bytes.NewBuffer([]byte(`{"test":"test"}`))),
- Header: http.Header{},
- }
- return res, nil
-}
-
-func TestElasticExportEvent(t *testing.T) {
+func TestElasticExportEventErr(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dc, err := newEEMetrics("Local")
if err != nil {
@@ -257,93 +72,7 @@ func TestElasticExportEvent(t *testing.T) {
if err = eEe.Connect(); err != nil {
t.Error(err)
}
- eEe.client.Transport = new(mockClientErr)
- if err := eEe.ExportEvent([]byte{}, ""); err != nil {
- t.Error(err)
- }
-}
-
-type mockClientErr2 struct{}
-
-func (mockClientErr2) Perform(req *http.Request) (res *http.Response, err error) {
- res = &http.Response{
- StatusCode: 300,
- Body: io.NopCloser(bytes.NewBuffer([]byte(""))),
- Header: http.Header{},
- }
- return res, nil
-}
-
-func TestElasticExportEvent2(t *testing.T) {
- cgrCfg := config.NewDefaultCGRConfig()
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
- eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
- if err != nil {
- t.Error(err)
- }
- if err = eEe.Connect(); err != nil {
- t.Error(err)
- }
- eEe.client.Transport = new(mockClientErr2)
-
- errExpect := io.EOF
- if err := eEe.ExportEvent([]byte{}, ""); err == nil || err != errExpect {
- t.Errorf("Expected %v but received %v", errExpect, err)
- }
-}
-
-type mockClient struct{}
-
-func (mockClient) Perform(req *http.Request) (res *http.Response, err error) {
- res = &http.Response{
- StatusCode: 200,
- Body: io.NopCloser(bytes.NewBuffer([]byte(""))),
- Header: http.Header{},
- }
- return res, nil
-}
-
-func TestElasticExportEvent3(t *testing.T) {
- cgrCfg := config.NewDefaultCGRConfig()
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
- eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
- if err != nil {
- t.Error(err)
- }
- if err := eEe.prepareOpts(); err != nil {
- t.Error(err)
- }
- if err = eEe.Connect(); err != nil {
- t.Error(err)
- }
- eEe.client.Transport = new(mockClient)
- errExpect := `the client noticed that the server is not Elasticsearch and we do not support this unknown product`
- cgrCfg.EEsCfg().Exporters[0].ComputeFields()
- if err := eEe.ExportEvent([]byte{}, ""); err == nil || err.Error() != errExpect {
- t.Errorf("Expected %q but got %q", errExpect, err)
- }
-}
-
-func TestElasticExportEvent4(t *testing.T) {
- cgrCfg := config.NewDefaultCGRConfig()
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
- eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
- if err != nil {
- t.Error(err)
- }
- if err = eEe.Connect(); err != nil {
- t.Error(err)
- }
- errExpect := `unsupported protocol scheme ""`
+ errExpect := `an error happened during the Index query execution: unsupported protocol scheme ""`
if err := eEe.ExportEvent([]byte{}, ""); err == nil || err.Error() != errExpect {
t.Errorf("Expected %q but got %q", errExpect, err)
}
@@ -351,7 +80,7 @@ func TestElasticExportEvent4(t *testing.T) {
func TestElasticClose(t *testing.T) {
elasticEE := &ElasticEE{
- client: &elasticsearch.Client{},
+ client: &elasticsearch.TypedClient{},
}
err := elasticEE.Close()
if elasticEE.client != nil {
@@ -366,7 +95,7 @@ func TestElasticConnect(t *testing.T) {
t.Run("ClientAlreadyExists", func(t *testing.T) {
elasticEE := &ElasticEE{
- client: &elasticsearch.Client{},
+ client: &elasticsearch.TypedClient{},
}
err := elasticEE.Connect()