From d1f8630d75098da598e4725fcd264b1976873c88 Mon Sep 17 00:00:00 2001 From: TeoV Date: Fri, 2 Oct 2020 11:34:40 +0300 Subject: [PATCH] Add support for options in Elasticsearch Request --- ees/elastic.go | 69 +++++++++++++++++++++++++++++++++++++++++++------ utils/consts.go | 13 +++++++++- 2 files changed, 73 insertions(+), 9 deletions(-) diff --git a/ees/elastic.go b/ees/elastic.go index e789d9bed..e2841b0d0 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -45,12 +45,12 @@ func NewElasticExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Fi type ElasticEe struct { id string eClnt *elasticsearch.Client - index string cgrCfg *config.CGRConfig cfgIdx int // index of config instance within ERsCfg.Readers filterS *engine.FilterS sync.RWMutex - dc utils.MapStorage + dc utils.MapStorage + opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap } // init will create all the necessary dependencies, including opening the file @@ -62,10 +62,52 @@ func (eEe *ElasticEe) init() (err error) { }); err != nil { return } + //parse opts if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.Index]; !has { - eEe.index = utils.CDRsTBL + eEe.opts.Index = utils.CDRsTBL } else { - eEe.index = utils.IfaceAsString(val) + eEe.opts.Index = utils.IfaceAsString(val) + } + if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.IfPrimaryTerm]; has { + var intVal int64 + if intVal, err = utils.IfaceAsTInt64(val); err != nil { + return + } + eEe.opts.IfPrimaryTerm = utils.IntPointer(int(intVal)) + } + if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.IfSeqNo]; has { + var intVal int64 + if intVal, err = utils.IfaceAsTInt64(val); err != nil { + return + } + eEe.opts.IfSeqNo = utils.IntPointer(int(intVal)) + } + if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.OpType]; has { + eEe.opts.OpType = utils.IfaceAsString(val) + } + if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.Pipeline]; has { + eEe.opts.Pipeline = utils.IfaceAsString(val) + } + if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.Routing]; has { + eEe.opts.Routing = utils.IfaceAsString(val) + } + if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.Timeout]; has { + if eEe.opts.Timeout, err = utils.IfaceAsDuration(val); err != nil { + return + } + } + if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.VersionLow]; has { + var intVal int64 + if intVal, err = utils.IfaceAsTInt64(val); err != nil { + return + } + eEe.opts.Version = utils.IntPointer(int(intVal)) + } + if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.VersionType]; has { + eEe.opts.VersionType = utils.IfaceAsString(val) + } + if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.WaitForActiveShards]; has { + eEe.opts.WaitForActiveShards = utils.IfaceAsString(val) } return } @@ -128,11 +170,22 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID()) runID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault) eReq := esapi.IndexRequest{ - Index: eEe.index, - DocumentID: utils.ConcatenatedKey(cgrID, runID), - Body: strings.NewReader(utils.ToJSON(valMp)), - Refresh: "true", + Index: eEe.opts.Index, + DocumentID: utils.ConcatenatedKey(cgrID, runID), + Body: strings.NewReader(utils.ToJSON(valMp)), + Refresh: "true", + IfPrimaryTerm: eEe.opts.IfPrimaryTerm, + IfSeqNo: eEe.opts.IfSeqNo, + OpType: eEe.opts.OpType, + Parent: eEe.opts.Parent, + Pipeline: eEe.opts.Pipeline, + Routing: eEe.opts.Routing, + Timeout: eEe.opts.Timeout, + Version: eEe.opts.Version, + VersionType: eEe.opts.VersionType, + WaitForActiveShards: eEe.opts.WaitForActiveShards, } + var resp *esapi.Response if resp, err = eReq.Do(context.Background(), eEe.eClnt); err != nil { resp.Body.Close() diff --git a/utils/consts.go b/utils/consts.go index 1bc8ffbd1..6af64016a 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -865,7 +865,6 @@ const ( MetaMonthlyEstimated = "*monthly_estimated" ProcessRuns = "ProcessRuns" HashtagSep = "#" - Index = "index" ) // Migrator Action @@ -2382,6 +2381,18 @@ const ( OptsRouteID = "*routeID" // EEs OptsEEsVerbose = "*eesVerbose" + // EEs Elasticsearch options + Index = "index" + IfPrimaryTerm = "if_primary_term" + IfSeqNo = "if_seq_no" + OpType = "op_type" + Pipeline = "pipeline" + RequireAlias = "require_alias" + Routing = "routing" + Timeout = "timeout" + VersionLow = "version" + VersionType = "version_type" + WaitForActiveShards = "wait_for_active_shards" // Others OptsContext = "*context" Subsys = "*subsys"