diff --git a/ees/elastic.go b/ees/elastic.go
index dec5e3cf2..c4d3f711e 100644
--- a/ees/elastic.go
+++ b/ees/elastic.go
@@ -19,15 +19,16 @@ along with this program. If not, see
package ees
import (
- "bytes"
- "encoding/json"
"fmt"
"os"
+ "strconv"
"strings"
"sync"
"github.com/elastic/elastic-transport-go/v8/elastictransport"
- "github.com/elastic/go-elasticsearch/v8/esapi"
+ "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/optype"
+ "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/refresh"
+ "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/versiontype"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
@@ -36,66 +37,31 @@ import (
elasticsearch "github.com/elastic/go-elasticsearch/v8"
)
-func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) {
- el := &ElasticEE{
- cfg: cfg,
- dc: dc,
- reqs: newConcReq(cfg.ConcurrentRequests),
- }
- if err := el.prepareOpts(); err != nil {
- return nil, err
- }
- return el, nil
-}
-
// ElasticEE implements EventExporter interface for ElasticSearch export.
type ElasticEE struct {
mu sync.RWMutex
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
reqs *concReq
- bytePreparing
- client *elasticsearch.Client
+ client *elasticsearch.TypedClient
clientCfg elasticsearch.Config
-
- // indexReqOpts is used to store IndexRequest options for convenience
- // and does not represent the IndexRequest itself.
- indexReqOpts esapi.IndexRequest
}
-// init will create all the necessary dependencies, including opening the file
-func (e *ElasticEE) prepareOpts() error {
+func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) {
+ el := &ElasticEE{
+ cfg: cfg,
+ dc: dc,
+ reqs: newConcReq(cfg.ConcurrentRequests),
+ }
+ if err := el.parseClientOpts(); err != nil {
+ return nil, err
+ }
+ return el, nil
+}
+
+func (e *ElasticEE) parseClientOpts() error {
opts := e.cfg.Opts
-
- // Parse index request options.
- e.indexReqOpts.Index = utils.CDRsTBL
- if opts.ElsIndex != nil {
- e.indexReqOpts.Index = *opts.ElsIndex
- }
- e.indexReqOpts.IfPrimaryTerm = opts.ElsIfPrimaryTerm
- e.indexReqOpts.IfSeqNo = opts.ElsIfSeqNo
- if opts.ElsOpType != nil {
- e.indexReqOpts.OpType = *opts.ElsOpType
- }
- if opts.ElsPipeline != nil {
- e.indexReqOpts.Pipeline = *opts.ElsPipeline
- }
- if opts.ElsRouting != nil {
- e.indexReqOpts.Routing = *opts.ElsRouting
- }
- if opts.ElsTimeout != nil {
- e.indexReqOpts.Timeout = *opts.ElsTimeout
- }
- e.indexReqOpts.Version = opts.ElsVersion
- if opts.ElsVersionType != nil {
- e.indexReqOpts.VersionType = *opts.ElsVersionType
- }
- if opts.ElsWaitForActiveShards != nil {
- e.indexReqOpts.WaitForActiveShards = *opts.ElsWaitForActiveShards
- }
-
- // Parse client config options.
if opts.ElsCloud != nil && *opts.ElsCloud {
e.clientCfg.CloudID = e.Cfg().ExportPath
} else {
@@ -184,12 +150,12 @@ func (e *ElasticEE) Connect() (err error) {
if e.client != nil { // check if connection is cached
return
}
- e.client, err = elasticsearch.NewClient(e.clientCfg)
+ e.client, err = elasticsearch.NewTypedClient(e.clientCfg)
return
}
// ExportEvent implements EventExporter
-func (e *ElasticEE) ExportEvent(ctx *context.Context, ev, extraData any) error {
+func (e *ElasticEE) ExportEvent(ctx *context.Context, event, extraData any) error {
e.reqs.get()
e.mu.RLock()
defer func() {
@@ -199,44 +165,75 @@ func (e *ElasticEE) ExportEvent(ctx *context.Context, ev, extraData any) error {
if e.client == nil {
return utils.ErrDisconnected
}
- key := extraData.(string)
- req := esapi.IndexRequest{
- DocumentID: key,
- Body: bytes.NewReader(ev.([]byte)),
- Refresh: "true",
- Index: e.indexReqOpts.Index,
- IfPrimaryTerm: e.indexReqOpts.IfPrimaryTerm,
- IfSeqNo: e.indexReqOpts.IfSeqNo,
- OpType: e.indexReqOpts.OpType,
- Pipeline: e.indexReqOpts.Pipeline,
- Routing: e.indexReqOpts.Routing,
- Timeout: e.indexReqOpts.Timeout,
- Version: e.indexReqOpts.Version,
- VersionType: e.indexReqOpts.VersionType,
- WaitForActiveShards: e.indexReqOpts.WaitForActiveShards,
- }
- resp, err := req.Do(ctx, e.client)
- if err != nil {
- return err
+ // Build and send index request.
+ key := extraData.(string)
+ opts := e.cfg.Opts
+ indexName := utils.CDRsTBL
+ if opts.ElsIndex != nil {
+ indexName = *opts.ElsIndex
}
- defer resp.Body.Close()
- if resp.IsError() {
- var errResp map[string]any
- if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil {
- return err
+ req := e.client.Index(indexName).
+ Id(key).
+ Request(event).
+ Refresh(refresh.True)
+
+ if opts.ElsIfPrimaryTerm != nil {
+ req.IfPrimaryTerm(strconv.Itoa(*opts.ElsIfPrimaryTerm))
+ }
+ if opts.ElsIfSeqNo != nil {
+ req.IfSeqNo(strconv.Itoa(*opts.ElsIfSeqNo))
+ }
+ if opts.ElsOpType != nil {
+ req.OpType(optype.OpType{Name: *opts.ElsOpType})
+ }
+ if opts.ElsPipeline != nil {
+ req.Pipeline(*opts.ElsPipeline)
+ }
+ if opts.ElsRouting != nil {
+ req.Routing(*opts.ElsRouting)
+ }
+ if opts.ElsTimeout != nil {
+ req.Timeout((*opts.ElsTimeout).String())
+ }
+ if opts.ElsVersion != nil {
+ req.Version(strconv.Itoa(*opts.ElsVersion))
+ }
+ if opts.ElsVersionType != nil {
+ req.VersionType(versiontype.VersionType{Name: *opts.ElsVersionType})
+ }
+ if opts.ElsWaitForActiveShards != nil {
+ req.WaitForActiveShards(*opts.ElsWaitForActiveShards)
+ }
+ _, err := req.Do(context.TODO())
+ return err
+}
+
+func (e *ElasticEE) PrepareMap(cgrEv *utils.CGREvent) (any, error) {
+ return cgrEv.Event, nil
+}
+
+func (e *ElasticEE) PrepareOrderMap(onm *utils.OrderedNavigableMap) (any, error) {
+ preparedMap := make(map[string]any)
+ for el := onm.GetFirstElement(); el != nil; el = el.Next() {
+ path := el.Value
+ item, err := onm.Field(path)
+ if err != nil {
+ utils.Logger.Warning(fmt.Sprintf(
+ "<%s> exporter %q: failed to retrieve field at path %q",
+ utils.EEs, e.cfg.ID, path))
+ continue
}
- utils.Logger.Warning(fmt.Sprintf(
- "<%s> exporter %q: failed to index document: %+v",
- utils.EEs, e.Cfg().ID, errResp))
+ path = path[:len(path)-1] // remove the last index
+ preparedMap[strings.Join(path, utils.NestingSep)] = item.String()
}
- return nil
+ return preparedMap, nil
}
func (e *ElasticEE) Close() error {
e.mu.Lock()
+ defer e.mu.Unlock()
e.client = nil
- e.mu.Unlock()
return nil
}
diff --git a/ees/elastic_test.go b/ees/elastic_test.go
index b990bf6bc..012f36dff 100644
--- a/ees/elastic_test.go
+++ b/ees/elastic_test.go
@@ -18,9 +18,6 @@ along with this program. If not, see
package ees
import (
- "bytes"
- "io"
- "net/http"
"reflect"
"testing"
@@ -50,7 +47,7 @@ func TestInitClient(t *testing.T) {
Opts: &config.EventExporterOpts{},
},
}
- if err := ee.prepareOpts(); err != nil {
+ if err := ee.parseClientOpts(); err != nil {
t.Error(err)
}
errExpect := `cannot create client: cannot parse url: parse "/\x00": net/url: invalid control character in URL`
@@ -58,171 +55,8 @@ func TestInitClient(t *testing.T) {
t.Errorf("Expected %+v \n but got %+v", errExpect, err)
}
}
-func TestInitCase1(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- ElsIndex: utils.StringPointer("test"),
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := "test"
- if !reflect.DeepEqual(ee.indexReqOpts.Index, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.indexReqOpts.Index)
- }
-}
-func TestInitCase2(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- ElsIfPrimaryTerm: utils.IntPointer(20),
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := utils.IntPointer(20)
- if !reflect.DeepEqual(ee.indexReqOpts.IfPrimaryTerm, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfPrimaryTerm))
- }
-}
-
-func TestInitCase3(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- ElsIfSeqNo: utils.IntPointer(20),
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := utils.IntPointer(20)
- if !reflect.DeepEqual(ee.indexReqOpts.IfSeqNo, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfSeqNo))
- }
-}
-
-func TestInitCase4(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- ElsOpType: utils.StringPointer("test"),
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := "test"
- if !reflect.DeepEqual(ee.indexReqOpts.OpType, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.OpType))
- }
-}
-
-func TestInitCase5(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- ElsPipeline: utils.StringPointer("test"),
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := "test"
- if !reflect.DeepEqual(ee.indexReqOpts.Pipeline, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Pipeline))
- }
-}
-
-func TestInitCase6(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- ElsRouting: utils.StringPointer("test"),
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := "test"
- if !reflect.DeepEqual(ee.indexReqOpts.Routing, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Routing))
- }
-}
-
-func TestInitCase8(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- ElsVersion: utils.IntPointer(20),
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := utils.IntPointer(20)
- if !reflect.DeepEqual(ee.indexReqOpts.Version, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Version))
- }
-}
-
-func TestInitCase9(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- ElsVersionType: utils.StringPointer("test"),
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := "test"
- if !reflect.DeepEqual(ee.indexReqOpts.VersionType, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.VersionType))
- }
-}
-
-func TestInitCase10(t *testing.T) {
- ee := &ElasticEE{
- cfg: &config.EventExporterCfg{
- Opts: &config.EventExporterOpts{
- ElsWaitForActiveShards: utils.StringPointer("test"),
- },
- },
- }
- if err := ee.prepareOpts(); err != nil {
- t.Error(err)
- }
- eeExpect := "test"
- if !reflect.DeepEqual(ee.indexReqOpts.WaitForActiveShards, eeExpect) {
- t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.WaitForActiveShards))
- }
-}
-
-type mockClientErr struct{}
-
-func (mockClientErr) Perform(req *http.Request) (res *http.Response, err error) {
- res = &http.Response{
- StatusCode: 300,
- Body: io.NopCloser(bytes.NewBuffer([]byte(`{"test":"test"}`))),
- Header: http.Header{},
- }
- return res, nil
-}
-
-func TestElasticExportEvent(t *testing.T) {
+func TestElasticExportEventErr(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dc, err := newEEMetrics("Local")
if err != nil {
@@ -235,90 +69,7 @@ func TestElasticExportEvent(t *testing.T) {
if err = eEe.Connect(); err != nil {
t.Error(err)
}
- eEe.client.Transport = new(mockClientErr)
- if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err != nil {
- t.Error(err)
- }
-}
-
-type mockClientErr2 struct{}
-
-func (mockClientErr2) Perform(req *http.Request) (res *http.Response, err error) {
- res = &http.Response{
- StatusCode: 300,
- Body: io.NopCloser(bytes.NewBuffer([]byte(""))),
- Header: http.Header{},
- }
- return res, nil
-}
-
-func TestElasticExportEvent2(t *testing.T) {
- cgrCfg := config.NewDefaultCGRConfig()
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
- eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
- if err != nil {
- t.Error(err)
- }
- if err = eEe.Connect(); err != nil {
- t.Error(err)
- }
- eEe.client.Transport = new(mockClientErr2)
-
- errExpect := io.EOF
- if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err == nil || err != errExpect {
- t.Errorf("Expected %v but received %v", errExpect, err)
- }
-}
-
-type mockClient struct{}
-
-func (mockClient) Perform(req *http.Request) (res *http.Response, err error) {
- res = &http.Response{
- StatusCode: 200,
- Body: io.NopCloser(bytes.NewBuffer([]byte(""))),
- Header: http.Header{},
- }
- return res, nil
-}
-func TestElasticExportEvent3(t *testing.T) {
- cgrCfg := config.NewDefaultCGRConfig()
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
- eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
- if err != nil {
- t.Error(err)
- }
- if err = eEe.Connect(); err != nil {
- t.Error(err)
- }
- eEe.client.Transport = new(mockClient)
- errExpect := `the client noticed that the server is not Elasticsearch and we do not support this unknown product`
-
- cgrCfg.EEsCfg().Exporters[0].ComputeFields()
- if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err == nil || err.Error() != errExpect {
- t.Error(err)
- }
-}
-
-func TestElasticExportEvent4(t *testing.T) {
- cgrCfg := config.NewDefaultCGRConfig()
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
- eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
- if err != nil {
- t.Error(err)
- }
- if err = eEe.Connect(); err != nil {
- t.Error(err)
- }
- errExpect := `unsupported protocol scheme ""`
+ errExpect := `an error happened during the Index query execution: unsupported protocol scheme ""`
if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err == nil || err.Error() != errExpect {
t.Errorf("Expected %q but got %q", errExpect, err)
}