Slightly refactor elastic exporter

now returns error in case of invalid logger type. Could be removed
in the future in favor of handling it in configsanity.go.
This commit is contained in:
ionutboangiu
2024-11-22 22:32:43 +02:00
committed by Dan Christian Bogos
parent 248bf792d5
commit ae264deadb
3 changed files with 160 additions and 141 deletions

View File

@@ -204,7 +204,7 @@ func TestNewEventExporterCase7(t *testing.T) {
newEE := ee.(*ElasticEE)
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
eeExpect.eClnt = newEE.eClnt
eeExpect.client = newEE.client
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", eeExpect, newEE)
}

View File

@@ -35,188 +35,207 @@ import (
elasticsearch "github.com/elastic/go-elasticsearch/v8"
)
func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (eEe *ElasticEE, err error) {
eEe = &ElasticEE{
func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) {
el := &ElasticEE{
cfg: cfg,
dc: dc,
reqs: newConcReq(cfg.ConcurrentRequests),
}
err = eEe.prepareOpts()
return
if err := el.prepareOpts(); err != nil {
return nil, err
}
return el, nil
}
// ElasticEE implements EventExporter interface for ElasticSearch export
// ElasticEE implements EventExporter interface for ElasticSearch export.
type ElasticEE struct {
cfg *config.EventExporterCfg
eClnt *elasticsearch.Client
dc *utils.SafeMapStorage
indxOpts esapi.IndexRequest // this variable is used only for storing the options from OptsMap
reqs *concReq
clnOpts elasticsearch.Config
sync.RWMutex
mu sync.RWMutex
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
reqs *concReq
bytePreparing
client *elasticsearch.Client
clientCfg elasticsearch.Config
// indexReqOpts is used to store IndexRequest options for convenience
// and does not represent the IndexRequest itself.
indexReqOpts esapi.IndexRequest
}
// init will create all the necessary dependencies, including opening the file
func (eEe *ElasticEE) prepareOpts() (err error) {
//parse opts
eEe.indxOpts.Index = utils.CDRsTBL
if eEe.Cfg().Opts.Els.Index != nil {
eEe.indxOpts.Index = *eEe.Cfg().Opts.Els.Index
func (e *ElasticEE) prepareOpts() error {
opts := e.cfg.Opts.Els
// Parse index request options.
e.indexReqOpts.Index = utils.CDRsTBL
if opts.Index != nil {
e.indexReqOpts.Index = *opts.Index
}
eEe.indxOpts.IfPrimaryTerm = eEe.Cfg().Opts.Els.IfPrimaryTerm
eEe.indxOpts.IfSeqNo = eEe.Cfg().Opts.Els.IfSeqNo
if eEe.Cfg().Opts.Els.OpType != nil {
eEe.indxOpts.OpType = *eEe.Cfg().Opts.Els.OpType
e.indexReqOpts.IfPrimaryTerm = opts.IfPrimaryTerm
e.indexReqOpts.IfSeqNo = opts.IfSeqNo
if opts.OpType != nil {
e.indexReqOpts.OpType = *opts.OpType
}
if eEe.Cfg().Opts.Els.Pipeline != nil {
eEe.indxOpts.Pipeline = *eEe.Cfg().Opts.Els.Pipeline
if opts.Pipeline != nil {
e.indexReqOpts.Pipeline = *opts.Pipeline
}
if eEe.Cfg().Opts.Els.Routing != nil {
eEe.indxOpts.Routing = *eEe.Cfg().Opts.Els.Routing
if opts.Routing != nil {
e.indexReqOpts.Routing = *opts.Routing
}
if eEe.Cfg().Opts.Els.Timeout != nil {
eEe.indxOpts.Timeout = *eEe.Cfg().Opts.Els.Timeout
if opts.Timeout != nil {
e.indexReqOpts.Timeout = *opts.Timeout
}
eEe.indxOpts.Version = eEe.Cfg().Opts.Els.Version
if eEe.Cfg().Opts.Els.VersionType != nil {
eEe.indxOpts.VersionType = *eEe.Cfg().Opts.Els.VersionType
e.indexReqOpts.Version = opts.Version
if opts.VersionType != nil {
e.indexReqOpts.VersionType = *opts.VersionType
}
if eEe.Cfg().Opts.Els.WaitForActiveShards != nil {
eEe.indxOpts.WaitForActiveShards = *eEe.Cfg().Opts.Els.WaitForActiveShards
if opts.WaitForActiveShards != nil {
e.indexReqOpts.WaitForActiveShards = *opts.WaitForActiveShards
}
//client config
if eEe.Cfg().Opts.Els.Cloud != nil && *eEe.Cfg().Opts.Els.Cloud {
eEe.clnOpts.CloudID = eEe.Cfg().ExportPath
// Parse client config options.
if opts.Cloud != nil && *opts.Cloud {
e.clientCfg.CloudID = e.Cfg().ExportPath
} else {
eEe.clnOpts.Addresses = strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep)
e.clientCfg.Addresses = strings.Split(e.Cfg().ExportPath, utils.InfieldSep)
}
if eEe.Cfg().Opts.Els.Username != nil {
eEe.clnOpts.Username = *eEe.Cfg().Opts.Els.Username
if opts.Username != nil {
e.clientCfg.Username = *opts.Username
}
if eEe.Cfg().Opts.Els.Password != nil {
eEe.clnOpts.Password = *eEe.Cfg().Opts.Els.Password
if opts.Password != nil {
e.clientCfg.Password = *opts.Password
}
if eEe.Cfg().Opts.Els.APIKey != nil {
eEe.clnOpts.APIKey = *eEe.Cfg().Opts.Els.APIKey
if opts.APIKey != nil {
e.clientCfg.APIKey = *opts.APIKey
}
if eEe.Cfg().Opts.RPC.CAPath != nil {
var cacert []byte
cacert, err = os.ReadFile(*eEe.Cfg().Opts.RPC.CAPath)
if e.Cfg().Opts.RPC.CAPath != nil {
cacert, err := os.ReadFile(*e.Cfg().Opts.RPC.CAPath)
if err != nil {
return
return err
}
eEe.clnOpts.CACert = cacert
e.clientCfg.CACert = cacert
}
if eEe.Cfg().Opts.Els.CertificateFingerprint != nil {
eEe.clnOpts.CertificateFingerprint = *eEe.Cfg().Opts.Els.CertificateFingerprint
if opts.CertificateFingerprint != nil {
e.clientCfg.CertificateFingerprint = *opts.CertificateFingerprint
}
if eEe.Cfg().Opts.Els.ServiceToken != nil {
eEe.clnOpts.ServiceToken = *eEe.Cfg().Opts.Els.ServiceToken
if opts.ServiceToken != nil {
e.clientCfg.ServiceToken = *opts.ServiceToken
}
if eEe.Cfg().Opts.Els.DiscoverNodesOnStart != nil {
eEe.clnOpts.DiscoverNodesOnStart = *eEe.Cfg().Opts.Els.DiscoverNodesOnStart
if opts.DiscoverNodesOnStart != nil {
e.clientCfg.DiscoverNodesOnStart = *opts.DiscoverNodesOnStart
}
if eEe.Cfg().Opts.Els.DiscoverNodeInterval != nil {
eEe.clnOpts.DiscoverNodesInterval = *eEe.Cfg().Opts.Els.DiscoverNodeInterval
if opts.DiscoverNodeInterval != nil {
e.clientCfg.DiscoverNodesInterval = *opts.DiscoverNodeInterval
}
if eEe.Cfg().Opts.Els.EnableDebugLogger != nil {
eEe.clnOpts.EnableDebugLogger = *eEe.Cfg().Opts.Els.EnableDebugLogger
if opts.EnableDebugLogger != nil {
e.clientCfg.EnableDebugLogger = *opts.EnableDebugLogger
}
if loggerType := eEe.Cfg().Opts.Els.Logger; loggerType != nil {
if loggerType := opts.Logger; loggerType != nil {
var logger elastictransport.Logger
switch *loggerType {
case utils.ElsJson:
logger = &elastictransport.JSONLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true}
logger = &elastictransport.JSONLogger{
Output: os.Stdout,
EnableRequestBody: true,
EnableResponseBody: true,
}
case utils.ElsColor:
logger = &elastictransport.ColorLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true}
logger = &elastictransport.ColorLogger{
Output: os.Stdout,
EnableRequestBody: true,
EnableResponseBody: true,
}
case utils.ElsText:
logger = &elastictransport.TextLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true}
logger = &elastictransport.TextLogger{
Output: os.Stdout,
EnableRequestBody: true,
EnableResponseBody: true,
}
default:
return
return fmt.Errorf("invalid logger type: %q", *loggerType)
}
eEe.clnOpts.Logger = logger
e.clientCfg.Logger = logger
}
if eEe.Cfg().Opts.Els.CompressRequestBody != nil {
eEe.clnOpts.CompressRequestBody = *eEe.Cfg().Opts.Els.CompressRequestBody
if opts.CompressRequestBody != nil {
e.clientCfg.CompressRequestBody = *opts.CompressRequestBody
}
if eEe.Cfg().Opts.Els.RetryOnStatus != nil {
eEe.clnOpts.RetryOnStatus = *eEe.Cfg().Opts.Els.RetryOnStatus
if opts.RetryOnStatus != nil {
e.clientCfg.RetryOnStatus = *opts.RetryOnStatus
}
if eEe.Cfg().Opts.Els.MaxRetries != nil {
eEe.clnOpts.MaxRetries = *eEe.Cfg().Opts.Els.MaxRetries
if opts.MaxRetries != nil {
e.clientCfg.MaxRetries = *opts.MaxRetries
}
if eEe.Cfg().Opts.Els.DisableRetry != nil {
eEe.clnOpts.DisableRetry = *eEe.Cfg().Opts.Els.DisableRetry
if opts.DisableRetry != nil {
e.clientCfg.DisableRetry = *opts.DisableRetry
}
if eEe.Cfg().Opts.Els.CompressRequestBodyLevel != nil {
eEe.clnOpts.CompressRequestBodyLevel = *eEe.Cfg().Opts.Els.CompressRequestBodyLevel
if opts.CompressRequestBodyLevel != nil {
e.clientCfg.CompressRequestBodyLevel = *opts.CompressRequestBodyLevel
}
return
return nil
}
func (eEe *ElasticEE) Cfg() *config.EventExporterCfg { return eEe.cfg }
func (e *ElasticEE) Cfg() *config.EventExporterCfg { return e.cfg }
func (eEe *ElasticEE) Connect() (err error) {
eEe.Lock()
defer eEe.Unlock()
// create the client
if eEe.eClnt != nil {
func (e *ElasticEE) Connect() (err error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.client != nil { // check if connection is cached
return
}
eEe.eClnt, err = elasticsearch.NewClient(eEe.clnOpts)
e.client, err = elasticsearch.NewClient(e.clientCfg)
return
}
// ExportEvent implements EventExporter
func (eEe *ElasticEE) ExportEvent(ev any, key string) (err error) {
eEe.reqs.get()
eEe.RLock()
func (e *ElasticEE) ExportEvent(ev any, key string) error {
e.reqs.get()
e.mu.RLock()
defer func() {
eEe.RUnlock()
eEe.reqs.done()
e.mu.RUnlock()
e.reqs.done()
}()
if eEe.eClnt == nil {
if e.client == nil {
return utils.ErrDisconnected
}
eReq := esapi.IndexRequest{
Index: eEe.indxOpts.Index,
req := esapi.IndexRequest{
DocumentID: key,
Body: bytes.NewReader(ev.([]byte)),
Refresh: "true",
IfPrimaryTerm: eEe.indxOpts.IfPrimaryTerm,
IfSeqNo: eEe.indxOpts.IfSeqNo,
OpType: eEe.indxOpts.OpType,
Pipeline: eEe.indxOpts.Pipeline,
Routing: eEe.indxOpts.Routing,
Timeout: eEe.indxOpts.Timeout,
Version: eEe.indxOpts.Version,
VersionType: eEe.indxOpts.VersionType,
WaitForActiveShards: eEe.indxOpts.WaitForActiveShards,
Index: e.indexReqOpts.Index,
IfPrimaryTerm: e.indexReqOpts.IfPrimaryTerm,
IfSeqNo: e.indexReqOpts.IfSeqNo,
OpType: e.indexReqOpts.OpType,
Pipeline: e.indexReqOpts.Pipeline,
Routing: e.indexReqOpts.Routing,
Timeout: e.indexReqOpts.Timeout,
Version: e.indexReqOpts.Version,
VersionType: e.indexReqOpts.VersionType,
WaitForActiveShards: e.indexReqOpts.WaitForActiveShards,
}
var resp *esapi.Response
if resp, err = eReq.Do(context.Background(), eEe.eClnt); err != nil {
return
resp, err := req.Do(context.Background(), e.client)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.IsError() {
var e map[string]any
if err = json.NewDecoder(resp.Body).Decode(&e); err != nil {
return
var errResp map[string]any
if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil {
return err
}
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%+v> when indexing document",
utils.EEs, eEe.Cfg().ID, e))
utils.Logger.Warning(fmt.Sprintf(
"<%s> exporter %q: failed to index document: %+v",
utils.EEs, e.Cfg().ID, errResp))
}
return
return nil
}
func (eEe *ElasticEE) Close() (_ error) {
eEe.Lock()
eEe.eClnt = nil
eEe.Unlock()
return
func (e *ElasticEE) Close() error {
e.mu.Lock()
e.client = nil
e.mu.Unlock()
return nil
}
func (eEe *ElasticEE) GetMetrics() *utils.SafeMapStorage { return eEe.dc }
func (e *ElasticEE) GetMetrics() *utils.SafeMapStorage { return e.dc }

View File

@@ -74,8 +74,8 @@ func TestInitCase1(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.indxOpts.Index, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.indxOpts.Index)
if !reflect.DeepEqual(ee.indexReqOpts.Index, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.indexReqOpts.Index)
}
}
@@ -93,8 +93,8 @@ func TestInitCase2(t *testing.T) {
t.Error(err)
}
eeExpect := utils.IntPointer(20)
if !reflect.DeepEqual(ee.indxOpts.IfPrimaryTerm, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.IfPrimaryTerm))
if !reflect.DeepEqual(ee.indexReqOpts.IfPrimaryTerm, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfPrimaryTerm))
}
}
@@ -112,8 +112,8 @@ func TestInitCase3(t *testing.T) {
t.Error(err)
}
eeExpect := utils.IntPointer(20)
if !reflect.DeepEqual(ee.indxOpts.IfSeqNo, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.IfSeqNo))
if !reflect.DeepEqual(ee.indexReqOpts.IfSeqNo, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfSeqNo))
}
}
@@ -131,8 +131,8 @@ func TestInitCase4(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.indxOpts.OpType, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.OpType))
if !reflect.DeepEqual(ee.indexReqOpts.OpType, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.OpType))
}
}
@@ -150,8 +150,8 @@ func TestInitCase5(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.indxOpts.Pipeline, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.Pipeline))
if !reflect.DeepEqual(ee.indexReqOpts.Pipeline, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Pipeline))
}
}
@@ -169,8 +169,8 @@ func TestInitCase6(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.indxOpts.Routing, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.Routing))
if !reflect.DeepEqual(ee.indexReqOpts.Routing, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Routing))
}
}
@@ -189,8 +189,8 @@ func TestInitCase8(t *testing.T) {
t.Error(err)
}
eeExpect := utils.IntPointer(20)
if !reflect.DeepEqual(ee.indxOpts.Version, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.Version))
if !reflect.DeepEqual(ee.indexReqOpts.Version, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Version))
}
}
@@ -209,8 +209,8 @@ func TestInitCase9(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.indxOpts.VersionType, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.VersionType))
if !reflect.DeepEqual(ee.indexReqOpts.VersionType, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.VersionType))
}
}
@@ -228,8 +228,8 @@ func TestInitCase10(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.indxOpts.WaitForActiveShards, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.WaitForActiveShards))
if !reflect.DeepEqual(ee.indexReqOpts.WaitForActiveShards, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.WaitForActiveShards))
}
}
@@ -257,7 +257,7 @@ func TestElasticExportEvent(t *testing.T) {
if err = eEe.Connect(); err != nil {
t.Error(err)
}
eEe.eClnt.Transport = new(mockClientErr)
eEe.client.Transport = new(mockClientErr)
if err := eEe.ExportEvent([]byte{}, ""); err != nil {
t.Error(err)
}
@@ -287,7 +287,7 @@ func TestElasticExportEvent2(t *testing.T) {
if err = eEe.Connect(); err != nil {
t.Error(err)
}
eEe.eClnt.Transport = new(mockClientErr2)
eEe.client.Transport = new(mockClientErr2)
errExpect := io.EOF
if err := eEe.ExportEvent([]byte{}, ""); err == nil || err != errExpect {
@@ -322,7 +322,7 @@ func TestElasticExportEvent3(t *testing.T) {
if err = eEe.Connect(); err != nil {
t.Error(err)
}
eEe.eClnt.Transport = new(mockClient)
eEe.client.Transport = new(mockClient)
errExpect := `the client noticed that the server is not Elasticsearch and we do not support this unknown product`
cgrCfg.EEsCfg().Exporters[0].ComputeFields()
if err := eEe.ExportEvent([]byte{}, ""); err == nil || err.Error() != errExpect {
@@ -351,11 +351,11 @@ func TestElasticExportEvent4(t *testing.T) {
func TestElasticClose(t *testing.T) {
elasticEE := &ElasticEE{
eClnt: &elasticsearch.Client{},
client: &elasticsearch.Client{},
}
err := elasticEE.Close()
if elasticEE.eClnt != nil {
t.Errorf("expected eClnt to be nil, got %v", elasticEE.eClnt)
if elasticEE.client != nil {
t.Errorf("expected eClnt to be nil, got %v", elasticEE.client)
}
if err != nil {
t.Errorf("expected no error, got %v", err)
@@ -366,7 +366,7 @@ func TestElasticConnect(t *testing.T) {
t.Run("ClientAlreadyExists", func(t *testing.T) {
elasticEE := &ElasticEE{
eClnt: &elasticsearch.Client{},
client: &elasticsearch.Client{},
}
err := elasticEE.Connect()
@@ -374,7 +374,7 @@ func TestElasticConnect(t *testing.T) {
if err != nil {
t.Errorf("expected no error, got %v", err)
}
if elasticEE.eClnt == nil {
if elasticEE.client == nil {
t.Error("expected existing client to remain initialized")
}
})
@@ -388,7 +388,7 @@ func TestElasticConnect(t *testing.T) {
if err != nil {
t.Errorf("expected no error, got %v", err)
}
if elasticEE.eClnt == nil {
if elasticEE.client == nil {
t.Error("expected client to be initialized")
}
})