From a72f62ecf122475a35065c8cfbe729548d817bc3 Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 1 Oct 2020 17:40:10 +0300 Subject: [PATCH] Add support in EEs for *elastic exporter ( + integration test ) --- config/config.go | 2 +- data/conf/samples/ees/cgrates.json | 24 +++ ees/elastic.go | 49 +++++ ees/elastic_it_test.go | 312 +++++++++++++++++++++++++++++ ees/filecsv.go | 3 +- packages/debian/changelog | 1 + utils/consts.go | 1 + 7 files changed, 389 insertions(+), 3 deletions(-) create mode 100644 ees/elastic_it_test.go diff --git a/config/config.go b/config/config.go index cc65d88e2..406e57f69 100755 --- a/config/config.go +++ b/config/config.go @@ -329,7 +329,7 @@ var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV, var possibleExporterTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.META_NONE, utils.MetaFileFWV, utils.MetaHTTPPost, utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, - utils.MetaKafkajsonMap, utils.MetaS3jsonMap, utils.MetaVirt}) + utils.MetaKafkajsonMap, utils.MetaS3jsonMap, utils.MetaElastic, utils.MetaVirt}) func (cfg *CGRConfig) LazySanityCheck() { for _, expID := range cfg.cdrsCfg.OnlineCDRExports { diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index b1eafb720..f584f5363 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -351,6 +351,30 @@ "fields":[ {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, ], + }, + { + "id": "ElasticsearchExporter", + "type": "*elastic", + "export_path": "http://localhost:9200", + "tenant": "cgrates.org", + "attempts": 1, + "filters": ["*string:~*req.ExporterUsed:ElasticExporter"], + "fields":[ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime"}, + {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"}, + {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, + ], } ] }, diff --git a/ees/elastic.go b/ees/elastic.go index 31503bd64..e789d9bed 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -19,9 +19,14 @@ along with this program. If not, see package ees import ( + "context" + "encoding/json" + "fmt" "strings" "sync" + "github.com/elastic/go-elasticsearch/esapi" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -40,6 +45,7 @@ func NewElasticExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Fi type ElasticEe struct { id string eClnt *elasticsearch.Client + index string cgrCfg *config.CGRConfig cfgIdx int // index of config instance within ERsCfg.Readers filterS *engine.FilterS @@ -56,6 +62,11 @@ func (eEe *ElasticEe) init() (err error) { }); err != nil { return } + if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.Index]; !has { + eEe.index = utils.CDRsTBL + } else { + eEe.index = utils.IfaceAsString(val) + } return } @@ -86,6 +97,7 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { for k, v := range cgrEv.Event { req[k] = v } + valMp := make(map[string]string) eeReq := NewEventExporterRequest(req, eEe.dc, eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Tenant, eEe.cgrCfg.GeneralCfg().DefaultTenant, @@ -95,8 +107,45 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { if err = eeReq.SetFields(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].ContentFields()); err != nil { return } + for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() { + var nmIt utils.NMInterface + if nmIt, err = eeReq.cnt.Field(el.Value); err != nil { + return + } + itm, isNMItem := nmIt.(*config.NMItem) + if !isNMItem { + err = fmt.Errorf("cannot encode reply value: %s, err: not NMItems", utils.ToJSON(el.Value)) + return + } + if itm == nil { + continue // all attributes, not writable to diameter packet + } + valMp[strings.Join(itm.Path, utils.NestingSep)] = utils.IfaceAsString(itm.Data) + } updateEEMetrics(eEe.dc, cgrEv.Event, utils.FirstNonEmpty(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Timezone, eEe.cgrCfg.GeneralCfg().DefaultTimezone)) + // Set up the request object + cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID()) + runID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault) + eReq := esapi.IndexRequest{ + Index: eEe.index, + DocumentID: utils.ConcatenatedKey(cgrID, runID), + Body: strings.NewReader(utils.ToJSON(valMp)), + Refresh: "true", + } + var resp *esapi.Response + if resp, err = eReq.Do(context.Background(), eEe.eClnt); err != nil { + resp.Body.Close() + return + } else if resp.IsError() { + var e map[string]interface{} + if err = json.NewDecoder(resp.Body).Decode(&e); err != nil { + return + } else { + utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%+v> when indexing document", + utils.EventExporterS, eEe.id, e)) + } + } return } diff --git a/ees/elastic_it_test.go b/ees/elastic_it_test.go new file mode 100644 index 000000000..699bb8624 --- /dev/null +++ b/ees/elastic_it_test.go @@ -0,0 +1,312 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "net/rpc" + "os/exec" + "path" + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/utils" + elasticsearch "github.com/elastic/go-elasticsearch" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" +) + +var ( + elasticConfigDir string + elasticCfgPath string + elasticCfg *config.CGRConfig + elasticRpc *rpc.Client + elasticServerPath = flag.Bool("elastic", false, "Run only if the user specify it") + + sTestsElastic = []func(t *testing.T){ + testCreateDirectory, + testElasticLoadConfig, + testElasticResetDataDB, + testElasticResetStorDb, + testElasticStartEngine, + testElasticRPCConn, + testElasticStartElasticsearch, + testElasticExportEvents, + testElasticVerifyExports, + testStopCgrEngine, + testElasticCloseElasticsearch, + testCleanDirectory, + } +) + +// To run these tests first you need to install elasticsearch server locally as a daemon +// https://www.elastic.co/guide/en/elasticsearch/reference/current/deb.html +// and pass the elastic flag +func TestElasticExport(t *testing.T) { + if !*elasticServerPath { + t.SkipNow() + } + elasticConfigDir = "ees" + for _, stest := range sTestsElastic { + t.Run(elasticConfigDir, stest) + } +} + +func testElasticLoadConfig(t *testing.T) { + var err error + elasticCfgPath = path.Join(*dataDir, "conf", "samples", elasticConfigDir) + if elasticCfg, err = config.NewCGRConfigFromPath(elasticCfgPath); err != nil { + t.Error(err) + } +} + +func testElasticResetDataDB(t *testing.T) { + if err := engine.InitDataDb(elasticCfg); err != nil { + t.Fatal(err) + } +} + +func testElasticResetStorDb(t *testing.T) { + if err := engine.InitStorDb(elasticCfg); err != nil { + t.Fatal(err) + } +} + +func testElasticStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(elasticCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testElasticRPCConn(t *testing.T) { + var err error + elasticRpc, err = newRPCClient(elasticCfg.ListenCfg()) + if err != nil { + t.Fatal(err) + } +} + +func testElasticStartElasticsearch(t *testing.T) { + if err := exec.Command("systemctl", "start", "elasticsearch.service").Run(); err != nil { + t.Error(err) + } + // give some time to elasticsearch server to become up + time.Sleep(5 * time.Second) +} + +func testElasticExportEvents(t *testing.T) { + eventVoice := &utils.CGREventWithIDs{ + CGREventWithOpts: &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "voiceEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.VOICE, + utils.OriginID: "dsafdsaf", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.META_RATED, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: time.Duration(10) * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 1.01, + "ExporterUsed": "ElasticExporter", + "ExtraFields": map[string]string{"extra1": "val_extra1", + "extra2": "val_extra2", "extra3": "val_extra3"}, + }, + }, + }, + } + + eventData := &utils.CGREventWithIDs{ + CGREventWithOpts: &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "dataEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("abcdef", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.DATA, + utils.OriginID: "abcdef", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.META_RATED, + utils.Tenant: "AnotherTenant", + utils.Category: "call", //for data CDR use different Tenant + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: time.Duration(10) * time.Nanosecond, + utils.RunID: utils.MetaDefault, + utils.Cost: 0.012, + "ExporterUsed": "ElasticExporter", + "ExtraFields": map[string]string{"extra1": "val_extra1", + "extra2": "val_extra2", "extra3": "val_extra3"}, + }, + }, + }, + } + + eventSMS := &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "SMSEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("sdfwer", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.SMS, + utils.OriginID: "sdfwer", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.META_RATED, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: time.Duration(1), + utils.RunID: utils.MetaDefault, + utils.Cost: 0.15, + "ExporterUsed": "ElasticExporter", + "ExtraFields": map[string]string{"extra1": "val_extra1", + "extra2": "val_extra2", "extra3": "val_extra3"}, + }, + }, + } + var reply map[string]utils.MapStorage + if err := elasticRpc.Call(utils.EventExporterSv1ProcessEvent, eventVoice, &reply); err != nil { + t.Error(err) + } + if err := elasticRpc.Call(utils.EventExporterSv1ProcessEvent, eventData, &reply); err != nil { + t.Error(err) + } + if err := elasticRpc.Call(utils.EventExporterSv1ProcessEvent, eventSMS, &reply); err != nil { + t.Error(err) + } +} + +func testElasticVerifyExports(t *testing.T) { + es, err := elasticsearch.NewDefaultClient() + if err != nil { + t.Error(err) + } + var r map[string]interface{} + var buf bytes.Buffer + query := map[string]interface{}{ + "query": map[string]interface{}{ + "match": map[string]interface{}{ + utils.Tenant: "cgrates.org", + }, + }, + } + if err := json.NewEncoder(&buf).Encode(query); err != nil { + t.Error(err) + } + // Perform the search request. + res, err := es.Search( + es.Search.WithContext(context.Background()), + es.Search.WithIndex("cdrs"), + es.Search.WithBody(&buf), + es.Search.WithTrackTotalHits(true), + es.Search.WithPretty(), + ) + if err != nil { + t.Error(err) + } + defer res.Body.Close() + + if res.IsError() { + var e map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&e); err != nil { + t.Error(err) + } else { + t.Errorf("%+v", e) + } + } + + if err := json.NewDecoder(res.Body).Decode(&r); err != nil { + t.Error(err) + } + for _, hit := range r["hits"].(map[string]interface{})["hits"].([]interface{}) { + switch hit.(map[string]interface{})["_id"] { + case "2478e9f18ebcd3c684f3c14596b8bfeab2b0d6d4:*default": + eMp := map[string]interface{}{ + utils.Account: "1001", + utils.AnswerTime: "2013-11-07T08:42:26Z", + utils.CGRID: "2478e9f18ebcd3c684f3c14596b8bfeab2b0d6d4", + utils.Category: "call", + utils.Cost: "0.15", + utils.Destination: "1002", + utils.OriginID: "sdfwer", + utils.RequestType: "*rated", + utils.RunID: "*default", + utils.SetupTime: "2013-11-07T08:42:25Z", + utils.Subject: "1001", + utils.Tenant: "cgrates.org", + utils.ToR: "*sms", + utils.Usage: "1", + } + if !reflect.DeepEqual(eMp, hit.(map[string]interface{})["_source"]) { + t.Errorf("Expected %+v, received: %+v", eMp, hit.(map[string]interface{})["_source"]) + } + case "dbafe9c8614c785a65aabd116dd3959c3c56f7f6:*default": + eMp := map[string]interface{}{ + utils.Account: "1001", + utils.AnswerTime: "2013-11-07T08:42:26Z", + utils.CGRID: "dbafe9c8614c785a65aabd116dd3959c3c56f7f6", + utils.Category: "call", + utils.Cost: "1.01", + utils.Destination: "1002", + utils.OriginID: "dsafdsaf", + utils.RequestType: "*rated", + utils.RunID: "*default", + utils.SetupTime: "2013-11-07T08:42:25Z", + utils.Subject: "1001", + utils.Tenant: "cgrates.org", + utils.ToR: "*voice", + utils.Usage: "10000000000", + } + if !reflect.DeepEqual(eMp, hit.(map[string]interface{})["_source"]) { + t.Errorf("Expected %+v, received: %+v", eMp, hit.(map[string]interface{})["_source"]) + } + } + } +} + +func testElasticCloseElasticsearch(t *testing.T) { + if err := exec.Command("systemctl", "stop", "elasticsearch.service").Run(); err != nil { + t.Error(err) + } +} diff --git a/ees/filecsv.go b/ees/filecsv.go index 53b2d27ee..2e815ee29 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -126,8 +126,7 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { } updateEEMetrics(fCsv.dc, cgrEv.Event, utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone, fCsv.cgrCfg.GeneralCfg().DefaultTimezone)) - fCsv.csvWriter.Write(csvRecord) - return + return fCsv.csvWriter.Write(csvRecord) } // Compose and cache the header diff --git a/packages/debian/changelog b/packages/debian/changelog index cf8b85cf2..8aa3b4dfe 100644 --- a/packages/debian/changelog +++ b/packages/debian/changelog @@ -103,6 +103,7 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium * [AgentS] DiameterAgent return NOT_FOUND instead of "filter not passing" error and let other subsystem to handle this (e.g. FilterS) * [StatS] Change format of metricID when specifying fields ( e.g. *sum#~*req.FieldName ) * [FilterS] Added *apiban filter + * [EEs] Add support for *elastic exporter -- DanB Wed, 19 Feb 2020 13:25:52 +0200 diff --git a/utils/consts.go b/utils/consts.go index ee2a7f3fa..63ce27e68 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -865,6 +865,7 @@ const ( MetaMonthlyEstimated = "*monthly_estimated" ProcessRuns = "ProcessRuns" HashtagSep = "#" + Index = "index" ) // Migrator Action