diff --git a/config/config_defaults.go b/config/config_defaults.go index 5ae93dae2..b52858e12 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -507,11 +507,12 @@ const CGRATES_CFG_JSON = ` // Elasticsearch options - "elsCloudID":"", //ELS Cloud Endpoint - "elsApiKey": "", - "elsUsername":"", - "elsPassword":"", - "discoverNodesOnStart":false + // "elsCloud":false, //ELS Cloud Endpoint + // "elsApiKey": "", + // "elsUsername":"", + // "elsPassword":"", + // "discoverNodesOnStart":false, + // "discoverNodesOnInterval":"100s", // "elsIndex": "", // ElsIndex // "elsIfPrimaryTerm": 0, // ElsIfPrimaryTerm // "elsIfSeqNo": 0, // ElsIfSeqNo diff --git a/config/eescfg.go b/config/eescfg.go index 9fa89b0b2..db208caa8 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -157,8 +157,9 @@ type EventExporterOpts struct { CSVFieldSeparator *string ElsIndex *string ElsIfPrimaryTerm *int - DiscoverNodesOnStart *bool - ElsCloudID *string + ElsDiscoverNodesOnStart *bool + ElsDiscoverNodeInterval *time.Duration + ElsCloud *bool ElsAPIKey *string ElsUsername *string // Username for HTTP Basic Authentication. ElsPassword *string @@ -255,8 +256,8 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) if jsnCfg.CSVFieldSeparator != nil { eeOpts.CSVFieldSeparator = jsnCfg.CSVFieldSeparator } - if jsnCfg.ElsCloudID != nil { - eeOpts.ElsCloudID = jsnCfg.ElsCloudID + if jsnCfg.ElsCloud != nil { + eeOpts.ElsCloud = jsnCfg.ElsCloud } if jsnCfg.ElsAPIKey != nil { eeOpts.ElsAPIKey = jsnCfg.ElsAPIKey @@ -267,8 +268,15 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) if jsnCfg.ElsPassword != nil { eeOpts.ElsPassword = jsnCfg.ElsPassword } - if jsnCfg.DiscoverNodesOnStart != nil { - eeOpts.DiscoverNodesOnStart = jsnCfg.DiscoverNodesOnStart + if jsnCfg.ElsDiscoverNodesOnStart != nil { + eeOpts.ElsDiscoverNodesOnStart = jsnCfg.ElsDiscoverNodesOnStart + } + if jsnCfg.ElsDiscoverNodesInterval != nil { + var nodesInterval time.Duration + if nodesInterval, err = utils.ParseDurationWithSecs(*jsnCfg.ElsDiscoverNodesInterval); err != nil { + return + } + eeOpts.ElsDiscoverNodeInterval = utils.DurationPointer(nodesInterval) } if jsnCfg.ElsIndex != nil { eeOpts.ElsIndex = jsnCfg.ElsIndex diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 5e7d49740..b78ef00f8 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -299,11 +299,12 @@ type EEsJsonCfg struct { type EventExporterOptsJson struct { CSVFieldSeparator *string `json:"csvFieldSeparator"` - ElsCloudID *string `json:"elsCloudID"` + ElsCloud *bool `json:"elsCloud"` ElsAPIKey *string `json:"elsApiKey"` ElsUsername *string `json:"elsUsername"` ElsPassword *string `json:"elsPassword"` - DiscoverNodesOnStart *bool `json:"discoverNodesOnStart"` + ElsDiscoverNodesOnStart *bool `json:"elsDiscoverNodesOnStart"` + ElsDiscoverNodesInterval *string `json:"elsDiscoverNodesInterval"` ElsIndex *string `json:"elsIndex"` ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"` ElsIfSeqNo *int `json:"elsIfSeqNo"` diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index 9e994a681..67e06d885 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -363,7 +363,10 @@ "attempts": 1, "opts": { "elsIndex": "cdrs", - "discoverNodesOnStart":true, + "elsDiscoverNodesOnStart":true, + //"elsCloud":false, + // "elsUsername":"", + // "elsPassword":"", //"elsIfPrimaryTerm": 0, //"elsIfSeqNo": 0, "elsOpType": "", diff --git a/ees/elastic.go b/ees/elastic.go index fba3352eb..370dc494b 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -45,11 +45,12 @@ func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (eEe * // ElasticEE implements EventExporter interface for ElasticSearch export type ElasticEE struct { - cfg *config.EventExporterCfg - eClnt *elasticsearch.Client - dc *utils.SafeMapStorage - opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap - reqs *concReq + cfg *config.EventExporterCfg + eClnt *elasticsearch.Client + dc *utils.SafeMapStorage + indxOpts esapi.IndexRequest // this variable is used only for storing the options from OptsMap + reqs *concReq + clnOpts elasticsearch.Config sync.RWMutex bytePreparing } @@ -57,30 +58,52 @@ type ElasticEE struct { // init will create all the necessary dependencies, including opening the file func (eEe *ElasticEE) prepareOpts() (err error) { //parse opts - eEe.opts.Index = utils.CDRsTBL + eEe.indxOpts.Index = utils.CDRsTBL if eEe.Cfg().Opts.ElsIndex != nil { - eEe.opts.Index = *eEe.Cfg().Opts.ElsIndex + eEe.indxOpts.Index = *eEe.Cfg().Opts.ElsIndex } - eEe.opts.IfPrimaryTerm = eEe.Cfg().Opts.ElsIfPrimaryTerm - eEe.opts.IfSeqNo = eEe.Cfg().Opts.ElsIfSeqNo + eEe.indxOpts.IfPrimaryTerm = eEe.Cfg().Opts.ElsIfPrimaryTerm + eEe.indxOpts.IfSeqNo = eEe.Cfg().Opts.ElsIfSeqNo if eEe.Cfg().Opts.ElsOpType != nil { - eEe.opts.OpType = *eEe.Cfg().Opts.ElsOpType + eEe.indxOpts.OpType = *eEe.Cfg().Opts.ElsOpType } if eEe.Cfg().Opts.ElsPipeline != nil { - eEe.opts.Pipeline = *eEe.Cfg().Opts.ElsPipeline + eEe.indxOpts.Pipeline = *eEe.Cfg().Opts.ElsPipeline } if eEe.Cfg().Opts.ElsRouting != nil { - eEe.opts.Routing = *eEe.Cfg().Opts.ElsRouting + eEe.indxOpts.Routing = *eEe.Cfg().Opts.ElsRouting } if eEe.Cfg().Opts.ElsTimeout != nil { - eEe.opts.Timeout = *eEe.Cfg().Opts.ElsTimeout + eEe.indxOpts.Timeout = *eEe.Cfg().Opts.ElsTimeout } - eEe.opts.Version = eEe.Cfg().Opts.ElsVersion + eEe.indxOpts.Version = eEe.Cfg().Opts.ElsVersion if eEe.Cfg().Opts.ElsVersionType != nil { - eEe.opts.VersionType = *eEe.Cfg().Opts.ElsVersionType + eEe.indxOpts.VersionType = *eEe.Cfg().Opts.ElsVersionType } if eEe.Cfg().Opts.ElsWaitForActiveShards != nil { - eEe.opts.WaitForActiveShards = *eEe.Cfg().Opts.ElsWaitForActiveShards + eEe.indxOpts.WaitForActiveShards = *eEe.Cfg().Opts.ElsWaitForActiveShards + } + + //client config + if eEe.Cfg().Opts.ElsCloud != nil && *eEe.Cfg().Opts.ElsCloud { + eEe.clnOpts.CloudID = eEe.Cfg().ExportPath + } else { + eEe.clnOpts.Addresses = strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep) + } + if eEe.Cfg().Opts.ElsUsername != nil { + eEe.clnOpts.Username = *eEe.Cfg().Opts.ElsUsername + } + if eEe.Cfg().Opts.ElsPassword != nil { + eEe.clnOpts.Password = *eEe.Cfg().Opts.ElsPassword + } + if eEe.Cfg().Opts.ElsAPIKey != nil { + eEe.clnOpts.APIKey = *eEe.Cfg().Opts.ElsAPIKey + } + if eEe.Cfg().Opts.ElsDiscoverNodesOnStart != nil { + eEe.clnOpts.DiscoverNodesOnStart = *eEe.Cfg().Opts.ElsDiscoverNodesOnStart + } + if eEe.Cfg().Opts.ElsDiscoverNodeInterval != nil { + eEe.clnOpts.DiscoverNodesInterval = *eEe.Cfg().Opts.ElsDiscoverNodeInterval } return } @@ -90,24 +113,10 @@ func (eEe *ElasticEE) Cfg() *config.EventExporterCfg { return eEe.cfg } func (eEe *ElasticEE) Connect() (err error) { eEe.Lock() // create the client - if eEe.eClnt == nil { - if *eEe.Cfg().Opts.ElsCloudID != "" { - eEe.eClnt, err = elasticsearch.NewClient( - elasticsearch.Config{ - CloudID: *eEe.Cfg().Opts.ElsCloudID, - APIKey: *eEe.Cfg().Opts.ElsAPIKey, - Username: *eEe.Cfg().Opts.ElsUsername, - Password: *eEe.Cfg().Opts.ElsPassword, - DiscoverNodesOnStart: *eEe.Cfg().Opts.DiscoverNodesOnStart}) - } else { - eEe.eClnt, err = elasticsearch.NewClient( - elasticsearch.Config{ - DiscoverNodesOnStart: *eEe.Cfg().Opts.DiscoverNodesOnStart, - Addresses: strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep), - }, - ) - } + if eEe.eClnt != nil { + return } + eEe.eClnt, err = elasticsearch.NewClient(eEe.clnOpts) eEe.Unlock() return } @@ -124,19 +133,19 @@ func (eEe *ElasticEE) ExportEvent(ev any, key string) (err error) { return utils.ErrDisconnected } eReq := esapi.IndexRequest{ - Index: eEe.opts.Index, + Index: eEe.indxOpts.Index, DocumentID: key, Body: bytes.NewReader(ev.([]byte)), Refresh: "true", - IfPrimaryTerm: eEe.opts.IfPrimaryTerm, - IfSeqNo: eEe.opts.IfSeqNo, - OpType: eEe.opts.OpType, - Pipeline: eEe.opts.Pipeline, - Routing: eEe.opts.Routing, - Timeout: eEe.opts.Timeout, - Version: eEe.opts.Version, - VersionType: eEe.opts.VersionType, - WaitForActiveShards: eEe.opts.WaitForActiveShards, + IfPrimaryTerm: eEe.indxOpts.IfPrimaryTerm, + IfSeqNo: eEe.indxOpts.IfSeqNo, + OpType: eEe.indxOpts.OpType, + Pipeline: eEe.indxOpts.Pipeline, + Routing: eEe.indxOpts.Routing, + Timeout: eEe.indxOpts.Timeout, + Version: eEe.indxOpts.Version, + VersionType: eEe.indxOpts.VersionType, + WaitForActiveShards: eEe.indxOpts.WaitForActiveShards, } var resp *esapi.Response diff --git a/ees/elastic_test.go b/ees/elastic_test.go index a4d94dad4..c8a617404 100644 --- a/ees/elastic_test.go +++ b/ees/elastic_test.go @@ -65,8 +65,8 @@ func TestInitCase1(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.opts.Index, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.opts.Index) + if !reflect.DeepEqual(ee.indxOpts.Index, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", eeExpect, ee.indxOpts.Index) } } @@ -82,8 +82,8 @@ func TestInitCase2(t *testing.T) { t.Error(err) } eeExpect := utils.IntPointer(20) - if !reflect.DeepEqual(ee.opts.IfPrimaryTerm, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.IfPrimaryTerm)) + if !reflect.DeepEqual(ee.indxOpts.IfPrimaryTerm, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.IfPrimaryTerm)) } } @@ -99,8 +99,8 @@ func TestInitCase3(t *testing.T) { t.Error(err) } eeExpect := utils.IntPointer(20) - if !reflect.DeepEqual(ee.opts.IfSeqNo, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.IfSeqNo)) + if !reflect.DeepEqual(ee.indxOpts.IfSeqNo, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.IfSeqNo)) } } @@ -116,8 +116,8 @@ func TestInitCase4(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.opts.OpType, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.OpType)) + if !reflect.DeepEqual(ee.indxOpts.OpType, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.OpType)) } } @@ -133,8 +133,8 @@ func TestInitCase5(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.opts.Pipeline, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.Pipeline)) + if !reflect.DeepEqual(ee.indxOpts.Pipeline, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.Pipeline)) } } @@ -150,8 +150,8 @@ func TestInitCase6(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.opts.Routing, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.Routing)) + if !reflect.DeepEqual(ee.indxOpts.Routing, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.Routing)) } } @@ -167,8 +167,8 @@ func TestInitCase8(t *testing.T) { t.Error(err) } eeExpect := utils.IntPointer(20) - if !reflect.DeepEqual(ee.opts.Version, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.Version)) + if !reflect.DeepEqual(ee.indxOpts.Version, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.Version)) } } @@ -184,8 +184,8 @@ func TestInitCase9(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.opts.VersionType, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.VersionType)) + if !reflect.DeepEqual(ee.indxOpts.VersionType, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.VersionType)) } } @@ -201,8 +201,8 @@ func TestInitCase10(t *testing.T) { t.Error(err) } eeExpect := "test" - if !reflect.DeepEqual(ee.opts.WaitForActiveShards, eeExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.opts.WaitForActiveShards)) + if !reflect.DeepEqual(ee.indxOpts.WaitForActiveShards, eeExpect) { + t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(ee.indxOpts.WaitForActiveShards)) } } @@ -279,26 +279,26 @@ func (mockClient) Perform(req *http.Request) (res *http.Response, err error) { return res, nil } -// func TestElasticExportEvent3(t *testing.T) { -// cgrCfg := config.NewDefaultCGRConfig() -// dc, err := newEEMetrics("Local") -// if err != nil { -// t.Error(err) -// } -// eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc) -// if err != nil { -// t.Error(err) -// } -// if err = eEe.Connect(); err != nil { -// t.Error(err) -// } -// eEe.eClnt.Transport = new(mockClient) -// // errExpect := `unsupported protocol scheme ""` -// cgrCfg.EEsCfg().Exporters[0].ComputeFields() -// if err := eEe.ExportEvent([]byte{}, ""); err != nil { -// t.Error(err) -// } -// } +func TestElasticExportEvent3(t *testing.T) { + cgrCfg := config.NewDefaultCGRConfig() + dc, err := newEEMetrics("Local") + if err != nil { + t.Error(err) + } + eEe, err := NewElasticEE(cgrCfg.EEsCfg().Exporters[0], dc) + if err != nil { + t.Error(err) + } + if err = eEe.Connect(); err != nil { + t.Error(err) + } + eEe.eClnt.Transport = new(mockClient) + // errExpect := `unsupported protocol scheme ""` + cgrCfg.EEsCfg().Exporters[0].ComputeFields() + if err := eEe.ExportEvent([]byte{}, ""); err != nil { + t.Error(err) + } +} func TestElasticExportEvent4(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig()