From 8cc0af4981315c20c82ddfc36db37e2a514f2eda Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 3 Sep 2020 14:57:11 +0300 Subject: [PATCH] Add integration test for ExportCDRs with multiple exporters --- apier/v1/apier.go | 16 ++- apier/v1/ees_it_test.go | 63 ++++++++++-- data/conf/samples/ees_internal/cgrates.json | 54 +++++++++- data/conf/samples/ees_mongo/cgrates.json | 54 +++++++++- data/conf/samples/ees_mysql/cgrates.json | 54 +++++++++- ees/ees.go | 108 +++++++++++++++++++- ees/filecsv.go | 2 +- ees/filefwv.go | 2 +- ees/httpjsonmap.go | 2 +- ees/httppost.go | 2 +- ees/virtualee.go | 2 +- 11 files changed, 330 insertions(+), 29 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 7cf525e7d..6e731a612 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -29,6 +29,8 @@ import ( "strings" "time" + "github.com/cgrates/cgrates/ees" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/guardian" @@ -1841,7 +1843,7 @@ func (apierSv1 *APIerSv1) ExportToFolder(arg *utils.ArgExportToFolder, reply *st return nil } -func (apierSv1 *APIerSv1) ExportCDRs(args *utils.ArgExportCDRs, reply *map[string]interface{}) error { +func (apierSv1 *APIerSv1) ExportCDRs(args *utils.ArgExportCDRs, reply *map[string]interface{}) (err error) { if len(apierSv1.Config.ApierCfg().EEsConns) == 0 { return utils.NewErrNotConnected(utils.EEs) } @@ -1881,15 +1883,11 @@ func (apierSv1 *APIerSv1) ExportCDRs(args *utils.ArgExportCDRs, reply *map[strin // we consider only the last reply because it should have the metrics updated if !args.Verbose { (*reply)[utils.ExporterIDs] = make([]string, 0, len(rplyCdr)) - } - for exporterID, metrics := range rplyCdr { - if !args.Verbose { + for exporterID := range rplyCdr { (*reply)[utils.ExporterIDs] = append((*reply)[utils.ExporterIDs].([]string), exporterID) - } else { - for k, v := range metrics { - (*reply)[k] = v - } } + } else if *reply, err = ees.MergeEEMetrics(rplyCdr); err != nil { + return } - return nil + return } diff --git a/apier/v1/ees_it_test.go b/apier/v1/ees_it_test.go index 69a47cd67..8ca5fb29b 100644 --- a/apier/v1/ees_it_test.go +++ b/apier/v1/ees_it_test.go @@ -48,6 +48,8 @@ var ( testEEsAddCDRs, testEEsExportCDRs, testEEsVerifyExports, + testEEsExportCDRsMultipleExporters, + testEEsVerifyExportsMultipleExporters, testEEsKillEngine, testEEsCleanFolder, } @@ -73,7 +75,7 @@ func TestExportCDRs(t *testing.T) { } func testEEsPrepareFolder(t *testing.T) { - for _, dir := range []string{"/tmp/testCSV"} { + for _, dir := range []string{"/tmp/testCSV", "/tmp/testCSV2", "/tmp/testCSV3"} { if err := os.RemoveAll(dir); err != nil { t.Fatal("Error removing folder: ", dir, err) } @@ -190,7 +192,7 @@ func testEEsExportCDRs(t *testing.T) { if err := eeSRPC.Call(utils.APIerSv1ExportCDRs, &attr, &rply); err != nil { t.Error("Unexpected error: ", err.Error()) } - time.Sleep(time.Second) + time.Sleep(2 * time.Second) if rply["FirstExpOrderID"] != 1.0 { t.Errorf("Expected %+v, received: %+v", 1.0, rply["FirstExpOrderID"]) } else if rply["LastExpOrderID"] != 4.0 { @@ -216,10 +218,57 @@ func testEEsVerifyExports(t *testing.T) { if len(files) != 1 { t.Errorf("Expected %+v, received: %+v", 1, len(files)) } - eCnt := "Cdr1,*raw,*voice,OriginCDR1,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T18:03:10+03:00,2018-10-04T18:03:10+03:00,10000000000,1.01\n" + - "Cdr2,*raw,*voice,OriginCDR2,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T18:03:10+03:00,2018-10-04T18:03:10+03:00,5000000000,1.01\n" + - "Cdr3,*raw,*voice,OriginCDR3,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T18:03:10+03:00,2018-10-04T18:03:10+03:00,30000000000,1.01\n" + - "Cdr4,*raw,*voice,OriginCDR4,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T18:03:10+03:00,2018-10-04T18:03:10+03:00,0,1.01\n" + eCnt := "Cdr1,*raw,*voice,OriginCDR1,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T15:03:10Z,2018-10-04T15:03:10Z,10000000000,1.01\n" + + "Cdr2,*raw,*voice,OriginCDR2,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T15:03:10Z,2018-10-04T15:03:10Z,5000000000,1.01\n" + + "Cdr3,*raw,*voice,OriginCDR3,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T15:03:10Z,2018-10-04T15:03:10Z,30000000000,1.01\n" + + "Cdr4,*raw,*voice,OriginCDR4,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T15:03:10Z,2018-10-04T15:03:10Z,0,1.01\n" + if outContent1, err := ioutil.ReadFile(files[0]); err != nil { + t.Error(err) + } else if len(eCnt) != len(string(outContent1)) { + t.Errorf("Expecting: \n<%+v>, \nreceived: \n<%+v>", len(eCnt), len(string(outContent1))) + t.Errorf("Expecting: \n<%q>, \nreceived: \n<%q>", eCnt, string(outContent1)) + } +} + +func testEEsExportCDRsMultipleExporters(t *testing.T) { + attr := &utils.ArgExportCDRs{ + ExporterIDs: []string{"CSVExporter", "CSVExporter2"}, + Verbose: true, + } + var rply map[string]interface{} + if err := eeSRPC.Call(utils.APIerSv1ExportCDRs, &attr, &rply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } + time.Sleep(2 * time.Second) + if rply["FirstExpOrderID"] != 1.0 { + t.Errorf("Expected %+v, received: %+v", 1.0, rply["FirstExpOrderID"]) + } else if rply["LastExpOrderID"] != 4.0 { + t.Errorf("Expected %+v, received: %+v", 4.0, rply["LastExpOrderID"]) + } else if rply["NumberOfEvents"] != 8.0 { + t.Errorf("Expected %+v, received: %+v", 8.0, rply["NumberOfEvents"]) + } else if rply["TotalCost"] != 8.08 { + t.Errorf("Expected %+v, received: %+v", 8.08, rply["TotalCost"]) + } +} + +func testEEsVerifyExportsMultipleExporters(t *testing.T) { + var files []string + err := filepath.Walk("/tmp/testCSV2/", func(path string, info os.FileInfo, err error) error { + if strings.HasSuffix(path, utils.CSVSuffix) { + files = append(files, path) + } + return nil + }) + if err != nil { + t.Error(err) + } + if len(files) != 1 { + t.Errorf("Expected %+v, received: %+v", 1, len(files)) + } + eCnt := "Cdr1,*raw,*voice,OriginCDR1,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T15:03:10Z,2018-10-04T15:03:10Z,10000000000,1.01\n" + + "Cdr2,*raw,*voice,OriginCDR2,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T15:03:10Z,2018-10-04T15:03:10Z,5000000000,1.01\n" + + "Cdr3,*raw,*voice,OriginCDR3,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T15:03:10Z,2018-10-04T15:03:10Z,30000000000,1.01\n" + + "Cdr4,*raw,*voice,OriginCDR4,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T15:03:10Z,2018-10-04T15:03:10Z,0,1.01\n" if outContent1, err := ioutil.ReadFile(files[0]); err != nil { t.Error(err) } else if len(eCnt) != len(string(outContent1)) { @@ -234,7 +283,7 @@ func testEEsKillEngine(t *testing.T) { } func testEEsCleanFolder(t *testing.T) { - for _, dir := range []string{"/tmp/testCSV"} { + for _, dir := range []string{"/tmp/testCSV", "/tmp/testCSV2", "/tmp/testCSV3"} { if err := os.RemoveAll(dir); err != nil { t.Fatal("Error removing folder: ", dir, err) } diff --git a/data/conf/samples/ees_internal/cgrates.json b/data/conf/samples/ees_internal/cgrates.json index a6787035f..358c02ae8 100644 --- a/data/conf/samples/ees_internal/cgrates.json +++ b/data/conf/samples/ees_internal/cgrates.json @@ -65,8 +65,6 @@ "type": "*file_csv", "export_path": "/tmp/testCSV", "tenant": "cgrates.org", - "flags": ["*attributes"], - "attribute_context": "customContext", "synchronous": true, "attempts": 1, "field_separator": ",", @@ -87,6 +85,58 @@ {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, ], }, + { + "id": "CSVExporter2", + "type": "*file_csv", + "export_path": "/tmp/testCSV2", + "tenant": "cgrates.org", + "synchronous": true, + "attempts": 1, + "field_separator": ",", + "filters": ["*string:~*req.RunID:*raw"], + "fields":[ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime{*time_string}" }, + {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime{*time_string}"}, + {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, + ], + }, + { + "id": "CSVExporter3", + "type": "*file_csv", + "export_path": "/tmp/testCSV3", + "tenant": "cgrates.org", + "synchronous": false, + "attempts": 1, + "field_separator": ",", + "filters": ["*string:~*req.CGRID:Cdr1;Cdr2"], + "fields":[ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime{*time_string}" }, + {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime{*time_string}"}, + {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, + ], + } ] }, diff --git a/data/conf/samples/ees_mongo/cgrates.json b/data/conf/samples/ees_mongo/cgrates.json index 33b7b49e4..784e156c5 100644 --- a/data/conf/samples/ees_mongo/cgrates.json +++ b/data/conf/samples/ees_mongo/cgrates.json @@ -69,8 +69,6 @@ "type": "*file_csv", "export_path": "/tmp/testCSV", "tenant": "cgrates.org", - "flags": ["*attributes"], - "attribute_context": "customContext", "synchronous": true, "attempts": 1, "field_separator": ",", @@ -91,6 +89,58 @@ {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, ], }, + { + "id": "CSVExporter2", + "type": "*file_csv", + "export_path": "/tmp/testCSV2", + "tenant": "cgrates.org", + "synchronous": true, + "attempts": 1, + "field_separator": ",", + "filters": ["*string:~*req.RunID:*raw"], + "fields":[ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime{*time_string}" }, + {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime{*time_string}"}, + {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, + ], + }, + { + "id": "CSVExporter3", + "type": "*file_csv", + "export_path": "/tmp/testCSV3", + "tenant": "cgrates.org", + "synchronous": false, + "attempts": 1, + "field_separator": ",", + "filters": ["*string:~*req.CGRID:Cdr1;Cdr2"], + "fields":[ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime{*time_string}" }, + {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime{*time_string}"}, + {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, + ], + } ] }, diff --git a/data/conf/samples/ees_mysql/cgrates.json b/data/conf/samples/ees_mysql/cgrates.json index f6a228f47..f254a1fd5 100644 --- a/data/conf/samples/ees_mysql/cgrates.json +++ b/data/conf/samples/ees_mysql/cgrates.json @@ -67,8 +67,6 @@ "type": "*file_csv", "export_path": "/tmp/testCSV", "tenant": "cgrates.org", - "flags": ["*attributes"], - "attribute_context": "customContext", "synchronous": true, "attempts": 1, "field_separator": ",", @@ -89,6 +87,58 @@ {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, ], }, + { + "id": "CSVExporter2", + "type": "*file_csv", + "export_path": "/tmp/testCSV2", + "tenant": "cgrates.org", + "synchronous": true, + "attempts": 1, + "field_separator": ",", + "filters": ["*string:~*req.RunID:*raw"], + "fields":[ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime{*time_string}" }, + {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime{*time_string}"}, + {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, + ], + }, + { + "id": "CSVExporter3", + "type": "*file_csv", + "export_path": "/tmp/testCSV3", + "tenant": "cgrates.org", + "synchronous": false, + "attempts": 1, + "field_separator": ",", + "filters": ["*string:~*req.CGRID:Cdr1;Cdr2"], + "fields":[ + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime{*time_string}" }, + {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime{*time_string}"}, + {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, + ], + } ] }, diff --git a/ees/ees.go b/ees/ees.go index 16ac5d911..97f064bc6 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -227,7 +227,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma if evict { ee.OnEvicted("", nil) // so we can close ie the file } - if hasVerbose && eeCfg.Synchronous { + if hasVerbose && sync { metricMapLock.Lock() metricsMap[ee.ID()] = ee.GetMetrics() metricMapLock.Unlock() @@ -254,7 +254,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma func newEEMetrics() utils.MapStorage { return utils.MapStorage{ - utils.NumberOfEvents: 0, + utils.NumberOfEvents: int64(0), utils.PositiveExports: utils.StringSet{}, utils.NegativeExports: utils.StringSet{}, utils.TimeNow: time.Now(), @@ -330,3 +330,107 @@ func updateEEMetrics(dc utils.MapStorage, ev engine.MapEvent, timezone string) { } } } + +func MergeEEMetrics(dc map[string]map[string]interface{}) (reply map[string]interface{}, err error) { + positiveExp := make(utils.StringSet) + negativeExp := make(utils.StringSet) + reply = map[string]interface{}{ + utils.PositiveExports: positiveExp, + utils.NegativeExports: negativeExp, + } + for _, metrics := range dc { + for k, v := range metrics { + switch k { + case utils.NumberOfEvents: + val, err := utils.IfaceAsTInt64(v) + if err != nil { + return nil, err + } + if _, has := reply[k]; !has { + reply[k] = val + } else { + reply[k] = reply[k].(int64) + val + } + case utils.PositiveExports: + val, canCast := v.(map[string]interface{}) + if !canCast { + return nil, fmt.Errorf("cannot cast to map[string]interface{} %+v for positive exports", v) + } + for pos := range val { + positiveExp.Add(pos) + } + case utils.NegativeExports: + val, canCast := v.(map[string]interface{}) + if !canCast { + return nil, fmt.Errorf("cannot cast to map[string]interface{} %+v for negative exports", v) + } + for neg := range val { + negativeExp.Add(neg) + } + case utils.FirstEventATime: + val, err := utils.IfaceAsTime(v, config.CgrConfig().GeneralCfg().DefaultTimezone) + if err != nil { + return nil, err + } + if _, has := reply[k]; !has { + reply[k] = val + } else if val.Before(reply[k].(time.Time)) { + reply[k] = val + } + case utils.LastEventATime: + val, err := utils.IfaceAsTime(v, config.CgrConfig().GeneralCfg().DefaultTimezone) + if err != nil { + return nil, err + } + if _, has := reply[k]; !has { + reply[k] = val + } else if val.After(reply[k].(time.Time)) { + reply[k] = val + } + case utils.FirstExpOrderID: + val, err := utils.IfaceAsTInt64(v) + if err != nil { + return nil, err + } + if _, has := reply[k]; !has { + reply[k] = val + } else if reply[k].(int64) > val { + reply[k] = val + } + case utils.LastExpOrderID: + val, err := utils.IfaceAsTInt64(v) + if err != nil { + return nil, err + } + if _, has := reply[k]; !has { + reply[k] = val + } else if reply[k].(int64) < val { + reply[k] = val + } + case utils.TotalCost: + val, err := utils.IfaceAsFloat64(v) + if err != nil { + return nil, err + } + if _, has := reply[k]; !has { + reply[k] = val + } else { + reply[k] = reply[k].(float64) + val + } + case utils.TotalDuration, utils.TotalSMSUsage, + utils.TotalMMSUsage, utils.TotalGenericUsage, + utils.TotalDataUsage: + val, err := utils.IfaceAsDuration(v) + if err != nil { + return nil, err + } + if _, has := reply[k]; !has { + reply[k] = val + } else { + reply[k] = reply[k].(time.Duration) + val + } + } + } + } + return +} diff --git a/ees/filecsv.go b/ees/filecsv.go index e18f470f5..818aedd2f 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -98,7 +98,7 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { } fCsv.Unlock() }() - fCsv.dc[utils.NumberOfEvents] = fCsv.dc[utils.NumberOfEvents].(int) + 1 + fCsv.dc[utils.NumberOfEvents] = fCsv.dc[utils.NumberOfEvents].(int64) + 1 var csvRecord []string req := utils.MapStorage{} diff --git a/ees/filefwv.go b/ees/filefwv.go index 0e776a278..af917b769 100644 --- a/ees/filefwv.go +++ b/ees/filefwv.go @@ -89,7 +89,7 @@ func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { } fFwv.Unlock() }() - fFwv.dc[utils.NumberOfEvents] = fFwv.dc[utils.NumberOfEvents].(int) + 1 + fFwv.dc[utils.NumberOfEvents] = fFwv.dc[utils.NumberOfEvents].(int64) + 1 var records []string req := utils.MapStorage{} for k, v := range cgrEv.Event { diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index 80f6c485b..ba7d6038b 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -80,7 +80,7 @@ func (httpJson *HTTPJsonMapEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { httpJson.Unlock() }() - httpJson.dc[utils.NumberOfEvents] = httpJson.dc[utils.NumberOfEvents].(int) + 1 + httpJson.dc[utils.NumberOfEvents] = httpJson.dc[utils.NumberOfEvents].(int64) + 1 valMp := make(map[string]string) eeReq := NewEventExporterRequest(utils.MapStorage(cgrEv.Event), httpJson.dc, diff --git a/ees/httppost.go b/ees/httppost.go index d505da5ed..ab5ade3e9 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -72,7 +72,7 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) { } httpPost.Unlock() }() - httpPost.dc[utils.NumberOfEvents] = httpPost.dc[utils.NumberOfEvents].(int) + 1 + httpPost.dc[utils.NumberOfEvents] = httpPost.dc[utils.NumberOfEvents].(int64) + 1 var body interface{} urlVals := url.Values{} diff --git a/ees/virtualee.go b/ees/virtualee.go index 82d797e71..d4d5b22d8 100644 --- a/ees/virtualee.go +++ b/ees/virtualee.go @@ -71,7 +71,7 @@ func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { } vEe.Unlock() }() - vEe.dc[utils.NumberOfEvents] = vEe.dc[utils.NumberOfEvents].(int) + 1 + vEe.dc[utils.NumberOfEvents] = vEe.dc[utils.NumberOfEvents].(int64) + 1 req := utils.MapStorage{} for k, v := range cgrEv.Event {