diff --git a/ees/ee.go b/ees/ee.go index cb7dc5a7f..ae694aa92 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -52,6 +52,8 @@ func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Filt return NewPosterJSONMapEE(cgrCfg, cfgIdx, filterS, dc) case utils.MetaVirt: return NewVirtualExporter(cgrCfg, cfgIdx, filterS, dc) + case utils.MetaElastic: + return NewElasticExporter(cgrCfg, cfgIdx, filterS, dc) default: return nil, fmt.Errorf("unsupported exporter type: <%s>", cgrCfg.EEsCfg().Exporters[cfgIdx].Type) } diff --git a/ees/elastic.go b/ees/elastic.go new file mode 100644 index 000000000..e03068e2b --- /dev/null +++ b/ees/elastic.go @@ -0,0 +1,102 @@ +/* +Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "sync" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + elasticsearch "github.com/elastic/go-elasticsearch" +) + +func NewElasticExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, + dc utils.MapStorage) (eEe *ElasticEe, err error) { + eEe = &ElasticEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, + cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc} + err = eEe.init() + return +} + +// ElasticEe implements EventExporter interface for ElasticSearch export +type ElasticEe struct { + id string + eClnt *elasticsearch.Client + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + filterS *engine.FilterS + sync.RWMutex + dc utils.MapStorage +} + +// init will create all the necessary dependencies, including opening the file +func (eEe *ElasticEe) init() (err error) { + // compose the config out of opts + // create the client + if eEe.eClnt, err = elasticsearch.NewDefaultClient(); err != nil { + return + } + return +} + +// ID returns the identificator of this exporter +func (eEe *ElasticEe) ID() string { + return eEe.id +} + +// OnEvicted implements EventExporter, doing the cleanup before exit +func (eEe *ElasticEe) OnEvicted(_ string, _ interface{}) { + return +} + +// ExportEvent implements EventExporter +func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { + eEe.Lock() + defer func() { + if err != nil { + eEe.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) + } else { + eEe.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) + } + eEe.Unlock() + }() + eEe.dc[utils.NumberOfEvents] = eEe.dc[utils.NumberOfEvents].(int64) + 1 + + req := utils.MapStorage{} + for k, v := range cgrEv.Event { + req[k] = v + } + eeReq := NewEventExporterRequest(req, eEe.dc, + eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Tenant, + eEe.cgrCfg.GeneralCfg().DefaultTenant, + utils.FirstNonEmpty(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Timezone, + eEe.cgrCfg.GeneralCfg().DefaultTimezone), + eEe.filterS) + if err = eeReq.SetFields(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].ContentFields()); err != nil { + return + } + updateEEMetrics(eEe.dc, cgrEv.Event, utils.FirstNonEmpty(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Timezone, + eEe.cgrCfg.GeneralCfg().DefaultTimezone)) + return +} + +func (eEe *ElasticEe) GetMetrics() utils.MapStorage { + return eEe.dc.Clone() +} diff --git a/go.mod b/go.mod index eecf21ae7..606511887 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/cgrates/sipingo v1.0.1-0.20200514112313-699ebc1cdb8e github.com/creack/pty v1.1.11 github.com/dgrijalva/jwt-go v3.2.0+incompatible + github.com/elastic/go-elasticsearch v0.0.0 github.com/ericlagergren/decimal v0.0.0-20191206042408-88212e6cfca9 github.com/fiorix/go-diameter/v4 v4.0.1 github.com/fortytw2/leaktest v1.3.0 // indirect diff --git a/go.sum b/go.sum index 2a072a4ed..f2253684d 100644 --- a/go.sum +++ b/go.sum @@ -99,6 +99,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA= +github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= diff --git a/utils/consts.go b/utils/consts.go index 8c71a2979..a65c1a7a1 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -486,7 +486,6 @@ const ( TxtSuffix = ".txt" JSNSuffix = ".json" GOBSuffix = ".gob" - FormSuffix = ".form" XMLSuffix = ".xml" CSVSuffix = ".csv" FWVSuffix = ".fwv" @@ -498,9 +497,9 @@ const ( CDRPoster = "cdr" MetaFileCSV = "*file_csv" MetaVirt = "*virt" + MetaElastic = "*elastic" MetaFileFWV = "*file_fwv" MetaFile = "*file" - MetaFScsv = "*freeswitch_csv" Accounts = "Accounts" AccountService = "AccountS" Actions = "Actions"