Added support for additional els features

This commit is contained in:
gezimbll
2023-06-12 10:48:49 -04:00
committed by Dan Christian Bogos
parent fcd393807a
commit 764d531beb
6 changed files with 117 additions and 95 deletions

View File

@@ -507,11 +507,12 @@ const CGRATES_CFG_JSON = `
// Elasticsearch options
"elsCloudID":"", //ELS Cloud Endpoint
"elsApiKey": "",
"elsUsername":"",
"elsPassword":"",
"discoverNodesOnStart":false
// "elsCloud":false, //ELS Cloud Endpoint
// "elsApiKey": "",
// "elsUsername":"",
// "elsPassword":"",
// "discoverNodesOnStart":false,
// "discoverNodesOnInterval":"100s",
// "elsIndex": "", // ElsIndex
// "elsIfPrimaryTerm": 0, // ElsIfPrimaryTerm
// "elsIfSeqNo": 0, // ElsIfSeqNo

View File

@@ -157,8 +157,9 @@ type EventExporterOpts struct {
CSVFieldSeparator *string
ElsIndex *string
ElsIfPrimaryTerm *int
DiscoverNodesOnStart *bool
ElsCloudID *string
ElsDiscoverNodesOnStart *bool
ElsDiscoverNodeInterval *time.Duration
ElsCloud *bool
ElsAPIKey *string
ElsUsername *string // Username for HTTP Basic Authentication.
ElsPassword *string
@@ -255,8 +256,8 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson)
if jsnCfg.CSVFieldSeparator != nil {
eeOpts.CSVFieldSeparator = jsnCfg.CSVFieldSeparator
}
if jsnCfg.ElsCloudID != nil {
eeOpts.ElsCloudID = jsnCfg.ElsCloudID
if jsnCfg.ElsCloud != nil {
eeOpts.ElsCloud = jsnCfg.ElsCloud
}
if jsnCfg.ElsAPIKey != nil {
eeOpts.ElsAPIKey = jsnCfg.ElsAPIKey
@@ -267,8 +268,15 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson)
if jsnCfg.ElsPassword != nil {
eeOpts.ElsPassword = jsnCfg.ElsPassword
}
if jsnCfg.DiscoverNodesOnStart != nil {
eeOpts.DiscoverNodesOnStart = jsnCfg.DiscoverNodesOnStart
if jsnCfg.ElsDiscoverNodesOnStart != nil {
eeOpts.ElsDiscoverNodesOnStart = jsnCfg.ElsDiscoverNodesOnStart
}
if jsnCfg.ElsDiscoverNodesInterval != nil {
var nodesInterval time.Duration
if nodesInterval, err = utils.ParseDurationWithSecs(*jsnCfg.ElsDiscoverNodesInterval); err != nil {
return
}
eeOpts.ElsDiscoverNodeInterval = utils.DurationPointer(nodesInterval)
}
if jsnCfg.ElsIndex != nil {
eeOpts.ElsIndex = jsnCfg.ElsIndex

View File

@@ -299,11 +299,12 @@ type EEsJsonCfg struct {
type EventExporterOptsJson struct {
CSVFieldSeparator *string `json:"csvFieldSeparator"`
ElsCloudID *string `json:"elsCloudID"`
ElsCloud *bool `json:"elsCloud"`
ElsAPIKey *string `json:"elsApiKey"`
ElsUsername *string `json:"elsUsername"`
ElsPassword *string `json:"elsPassword"`
DiscoverNodesOnStart *bool `json:"discoverNodesOnStart"`
ElsDiscoverNodesOnStart *bool `json:"elsDiscoverNodesOnStart"`
ElsDiscoverNodesInterval *string `json:"elsDiscoverNodesInterval"`
ElsIndex *string `json:"elsIndex"`
ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"`
ElsIfSeqNo *int `json:"elsIfSeqNo"`

View File

@@ -363,7 +363,10 @@
"attempts": 1,
"opts": {
"elsIndex": "cdrs",
"discoverNodesOnStart":true,
"elsDiscoverNodesOnStart":true,
//"elsCloud":false,
// "elsUsername":"",
// "elsPassword":"",
//"elsIfPrimaryTerm": 0,
//"elsIfSeqNo": 0,
"elsOpType": "",

View File

@@ -45,11 +45,12 @@ func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (eEe *
// ElasticEE implements EventExporter interface for ElasticSearch export
type ElasticEE struct {
cfg *config.EventExporterCfg
eClnt *elasticsearch.Client
dc *utils.SafeMapStorage
opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap
reqs *concReq
cfg *config.EventExporterCfg
eClnt *elasticsearch.Client
dc *utils.SafeMapStorage
indxOpts esapi.IndexRequest // this variable is used only for storing the options from OptsMap
reqs *concReq
clnOpts elasticsearch.Config
sync.RWMutex
bytePreparing
}
@@ -57,30 +58,52 @@ type ElasticEE struct {
// init will create all the necessary dependencies, including opening the file
func (eEe *ElasticEE) prepareOpts() (err error) {
//parse opts
eEe.opts.Index = utils.CDRsTBL
eEe.indxOpts.Index = utils.CDRsTBL
if eEe.Cfg().Opts.ElsIndex != nil {
eEe.opts.Index = *eEe.Cfg().Opts.ElsIndex
eEe.indxOpts.Index = *eEe.Cfg().Opts.ElsIndex
}
eEe.opts.IfPrimaryTerm = eEe.Cfg().Opts.ElsIfPrimaryTerm
eEe.opts.IfSeqNo = eEe.Cfg().Opts.ElsIfSeqNo
eEe.indxOpts.IfPrimaryTerm = eEe.Cfg().Opts.ElsIfPrimaryTerm
eEe.indxOpts.IfSeqNo = eEe.Cfg().Opts.ElsIfSeqNo
if eEe.Cfg().Opts.ElsOpType != nil {
eEe.opts.OpType = *eEe.Cfg().Opts.ElsOpType
eEe.indxOpts.OpType = *eEe.Cfg().Opts.ElsOpType
}
if eEe.Cfg().Opts.ElsPipeline != nil {
eEe.opts.Pipeline = *eEe.Cfg().Opts.ElsPipeline
eEe.indxOpts.Pipeline = *eEe.Cfg().Opts.ElsPipeline
}
if eEe.Cfg().Opts.ElsRouting != nil {
eEe.opts.Routing = *eEe.Cfg().Opts.ElsRouting
eEe.indxOpts.Routing = *eEe.Cfg().Opts.ElsRouting
}
if eEe.Cfg().Opts.ElsTimeout != nil {
eEe.opts.Timeout = *eEe.Cfg().Opts.ElsTimeout
eEe.indxOpts.Timeout = *eEe.Cfg().Opts.ElsTimeout
}
eEe.opts.Version = eEe.Cfg().Opts.ElsVersion
eEe.indxOpts.Version = eEe.Cfg().Opts.ElsVersion
if eEe.Cfg().Opts.ElsVersionType != nil {
eEe.opts.VersionType = *eEe.Cfg().Opts.ElsVersionType
eEe.indxOpts.VersionType = *eEe.Cfg().Opts.ElsVersionType
}
if eEe.Cfg().Opts.ElsWaitForActiveShards != nil {
eEe.opts.WaitForActiveShards = *eEe.Cfg().Opts.ElsWaitForActiveShards
eEe.indxOpts.WaitForActiveShards = *eEe.Cfg().Opts.ElsWaitForActiveShards
}
//client config
if eEe.Cfg().Opts.ElsCloud != nil && *eEe.Cfg().Opts.ElsCloud {
eEe.clnOpts.CloudID = eEe.Cfg().ExportPath
} else {
eEe.clnOpts.Addresses = strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep)
}
if eEe.Cfg().Opts.ElsUsername != nil {
eEe.clnOpts.Username = *eEe.Cfg().Opts.ElsUsername
}
if eEe.Cfg().Opts.ElsPassword != nil {
eEe.clnOpts.Password = *eEe.Cfg().Opts.ElsPassword
}
if eEe.Cfg().Opts.ElsAPIKey != nil {
eEe.clnOpts.APIKey = *eEe.Cfg().Opts.ElsAPIKey
}
if eEe.Cfg().Opts.ElsDiscoverNodesOnStart != nil {
eEe.clnOpts.DiscoverNodesOnStart = *eEe.Cfg().Opts.ElsDiscoverNodesOnStart
}
if eEe.Cfg().Opts.ElsDiscoverNodeInterval != nil {
eEe.clnOpts.DiscoverNodesInterval = *eEe.Cfg().Opts.ElsDiscoverNodeInterval
}
return
}
@@ -90,24 +113,10 @@ func (eEe *ElasticEE) Cfg() *config.EventExporterCfg { return eEe.cfg }
func (eEe *ElasticEE) Connect() (err error) {
eEe.Lock()
// create the client
if eEe.eClnt == nil {
if *eEe.Cfg().Opts.ElsCloudID != "" {
eEe.eClnt, err = elasticsearch.NewClient(
elasticsearch.Config{
CloudID: *eEe.Cfg().Opts.ElsCloudID,
APIKey: *eEe.Cfg().Opts.ElsAPIKey,
Username: *eEe.Cfg().Opts.ElsUsername,
Password: *eEe.Cfg().Opts.ElsPassword,
DiscoverNodesOnStart: *eEe.Cfg().Opts.DiscoverNodesOnStart})
} else {
eEe.eClnt, err = elasticsearch.NewClient(
elasticsearch.Config{
DiscoverNodesOnStart: *eEe.Cfg().Opts.DiscoverNodesOnStart,
Addresses: strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep),
},
)
}
if eEe.eClnt != nil {
return
}
eEe.eClnt, err = elasticsearch.NewClient(eEe.clnOpts)
eEe.Unlock()
return
}
@@ -124,19 +133,19 @@ func (eEe *ElasticEE) ExportEvent(ev any, key string) (err error) {
return utils.ErrDisconnected
}
eReq := esapi.IndexRequest{
Index: eEe.opts.Index,
Index: eEe.indxOpts.Index,
DocumentID: key,
Body: bytes.NewReader(ev.([]byte)),
Refresh: "true",
IfPrimaryTerm: eEe.opts.IfPrimaryTerm,
IfSeqNo: eEe.opts.IfSeqNo,
OpType: eEe.opts.OpType,
Pipeline: eEe.opts.Pipeline,
Routing: eEe.opts.Routing,
Timeout: eEe.opts.Timeout,
Version: eEe.opts.Version,
VersionType: eEe.opts.VersionType,
WaitForActiveShards: eEe.opts.WaitForActiveShards,
IfPrimaryTerm: eEe.indxOpts.IfPrimaryTerm,
IfSeqNo: eEe.indxOpts.IfSeqNo,
OpType: eEe.indxOpts.OpType,
Pipeline: eEe.indxOpts.Pipeline,
Routing: eEe.indxOpts.Routing,
Timeout: eEe.indxOpts.Timeout,
Version: eEe.indxOpts.Version,
VersionType: eEe.indxOpts.VersionType,
WaitForActiveShards: eEe.indxOpts.WaitForActiveShards,
}
var resp *esapi.Response

View File

@@ -65,8 +65,8 @@ func TestInitCase1(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.opts.Index, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.opts.Index)
if !reflect.DeepEqual(ee.indxOpts.Index, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.indxOpts.Index)
}
}
@@ -82,8 +82,8 @@ func TestInitCase2(t *testing.T) {
t.Error(err)
}
eeExpect := utils.IntPointer(20)
if !reflect.DeepEqual(ee.opts.IfPrimaryTerm, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.IfPrimaryTerm))
if !reflect.DeepEqual(ee.indxOpts.IfPrimaryTerm, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.IfPrimaryTerm))
}
}
@@ -99,8 +99,8 @@ func TestInitCase3(t *testing.T) {
t.Error(err)
}
eeExpect := utils.IntPointer(20)
if !reflect.DeepEqual(ee.opts.IfSeqNo, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.IfSeqNo))
if !reflect.DeepEqual(ee.indxOpts.IfSeqNo, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.IfSeqNo))
}
}
@@ -116,8 +116,8 @@ func TestInitCase4(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.opts.OpType, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.OpType))
if !reflect.DeepEqual(ee.indxOpts.OpType, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.OpType))
}
}
@@ -133,8 +133,8 @@ func TestInitCase5(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.opts.Pipeline, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.Pipeline))
if !reflect.DeepEqual(ee.indxOpts.Pipeline, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.Pipeline))
}
}
@@ -150,8 +150,8 @@ func TestInitCase6(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.opts.Routing, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.Routing))
if !reflect.DeepEqual(ee.indxOpts.Routing, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.Routing))
}
}
@@ -167,8 +167,8 @@ func TestInitCase8(t *testing.T) {
t.Error(err)
}
eeExpect := utils.IntPointer(20)
if !reflect.DeepEqual(ee.opts.Version, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.Version))
if !reflect.DeepEqual(ee.indxOpts.Version, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.Version))
}
}
@@ -184,8 +184,8 @@ func TestInitCase9(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.opts.VersionType, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.VersionType))
if !reflect.DeepEqual(ee.indxOpts.VersionType, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.VersionType))
}
}
@@ -201,8 +201,8 @@ func TestInitCase10(t *testing.T) {
t.Error(err)
}
eeExpect := "test"
if !reflect.DeepEqual(ee.opts.WaitForActiveShards, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.WaitForActiveShards))
if !reflect.DeepEqual(ee.indxOpts.WaitForActiveShards, eeExpect) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.WaitForActiveShards))
}
}
@@ -279,26 +279,26 @@ func (mockClient) Perform(req *http.Request) (res *http.Response, err error) {
return res, nil
}
// func TestElasticExportEvent3(t *testing.T) {
// cgrCfg := config.NewDefaultCGRConfig()
// dc, err := newEEMetrics("Local")
// if err != nil {
// t.Error(err)
// }
// eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
// if err != nil {
// t.Error(err)
// }
// if err = eEe.Connect(); err != nil {
// t.Error(err)
// }
// eEe.eClnt.Transport = new(mockClient)
// // errExpect := `unsupported protocol scheme ""`
// cgrCfg.EEsCfg().Exporters[0].ComputeFields()
// if err := eEe.ExportEvent([]byte{}, ""); err != nil {
// t.Error(err)
// }
// }
func TestElasticExportEvent3(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dc, err := newEEMetrics("Local")
if err != nil {
t.Error(err)
}
eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc)
if err != nil {
t.Error(err)
}
if err = eEe.Connect(); err != nil {
t.Error(err)
}
eEe.eClnt.Transport = new(mockClient)
// errExpect := `unsupported protocol scheme ""`
cgrCfg.EEsCfg().Exporters[0].ComputeFields()
if err := eEe.ExportEvent([]byte{}, ""); err != nil {
t.Error(err)
}
}
func TestElasticExportEvent4(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()