diff --git a/ees/ee_test.go b/ees/ee_test.go index 942dec38c..a9c23738c 100644 --- a/ees/ee_test.go +++ b/ees/ee_test.go @@ -219,7 +219,7 @@ func TestNewEventExporterCase7(t *testing.T) { newEE := ee.(*ElasticEE) newEE.dc.MapStorage[utils.TimeNow] = nil eeExpect.dc.MapStorage[utils.TimeNow] = nil - eeExpect.eClnt = newEE.eClnt + eeExpect.client = newEE.client if !reflect.DeepEqual(eeExpect, newEE) { t.Errorf("Expected %+v \n but got %+v", eeExpect, newEE) } diff --git a/ees/elastic.go b/ees/elastic.go index 8aa845c7f..dec5e3cf2 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -36,192 +36,211 @@ import ( elasticsearch "github.com/elastic/go-elasticsearch/v8" ) -func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (eEe *ElasticEE, err error) { - eEe = &ElasticEE{ +func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (*ElasticEE, error) { + el := &ElasticEE{ cfg: cfg, dc: dc, reqs: newConcReq(cfg.ConcurrentRequests), } - err = eEe.prepareOpts() - return + if err := el.prepareOpts(); err != nil { + return nil, err + } + return el, nil } -// ElasticEE implements EventExporter interface for ElasticSearch export +// ElasticEE implements EventExporter interface for ElasticSearch export. type ElasticEE struct { - cfg *config.EventExporterCfg - eClnt *elasticsearch.Client - dc *utils.SafeMapStorage - opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap - clntOpts elasticsearch.Config - reqs *concReq - sync.RWMutex + mu sync.RWMutex + cfg *config.EventExporterCfg + dc *utils.SafeMapStorage + reqs *concReq bytePreparing + + client *elasticsearch.Client + clientCfg elasticsearch.Config + + // indexReqOpts is used to store IndexRequest options for convenience + // and does not represent the IndexRequest itself. + indexReqOpts esapi.IndexRequest } // init will create all the necessary dependencies, including opening the file -func (eEe *ElasticEE) prepareOpts() (err error) { - //parse opts - eEe.opts.Index = utils.CDRsTBL - if eEe.Cfg().Opts.ElsIndex != nil { - eEe.opts.Index = *eEe.Cfg().Opts.ElsIndex +func (e *ElasticEE) prepareOpts() error { + opts := e.cfg.Opts + + // Parse index request options. + e.indexReqOpts.Index = utils.CDRsTBL + if opts.ElsIndex != nil { + e.indexReqOpts.Index = *opts.ElsIndex } - eEe.opts.IfPrimaryTerm = eEe.Cfg().Opts.ElsIfPrimaryTerm - eEe.opts.IfSeqNo = eEe.Cfg().Opts.ElsIfSeqNo - if eEe.Cfg().Opts.ElsOpType != nil { - eEe.opts.OpType = *eEe.Cfg().Opts.ElsOpType + e.indexReqOpts.IfPrimaryTerm = opts.ElsIfPrimaryTerm + e.indexReqOpts.IfSeqNo = opts.ElsIfSeqNo + if opts.ElsOpType != nil { + e.indexReqOpts.OpType = *opts.ElsOpType } - if eEe.Cfg().Opts.ElsPipeline != nil { - eEe.opts.Pipeline = *eEe.Cfg().Opts.ElsPipeline + if opts.ElsPipeline != nil { + e.indexReqOpts.Pipeline = *opts.ElsPipeline } - if eEe.Cfg().Opts.ElsRouting != nil { - eEe.opts.Routing = *eEe.Cfg().Opts.ElsRouting + if opts.ElsRouting != nil { + e.indexReqOpts.Routing = *opts.ElsRouting } - if eEe.Cfg().Opts.ElsTimeout != nil { - eEe.opts.Timeout = *eEe.Cfg().Opts.ElsTimeout + if opts.ElsTimeout != nil { + e.indexReqOpts.Timeout = *opts.ElsTimeout } - eEe.opts.Version = eEe.Cfg().Opts.ElsVersion - if eEe.Cfg().Opts.ElsVersionType != nil { - eEe.opts.VersionType = *eEe.Cfg().Opts.ElsVersionType + e.indexReqOpts.Version = opts.ElsVersion + if opts.ElsVersionType != nil { + e.indexReqOpts.VersionType = *opts.ElsVersionType } - if eEe.Cfg().Opts.ElsWaitForActiveShards != nil { - eEe.opts.WaitForActiveShards = *eEe.Cfg().Opts.ElsWaitForActiveShards + if opts.ElsWaitForActiveShards != nil { + e.indexReqOpts.WaitForActiveShards = *opts.ElsWaitForActiveShards } - //client opts - if eEe.Cfg().Opts.ElsCloud != nil && *eEe.Cfg().Opts.ElsCloud { - eEe.clntOpts.CloudID = eEe.Cfg().ExportPath + // Parse client config options. + if opts.ElsCloud != nil && *opts.ElsCloud { + e.clientCfg.CloudID = e.Cfg().ExportPath } else { - eEe.clntOpts.Addresses = strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep) + e.clientCfg.Addresses = strings.Split(e.Cfg().ExportPath, utils.InfieldSep) } - if eEe.Cfg().Opts.ElsUsername != nil { - eEe.clntOpts.Username = *eEe.Cfg().Opts.ElsUsername + if opts.ElsUsername != nil { + e.clientCfg.Username = *opts.ElsUsername } - if eEe.Cfg().Opts.ElsPassword != nil { - eEe.clntOpts.Password = *eEe.Cfg().Opts.ElsPassword + if opts.ElsPassword != nil { + e.clientCfg.Password = *opts.ElsPassword } - if eEe.Cfg().Opts.ElsAPIKey != nil { - eEe.clntOpts.APIKey = *eEe.Cfg().Opts.ElsAPIKey + if opts.ElsAPIKey != nil { + e.clientCfg.APIKey = *opts.ElsAPIKey } - if eEe.Cfg().Opts.CAPath != nil { - var cacert []byte - cacert, err = os.ReadFile(*eEe.Cfg().Opts.CAPath) + if opts.CAPath != nil { + cacert, err := os.ReadFile(*opts.CAPath) if err != nil { - return + return err } - eEe.clntOpts.CACert = cacert + e.clientCfg.CACert = cacert } - if eEe.Cfg().Opts.ElsCertificateFingerprint != nil { - eEe.clntOpts.CertificateFingerprint = *eEe.Cfg().Opts.ElsCertificateFingerprint + if opts.ElsCertificateFingerprint != nil { + e.clientCfg.CertificateFingerprint = *opts.ElsCertificateFingerprint } - if eEe.Cfg().Opts.ElsServiceToken != nil { - eEe.clntOpts.ServiceToken = *eEe.Cfg().Opts.ElsServiceToken + if opts.ElsServiceToken != nil { + e.clientCfg.ServiceToken = *opts.ElsServiceToken } - if eEe.Cfg().Opts.ElsDiscoverNodesOnStart != nil { - eEe.clntOpts.DiscoverNodesOnStart = *eEe.Cfg().Opts.ElsDiscoverNodesOnStart + if opts.ElsDiscoverNodesOnStart != nil { + e.clientCfg.DiscoverNodesOnStart = *opts.ElsDiscoverNodesOnStart } - if eEe.Cfg().Opts.ElsDiscoverNodeInterval != nil { - eEe.clntOpts.DiscoverNodesInterval = *eEe.Cfg().Opts.ElsDiscoverNodeInterval + if opts.ElsDiscoverNodeInterval != nil { + e.clientCfg.DiscoverNodesInterval = *opts.ElsDiscoverNodeInterval } - if eEe.Cfg().Opts.ElsEnableDebugLogger != nil { - eEe.clntOpts.EnableDebugLogger = *eEe.Cfg().Opts.ElsEnableDebugLogger + if opts.ElsEnableDebugLogger != nil { + e.clientCfg.EnableDebugLogger = *opts.ElsEnableDebugLogger } - if loggerType := eEe.Cfg().Opts.ElsLogger; loggerType != nil { + if loggerType := opts.ElsLogger; loggerType != nil { var logger elastictransport.Logger switch *loggerType { case utils.ElsJson: - logger = &elastictransport.JSONLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true} + logger = &elastictransport.JSONLogger{ + Output: os.Stdout, + EnableRequestBody: true, + EnableResponseBody: true, + } case utils.ElsColor: - logger = &elastictransport.ColorLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true} + logger = &elastictransport.ColorLogger{ + Output: os.Stdout, + EnableRequestBody: true, + EnableResponseBody: true, + } case utils.ElsText: - logger = &elastictransport.TextLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true} + logger = &elastictransport.TextLogger{ + Output: os.Stdout, + EnableRequestBody: true, + EnableResponseBody: true, + } default: - return + return fmt.Errorf("invalid logger type: %q", *loggerType) } - eEe.clntOpts.Logger = logger + e.clientCfg.Logger = logger } - if eEe.Cfg().Opts.ElsCompressRequestBody != nil { - eEe.clntOpts.CompressRequestBody = *eEe.Cfg().Opts.ElsCompressRequestBody + if opts.ElsCompressRequestBody != nil { + e.clientCfg.CompressRequestBody = *opts.ElsCompressRequestBody } - if eEe.Cfg().Opts.ElsRetryOnStatus != nil { - eEe.clntOpts.RetryOnStatus = *eEe.Cfg().Opts.ElsRetryOnStatus + if opts.ElsRetryOnStatus != nil { + e.clientCfg.RetryOnStatus = *opts.ElsRetryOnStatus } - if eEe.Cfg().Opts.ElsMaxRetries != nil { - eEe.clntOpts.MaxRetries = *eEe.Cfg().Opts.ElsMaxRetries + if opts.ElsMaxRetries != nil { + e.clientCfg.MaxRetries = *opts.ElsMaxRetries } - if eEe.Cfg().Opts.ElsDisableRetry != nil { - eEe.clntOpts.DisableRetry = *eEe.Cfg().Opts.ElsDisableRetry + if opts.ElsDisableRetry != nil { + e.clientCfg.DisableRetry = *opts.ElsDisableRetry } - if eEe.Cfg().Opts.ElsCompressRequestBodyLevel != nil { - eEe.clntOpts.CompressRequestBodyLevel = *eEe.Cfg().Opts.ElsCompressRequestBodyLevel + if opts.ElsCompressRequestBodyLevel != nil { + e.clientCfg.CompressRequestBodyLevel = *opts.ElsCompressRequestBodyLevel } - return + return nil } -func (eEe *ElasticEE) Cfg() *config.EventExporterCfg { return eEe.cfg } +func (e *ElasticEE) Cfg() *config.EventExporterCfg { return e.cfg } -func (eEe *ElasticEE) Connect() (err error) { - eEe.Lock() - defer eEe.Unlock() - // create the client - if eEe.eClnt != nil { +func (e *ElasticEE) Connect() (err error) { + e.mu.Lock() + defer e.mu.Unlock() + if e.client != nil { // check if connection is cached return } - eEe.eClnt, err = elasticsearch.NewClient(eEe.clntOpts) + e.client, err = elasticsearch.NewClient(e.clientCfg) return } // ExportEvent implements EventExporter -func (eEe *ElasticEE) ExportEvent(ctx *context.Context, ev, extraData any) (err error) { - eEe.reqs.get() - eEe.RLock() +func (e *ElasticEE) ExportEvent(ctx *context.Context, ev, extraData any) error { + e.reqs.get() + e.mu.RLock() defer func() { - eEe.RUnlock() - eEe.reqs.done() + e.mu.RUnlock() + e.reqs.done() }() - if eEe.eClnt == nil { + if e.client == nil { return utils.ErrDisconnected } key := extraData.(string) - eReq := esapi.IndexRequest{ - Index: eEe.opts.Index, + req := esapi.IndexRequest{ DocumentID: key, Body: bytes.NewReader(ev.([]byte)), Refresh: "true", - IfPrimaryTerm: eEe.opts.IfPrimaryTerm, - IfSeqNo: eEe.opts.IfSeqNo, - OpType: eEe.opts.OpType, - Pipeline: eEe.opts.Pipeline, - Routing: eEe.opts.Routing, - Timeout: eEe.opts.Timeout, - Version: eEe.opts.Version, - VersionType: eEe.opts.VersionType, - WaitForActiveShards: eEe.opts.WaitForActiveShards, + Index: e.indexReqOpts.Index, + IfPrimaryTerm: e.indexReqOpts.IfPrimaryTerm, + IfSeqNo: e.indexReqOpts.IfSeqNo, + OpType: e.indexReqOpts.OpType, + Pipeline: e.indexReqOpts.Pipeline, + Routing: e.indexReqOpts.Routing, + Timeout: e.indexReqOpts.Timeout, + Version: e.indexReqOpts.Version, + VersionType: e.indexReqOpts.VersionType, + WaitForActiveShards: e.indexReqOpts.WaitForActiveShards, } - var resp *esapi.Response - if resp, err = eReq.Do(ctx, eEe.eClnt); err != nil { - return + resp, err := req.Do(ctx, e.client) + if err != nil { + return err } defer resp.Body.Close() if resp.IsError() { - var e map[string]any - if err = json.NewDecoder(resp.Body).Decode(&e); err != nil { - return + var errResp map[string]any + if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil { + return err } - utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%+v> when indexing document", - utils.EEs, eEe.Cfg().ID, e)) + utils.Logger.Warning(fmt.Sprintf( + "<%s> exporter %q: failed to index document: %+v", + utils.EEs, e.Cfg().ID, errResp)) } - return + return nil } -func (eEe *ElasticEE) Close() (_ error) { - eEe.Lock() - eEe.eClnt = nil - eEe.Unlock() - return +func (e *ElasticEE) Close() error { + e.mu.Lock() + e.client = nil + e.mu.Unlock() + return nil } -func (eEe *ElasticEE) GetMetrics() *utils.SafeMapStorage { return eEe.dc } +func (e *ElasticEE) GetMetrics() *utils.SafeMapStorage { return e.dc } func (eEE *ElasticEE) ExtraData(ev *utils.CGREvent) any { return utils.ConcatenatedKey( diff --git a/ees/elastic_test.go b/ees/elastic_test.go index f589ae3b1..b990bf6bc 100644 --- a/ees/elastic_test.go +++ b/ees/elastic_test.go @@ -70,8 +70,8 @@ func TestInitCase1(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.opts.Index, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.opts.Index) + if !reflect.DeepEqual(ee.indexReqOpts.Index, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.indexReqOpts.Index) } } @@ -87,8 +87,8 @@ func TestInitCase2(t *testing.T) { t.Error(err) } eeExpect := utils.IntPointer(20) - if !reflect.DeepEqual(ee.opts.IfPrimaryTerm, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.IfPrimaryTerm)) + if !reflect.DeepEqual(ee.indexReqOpts.IfPrimaryTerm, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfPrimaryTerm)) } } @@ -104,8 +104,8 @@ func TestInitCase3(t *testing.T) { t.Error(err) } eeExpect := utils.IntPointer(20) - if !reflect.DeepEqual(ee.opts.IfSeqNo, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.IfSeqNo)) + if !reflect.DeepEqual(ee.indexReqOpts.IfSeqNo, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.IfSeqNo)) } } @@ -121,8 +121,8 @@ func TestInitCase4(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.opts.OpType, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.OpType)) + if !reflect.DeepEqual(ee.indexReqOpts.OpType, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.OpType)) } } @@ -138,8 +138,8 @@ func TestInitCase5(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.opts.Pipeline, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.Pipeline)) + if !reflect.DeepEqual(ee.indexReqOpts.Pipeline, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Pipeline)) } } @@ -155,8 +155,8 @@ func TestInitCase6(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.opts.Routing, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.Routing)) + if !reflect.DeepEqual(ee.indexReqOpts.Routing, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Routing)) } } @@ -172,8 +172,8 @@ func TestInitCase8(t *testing.T) { t.Error(err) } eeExpect := utils.IntPointer(20) - if !reflect.DeepEqual(ee.opts.Version, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.Version)) + if !reflect.DeepEqual(ee.indexReqOpts.Version, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.Version)) } } @@ -189,8 +189,8 @@ func TestInitCase9(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.opts.VersionType, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.VersionType)) + if !reflect.DeepEqual(ee.indexReqOpts.VersionType, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.VersionType)) } } @@ -206,8 +206,8 @@ func TestInitCase10(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.opts.WaitForActiveShards, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.WaitForActiveShards)) + if !reflect.DeepEqual(ee.indexReqOpts.WaitForActiveShards, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indexReqOpts.WaitForActiveShards)) } } @@ -235,7 +235,7 @@ func TestElasticExportEvent(t *testing.T) { if err = eEe.Connect(); err != nil { t.Error(err) } - eEe.eClnt.Transport = new(mockClientErr) + eEe.client.Transport = new(mockClientErr) if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err != nil { t.Error(err) } @@ -265,7 +265,7 @@ func TestElasticExportEvent2(t *testing.T) { if err = eEe.Connect(); err != nil { t.Error(err) } - eEe.eClnt.Transport = new(mockClientErr2) + eEe.client.Transport = new(mockClientErr2) errExpect := io.EOF if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err == nil || err != errExpect { @@ -296,7 +296,7 @@ func TestElasticExportEvent3(t *testing.T) { if err = eEe.Connect(); err != nil { t.Error(err) } - eEe.eClnt.Transport = new(mockClient) + eEe.client.Transport = new(mockClient) errExpect := `the client noticed that the server is not Elasticsearch and we do not support this unknown product` cgrCfg.EEsCfg().Exporters[0].ComputeFields()