diff --git a/config/config.go b/config/config.go
index cc65d88e2..406e57f69 100755
--- a/config/config.go
+++ b/config/config.go
@@ -329,7 +329,7 @@ var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV,
var possibleExporterTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.META_NONE, utils.MetaFileFWV,
utils.MetaHTTPPost, utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap,
- utils.MetaKafkajsonMap, utils.MetaS3jsonMap, utils.MetaVirt})
+ utils.MetaKafkajsonMap, utils.MetaS3jsonMap, utils.MetaElastic, utils.MetaVirt})
func (cfg *CGRConfig) LazySanityCheck() {
for _, expID := range cfg.cdrsCfg.OnlineCDRExports {
diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json
index b1eafb720..f584f5363 100644
--- a/data/conf/samples/ees/cgrates.json
+++ b/data/conf/samples/ees/cgrates.json
@@ -351,6 +351,30 @@
"fields":[
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"},
],
+ },
+ {
+ "id": "ElasticsearchExporter",
+ "type": "*elastic",
+ "export_path": "http://localhost:9200",
+ "tenant": "cgrates.org",
+ "attempts": 1,
+ "filters": ["*string:~*req.ExporterUsed:ElasticExporter"],
+ "fields":[
+ {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
+ {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
+ {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"},
+ {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"},
+ {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"},
+ {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"},
+ {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"},
+ {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
+ {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"},
+ {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"},
+ {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime"},
+ {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"},
+ {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"},
+ {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
+ ],
}
]
},
diff --git a/ees/elastic.go b/ees/elastic.go
index 31503bd64..e789d9bed 100644
--- a/ees/elastic.go
+++ b/ees/elastic.go
@@ -19,9 +19,14 @@ along with this program. If not, see
package ees
import (
+ "context"
+ "encoding/json"
+ "fmt"
"strings"
"sync"
+ "github.com/elastic/go-elasticsearch/esapi"
+
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -40,6 +45,7 @@ func NewElasticExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Fi
type ElasticEe struct {
id string
eClnt *elasticsearch.Client
+ index string
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
@@ -56,6 +62,11 @@ func (eEe *ElasticEe) init() (err error) {
}); err != nil {
return
}
+ if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.Index]; !has {
+ eEe.index = utils.CDRsTBL
+ } else {
+ eEe.index = utils.IfaceAsString(val)
+ }
return
}
@@ -86,6 +97,7 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
for k, v := range cgrEv.Event {
req[k] = v
}
+ valMp := make(map[string]string)
eeReq := NewEventExporterRequest(req, eEe.dc,
eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Tenant,
eEe.cgrCfg.GeneralCfg().DefaultTenant,
@@ -95,8 +107,45 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
if err = eeReq.SetFields(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].ContentFields()); err != nil {
return
}
+ for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() {
+ var nmIt utils.NMInterface
+ if nmIt, err = eeReq.cnt.Field(el.Value); err != nil {
+ return
+ }
+ itm, isNMItem := nmIt.(*config.NMItem)
+ if !isNMItem {
+ err = fmt.Errorf("cannot encode reply value: %s, err: not NMItems", utils.ToJSON(el.Value))
+ return
+ }
+ if itm == nil {
+ continue // all attributes, not writable to diameter packet
+ }
+ valMp[strings.Join(itm.Path, utils.NestingSep)] = utils.IfaceAsString(itm.Data)
+ }
updateEEMetrics(eEe.dc, cgrEv.Event, utils.FirstNonEmpty(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Timezone,
eEe.cgrCfg.GeneralCfg().DefaultTimezone))
+ // Set up the request object
+ cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID())
+ runID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault)
+ eReq := esapi.IndexRequest{
+ Index: eEe.index,
+ DocumentID: utils.ConcatenatedKey(cgrID, runID),
+ Body: strings.NewReader(utils.ToJSON(valMp)),
+ Refresh: "true",
+ }
+ var resp *esapi.Response
+ if resp, err = eReq.Do(context.Background(), eEe.eClnt); err != nil {
+ resp.Body.Close()
+ return
+ } else if resp.IsError() {
+ var e map[string]interface{}
+ if err = json.NewDecoder(resp.Body).Decode(&e); err != nil {
+ return
+ } else {
+ utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%+v> when indexing document",
+ utils.EventExporterS, eEe.id, e))
+ }
+ }
return
}
diff --git a/ees/elastic_it_test.go b/ees/elastic_it_test.go
new file mode 100644
index 000000000..699bb8624
--- /dev/null
+++ b/ees/elastic_it_test.go
@@ -0,0 +1,312 @@
+// +build integration
+
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package ees
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "flag"
+ "net/rpc"
+ "os/exec"
+ "path"
+ "reflect"
+ "testing"
+ "time"
+
+ "github.com/cgrates/cgrates/utils"
+ elasticsearch "github.com/elastic/go-elasticsearch"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+)
+
+var (
+ elasticConfigDir string
+ elasticCfgPath string
+ elasticCfg *config.CGRConfig
+ elasticRpc *rpc.Client
+ elasticServerPath = flag.Bool("elastic", false, "Run only if the user specify it")
+
+ sTestsElastic = []func(t *testing.T){
+ testCreateDirectory,
+ testElasticLoadConfig,
+ testElasticResetDataDB,
+ testElasticResetStorDb,
+ testElasticStartEngine,
+ testElasticRPCConn,
+ testElasticStartElasticsearch,
+ testElasticExportEvents,
+ testElasticVerifyExports,
+ testStopCgrEngine,
+ testElasticCloseElasticsearch,
+ testCleanDirectory,
+ }
+)
+
+// To run these tests first you need to install elasticsearch server locally as a daemon
+// https://www.elastic.co/guide/en/elasticsearch/reference/current/deb.html
+// and pass the elastic flag
+func TestElasticExport(t *testing.T) {
+ if !*elasticServerPath {
+ t.SkipNow()
+ }
+ elasticConfigDir = "ees"
+ for _, stest := range sTestsElastic {
+ t.Run(elasticConfigDir, stest)
+ }
+}
+
+func testElasticLoadConfig(t *testing.T) {
+ var err error
+ elasticCfgPath = path.Join(*dataDir, "conf", "samples", elasticConfigDir)
+ if elasticCfg, err = config.NewCGRConfigFromPath(elasticCfgPath); err != nil {
+ t.Error(err)
+ }
+}
+
+func testElasticResetDataDB(t *testing.T) {
+ if err := engine.InitDataDb(elasticCfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testElasticResetStorDb(t *testing.T) {
+ if err := engine.InitStorDb(elasticCfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testElasticStartEngine(t *testing.T) {
+ if _, err := engine.StopStartEngine(elasticCfgPath, *waitRater); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testElasticRPCConn(t *testing.T) {
+ var err error
+ elasticRpc, err = newRPCClient(elasticCfg.ListenCfg())
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testElasticStartElasticsearch(t *testing.T) {
+ if err := exec.Command("systemctl", "start", "elasticsearch.service").Run(); err != nil {
+ t.Error(err)
+ }
+ // give some time to elasticsearch server to become up
+ time.Sleep(5 * time.Second)
+}
+
+func testElasticExportEvents(t *testing.T) {
+ eventVoice := &utils.CGREventWithIDs{
+ CGREventWithOpts: &utils.CGREventWithOpts{
+ CGREvent: &utils.CGREvent{
+ Tenant: "cgrates.org",
+ ID: "voiceEvent",
+ Time: utils.TimePointer(time.Now()),
+ Event: map[string]interface{}{
+ utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()),
+ utils.ToR: utils.VOICE,
+ utils.OriginID: "dsafdsaf",
+ utils.OriginHost: "192.168.1.1",
+ utils.RequestType: utils.META_RATED,
+ utils.Tenant: "cgrates.org",
+ utils.Category: "call",
+ utils.Account: "1001",
+ utils.Subject: "1001",
+ utils.Destination: "1002",
+ utils.SetupTime: time.Unix(1383813745, 0).UTC(),
+ utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
+ utils.Usage: time.Duration(10) * time.Second,
+ utils.RunID: utils.MetaDefault,
+ utils.Cost: 1.01,
+ "ExporterUsed": "ElasticExporter",
+ "ExtraFields": map[string]string{"extra1": "val_extra1",
+ "extra2": "val_extra2", "extra3": "val_extra3"},
+ },
+ },
+ },
+ }
+
+ eventData := &utils.CGREventWithIDs{
+ CGREventWithOpts: &utils.CGREventWithOpts{
+ CGREvent: &utils.CGREvent{
+ Tenant: "cgrates.org",
+ ID: "dataEvent",
+ Time: utils.TimePointer(time.Now()),
+ Event: map[string]interface{}{
+ utils.CGRID: utils.Sha1("abcdef", time.Unix(1383813745, 0).UTC().String()),
+ utils.ToR: utils.DATA,
+ utils.OriginID: "abcdef",
+ utils.OriginHost: "192.168.1.1",
+ utils.RequestType: utils.META_RATED,
+ utils.Tenant: "AnotherTenant",
+ utils.Category: "call", //for data CDR use different Tenant
+ utils.Account: "1001",
+ utils.Subject: "1001",
+ utils.Destination: "1002",
+ utils.SetupTime: time.Unix(1383813745, 0).UTC(),
+ utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
+ utils.Usage: time.Duration(10) * time.Nanosecond,
+ utils.RunID: utils.MetaDefault,
+ utils.Cost: 0.012,
+ "ExporterUsed": "ElasticExporter",
+ "ExtraFields": map[string]string{"extra1": "val_extra1",
+ "extra2": "val_extra2", "extra3": "val_extra3"},
+ },
+ },
+ },
+ }
+
+ eventSMS := &utils.CGREventWithOpts{
+ CGREvent: &utils.CGREvent{
+ Tenant: "cgrates.org",
+ ID: "SMSEvent",
+ Time: utils.TimePointer(time.Now()),
+ Event: map[string]interface{}{
+ utils.CGRID: utils.Sha1("sdfwer", time.Unix(1383813745, 0).UTC().String()),
+ utils.ToR: utils.SMS,
+ utils.OriginID: "sdfwer",
+ utils.OriginHost: "192.168.1.1",
+ utils.RequestType: utils.META_RATED,
+ utils.Tenant: "cgrates.org",
+ utils.Category: "call",
+ utils.Account: "1001",
+ utils.Subject: "1001",
+ utils.Destination: "1002",
+ utils.SetupTime: time.Unix(1383813745, 0).UTC(),
+ utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
+ utils.Usage: time.Duration(1),
+ utils.RunID: utils.MetaDefault,
+ utils.Cost: 0.15,
+ "ExporterUsed": "ElasticExporter",
+ "ExtraFields": map[string]string{"extra1": "val_extra1",
+ "extra2": "val_extra2", "extra3": "val_extra3"},
+ },
+ },
+ }
+ var reply map[string]utils.MapStorage
+ if err := elasticRpc.Call(utils.EventExporterSv1ProcessEvent, eventVoice, &reply); err != nil {
+ t.Error(err)
+ }
+ if err := elasticRpc.Call(utils.EventExporterSv1ProcessEvent, eventData, &reply); err != nil {
+ t.Error(err)
+ }
+ if err := elasticRpc.Call(utils.EventExporterSv1ProcessEvent, eventSMS, &reply); err != nil {
+ t.Error(err)
+ }
+}
+
+func testElasticVerifyExports(t *testing.T) {
+ es, err := elasticsearch.NewDefaultClient()
+ if err != nil {
+ t.Error(err)
+ }
+ var r map[string]interface{}
+ var buf bytes.Buffer
+ query := map[string]interface{}{
+ "query": map[string]interface{}{
+ "match": map[string]interface{}{
+ utils.Tenant: "cgrates.org",
+ },
+ },
+ }
+ if err := json.NewEncoder(&buf).Encode(query); err != nil {
+ t.Error(err)
+ }
+ // Perform the search request.
+ res, err := es.Search(
+ es.Search.WithContext(context.Background()),
+ es.Search.WithIndex("cdrs"),
+ es.Search.WithBody(&buf),
+ es.Search.WithTrackTotalHits(true),
+ es.Search.WithPretty(),
+ )
+ if err != nil {
+ t.Error(err)
+ }
+ defer res.Body.Close()
+
+ if res.IsError() {
+ var e map[string]interface{}
+ if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
+ t.Error(err)
+ } else {
+ t.Errorf("%+v", e)
+ }
+ }
+
+ if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
+ t.Error(err)
+ }
+ for _, hit := range r["hits"].(map[string]interface{})["hits"].([]interface{}) {
+ switch hit.(map[string]interface{})["_id"] {
+ case "2478e9f18ebcd3c684f3c14596b8bfeab2b0d6d4:*default":
+ eMp := map[string]interface{}{
+ utils.Account: "1001",
+ utils.AnswerTime: "2013-11-07T08:42:26Z",
+ utils.CGRID: "2478e9f18ebcd3c684f3c14596b8bfeab2b0d6d4",
+ utils.Category: "call",
+ utils.Cost: "0.15",
+ utils.Destination: "1002",
+ utils.OriginID: "sdfwer",
+ utils.RequestType: "*rated",
+ utils.RunID: "*default",
+ utils.SetupTime: "2013-11-07T08:42:25Z",
+ utils.Subject: "1001",
+ utils.Tenant: "cgrates.org",
+ utils.ToR: "*sms",
+ utils.Usage: "1",
+ }
+ if !reflect.DeepEqual(eMp, hit.(map[string]interface{})["_source"]) {
+ t.Errorf("Expected %+v, received: %+v", eMp, hit.(map[string]interface{})["_source"])
+ }
+ case "dbafe9c8614c785a65aabd116dd3959c3c56f7f6:*default":
+ eMp := map[string]interface{}{
+ utils.Account: "1001",
+ utils.AnswerTime: "2013-11-07T08:42:26Z",
+ utils.CGRID: "dbafe9c8614c785a65aabd116dd3959c3c56f7f6",
+ utils.Category: "call",
+ utils.Cost: "1.01",
+ utils.Destination: "1002",
+ utils.OriginID: "dsafdsaf",
+ utils.RequestType: "*rated",
+ utils.RunID: "*default",
+ utils.SetupTime: "2013-11-07T08:42:25Z",
+ utils.Subject: "1001",
+ utils.Tenant: "cgrates.org",
+ utils.ToR: "*voice",
+ utils.Usage: "10000000000",
+ }
+ if !reflect.DeepEqual(eMp, hit.(map[string]interface{})["_source"]) {
+ t.Errorf("Expected %+v, received: %+v", eMp, hit.(map[string]interface{})["_source"])
+ }
+ }
+ }
+}
+
+func testElasticCloseElasticsearch(t *testing.T) {
+ if err := exec.Command("systemctl", "stop", "elasticsearch.service").Run(); err != nil {
+ t.Error(err)
+ }
+}
diff --git a/ees/filecsv.go b/ees/filecsv.go
index 53b2d27ee..2e815ee29 100644
--- a/ees/filecsv.go
+++ b/ees/filecsv.go
@@ -126,8 +126,7 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}
updateEEMetrics(fCsv.dc, cgrEv.Event, utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone,
fCsv.cgrCfg.GeneralCfg().DefaultTimezone))
- fCsv.csvWriter.Write(csvRecord)
- return
+ return fCsv.csvWriter.Write(csvRecord)
}
// Compose and cache the header
diff --git a/packages/debian/changelog b/packages/debian/changelog
index cf8b85cf2..8aa3b4dfe 100644
--- a/packages/debian/changelog
+++ b/packages/debian/changelog
@@ -103,6 +103,7 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium
* [AgentS] DiameterAgent return NOT_FOUND instead of "filter not passing" error and let other subsystem to handle this (e.g. FilterS)
* [StatS] Change format of metricID when specifying fields ( e.g. *sum#~*req.FieldName )
* [FilterS] Added *apiban filter
+ * [EEs] Add support for *elastic exporter
-- DanB Wed, 19 Feb 2020 13:25:52 +0200
diff --git a/utils/consts.go b/utils/consts.go
index ee2a7f3fa..63ce27e68 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -865,6 +865,7 @@ const (
MetaMonthlyEstimated = "*monthly_estimated"
ProcessRuns = "ProcessRuns"
HashtagSep = "#"
+ Index = "index"
)
// Migrator Action