elasticsearch: switch to fully-typed API

- index request options are now used directly during ExportEvent. They are
  passed to the request as options only if they were configured in the first
  place.
- implement PrepareMap and PrepareOrderMap methods for the elastic exporter.
  bytePreparing methods are not needed anymore as the Event map can be exported
  directly.
- elasticsearch.Client -> elasticsearch.TypedClient
- rename prepareOpts -> parseClientOpts
This commit is contained in:
ionutboangiu
2025-01-27 19:31:20 +02:00
committed by Dan Christian Bogos
parent 3a0579a8ea
commit 035bac3688
2 changed files with 83 additions and 335 deletions

View File

@@ -19,15 +19,16 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ees
import (
"bytes"
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
"sync"
"github.com/elastic/elastic-transport-go/v8/elastictransport"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/optype"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/refresh"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/versiontype"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
@@ -36,66 +37,31 @@ import (
elasticsearch "github.com/elastic/go-elasticsearch/v8"
)
func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) {
el := &ElasticEE{
cfg: cfg,
dc: dc,
reqs: newConcReq(cfg.ConcurrentRequests),
}
if err := el.prepareOpts(); err != nil {
return nil, err
}
return el, nil
}
// ElasticEE implements EventExporter interface for ElasticSearch export.
type ElasticEE struct {
mu sync.RWMutex
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
reqs *concReq
bytePreparing
client *elasticsearch.Client
client *elasticsearch.TypedClient
clientCfg elasticsearch.Config
// indexReqOpts is used to store IndexRequest options for convenience
// and does not represent the IndexRequest itself.
indexReqOpts esapi.IndexRequest
}
// init will create all the necessary dependencies, including opening the file
func (e *ElasticEE) prepareOpts() error {
func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) {
el := &ElasticEE{
cfg: cfg,
dc: dc,
reqs: newConcReq(cfg.ConcurrentRequests),
}
if err := el.parseClientOpts(); err != nil {
return nil, err
}
return el, nil
}
func (e *ElasticEE) parseClientOpts() error {
opts := e.cfg.Opts
// Parse index request options.
e.indexReqOpts.Index = utils.CDRsTBL
if opts.ElsIndex != nil {
e.indexReqOpts.Index = *opts.ElsIndex
}
e.indexReqOpts.IfPrimaryTerm = opts.ElsIfPrimaryTerm
e.indexReqOpts.IfSeqNo = opts.ElsIfSeqNo
if opts.ElsOpType != nil {
e.indexReqOpts.OpType = *opts.ElsOpType
}
if opts.ElsPipeline != nil {
e.indexReqOpts.Pipeline = *opts.ElsPipeline
}
if opts.ElsRouting != nil {
e.indexReqOpts.Routing = *opts.ElsRouting
}
if opts.ElsTimeout != nil {
e.indexReqOpts.Timeout = *opts.ElsTimeout
}
e.indexReqOpts.Version = opts.ElsVersion
if opts.ElsVersionType != nil {
e.indexReqOpts.VersionType = *opts.ElsVersionType
}
if opts.ElsWaitForActiveShards != nil {
e.indexReqOpts.WaitForActiveShards = *opts.ElsWaitForActiveShards
}
// Parse client config options.
if opts.ElsCloud != nil && *opts.ElsCloud {
e.clientCfg.CloudID = e.Cfg().ExportPath
} else {
@@ -184,12 +150,12 @@ func (e *ElasticEE) Connect() (err error) {
if e.client != nil { // check if connection is cached
return
}
e.client, err = elasticsearch.NewClient(e.clientCfg)
e.client, err = elasticsearch.NewTypedClient(e.clientCfg)
return
}
// ExportEvent implements EventExporter
func (e *ElasticEE) ExportEvent(ctx *context.Context, ev, extraData any) error {
func (e *ElasticEE) ExportEvent(ctx *context.Context, event, extraData any) error {
e.reqs.get()
e.mu.RLock()
defer func() {
@@ -199,44 +165,75 @@ func (e *ElasticEE) ExportEvent(ctx *context.Context, ev, extraData any) error {
if e.client == nil {
return utils.ErrDisconnected
}
key := extraData.(string)
req := esapi.IndexRequest{
DocumentID: key,
Body: bytes.NewReader(ev.([]byte)),
Refresh: "true",
Index: e.indexReqOpts.Index,
IfPrimaryTerm: e.indexReqOpts.IfPrimaryTerm,
IfSeqNo: e.indexReqOpts.IfSeqNo,
OpType: e.indexReqOpts.OpType,
Pipeline: e.indexReqOpts.Pipeline,
Routing: e.indexReqOpts.Routing,
Timeout: e.indexReqOpts.Timeout,
Version: e.indexReqOpts.Version,
VersionType: e.indexReqOpts.VersionType,
WaitForActiveShards: e.indexReqOpts.WaitForActiveShards,
}
resp, err := req.Do(ctx, e.client)
if err != nil {
return err
// Build and send index request.
key := extraData.(string)
opts := e.cfg.Opts
indexName := utils.CDRsTBL
if opts.ElsIndex != nil {
indexName = *opts.ElsIndex
}
defer resp.Body.Close()
if resp.IsError() {
var errResp map[string]any
if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil {
return err
req := e.client.Index(indexName).
Id(key).
Request(event).
Refresh(refresh.True)
if opts.ElsIfPrimaryTerm != nil {
req.IfPrimaryTerm(strconv.Itoa(*opts.ElsIfPrimaryTerm))
}
if opts.ElsIfSeqNo != nil {
req.IfSeqNo(strconv.Itoa(*opts.ElsIfSeqNo))
}
if opts.ElsOpType != nil {
req.OpType(optype.OpType{Name: *opts.ElsOpType})
}
if opts.ElsPipeline != nil {
req.Pipeline(*opts.ElsPipeline)
}
if opts.ElsRouting != nil {
req.Routing(*opts.ElsRouting)
}
if opts.ElsTimeout != nil {
req.Timeout((*opts.ElsTimeout).String())
}
if opts.ElsVersion != nil {
req.Version(strconv.Itoa(*opts.ElsVersion))
}
if opts.ElsVersionType != nil {
req.VersionType(versiontype.VersionType{Name: *opts.ElsVersionType})
}
if opts.ElsWaitForActiveShards != nil {
req.WaitForActiveShards(*opts.ElsWaitForActiveShards)
}
_, err := req.Do(context.TODO())
return err
}
func (e *ElasticEE) PrepareMap(cgrEv *utils.CGREvent) (any, error) {
return cgrEv.Event, nil
}
func (e *ElasticEE) PrepareOrderMap(onm *utils.OrderedNavigableMap) (any, error) {
preparedMap := make(map[string]any)
for el := onm.GetFirstElement(); el != nil; el = el.Next() {
path := el.Value
item, err := onm.Field(path)
if err != nil {
utils.Logger.Warning(fmt.Sprintf(
"<%s> exporter %q: failed to retrieve field at path %q",
utils.EEs, e.cfg.ID, path))
continue
}
utils.Logger.Warning(fmt.Sprintf(
"<%s> exporter %q: failed to index document: %+v",
utils.EEs, e.Cfg().ID, errResp))
path = path[:len(path)-1] // remove the last index
preparedMap[strings.Join(path, utils.NestingSep)] = item.String()
}
return nil
return preparedMap, nil
}
func (e *ElasticEE) Close() error {
e.mu.Lock()
defer e.mu.Unlock()
e.client = nil
e.mu.Unlock()
return nil
}

View File

@@ -18,9 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ees
import (
"bytes"
"io"
"net/http"
"reflect"
"testing"
@@ -50,7 +47,7 @@ func TestInitClient(t *testing.T) {
Opts: &config.EventExporterOpts{},
},
}
if err := ee.prepareOpts(); err != nil {
if err := ee.parseClientOpts(); err != nil {
t.Error(err)
}
errExpect := `cannot create client: cannot parse url: parse "/\x00": net/url: invalid control character in URL`
@@ -58,171 +55,8 @@ func TestInitClient(t *testing.T) {
t.Errorf("Expected %+v \n but got %+v", errExpect, err)
}
}
func TestInitCase1(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: &config.EventExporterOpts{
ElsIndex: utils.StringPointer("test"),
},
},
}
if err := ee.prepareOpts(); err != nil {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.indexReqOpts.Index, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.indexReqOpts.Index)
}
}
func TestInitCase2(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: &config.EventExporterOpts{
ElsIfPrimaryTerm: utils.IntPointer(20),
},
},
}
if err := ee.prepareOpts(); err != nil {
t.Error(err)
}
eeExpect := utils.IntPointer(20)
if !reflect.DeepEqual(ee.indexReqOpts.IfPrimaryTerm, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfPrimaryTerm))
}
}
func TestInitCase3(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: &config.EventExporterOpts{
ElsIfSeqNo: utils.IntPointer(20),
},
},
}
if err := ee.prepareOpts(); err != nil {
t.Error(err)
}
eeExpect := utils.IntPointer(20)
if !reflect.DeepEqual(ee.indexReqOpts.IfSeqNo, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfSeqNo))
}
}
func TestInitCase4(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: &config.EventExporterOpts{
ElsOpType: utils.StringPointer("test"),
},
},
}
if err := ee.prepareOpts(); err != nil {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.indexReqOpts.OpType, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.OpType))
}
}
func TestInitCase5(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: &config.EventExporterOpts{
ElsPipeline: utils.StringPointer("test"),
},
},
}
if err := ee.prepareOpts(); err != nil {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.indexReqOpts.Pipeline, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Pipeline))
}
}
func TestInitCase6(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: &config.EventExporterOpts{
ElsRouting: utils.StringPointer("test"),
},
},
}
if err := ee.prepareOpts(); err != nil {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.indexReqOpts.Routing, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Routing))
}
}
func TestInitCase8(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: &config.EventExporterOpts{
ElsVersion: utils.IntPointer(20),
},
},
}
if err := ee.prepareOpts(); err != nil {
t.Error(err)
}
eeExpect := utils.IntPointer(20)
if !reflect.DeepEqual(ee.indexReqOpts.Version, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Version))
}
}
func TestInitCase9(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: &config.EventExporterOpts{
ElsVersionType: utils.StringPointer("test"),
},
},
}
if err := ee.prepareOpts(); err != nil {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.indexReqOpts.VersionType, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.VersionType))
}
}
func TestInitCase10(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: &config.EventExporterOpts{
ElsWaitForActiveShards: utils.StringPointer("test"),
},
},
}
if err := ee.prepareOpts(); err != nil {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.indexReqOpts.WaitForActiveShards, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.WaitForActiveShards))
}
}
type mockClientErr struct{}
func (mockClientErr) Perform(req *http.Request) (res *http.Response, err error) {
res = &http.Response{
StatusCode: 300,
Body: io.NopCloser(bytes.NewBuffer([]byte(`{"test":"test"}`))),
Header: http.Header{},
}
return res, nil
}
func TestElasticExportEvent(t *testing.T) {
func TestElasticExportEventErr(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dc, err := newEEMetrics("Local")
if err != nil {
@@ -235,90 +69,7 @@ func TestElasticExportEvent(t *testing.T) {
if err = eEe.Connect(); err != nil {
t.Error(err)
}
eEe.client.Transport = new(mockClientErr)
if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err != nil {
t.Error(err)
}
}
type mockClientErr2 struct{}
func (mockClientErr2) Perform(req *http.Request) (res *http.Response, err error) {
res = &http.Response{
StatusCode: 300,
Body: io.NopCloser(bytes.NewBuffer([]byte(""))),
Header: http.Header{},
}
return res, nil
}
func TestElasticExportEvent2(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dc, err := newEEMetrics("Local")
if err != nil {
t.Error(err)
}
eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
if err != nil {
t.Error(err)
}
if err = eEe.Connect(); err != nil {
t.Error(err)
}
eEe.client.Transport = new(mockClientErr2)
errExpect := io.EOF
if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err == nil || err != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
type mockClient struct{}
func (mockClient) Perform(req *http.Request) (res *http.Response, err error) {
res = &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewBuffer([]byte(""))),
Header: http.Header{},
}
return res, nil
}
func TestElasticExportEvent3(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dc, err := newEEMetrics("Local")
if err != nil {
t.Error(err)
}
eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
if err != nil {
t.Error(err)
}
if err = eEe.Connect(); err != nil {
t.Error(err)
}
eEe.client.Transport = new(mockClient)
errExpect := `the client noticed that the server is not Elasticsearch and we do not support this unknown product`
cgrCfg.EEsCfg().Exporters[0].ComputeFields()
if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err == nil || err.Error() != errExpect {
t.Error(err)
}
}
func TestElasticExportEvent4(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dc, err := newEEMetrics("Local")
if err != nil {
t.Error(err)
}
eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
if err != nil {
t.Error(err)
}
if err = eEe.Connect(); err != nil {
t.Error(err)
}
errExpect := `unsupported protocol scheme ""`
errExpect := `an error happened during the Index query execution: unsupported protocol scheme ""`
if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err == nil || err.Error() != errExpect {
t.Errorf("Expected %q but got %q", errExpect, err)
}