Add support for options in Elasticsearch Request

This commit is contained in:
TeoV
2020-10-02 11:34:40 +03:00
committed by Dan Christian Bogos
parent b34e482d0f
commit d1f8630d75
2 changed files with 73 additions and 9 deletions

View File

@@ -45,12 +45,12 @@ func NewElasticExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Fi
type ElasticEe struct {
id string
eClnt *elasticsearch.Client
index string
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
sync.RWMutex
dc utils.MapStorage
dc utils.MapStorage
opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap
}
// init will create all the necessary dependencies, including opening the file
@@ -62,10 +62,52 @@ func (eEe *ElasticEe) init() (err error) {
}); err != nil {
return
}
//parse opts
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.Index]; !has {
eEe.index = utils.CDRsTBL
eEe.opts.Index = utils.CDRsTBL
} else {
eEe.index = utils.IfaceAsString(val)
eEe.opts.Index = utils.IfaceAsString(val)
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.IfPrimaryTerm]; has {
var intVal int64
if intVal, err = utils.IfaceAsTInt64(val); err != nil {
return
}
eEe.opts.IfPrimaryTerm = utils.IntPointer(int(intVal))
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.IfSeqNo]; has {
var intVal int64
if intVal, err = utils.IfaceAsTInt64(val); err != nil {
return
}
eEe.opts.IfSeqNo = utils.IntPointer(int(intVal))
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.OpType]; has {
eEe.opts.OpType = utils.IfaceAsString(val)
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.Pipeline]; has {
eEe.opts.Pipeline = utils.IfaceAsString(val)
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.Routing]; has {
eEe.opts.Routing = utils.IfaceAsString(val)
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.Timeout]; has {
if eEe.opts.Timeout, err = utils.IfaceAsDuration(val); err != nil {
return
}
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.VersionLow]; has {
var intVal int64
if intVal, err = utils.IfaceAsTInt64(val); err != nil {
return
}
eEe.opts.Version = utils.IntPointer(int(intVal))
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.VersionType]; has {
eEe.opts.VersionType = utils.IfaceAsString(val)
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.WaitForActiveShards]; has {
eEe.opts.WaitForActiveShards = utils.IfaceAsString(val)
}
return
}
@@ -128,11 +170,22 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID())
runID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault)
eReq := esapi.IndexRequest{
Index: eEe.index,
DocumentID: utils.ConcatenatedKey(cgrID, runID),
Body: strings.NewReader(utils.ToJSON(valMp)),
Refresh: "true",
Index: eEe.opts.Index,
DocumentID: utils.ConcatenatedKey(cgrID, runID),
Body: strings.NewReader(utils.ToJSON(valMp)),
Refresh: "true",
IfPrimaryTerm: eEe.opts.IfPrimaryTerm,
IfSeqNo: eEe.opts.IfSeqNo,
OpType: eEe.opts.OpType,
Parent: eEe.opts.Parent,
Pipeline: eEe.opts.Pipeline,
Routing: eEe.opts.Routing,
Timeout: eEe.opts.Timeout,
Version: eEe.opts.Version,
VersionType: eEe.opts.VersionType,
WaitForActiveShards: eEe.opts.WaitForActiveShards,
}
var resp *esapi.Response
if resp, err = eReq.Do(context.Background(), eEe.eClnt); err != nil {
resp.Body.Close()

View File

@@ -865,7 +865,6 @@ const (
MetaMonthlyEstimated = "*monthly_estimated"
ProcessRuns = "ProcessRuns"
HashtagSep = "#"
Index = "index"
)
// Migrator Action
@@ -2382,6 +2381,18 @@ const (
OptsRouteID = "*routeID"
// EEs
OptsEEsVerbose = "*eesVerbose"
// EEs Elasticsearch options
Index = "index"
IfPrimaryTerm = "if_primary_term"
IfSeqNo = "if_seq_no"
OpType = "op_type"
Pipeline = "pipeline"
RequireAlias = "require_alias"
Routing = "routing"
Timeout = "timeout"
VersionLow = "version"
VersionType = "version_type"
WaitForActiveShards = "wait_for_active_shards"
// Others
OptsContext = "*context"
Subsys = "*subsys"