diff --git a/ees/ee_test.go b/ees/ee_test.go index db7a9f04f..2934ad0c7 100644 --- a/ees/ee_test.go +++ b/ees/ee_test.go @@ -204,7 +204,7 @@ func TestNewEventExporterCase7(t *testing.T) { newEE := ee.(*ElasticEE) newEE.dc.MapStorage[utils.TimeNow] = nil eeExpect.dc.MapStorage[utils.TimeNow] = nil - eeExpect.eClnt = newEE.eClnt + eeExpect.client = newEE.client if !reflect.DeepEqual(eeExpect, newEE) { t.Errorf("Expected %+v \n but got %+v", eeExpect, newEE) } diff --git a/ees/elastic.go b/ees/elastic.go index e71cc977a..e95cc3247 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -35,188 +35,207 @@ import ( elasticsearch "github.com/elastic/go-elasticsearch/v8" ) -func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (eEe *ElasticEE, err error) { - eEe = &ElasticEE{ +func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) { + el := &ElasticEE{ cfg: cfg, dc: dc, reqs: newConcReq(cfg.ConcurrentRequests), } - err = eEe.prepareOpts() - return + if err := el.prepareOpts(); err != nil { + return nil, err + } + return el, nil } -// ElasticEE implements EventExporter interface for ElasticSearch export +// ElasticEE implements EventExporter interface for ElasticSearch export. type ElasticEE struct { - cfg *config.EventExporterCfg - eClnt *elasticsearch.Client - dc *utils.SafeMapStorage - indxOpts esapi.IndexRequest // this variable is used only for storing the options from OptsMap - reqs *concReq - clnOpts elasticsearch.Config - sync.RWMutex + mu sync.RWMutex + cfg *config.EventExporterCfg + dc *utils.SafeMapStorage + reqs *concReq bytePreparing + + client *elasticsearch.Client + clientCfg elasticsearch.Config + + // indexReqOpts is used to store IndexRequest options for convenience + // and does not represent the IndexRequest itself. + indexReqOpts esapi.IndexRequest } // init will create all the necessary dependencies, including opening the file -func (eEe *ElasticEE) prepareOpts() (err error) { - //parse opts - eEe.indxOpts.Index = utils.CDRsTBL - if eEe.Cfg().Opts.Els.Index != nil { - eEe.indxOpts.Index = *eEe.Cfg().Opts.Els.Index +func (e *ElasticEE) prepareOpts() error { + opts := e.cfg.Opts.Els + + // Parse index request options. + e.indexReqOpts.Index = utils.CDRsTBL + if opts.Index != nil { + e.indexReqOpts.Index = *opts.Index } - eEe.indxOpts.IfPrimaryTerm = eEe.Cfg().Opts.Els.IfPrimaryTerm - eEe.indxOpts.IfSeqNo = eEe.Cfg().Opts.Els.IfSeqNo - if eEe.Cfg().Opts.Els.OpType != nil { - eEe.indxOpts.OpType = *eEe.Cfg().Opts.Els.OpType + e.indexReqOpts.IfPrimaryTerm = opts.IfPrimaryTerm + e.indexReqOpts.IfSeqNo = opts.IfSeqNo + if opts.OpType != nil { + e.indexReqOpts.OpType = *opts.OpType } - if eEe.Cfg().Opts.Els.Pipeline != nil { - eEe.indxOpts.Pipeline = *eEe.Cfg().Opts.Els.Pipeline + if opts.Pipeline != nil { + e.indexReqOpts.Pipeline = *opts.Pipeline } - if eEe.Cfg().Opts.Els.Routing != nil { - eEe.indxOpts.Routing = *eEe.Cfg().Opts.Els.Routing + if opts.Routing != nil { + e.indexReqOpts.Routing = *opts.Routing } - if eEe.Cfg().Opts.Els.Timeout != nil { - eEe.indxOpts.Timeout = *eEe.Cfg().Opts.Els.Timeout + if opts.Timeout != nil { + e.indexReqOpts.Timeout = *opts.Timeout } - eEe.indxOpts.Version = eEe.Cfg().Opts.Els.Version - if eEe.Cfg().Opts.Els.VersionType != nil { - eEe.indxOpts.VersionType = *eEe.Cfg().Opts.Els.VersionType + e.indexReqOpts.Version = opts.Version + if opts.VersionType != nil { + e.indexReqOpts.VersionType = *opts.VersionType } - if eEe.Cfg().Opts.Els.WaitForActiveShards != nil { - eEe.indxOpts.WaitForActiveShards = *eEe.Cfg().Opts.Els.WaitForActiveShards + if opts.WaitForActiveShards != nil { + e.indexReqOpts.WaitForActiveShards = *opts.WaitForActiveShards } - //client config - if eEe.Cfg().Opts.Els.Cloud != nil && *eEe.Cfg().Opts.Els.Cloud { - eEe.clnOpts.CloudID = eEe.Cfg().ExportPath + // Parse client config options. + if opts.Cloud != nil && *opts.Cloud { + e.clientCfg.CloudID = e.Cfg().ExportPath } else { - eEe.clnOpts.Addresses = strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep) + e.clientCfg.Addresses = strings.Split(e.Cfg().ExportPath, utils.InfieldSep) } - if eEe.Cfg().Opts.Els.Username != nil { - eEe.clnOpts.Username = *eEe.Cfg().Opts.Els.Username + if opts.Username != nil { + e.clientCfg.Username = *opts.Username } - if eEe.Cfg().Opts.Els.Password != nil { - eEe.clnOpts.Password = *eEe.Cfg().Opts.Els.Password + if opts.Password != nil { + e.clientCfg.Password = *opts.Password } - if eEe.Cfg().Opts.Els.APIKey != nil { - eEe.clnOpts.APIKey = *eEe.Cfg().Opts.Els.APIKey + if opts.APIKey != nil { + e.clientCfg.APIKey = *opts.APIKey } - if eEe.Cfg().Opts.RPC.CAPath != nil { - var cacert []byte - cacert, err = os.ReadFile(*eEe.Cfg().Opts.RPC.CAPath) + if e.Cfg().Opts.RPC.CAPath != nil { + cacert, err := os.ReadFile(*e.Cfg().Opts.RPC.CAPath) if err != nil { - return + return err } - eEe.clnOpts.CACert = cacert + e.clientCfg.CACert = cacert } - if eEe.Cfg().Opts.Els.CertificateFingerprint != nil { - eEe.clnOpts.CertificateFingerprint = *eEe.Cfg().Opts.Els.CertificateFingerprint + if opts.CertificateFingerprint != nil { + e.clientCfg.CertificateFingerprint = *opts.CertificateFingerprint } - if eEe.Cfg().Opts.Els.ServiceToken != nil { - eEe.clnOpts.ServiceToken = *eEe.Cfg().Opts.Els.ServiceToken + if opts.ServiceToken != nil { + e.clientCfg.ServiceToken = *opts.ServiceToken } - if eEe.Cfg().Opts.Els.DiscoverNodesOnStart != nil { - eEe.clnOpts.DiscoverNodesOnStart = *eEe.Cfg().Opts.Els.DiscoverNodesOnStart + if opts.DiscoverNodesOnStart != nil { + e.clientCfg.DiscoverNodesOnStart = *opts.DiscoverNodesOnStart } - if eEe.Cfg().Opts.Els.DiscoverNodeInterval != nil { - eEe.clnOpts.DiscoverNodesInterval = *eEe.Cfg().Opts.Els.DiscoverNodeInterval + if opts.DiscoverNodeInterval != nil { + e.clientCfg.DiscoverNodesInterval = *opts.DiscoverNodeInterval } - if eEe.Cfg().Opts.Els.EnableDebugLogger != nil { - eEe.clnOpts.EnableDebugLogger = *eEe.Cfg().Opts.Els.EnableDebugLogger + if opts.EnableDebugLogger != nil { + e.clientCfg.EnableDebugLogger = *opts.EnableDebugLogger } - if loggerType := eEe.Cfg().Opts.Els.Logger; loggerType != nil { + if loggerType := opts.Logger; loggerType != nil { var logger elastictransport.Logger switch *loggerType { case utils.ElsJson: - logger = &elastictransport.JSONLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true} + logger = &elastictransport.JSONLogger{ + Output: os.Stdout, + EnableRequestBody: true, + EnableResponseBody: true, + } case utils.ElsColor: - logger = &elastictransport.ColorLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true} + logger = &elastictransport.ColorLogger{ + Output: os.Stdout, + EnableRequestBody: true, + EnableResponseBody: true, + } case utils.ElsText: - logger = &elastictransport.TextLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true} + logger = &elastictransport.TextLogger{ + Output: os.Stdout, + EnableRequestBody: true, + EnableResponseBody: true, + } default: - return + return fmt.Errorf("invalid logger type: %q", *loggerType) } - eEe.clnOpts.Logger = logger + e.clientCfg.Logger = logger } - if eEe.Cfg().Opts.Els.CompressRequestBody != nil { - eEe.clnOpts.CompressRequestBody = *eEe.Cfg().Opts.Els.CompressRequestBody + if opts.CompressRequestBody != nil { + e.clientCfg.CompressRequestBody = *opts.CompressRequestBody } - if eEe.Cfg().Opts.Els.RetryOnStatus != nil { - eEe.clnOpts.RetryOnStatus = *eEe.Cfg().Opts.Els.RetryOnStatus + if opts.RetryOnStatus != nil { + e.clientCfg.RetryOnStatus = *opts.RetryOnStatus } - if eEe.Cfg().Opts.Els.MaxRetries != nil { - eEe.clnOpts.MaxRetries = *eEe.Cfg().Opts.Els.MaxRetries + if opts.MaxRetries != nil { + e.clientCfg.MaxRetries = *opts.MaxRetries } - if eEe.Cfg().Opts.Els.DisableRetry != nil { - eEe.clnOpts.DisableRetry = *eEe.Cfg().Opts.Els.DisableRetry + if opts.DisableRetry != nil { + e.clientCfg.DisableRetry = *opts.DisableRetry } - if eEe.Cfg().Opts.Els.CompressRequestBodyLevel != nil { - eEe.clnOpts.CompressRequestBodyLevel = *eEe.Cfg().Opts.Els.CompressRequestBodyLevel + if opts.CompressRequestBodyLevel != nil { + e.clientCfg.CompressRequestBodyLevel = *opts.CompressRequestBodyLevel } - return + return nil } -func (eEe *ElasticEE) Cfg() *config.EventExporterCfg { return eEe.cfg } +func (e *ElasticEE) Cfg() *config.EventExporterCfg { return e.cfg } -func (eEe *ElasticEE) Connect() (err error) { - eEe.Lock() - defer eEe.Unlock() - // create the client - if eEe.eClnt != nil { +func (e *ElasticEE) Connect() (err error) { + e.mu.Lock() + defer e.mu.Unlock() + if e.client != nil { // check if connection is cached return } - eEe.eClnt, err = elasticsearch.NewClient(eEe.clnOpts) + e.client, err = elasticsearch.NewClient(e.clientCfg) return } // ExportEvent implements EventExporter -func (eEe *ElasticEE) ExportEvent(ev any, key string) (err error) { - eEe.reqs.get() - eEe.RLock() +func (e *ElasticEE) ExportEvent(ev any, key string) error { + e.reqs.get() + e.mu.RLock() defer func() { - eEe.RUnlock() - eEe.reqs.done() + e.mu.RUnlock() + e.reqs.done() }() - if eEe.eClnt == nil { + if e.client == nil { return utils.ErrDisconnected } - eReq := esapi.IndexRequest{ - Index: eEe.indxOpts.Index, + req := esapi.IndexRequest{ DocumentID: key, Body: bytes.NewReader(ev.([]byte)), Refresh: "true", - IfPrimaryTerm: eEe.indxOpts.IfPrimaryTerm, - IfSeqNo: eEe.indxOpts.IfSeqNo, - OpType: eEe.indxOpts.OpType, - Pipeline: eEe.indxOpts.Pipeline, - Routing: eEe.indxOpts.Routing, - Timeout: eEe.indxOpts.Timeout, - Version: eEe.indxOpts.Version, - VersionType: eEe.indxOpts.VersionType, - WaitForActiveShards: eEe.indxOpts.WaitForActiveShards, + Index: e.indexReqOpts.Index, + IfPrimaryTerm: e.indexReqOpts.IfPrimaryTerm, + IfSeqNo: e.indexReqOpts.IfSeqNo, + OpType: e.indexReqOpts.OpType, + Pipeline: e.indexReqOpts.Pipeline, + Routing: e.indexReqOpts.Routing, + Timeout: e.indexReqOpts.Timeout, + Version: e.indexReqOpts.Version, + VersionType: e.indexReqOpts.VersionType, + WaitForActiveShards: e.indexReqOpts.WaitForActiveShards, } - var resp *esapi.Response - if resp, err = eReq.Do(context.Background(), eEe.eClnt); err != nil { - return + resp, err := req.Do(context.Background(), e.client) + if err != nil { + return err } defer resp.Body.Close() if resp.IsError() { - var e map[string]any - if err = json.NewDecoder(resp.Body).Decode(&e); err != nil { - return + var errResp map[string]any + if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil { + return err } - utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%+v> when indexing document", - utils.EEs, eEe.Cfg().ID, e)) + utils.Logger.Warning(fmt.Sprintf( + "<%s> exporter %q: failed to index document: %+v", + utils.EEs, e.Cfg().ID, errResp)) } - return + return nil } -func (eEe *ElasticEE) Close() (_ error) { - eEe.Lock() - eEe.eClnt = nil - eEe.Unlock() - return +func (e *ElasticEE) Close() error { + e.mu.Lock() + e.client = nil + e.mu.Unlock() + return nil } -func (eEe *ElasticEE) GetMetrics() *utils.SafeMapStorage { return eEe.dc } +func (e *ElasticEE) GetMetrics() *utils.SafeMapStorage { return e.dc } diff --git a/ees/elastic_test.go b/ees/elastic_test.go index 8744cde1e..b90426032 100644 --- a/ees/elastic_test.go +++ b/ees/elastic_test.go @@ -74,8 +74,8 @@ func TestInitCase1(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.indxOpts.Index, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.indxOpts.Index) + if !reflect.DeepEqual(ee.indexReqOpts.Index, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.indexReqOpts.Index) } } @@ -93,8 +93,8 @@ func TestInitCase2(t *testing.T) { t.Error(err) } eeExpect := utils.IntPointer(20) - if !reflect.DeepEqual(ee.indxOpts.IfPrimaryTerm, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.IfPrimaryTerm)) + if !reflect.DeepEqual(ee.indexReqOpts.IfPrimaryTerm, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfPrimaryTerm)) } } @@ -112,8 +112,8 @@ func TestInitCase3(t *testing.T) { t.Error(err) } eeExpect := utils.IntPointer(20) - if !reflect.DeepEqual(ee.indxOpts.IfSeqNo, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.IfSeqNo)) + if !reflect.DeepEqual(ee.indexReqOpts.IfSeqNo, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfSeqNo)) } } @@ -131,8 +131,8 @@ func TestInitCase4(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.indxOpts.OpType, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.OpType)) + if !reflect.DeepEqual(ee.indexReqOpts.OpType, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.OpType)) } } @@ -150,8 +150,8 @@ func TestInitCase5(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.indxOpts.Pipeline, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.Pipeline)) + if !reflect.DeepEqual(ee.indexReqOpts.Pipeline, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Pipeline)) } } @@ -169,8 +169,8 @@ func TestInitCase6(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.indxOpts.Routing, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.Routing)) + if !reflect.DeepEqual(ee.indexReqOpts.Routing, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Routing)) } } @@ -189,8 +189,8 @@ func TestInitCase8(t *testing.T) { t.Error(err) } eeExpect := utils.IntPointer(20) - if !reflect.DeepEqual(ee.indxOpts.Version, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.Version)) + if !reflect.DeepEqual(ee.indexReqOpts.Version, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Version)) } } @@ -209,8 +209,8 @@ func TestInitCase9(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.indxOpts.VersionType, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.VersionType)) + if !reflect.DeepEqual(ee.indexReqOpts.VersionType, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.VersionType)) } } @@ -228,8 +228,8 @@ func TestInitCase10(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.indxOpts.WaitForActiveShards, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.WaitForActiveShards)) + if !reflect.DeepEqual(ee.indexReqOpts.WaitForActiveShards, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.WaitForActiveShards)) } } @@ -257,7 +257,7 @@ func TestElasticExportEvent(t *testing.T) { if err = eEe.Connect(); err != nil { t.Error(err) } - eEe.eClnt.Transport = new(mockClientErr) + eEe.client.Transport = new(mockClientErr) if err := eEe.ExportEvent([]byte{}, ""); err != nil { t.Error(err) } @@ -287,7 +287,7 @@ func TestElasticExportEvent2(t *testing.T) { if err = eEe.Connect(); err != nil { t.Error(err) } - eEe.eClnt.Transport = new(mockClientErr2) + eEe.client.Transport = new(mockClientErr2) errExpect := io.EOF if err := eEe.ExportEvent([]byte{}, ""); err == nil || err != errExpect { @@ -322,7 +322,7 @@ func TestElasticExportEvent3(t *testing.T) { if err = eEe.Connect(); err != nil { t.Error(err) } - eEe.eClnt.Transport = new(mockClient) + eEe.client.Transport = new(mockClient) errExpect := `the client noticed that the server is not Elasticsearch and we do not support this unknown product` cgrCfg.EEsCfg().Exporters[0].ComputeFields() if err := eEe.ExportEvent([]byte{}, ""); err == nil || err.Error() != errExpect { @@ -351,11 +351,11 @@ func TestElasticExportEvent4(t *testing.T) { func TestElasticClose(t *testing.T) { elasticEE := &ElasticEE{ - eClnt: &elasticsearch.Client{}, + client: &elasticsearch.Client{}, } err := elasticEE.Close() - if elasticEE.eClnt != nil { - t.Errorf("expected eClnt to be nil, got %v", elasticEE.eClnt) + if elasticEE.client != nil { + t.Errorf("expected eClnt to be nil, got %v", elasticEE.client) } if err != nil { t.Errorf("expected no error, got %v", err) @@ -366,7 +366,7 @@ func TestElasticConnect(t *testing.T) { t.Run("ClientAlreadyExists", func(t *testing.T) { elasticEE := &ElasticEE{ - eClnt: &elasticsearch.Client{}, + client: &elasticsearch.Client{}, } err := elasticEE.Connect() @@ -374,7 +374,7 @@ func TestElasticConnect(t *testing.T) { if err != nil { t.Errorf("expected no error, got %v", err) } - if elasticEE.eClnt == nil { + if elasticEE.client == nil { t.Error("expected existing client to remain initialized") } }) @@ -388,7 +388,7 @@ func TestElasticConnect(t *testing.T) { if err != nil { t.Errorf("expected no error, got %v", err) } - if elasticEE.eClnt == nil { + if elasticEE.client == nil { t.Error("expected client to be initialized") } })