Slightly refactor elastic exporter

now returns error in case of invalid logger type. Could be removed
in the future in favor of handling it in configsanity.go.
This commit is contained in:
ionutboangiu
2025-01-27 17:21:26 +02:00
committed by Dan Christian Bogos
parent 39ef7df677
commit 3a0579a8ea
3 changed files with 154 additions and 135 deletions

View File

@@ -219,7 +219,7 @@ func TestNewEventExporterCase7(t *testing.T) {
newEE := ee.(*ElasticEE)
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
eeExpect.eClnt = newEE.eClnt
eeExpect.client = newEE.client
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", eeExpect, newEE)
}

View File

@@ -36,192 +36,211 @@ import (
elasticsearch "github.com/elastic/go-elasticsearch/v8"
)
func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (eEe *ElasticEE, err error) {
eEe = &ElasticEE{
func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) {
el := &ElasticEE{
cfg: cfg,
dc: dc,
reqs: newConcReq(cfg.ConcurrentRequests),
}
err = eEe.prepareOpts()
return
if err := el.prepareOpts(); err != nil {
return nil, err
}
return el, nil
}
// ElasticEE implements EventExporter interface for ElasticSearch export
// ElasticEE implements EventExporter interface for ElasticSearch export.
type ElasticEE struct {
cfg *config.EventExporterCfg
eClnt *elasticsearch.Client
dc *utils.SafeMapStorage
opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap
clntOpts elasticsearch.Config
reqs *concReq
sync.RWMutex
mu sync.RWMutex
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
reqs *concReq
bytePreparing
client *elasticsearch.Client
clientCfg elasticsearch.Config
// indexReqOpts is used to store IndexRequest options for convenience
// and does not represent the IndexRequest itself.
indexReqOpts esapi.IndexRequest
}
// init will create all the necessary dependencies, including opening the file
func (eEe *ElasticEE) prepareOpts() (err error) {
//parse opts
eEe.opts.Index = utils.CDRsTBL
if eEe.Cfg().Opts.ElsIndex != nil {
eEe.opts.Index = *eEe.Cfg().Opts.ElsIndex
func (e *ElasticEE) prepareOpts() error {
opts := e.cfg.Opts
// Parse index request options.
e.indexReqOpts.Index = utils.CDRsTBL
if opts.ElsIndex != nil {
e.indexReqOpts.Index = *opts.ElsIndex
}
eEe.opts.IfPrimaryTerm = eEe.Cfg().Opts.ElsIfPrimaryTerm
eEe.opts.IfSeqNo = eEe.Cfg().Opts.ElsIfSeqNo
if eEe.Cfg().Opts.ElsOpType != nil {
eEe.opts.OpType = *eEe.Cfg().Opts.ElsOpType
e.indexReqOpts.IfPrimaryTerm = opts.ElsIfPrimaryTerm
e.indexReqOpts.IfSeqNo = opts.ElsIfSeqNo
if opts.ElsOpType != nil {
e.indexReqOpts.OpType = *opts.ElsOpType
}
if eEe.Cfg().Opts.ElsPipeline != nil {
eEe.opts.Pipeline = *eEe.Cfg().Opts.ElsPipeline
if opts.ElsPipeline != nil {
e.indexReqOpts.Pipeline = *opts.ElsPipeline
}
if eEe.Cfg().Opts.ElsRouting != nil {
eEe.opts.Routing = *eEe.Cfg().Opts.ElsRouting
if opts.ElsRouting != nil {
e.indexReqOpts.Routing = *opts.ElsRouting
}
if eEe.Cfg().Opts.ElsTimeout != nil {
eEe.opts.Timeout = *eEe.Cfg().Opts.ElsTimeout
if opts.ElsTimeout != nil {
e.indexReqOpts.Timeout = *opts.ElsTimeout
}
eEe.opts.Version = eEe.Cfg().Opts.ElsVersion
if eEe.Cfg().Opts.ElsVersionType != nil {
eEe.opts.VersionType = *eEe.Cfg().Opts.ElsVersionType
e.indexReqOpts.Version = opts.ElsVersion
if opts.ElsVersionType != nil {
e.indexReqOpts.VersionType = *opts.ElsVersionType
}
if eEe.Cfg().Opts.ElsWaitForActiveShards != nil {
eEe.opts.WaitForActiveShards = *eEe.Cfg().Opts.ElsWaitForActiveShards
if opts.ElsWaitForActiveShards != nil {
e.indexReqOpts.WaitForActiveShards = *opts.ElsWaitForActiveShards
}
//client opts
if eEe.Cfg().Opts.ElsCloud != nil && *eEe.Cfg().Opts.ElsCloud {
eEe.clntOpts.CloudID = eEe.Cfg().ExportPath
// Parse client config options.
if opts.ElsCloud != nil && *opts.ElsCloud {
e.clientCfg.CloudID = e.Cfg().ExportPath
} else {
eEe.clntOpts.Addresses = strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep)
e.clientCfg.Addresses = strings.Split(e.Cfg().ExportPath, utils.InfieldSep)
}
if eEe.Cfg().Opts.ElsUsername != nil {
eEe.clntOpts.Username = *eEe.Cfg().Opts.ElsUsername
if opts.ElsUsername != nil {
e.clientCfg.Username = *opts.ElsUsername
}
if eEe.Cfg().Opts.ElsPassword != nil {
eEe.clntOpts.Password = *eEe.Cfg().Opts.ElsPassword
if opts.ElsPassword != nil {
e.clientCfg.Password = *opts.ElsPassword
}
if eEe.Cfg().Opts.ElsAPIKey != nil {
eEe.clntOpts.APIKey = *eEe.Cfg().Opts.ElsAPIKey
if opts.ElsAPIKey != nil {
e.clientCfg.APIKey = *opts.ElsAPIKey
}
if eEe.Cfg().Opts.CAPath != nil {
var cacert []byte
cacert, err = os.ReadFile(*eEe.Cfg().Opts.CAPath)
if opts.CAPath != nil {
cacert, err := os.ReadFile(*opts.CAPath)
if err != nil {
return
return err
}
eEe.clntOpts.CACert = cacert
e.clientCfg.CACert = cacert
}
if eEe.Cfg().Opts.ElsCertificateFingerprint != nil {
eEe.clntOpts.CertificateFingerprint = *eEe.Cfg().Opts.ElsCertificateFingerprint
if opts.ElsCertificateFingerprint != nil {
e.clientCfg.CertificateFingerprint = *opts.ElsCertificateFingerprint
}
if eEe.Cfg().Opts.ElsServiceToken != nil {
eEe.clntOpts.ServiceToken = *eEe.Cfg().Opts.ElsServiceToken
if opts.ElsServiceToken != nil {
e.clientCfg.ServiceToken = *opts.ElsServiceToken
}
if eEe.Cfg().Opts.ElsDiscoverNodesOnStart != nil {
eEe.clntOpts.DiscoverNodesOnStart = *eEe.Cfg().Opts.ElsDiscoverNodesOnStart
if opts.ElsDiscoverNodesOnStart != nil {
e.clientCfg.DiscoverNodesOnStart = *opts.ElsDiscoverNodesOnStart
}
if eEe.Cfg().Opts.ElsDiscoverNodeInterval != nil {
eEe.clntOpts.DiscoverNodesInterval = *eEe.Cfg().Opts.ElsDiscoverNodeInterval
if opts.ElsDiscoverNodeInterval != nil {
e.clientCfg.DiscoverNodesInterval = *opts.ElsDiscoverNodeInterval
}
if eEe.Cfg().Opts.ElsEnableDebugLogger != nil {
eEe.clntOpts.EnableDebugLogger = *eEe.Cfg().Opts.ElsEnableDebugLogger
if opts.ElsEnableDebugLogger != nil {
e.clientCfg.EnableDebugLogger = *opts.ElsEnableDebugLogger
}
if loggerType := eEe.Cfg().Opts.ElsLogger; loggerType != nil {
if loggerType := opts.ElsLogger; loggerType != nil {
var logger elastictransport.Logger
switch *loggerType {
case utils.ElsJson:
logger = &elastictransport.JSONLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true}
logger = &elastictransport.JSONLogger{
Output: os.Stdout,
EnableRequestBody: true,
EnableResponseBody: true,
}
case utils.ElsColor:
logger = &elastictransport.ColorLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true}
logger = &elastictransport.ColorLogger{
Output: os.Stdout,
EnableRequestBody: true,
EnableResponseBody: true,
}
case utils.ElsText:
logger = &elastictransport.TextLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true}
logger = &elastictransport.TextLogger{
Output: os.Stdout,
EnableRequestBody: true,
EnableResponseBody: true,
}
default:
return
return fmt.Errorf("invalid logger type: %q", *loggerType)
}
eEe.clntOpts.Logger = logger
e.clientCfg.Logger = logger
}
if eEe.Cfg().Opts.ElsCompressRequestBody != nil {
eEe.clntOpts.CompressRequestBody = *eEe.Cfg().Opts.ElsCompressRequestBody
if opts.ElsCompressRequestBody != nil {
e.clientCfg.CompressRequestBody = *opts.ElsCompressRequestBody
}
if eEe.Cfg().Opts.ElsRetryOnStatus != nil {
eEe.clntOpts.RetryOnStatus = *eEe.Cfg().Opts.ElsRetryOnStatus
if opts.ElsRetryOnStatus != nil {
e.clientCfg.RetryOnStatus = *opts.ElsRetryOnStatus
}
if eEe.Cfg().Opts.ElsMaxRetries != nil {
eEe.clntOpts.MaxRetries = *eEe.Cfg().Opts.ElsMaxRetries
if opts.ElsMaxRetries != nil {
e.clientCfg.MaxRetries = *opts.ElsMaxRetries
}
if eEe.Cfg().Opts.ElsDisableRetry != nil {
eEe.clntOpts.DisableRetry = *eEe.Cfg().Opts.ElsDisableRetry
if opts.ElsDisableRetry != nil {
e.clientCfg.DisableRetry = *opts.ElsDisableRetry
}
if eEe.Cfg().Opts.ElsCompressRequestBodyLevel != nil {
eEe.clntOpts.CompressRequestBodyLevel = *eEe.Cfg().Opts.ElsCompressRequestBodyLevel
if opts.ElsCompressRequestBodyLevel != nil {
e.clientCfg.CompressRequestBodyLevel = *opts.ElsCompressRequestBodyLevel
}
return
return nil
}
func (eEe *ElasticEE) Cfg() *config.EventExporterCfg { return eEe.cfg }
func (e *ElasticEE) Cfg() *config.EventExporterCfg { return e.cfg }
func (eEe *ElasticEE) Connect() (err error) {
eEe.Lock()
defer eEe.Unlock()
// create the client
if eEe.eClnt != nil {
func (e *ElasticEE) Connect() (err error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.client != nil { // check if connection is cached
return
}
eEe.eClnt, err = elasticsearch.NewClient(eEe.clntOpts)
e.client, err = elasticsearch.NewClient(e.clientCfg)
return
}
// ExportEvent implements EventExporter
func (eEe *ElasticEE) ExportEvent(ctx *context.Context, ev, extraData any) (err error) {
eEe.reqs.get()
eEe.RLock()
func (e *ElasticEE) ExportEvent(ctx *context.Context, ev, extraData any) error {
e.reqs.get()
e.mu.RLock()
defer func() {
eEe.RUnlock()
eEe.reqs.done()
e.mu.RUnlock()
e.reqs.done()
}()
if eEe.eClnt == nil {
if e.client == nil {
return utils.ErrDisconnected
}
key := extraData.(string)
eReq := esapi.IndexRequest{
Index: eEe.opts.Index,
req := esapi.IndexRequest{
DocumentID: key,
Body: bytes.NewReader(ev.([]byte)),
Refresh: "true",
IfPrimaryTerm: eEe.opts.IfPrimaryTerm,
IfSeqNo: eEe.opts.IfSeqNo,
OpType: eEe.opts.OpType,
Pipeline: eEe.opts.Pipeline,
Routing: eEe.opts.Routing,
Timeout: eEe.opts.Timeout,
Version: eEe.opts.Version,
VersionType: eEe.opts.VersionType,
WaitForActiveShards: eEe.opts.WaitForActiveShards,
Index: e.indexReqOpts.Index,
IfPrimaryTerm: e.indexReqOpts.IfPrimaryTerm,
IfSeqNo: e.indexReqOpts.IfSeqNo,
OpType: e.indexReqOpts.OpType,
Pipeline: e.indexReqOpts.Pipeline,
Routing: e.indexReqOpts.Routing,
Timeout: e.indexReqOpts.Timeout,
Version: e.indexReqOpts.Version,
VersionType: e.indexReqOpts.VersionType,
WaitForActiveShards: e.indexReqOpts.WaitForActiveShards,
}
var resp *esapi.Response
if resp, err = eReq.Do(ctx, eEe.eClnt); err != nil {
return
resp, err := req.Do(ctx, e.client)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.IsError() {
var e map[string]any
if err = json.NewDecoder(resp.Body).Decode(&e); err != nil {
return
var errResp map[string]any
if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil {
return err
}
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%+v> when indexing document",
utils.EEs, eEe.Cfg().ID, e))
utils.Logger.Warning(fmt.Sprintf(
"<%s> exporter %q: failed to index document: %+v",
utils.EEs, e.Cfg().ID, errResp))
}
return
return nil
}
func (eEe *ElasticEE) Close() (_ error) {
eEe.Lock()
eEe.eClnt = nil
eEe.Unlock()
return
func (e *ElasticEE) Close() error {
e.mu.Lock()
e.client = nil
e.mu.Unlock()
return nil
}
func (eEe *ElasticEE) GetMetrics() *utils.SafeMapStorage { return eEe.dc }
func (e *ElasticEE) GetMetrics() *utils.SafeMapStorage { return e.dc }
func (eEE *ElasticEE) ExtraData(ev *utils.CGREvent) any {
return utils.ConcatenatedKey(

View File

@@ -70,8 +70,8 @@ func TestInitCase1(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.opts.Index, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.opts.Index)
if !reflect.DeepEqual(ee.indexReqOpts.Index, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.indexReqOpts.Index)
}
}
@@ -87,8 +87,8 @@ func TestInitCase2(t *testing.T) {
t.Error(err)
}
eeExpect := utils.IntPointer(20)
if !reflect.DeepEqual(ee.opts.IfPrimaryTerm, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.IfPrimaryTerm))
if !reflect.DeepEqual(ee.indexReqOpts.IfPrimaryTerm, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfPrimaryTerm))
}
}
@@ -104,8 +104,8 @@ func TestInitCase3(t *testing.T) {
t.Error(err)
}
eeExpect := utils.IntPointer(20)
if !reflect.DeepEqual(ee.opts.IfSeqNo, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.IfSeqNo))
if !reflect.DeepEqual(ee.indexReqOpts.IfSeqNo, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfSeqNo))
}
}
@@ -121,8 +121,8 @@ func TestInitCase4(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.opts.OpType, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.OpType))
if !reflect.DeepEqual(ee.indexReqOpts.OpType, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.OpType))
}
}
@@ -138,8 +138,8 @@ func TestInitCase5(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.opts.Pipeline, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.Pipeline))
if !reflect.DeepEqual(ee.indexReqOpts.Pipeline, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Pipeline))
}
}
@@ -155,8 +155,8 @@ func TestInitCase6(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.opts.Routing, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.Routing))
if !reflect.DeepEqual(ee.indexReqOpts.Routing, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Routing))
}
}
@@ -172,8 +172,8 @@ func TestInitCase8(t *testing.T) {
t.Error(err)
}
eeExpect := utils.IntPointer(20)
if !reflect.DeepEqual(ee.opts.Version, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.Version))
if !reflect.DeepEqual(ee.indexReqOpts.Version, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Version))
}
}
@@ -189,8 +189,8 @@ func TestInitCase9(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.opts.VersionType, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.VersionType))
if !reflect.DeepEqual(ee.indexReqOpts.VersionType, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.VersionType))
}
}
@@ -206,8 +206,8 @@ func TestInitCase10(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.opts.WaitForActiveShards, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.WaitForActiveShards))
if !reflect.DeepEqual(ee.indexReqOpts.WaitForActiveShards, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.WaitForActiveShards))
}
}
@@ -235,7 +235,7 @@ func TestElasticExportEvent(t *testing.T) {
if err = eEe.Connect(); err != nil {
t.Error(err)
}
eEe.eClnt.Transport = new(mockClientErr)
eEe.client.Transport = new(mockClientErr)
if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err != nil {
t.Error(err)
}
@@ -265,7 +265,7 @@ func TestElasticExportEvent2(t *testing.T) {
if err = eEe.Connect(); err != nil {
t.Error(err)
}
eEe.eClnt.Transport = new(mockClientErr2)
eEe.client.Transport = new(mockClientErr2)
errExpect := io.EOF
if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err == nil || err != errExpect {
@@ -296,7 +296,7 @@ func TestElasticExportEvent3(t *testing.T) {
if err = eEe.Connect(); err != nil {
t.Error(err)
}
eEe.eClnt.Transport = new(mockClient)
eEe.client.Transport = new(mockClient)
errExpect := `the client noticed that the server is not Elasticsearch and we do not support this unknown product`
cgrCfg.EEsCfg().Exporters[0].ComputeFields()