diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index 41695aaac..28756a60e 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -321,186 +321,6 @@ {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}, ], }, - { - "id": "ElasticsearchExporter", - "type": "*els", - "export_path": "http://localhost:9200", - "attempts": 1, - "opts": { - "elsIndex": "cdrs", - //"elsIfPrimaryTerm": 0, - //"elsIfSeqNo": 0, - "elsOpType": "", - "elsPipeline": "", - "require_alias": "", - "elsRouting": "", - "elsTimeout": "0", - //"elsVersion": 0, - "elsVersionType": "", - "elsWaitForActiveShards": "", - }, - "fields":[ - {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, - {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, - {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, - {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, - {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, - {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*vars.*tenant"}, - {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, - {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, - {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, - {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, - {"tag": "@timestamp", "path": "*exp.@timestamp", "type": "*datetime", "value": "*now"}, - {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime"}, - {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"}, - {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, - {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, - ], - }, - { - "id": "ElasticsearchExporterCluster", - "type": "*els", - "export_path": "http://192.168.56.22:9200;http://192.168.56.64:9200", - "attempts": 1, - "opts": { - "elsIndex": "cdrs", - "elsDiscoverNodesOnStart":true, - //"elsDiscoverNodesInterval":"10s", - // "elsLogger":"elsJson", - // "elsEnableDebugLogger":false, - // "elsCompressRequestBody":false, - // "elsCompressRequestBodyLevel":0, - // "elsRetryOnStatus":[], - // "elsMaxRetries": 0, - // "elsDisableRetry": false, - - //"elsIfPrimaryTerm": 0, - //"elsIfSeqNo": 0, - "elsOpType": "", - "elsPipeline": "", - "require_alias": "", - "elsRouting": "", - "elsTimeout": "0", - //"elsVersion": 0, - "elsVersionType": "", - "elsWaitForActiveShards": "", - }, - "fields":[ - {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, - {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, - {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, - {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, - {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, - {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*vars.*tenant"}, - {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, - {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, - {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, - {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, - {"tag": "@timestamp", "path": "*exp.@timestamp", "type": "*datetime", "value": "*now"}, - {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime"}, - {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"}, - {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, - {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, - ], - }, - { - "id": "ElasticsearchExporterCloud", - "type": "*els", - "export_path": "Deployment:dXMtY2VudHJh0YW==", - "attempts": 1, - "opts": { - "elsIndex": "cdrs", - "elsUsername":"elastic_user", - "elsPassword":"", - "elsCloud":true, - //"elsApiKey":"aZmd2UQ==", - // "elsLogger":"elsJson", - // "elsEnableDebugLogger":false, - // "elsCompressRequestBody":false, - // "elsCompressRequestBodyLevel":0, - // "elsRetryOnStatus":[], - // "elsMaxRetries": 0, - // "elsDisableRetry": false, - //"elsIfPrimaryTerm": 0, - //"elsIfSeqNo": 0, - "elsOpType": "", - "elsPipeline": "", - "require_alias": "", - "elsRouting": "", - "elsTimeout": "0", - //"elsVersion": 0, - "elsVersionType": "", - "elsWaitForActiveShards": "", - }, - "fields":[ - {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, - {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, - {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, - {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, - {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, - {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*vars.*tenant"}, - {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, - {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, - {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, - {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, - {"tag": "@timestamp", "path": "*exp.@timestamp", "type": "*datetime", "value": "*now"}, - {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime"}, - {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"}, - {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, - {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, - ], - }, - { - "id": "ElasticsearchExporterHttps", - "type": "*els", - "export_path": "https://192.168.56.29:9200", - "attempts": 1, - "opts": { - "elsIndex": "cdrs", - "elsUsername":"elastic", - "elsPassword":"", - "caPath":"/path/to/http_ca.crt", - //"elsCertificateFingerPrint":"", - // "elsServiceToken":"" - // "elsLogger":"elsJson", - // "elsDiscoverNodesOnStart":true, - // "elsDiscoverNodesInterval":"10s", - // "elsEnableDebugLogger":false, - // "elsCompressRequestBody":true, - // "elsCompressRequestBodyLevel":0, - // "elsRetryOnStatus":[], - // "elsMaxRetries": 0, - // "elsDisableRetry": false, - - //"elsIfPrimaryTerm": 0, - //"elsIfSeqNo": 0, - "elsOpType": "", - "elsPipeline": "", - "require_alias": "", - "elsRouting": "", - "elsTimeout": "0", - //"elsVersion": 0, - "elsVersionType": "", - "elsWaitForActiveShards": "", - }, - "fields":[ - {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, - {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, - {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, - {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, - {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, - {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*vars.*tenant"}, - {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, - {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, - {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, - {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, - {"tag": "@timestamp", "path": "*exp.@timestamp", "type": "*datetime", "value": "*now"}, - {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime"}, - {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime"}, - {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, - {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, - ], - }, { "id": "HTTPJsonMapExporterWithNoFields", "type": "*http_json_map", @@ -509,12 +329,6 @@ "attribute_context": "customContext", "attempts": 1, }, - { - "id": "ElasticExporterWithNoFields", - "type": "*els", - "export_path": "http://localhost:9200", - "attempts": 1, - }, { "id": "HTTPPostExporterWithNoFields", "type": "*http_post", diff --git a/data/conf/samples/ees_elastic/cgrates.json b/data/conf/samples/ees_elastic/cgrates.json new file mode 100644 index 000000000..ce15e5ca4 --- /dev/null +++ b/data/conf/samples/ees_elastic/cgrates.json @@ -0,0 +1,116 @@ +{ + +"ees": { + "enabled": true, + "cache": { + "*els": { + "limit": -1, + "ttl": "10s", + "precache": false + } + }, + "exporters": [{ + "id": "els_basic", + "type": "*els", + "export_path": "http://localhost:9200", + "synchronous": true, + "failed_posts_dir": "*none", + "opts": { + "elsIndex": "cdrs_basic" + } + }, + { + "id": "els_fields", + "type": "*els", + "export_path": "http://localhost:9200", + "synchronous": true, + "failed_posts_dir": "*none", + "opts": { + "elsIndex": "cdrs_fields" + }, + "fields": [ + { "tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID" }, + { "tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account" }, + { "tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage" }, + { "tag": "Source", "path": "*exp.Source", "type": "*constant", "value": "test" }, + { "tag": "timestamp", "path": "*exp.@timestamp", "type": "*datetime", "value": "*now" } + ] + }, + { + "id": "els_cloud", + "type": "*els", + "export_path": "ELASTIC_DEPLOYMENT", + "synchronous": true, + "failed_posts_dir": "*none", + "opts": { + "elsIndex": "cdrs", + "elsCloud": true, + "elsApiKey": "ELASTIC_APIKEY" + } + }, + { + "id": "els_cluster", + "type": "*els", + "export_path": "http://192.168.56.22:9200;http://192.168.56.64:9200", + "synchronous": true, + "opts": { + "elsIndex": "cdrs", + "elsDiscoverNodesOnStart": true, + "elsDiscoverNodesInterval": "10s" + } + }, + { + "id": "els_https", + "type": "*els", + "export_path": "https://192.168.56.29:9200", + "synchronous": true, + "opts": { + "elsIndex": "cdrs", + "elsUsername": "elastic", + // "elsPassword":"", + "caPath": "/path/to/http_ca.crt" + // "elsCloud":true, + // "elsApiKey": "", + // "elsServiceToken": "", + // "elsCertificateFingerPrint": "", + // "elsDiscoverNodesOnStart": false, + // "elsDiscoverNodesInterval": "10s", + // "elsEnableDebugLogger": "false", + // "elsLogger": "", + // "elsCompressRequestBody": false, + // "elsCompressRequestBodyLevel": 0, + // "elsRetryOnStatus": [], + // "elsMaxRetries": 0, + // "elsDisableRetry": false, + // "elsIfPrimaryTerm": 0, + // "elsIfSeqNo": 0, + // "elsOpType": "", + // "elsPipeline": "", + // "elsRouting": "", + // "elsTimeout": "", + // "elsVersion": 0, + // "elsVersionType": "", + // "elsWaitForActiveShards": "" + }, + "fields": [ + { "tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID" }, + { "tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID" }, + { "tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR" }, + { "tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID" }, + { "tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType" }, + { "tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*vars.*tenant" }, + { "tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category" }, + { "tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account" }, + { "tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject" }, + { "tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination" }, + { "tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime" }, + { "tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime" }, + { "tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage" }, + { "tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}" }, + { "tag": "@timestamp", "path": "*exp.@timestamp", "type": "*datetime", "value": "*now" } + ] + } + ] +} + +} diff --git a/ees/elastic_it_test.go b/ees/elastic_it_test.go index c34a6b8e2..adf20af8b 100644 --- a/ees/elastic_it_test.go +++ b/ees/elastic_it_test.go @@ -22,328 +22,209 @@ along with this program. If not, see package ees import ( - "bytes" "encoding/json" - "flag" - "os/exec" - "path" - "reflect" + "fmt" + "path/filepath" + "slices" + "strconv" + "strings" + "sync" "testing" "time" "github.com/cgrates/birpc/context" + "github.com/google/go-cmp/cmp" "github.com/cgrates/birpc" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" elasticsearch "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/typedapi/core/search" + "github.com/elastic/go-elasticsearch/v8/typedapi/types" - "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" ) -var ( - elasticConfigDir string - elasticCfgPath string - elasticCfg *config.CGRConfig - elasticRpc *birpc.Client - elasticServerPath = flag.Bool("elastic", false, "Run only if the user specify it") - - sTestsElastic = []func(t *testing.T){ - testCreateDirectory, - testElasticLoadConfig, - testElasticResetDataDB, - testElasticResetStorDb, - testElasticStartEngine, - testElasticRPCConn, - testElasticStartElasticsearch, - testElasticExportEvents, - testElasticVerifyExports, - testStopCgrEngine, - testElasticCloseElasticsearch, - testCleanDirectory, - } -) - -// To run these tests first you need to install elasticsearch server locally as a daemon -// https://www.elastic.co/guide/en/elasticsearch/reference/current/deb.html -// and pass the elastic flag -func TestElasticExport(t *testing.T) { - if !*elasticServerPath { +func TestElasticsearchIT(t *testing.T) { + switch *utils.DBType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: t.SkipNow() + default: + t.Fatal("unsupported dbtype value") } - elasticConfigDir = "ees" - for _, stest := range sTestsElastic { - t.Run(elasticConfigDir, stest) + + ng := engine.TestEngine{ + ConfigPath: filepath.Join(*utils.DataDir, "conf", "samples", "ees_elastic"), + DBCfg: engine.InternalDBCfg, + // LogBuffer: &bytes.Buffer{}, } + // defer fmt.Println(ng.LogBuffer) + client, cfg := ng.Run(t) + + // Initialize separate clients for each exporter. + esClBasic := initElsClient(t, cfg, "basic") + esClFields := initElsClient(t, cfg, "fields") + + n := 2 // number of events to export + var wg sync.WaitGroup + for i := range n { + wg.Add(2) + go func() { + defer wg.Done() + exportElsEvent(t, client, "basic", i+1) + }() + go func() { + defer wg.Done() + exportElsEvent(t, client, "fields", i+1) + }() + } + wg.Wait() + verifyElsExports(t, esClBasic, "basic", n, map[string]any{ + utils.AccountField: "1001", + utils.ToR: utils.MetaData, + utils.RequestType: utils.MetaPostpaid, + }) + verifyElsExports(t, esClFields, "fields", n, map[string]any{ + utils.AccountField: "1001", + utils.Source: "test", + }) } -func testElasticLoadConfig(t *testing.T) { - var err error - elasticCfgPath = path.Join(*utils.DataDir, "conf", "samples", elasticConfigDir) - if elasticCfg, err = config.NewCGRConfigFromPath(elasticCfgPath); err != nil { - t.Error(err) - } -} - -func testElasticResetDataDB(t *testing.T) { - if err := engine.InitDataDb(elasticCfg); err != nil { +func exportElsEvent(t *testing.T, client *birpc.Client, exporterSuffix string, i int) { + t.Helper() + var reply map[string]map[string]any + if err := client.Call(context.Background(), utils.EeSv1ProcessEvent, + &engine.CGREventWithEeIDs{ + EeIDs: []string{fmt.Sprintf("els_%s", exporterSuffix)}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + utils.CGRID: fmt.Sprintf("%s%03d", exporterSuffix, i), + utils.AccountField: "1001", + utils.ToR: utils.MetaData, + utils.RequestType: utils.MetaPostpaid, + utils.Usage: i, + }, + }, + }, &reply); err != nil { t.Fatal(err) } } -func testElasticResetStorDb(t *testing.T) { - if err := engine.InitStorDb(elasticCfg); err != nil { - t.Fatal(err) +// To check via CLI: +// +// Get document count +// curl localhost:9200/cdrs_basic/_count +// +// Read all documents (default limit is 10) +// curl localhost:9200/cdrs_basic/_search +func verifyElsExports(t *testing.T, client *elasticsearch.TypedClient, exporterType string, n int, expSource map[string]any) { + t.Helper() + req := search.Request{ + Query: &types.Query{MatchAll: &types.MatchAllQuery{}}, } -} - -func testElasticStartEngine(t *testing.T) { - if _, err := engine.StopStartEngine(elasticCfgPath, *utils.WaitRater); err != nil { - t.Fatal(err) + if n > 10 && n <= 10_000 { + // Return more than the default 10 results limit if needed. + // Max limit is 10_000. + req.Size = &n } -} - -func testElasticRPCConn(t *testing.T) { - var err error - elasticRpc, err = newRPCClient(elasticCfg.ListenCfg()) + index := fmt.Sprintf("cdrs_%s", exporterType) + resp, err := client.Search(). + Index(index). + Request(&req). + Do(context.TODO()) if err != nil { t.Fatal(err) } -} + if hc := len(resp.Hits.Hits); hc != n { + t.Fatalf("len(resp.Hits.Hits)=%d, want %d", hc, n) + } + slices.SortFunc(resp.Hits.Hits, func(a, b types.Hit) int { + switch { + case *a.Id_ < *b.Id_: + return -1 + case *a.Id_ > *b.Id_: + return 1 + } + return 0 + }) + for i, hit := range resp.Hits.Hits { + wantUsage := i + 1 + wantCGRID := fmt.Sprintf("%s%03d", exporterType, wantUsage) -func testElasticStartElasticsearch(t *testing.T) { - if err := exec.Command("systemctl", "start", "elasticsearch.service").Run(); err != nil { - t.Error(err) - } - // give some time to elasticsearch server to become up - time.Sleep(5 * time.Second) -} - -func testElasticExportEvents(t *testing.T) { - eventVoice := &engine.CGREventWithEeIDs{ - EeIDs: []string{"ElasticsearchExporter"}, - CGREvent: &utils.CGREvent{ - Tenant: "cgrates.org", - ID: "voiceEvent", - Time: utils.TimePointer(time.Now()), - Event: map[string]any{ - utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), - utils.ToR: utils.MetaVoice, - utils.OriginID: "dsafdsaf", - utils.OriginHost: "192.168.1.1", - utils.RequestType: utils.MetaRated, - utils.Tenant: "cgrates.org", - utils.Category: "call", - utils.AccountField: "1001", - utils.Subject: "1001", - utils.Destination: "1002", - utils.SetupTime: time.Unix(1383813745, 0).UTC(), - utils.AnswerTime: time.Unix(1383813746, 0).UTC(), - utils.Usage: 10 * time.Second, - utils.RunID: utils.MetaDefault, - utils.Cost: 1.01, - "ExtraFields": map[string]string{"extra1": "val_extra1", - "extra2": "val_extra2", "extra3": "val_extra3"}, - }, - }, - } - - eventData := &engine.CGREventWithEeIDs{ - EeIDs: []string{"ElasticsearchExporter"}, - CGREvent: &utils.CGREvent{ - Tenant: "cgrates.org", - ID: "dataEvent", - Time: utils.TimePointer(time.Now()), - Event: map[string]any{ - utils.CGRID: utils.Sha1("abcdef", time.Unix(1383813745, 0).UTC().String()), - utils.ToR: utils.MetaData, - utils.OriginID: "abcdef", - utils.OriginHost: "192.168.1.1", - utils.RequestType: utils.MetaRated, - utils.Tenant: "AnotherTenant", - utils.Category: "call", //for data CDR use different Tenant - utils.AccountField: "1001", - utils.Subject: "1001", - utils.Destination: "1002", - utils.SetupTime: time.Unix(1383813745, 0).UTC(), - utils.AnswerTime: time.Unix(1383813746, 0).UTC(), - utils.Usage: 10 * time.Nanosecond, - utils.RunID: utils.MetaDefault, - utils.Cost: 0.012, - "ExtraFields": map[string]string{"extra1": "val_extra1", - "extra2": "val_extra2", "extra3": "val_extra3"}, - }, - }, - } - - eventSMS := &engine.CGREventWithEeIDs{ - EeIDs: []string{"ElasticsearchExporter"}, - CGREvent: &utils.CGREvent{ - Tenant: "cgrates.org", - ID: "SMSEvent", - Time: utils.TimePointer(time.Now()), - Event: map[string]any{ - utils.CGRID: utils.Sha1("sdfwer", time.Unix(1383813745, 0).UTC().String()), - utils.ToR: utils.MetaSMS, - utils.OriginID: "sdfwer", - utils.OriginHost: "192.168.1.1", - utils.RequestType: utils.MetaRated, - utils.Tenant: "cgrates.org", - utils.Category: "call", - utils.AccountField: "1001", - utils.Subject: "1001", - utils.Destination: "1002", - utils.SetupTime: time.Unix(1383813745, 0).UTC(), - utils.AnswerTime: time.Unix(1383813746, 0).UTC(), - utils.Usage: time.Duration(1), - utils.RunID: utils.MetaDefault, - utils.Cost: 0.15, - "ExtraFields": map[string]string{"extra1": "val_extra1", - "extra2": "val_extra2", "extra3": "val_extra3"}, - }, - }, - } - - eventSMSNoFields := &engine.CGREventWithEeIDs{ - EeIDs: []string{"ElasticExporterWithNoFields"}, - CGREvent: &utils.CGREvent{ - Tenant: "cgrates.org", - ID: "SMSEvent", - Time: utils.TimePointer(time.Now()), - Event: map[string]any{ - utils.CGRID: utils.Sha1("sms2", time.Unix(1383813745, 0).UTC().String()), - utils.ToR: utils.MetaSMS, - utils.Tenant: "cgrates.org", - utils.Category: "call", - utils.AccountField: "1001", - utils.Subject: "1001", - utils.Destination: "1002", - utils.RunID: utils.MetaDefault, - }, - APIOpts: map[string]any{ - "ExporterUsed": "ElasticExporterWithNoFields", - }, - }, - } - var reply map[string]utils.MapStorage - if err := elasticRpc.Call(context.Background(), utils.EeSv1ProcessEvent, eventVoice, &reply); err != nil { - t.Error(err) - } - if err := elasticRpc.Call(context.Background(), utils.EeSv1ProcessEvent, eventData, &reply); err != nil { - t.Error(err) - } - if err := elasticRpc.Call(context.Background(), utils.EeSv1ProcessEvent, eventSMS, &reply); err != nil { - t.Error(err) - } - if err := elasticRpc.Call(context.Background(), utils.EeSv1ProcessEvent, eventSMSNoFields, &reply); err != nil { - t.Error(err) - } -} - -func testElasticVerifyExports(t *testing.T) { - es, err := elasticsearch.NewDefaultClient() - if err != nil { - t.Error(err) - } - var r map[string]any - var buf bytes.Buffer - query := map[string]any{ - "query": map[string]any{ - "match": map[string]any{ - utils.Tenant: "cgrates.org", - }, - }, - } - if err := json.NewEncoder(&buf).Encode(query); err != nil { - t.Error(err) - } - // Perform the search request. - res, err := es.Search( - es.Search.WithContext(context.Background()), - es.Search.WithIndex("cdrs"), - es.Search.WithBody(&buf), - es.Search.WithTrackTotalHits(true), - es.Search.WithPretty(), - ) - if err != nil { - t.Error(err) - } - defer res.Body.Close() - - if res.IsError() { - var e map[string]any - if err := json.NewDecoder(res.Body).Decode(&e); err != nil { - t.Error(err) + if strings.HasPrefix(*hit.Id_, "basic") { + expSource[utils.Usage] = float64(wantUsage) } else { - t.Errorf("%+v", e) + expSource[utils.Usage] = strconv.Itoa(wantUsage) + } + expSource[utils.CGRID] = wantCGRID + wantDocID := wantCGRID + ":*default" + if *hit.Id_ != wantDocID { + t.Errorf("hit.Id_ = %s, want %s", *hit.Id_, wantDocID) + } + var got map[string]any + if err := json.Unmarshal(hit.Source_, &got); err != nil { + t.Error(err) } - } - if err := json.NewDecoder(res.Body).Decode(&r); err != nil { - t.Error(err) - } - for _, hit := range r["hits"].(map[string]any)["hits"].([]any) { - switch hit.(map[string]any)["_id"] { - case "2478e9f18ebcd3c684f3c14596b8bfeab2b0d6d4:*default": - eMp := map[string]any{ - utils.AccountField: "1001", - utils.AnswerTime: "2013-11-07T08:42:26Z", - utils.CGRID: "2478e9f18ebcd3c684f3c14596b8bfeab2b0d6d4", - utils.Category: "call", - utils.Cost: "0.15", - utils.Destination: "1002", - utils.OriginID: "sdfwer", - utils.RequestType: "*rated", - utils.RunID: "*default", - utils.SetupTime: "2013-11-07T08:42:25Z", - utils.Subject: "1001", - utils.Tenant: "cgrates.org", - utils.ToR: "*sms", - utils.Usage: "1", + if strings.HasPrefix(*hit.Id_, "fields") { + // Check if @timestamp field exists and has the correct format. + // No need to test the exact value. + timestamp, has := got["@timestamp"] + if !has { + t.Fatalf("timestamp missing in document with ID %s", *hit.Id_) } - if !reflect.DeepEqual(eMp, hit.(map[string]any)["_source"]) { - t.Errorf("Expected %+v, received: %+v", eMp, hit.(map[string]any)["_source"]) - } - case "dbafe9c8614c785a65aabd116dd3959c3c56f7f6:*default": - eMp := map[string]any{ - utils.AccountField: "1001", - utils.AnswerTime: "2013-11-07T08:42:26Z", - utils.CGRID: "dbafe9c8614c785a65aabd116dd3959c3c56f7f6", - utils.Category: "call", - utils.Cost: "1.01", - utils.Destination: "1002", - utils.OriginID: "dsafdsaf", - utils.RequestType: "*rated", - utils.RunID: "*default", - utils.SetupTime: "2013-11-07T08:42:25Z", - utils.Subject: "1001", - utils.Tenant: "cgrates.org", - utils.ToR: "*voice", - utils.Usage: "10000000000", - } - if !reflect.DeepEqual(eMp, hit.(map[string]any)["_source"]) { - t.Errorf("Expected %+v, received: %+v", eMp, hit.(map[string]any)["_source"]) - } - case utils.Sha1("sms2", time.Unix(1383813745, 0).UTC().String()) + ":*default": - eMp := map[string]any{ - utils.CGRID: utils.Sha1("sms2", time.Unix(1383813745, 0).UTC().String()), - utils.ToR: utils.MetaSMS, - utils.Tenant: "cgrates.org", - utils.Category: "call", - utils.AccountField: "1001", - utils.Subject: "1001", - utils.Destination: "1002", - utils.RunID: utils.MetaDefault, - } - if !reflect.DeepEqual(eMp, hit.(map[string]any)["_source"]) { - t.Errorf("Expected %+v, received: %+v", eMp, hit.(map[string]any)["_source"]) + if _, err := time.Parse(time.RFC3339, utils.IfaceAsString(timestamp)); err != nil { + t.Fatalf("failed to parse @timestamp field in document with ID %s", *hit.Id_) } + expSource["@timestamp"] = timestamp + } + + if diff := cmp.Diff(expSource, got); diff != "" { + t.Errorf("SearchAll(index=%q) returned unexpected result (-want +got): \n%s", index, diff) } } } -func testElasticCloseElasticsearch(t *testing.T) { - if err := exec.Command("systemctl", "stop", "elasticsearch.service").Run(); err != nil { - t.Error(err) +func initElsClient(t *testing.T, cfg *config.CGRConfig, exporterType string) *elasticsearch.TypedClient { + eeCfg := cfg.EEsCfg().ExporterCfg(fmt.Sprintf("els_%s", exporterType)) + tmp := &ElasticEE{ + cfg: eeCfg, } + if err := tmp.prepareOpts(); err != nil { + t.Fatal(err) + } + client, err := elasticsearch.NewTypedClient(tmp.clientCfg) + if err != nil { + t.Fatal(err) + } + + // info, err := client.Info().Do(context.TODO()) + // if err != nil { + // t.Fatal(err) + // } + // fmt.Println(utils.ToJSON(info)) + + // Ensure index is removed at the end. No need to create beforehand, as + // it gets created automatically. + if eeCfg.Opts.Els.Index == nil { + t.Fatal("elsIndex opt cannot be nil") + } + index := *eeCfg.Opts.Els.Index + + // resp, err := client.Indices.Create(index).Do(context.TODO()) + // if err != nil { + // t.Fatal(err) + // } + // fmt.Println(utils.ToJSON(resp)) + + t.Cleanup(func() { + resp, err := client.Indices.Delete(index).Do(context.TODO()) + if err != nil || !resp.Acknowledged { + t.Errorf("failed to delete index %s: %v", index, err) + } + + }) + return client }