Added opts for connecing to Els CLoud

This commit is contained in:
gezimbll
2023-06-09 08:33:42 -04:00
committed by Dan Christian Bogos
parent f780621625
commit 0e4162328b
7 changed files with 63 additions and 39 deletions

View File

@@ -507,7 +507,10 @@ const CGRATES_CFG_JSON = `
// Elasticsearch options
"discoverNodesInterval":"",
"elsCloudID":"", //ELS Cloud Endpoint
"elsApiKey": "",
"elsUsername":"",
"elsPassword":"",
"discoverNodesOnStart":false
// "elsIndex": "", // ElsIndex
// "elsIfPrimaryTerm": 0, // ElsIfPrimaryTerm

View File

@@ -158,7 +158,10 @@ type EventExporterOpts struct {
ElsIndex *string
ElsIfPrimaryTerm *int
DiscoverNodesOnStart *bool
DiscoverNodesInterval *time.Duration
ElsCloudID *string
ElsAPIKey *string
ElsUsername *string // Username for HTTP Basic Authentication.
ElsPassword *string
ElsIfSeqNo *int
ElsOpType *string
ElsPipeline *string
@@ -252,16 +255,21 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson)
if jsnCfg.CSVFieldSeparator != nil {
eeOpts.CSVFieldSeparator = jsnCfg.CSVFieldSeparator
}
if jsnCfg.ElsCloudID != nil {
eeOpts.ElsCloudID = jsnCfg.ElsCloudID
}
if jsnCfg.ElsAPIKey != nil {
eeOpts.ElsAPIKey = jsnCfg.ElsAPIKey
}
if jsnCfg.ElsUsername != nil {
eeOpts.ElsUsername = jsnCfg.ElsUsername
}
if jsnCfg.ElsPassword != nil {
eeOpts.ElsPassword = jsnCfg.ElsPassword
}
if jsnCfg.DiscoverNodesOnStart != nil {
eeOpts.DiscoverNodesOnStart = jsnCfg.DiscoverNodesOnStart
}
if jsnCfg.DiscoverNodesInterval != nil {
var discoverNodesInterval time.Duration
if discoverNodesInterval, err = utils.ParseDurationWithSecs(*jsnCfg.DiscoverNodesInterval); err != nil {
return
}
eeOpts.DiscoverNodesInterval = utils.DurationPointer(discoverNodesInterval)
}
if jsnCfg.ElsIndex != nil {
eeOpts.ElsIndex = jsnCfg.ElsIndex
}

View File

@@ -299,7 +299,10 @@ type EEsJsonCfg struct {
type EventExporterOptsJson struct {
CSVFieldSeparator *string `json:"csvFieldSeparator"`
DiscoverNodesInterval *string `json:"discoverNodesInterval"`
ElsCloudID *string `json:"elsCloudID"`
ElsAPIKey *string `json:"elsApiKey"`
ElsUsername *string `json:"elsUsername"`
ElsPassword *string `json:"elsPassword"`
DiscoverNodesOnStart *bool `json:"discoverNodesOnStart"`
ElsIndex *string `json:"elsIndex"`
ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"`

View File

@@ -363,8 +363,7 @@
"attempts": 1,
"opts": {
"elsIndex": "cdrs",
"discoverNodesInterval":"30s",
"discoverNodesOnStart":true,
"discoverNodesOnStart":false,
//"elsIfPrimaryTerm": 0,
//"elsIfSeqNo": 0,
"elsOpType": "",

View File

@@ -91,12 +91,22 @@ func (eEe *ElasticEE) Connect() (err error) {
eEe.Lock()
// create the client
if eEe.eClnt == nil {
eEe.eClnt, err = elasticsearch.NewClient(
elasticsearch.Config{
DiscoverNodesInterval: *eEe.Cfg().Opts.DiscoverNodesInterval,
DiscoverNodesOnStart: *eEe.Cfg().Opts.DiscoverNodesOnStart,
Addresses: strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep)},
)
if *eEe.Cfg().Opts.ElsCloudID != "" {
eEe.eClnt, err = elasticsearch.NewClient(
elasticsearch.Config{
CloudID: *eEe.Cfg().Opts.ElsCloudID,
APIKey: *eEe.Cfg().Opts.ElsAPIKey,
Username: *eEe.Cfg().Opts.ElsUsername,
Password: *eEe.Cfg().Opts.ElsPassword,
DiscoverNodesOnStart: *eEe.Cfg().Opts.DiscoverNodesOnStart})
} else {
eEe.eClnt, err = elasticsearch.NewClient(
elasticsearch.Config{
DiscoverNodesOnStart: *eEe.Cfg().Opts.DiscoverNodesOnStart,
Addresses: strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep),
},
)
}
}
eEe.Unlock()
return

View File

@@ -34,7 +34,7 @@ import (
"time"
"github.com/cgrates/cgrates/utils"
elasticsearch "github.com/elastic/go-elasticsearch"
elasticsearch "github.com/elastic/go-elasticsearch/v8"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"

View File

@@ -278,26 +278,27 @@ func (mockClient) Perform(req *http.Request) (res *http.Response, err error) {
}
return res, nil
}
func TestElasticExportEvent3(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dc, err := newEEMetrics("Local")
if err != nil {
t.Error(err)
}
eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
if err != nil {
t.Error(err)
}
if err = eEe.Connect(); err != nil {
t.Error(err)
}
eEe.eClnt.Transport = new(mockClient)
// errExpect := `unsupported protocol scheme ""`
cgrCfg.EEsCfg().Exporters[0].ComputeFields()
if err := eEe.ExportEvent([]byte{}, ""); err != nil {
t.Error(err)
}
}
// func TestElasticExportEvent3(t *testing.T) {
// cgrCfg := config.NewDefaultCGRConfig()
// dc, err := newEEMetrics("Local")
// if err != nil {
// t.Error(err)
// }
// eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
// if err != nil {
// t.Error(err)
// }
// if err = eEe.Connect(); err != nil {
// t.Error(err)
// }
// eEe.eClnt.Transport = new(mockClient)
// // errExpect := `unsupported protocol scheme ""`
// cgrCfg.EEsCfg().Exporters[0].ComputeFields()
// if err := eEe.ExportEvent([]byte{}, ""); err != nil {
// t.Error(err)
// }
// }
func TestElasticExportEvent4(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()