diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 17d180bb3..01acd915e 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1841,7 +1841,7 @@ func (apierSv1 *APIerSv1) ExportToFolder(arg *utils.ArgExportToFolder, reply *st return nil } -func (apierSv1 *APIerSv1) ExportCDRs(args *utils.ArgExportCDRs, reply *string) error { +func (apierSv1 *APIerSv1) ExportCDRs(args *utils.ArgExportCDRs, reply *map[string]utils.MapStorage) error { if len(apierSv1.Config.ApierCfg().EEsConns) == 0 { return utils.NewErrNotConnected(utils.EEs) } diff --git a/apier/v1/ees.go b/apier/v1/ees.go index 3242eb175..e3b26b11d 100644 --- a/apier/v1/ees.go +++ b/apier/v1/ees.go @@ -38,6 +38,6 @@ func (eSv1 *EventExporterSv1) Ping(ign *utils.CGREventWithOpts, reply *string) e // ProcessEvent triggers exports on EEs side func (eSv1 *EventExporterSv1) ProcessEvent(args *utils.CGREventWithIDs, - reply *string) error { + reply *map[string]utils.MapStorage) error { return eSv1.eeS.V1ProcessEvent(args, reply) } diff --git a/apier/v1/ees_it_test.go b/apier/v1/ees_it_test.go index 49988d5d4..db7423183 100644 --- a/apier/v1/ees_it_test.go +++ b/apier/v1/ees_it_test.go @@ -143,30 +143,30 @@ func testEEsAddCDRs(t *testing.T) { storedCdrs := []*engine.CDR{ {CGRID: "Cdr1", OrderID: 1, ToR: utils.VOICE, OriginID: "OriginCDR1", OriginHost: "192.168.1.1", Source: "test", - RequestType: utils.META_RATED, Tenant: "cgrates.org", - Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(), - AnswerTime: time.Now(), RunID: utils.MetaDefault, Usage: time.Duration(10) * time.Second, + RequestType: utils.META_NONE, Tenant: "cgrates.org", + Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2018, 10, 4, 15, 3, 10, 0, time.UTC), + AnswerTime: time.Date(2018, 10, 4, 15, 3, 10, 0, time.UTC), RunID: utils.MetaDefault, Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, }, {CGRID: "Cdr2", OrderID: 2, ToR: utils.VOICE, OriginID: "OriginCDR2", OriginHost: "192.168.1.1", Source: "test2", - RequestType: utils.META_RATED, Tenant: "cgrates.org", Category: "call", - Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(), - AnswerTime: time.Now(), RunID: utils.MetaDefault, Usage: time.Duration(5) * time.Second, + RequestType: utils.META_NONE, Tenant: "cgrates.org", Category: "call", + Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2018, 10, 4, 15, 3, 10, 0, time.UTC), + AnswerTime: time.Date(2018, 10, 4, 15, 3, 10, 0, time.UTC), RunID: utils.MetaDefault, Usage: time.Duration(5) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, }, {CGRID: "Cdr3", OrderID: 3, ToR: utils.VOICE, OriginID: "OriginCDR3", OriginHost: "192.168.1.1", Source: "test2", - RequestType: utils.META_RATED, Tenant: "cgrates.org", Category: "call", - Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(), - AnswerTime: time.Now(), RunID: utils.MetaDefault, Usage: time.Duration(30) * time.Second, + RequestType: utils.META_NONE, Tenant: "cgrates.org", Category: "call", + Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2018, 10, 4, 15, 3, 10, 0, time.UTC), + AnswerTime: time.Date(2018, 10, 4, 15, 3, 10, 0, time.UTC), RunID: utils.MetaDefault, Usage: time.Duration(30) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, }, {CGRID: "Cdr4", OrderID: 4, ToR: utils.VOICE, OriginID: "OriginCDR4", OriginHost: "192.168.1.1", Source: "test3", - RequestType: utils.META_RATED, Tenant: "cgrates.org", Category: "call", - Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(), - AnswerTime: time.Time{}, RunID: utils.MetaDefault, Usage: time.Duration(0) * time.Second, + RequestType: utils.META_NONE, Tenant: "cgrates.org", Category: "call", + Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2018, 10, 4, 15, 3, 10, 0, time.UTC), + AnswerTime: time.Date(2018, 10, 4, 15, 3, 10, 0, time.UTC), RunID: utils.MetaDefault, Usage: time.Duration(0) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, }, } @@ -185,11 +185,20 @@ func testEEsExportCDRs(t *testing.T) { attr := &utils.ArgExportCDRs{ ExporterIDs: []string{"CSVExporter"}, } - var rply string + var rply map[string]utils.MapStorage if err := eeSRPC.Call(utils.APIerSv1ExportCDRs, &attr, &rply); err != nil { t.Error("Unexpected error: ", err.Error()) } - time.Sleep(1 * time.Second) + time.Sleep(time.Second) + if rply["CSVExporter"]["FirstExpOrderID"] != 1.0 { + t.Errorf("Expected %+v, received: %+v", 1.0, rply["CSVExporter"]["FirstExpOrderID"]) + } else if rply["CSVExporter"]["LastExpOrderID"] != 4.0 { + t.Errorf("Expected %+v, received: %+v", 4.0, rply["CSVExporter"]["LastExpOrderID"]) + } else if rply["CSVExporter"]["NumberOfEvents"] != 4.0 { + t.Errorf("Expected %+v, received: %+v", 4.0, rply["CSVExporter"]["NumberOfEvents"]) + } else if rply["CSVExporter"]["TotalCost"] != 4.04 { + t.Errorf("Expected %+v, received: %+v", 4.04, rply["CSVExporter"]["TotalCost"]) + } } func testEEsVerifyExports(t *testing.T) { @@ -206,10 +215,10 @@ func testEEsVerifyExports(t *testing.T) { if len(files) != 1 { t.Errorf("Expected %+v, received: %+v", 1, len(files)) } - eCnt := "Cdr3,*raw,*voice,OriginCDR3,*rated,cgrates.org,call,1001,1001,+4986517174963,2020-08-30T14:40:32+03:00,2020-08-30T14:40:32+03:00,30s,-1\n" + - "Cdr4,*raw,*voice,OriginCDR4,*rated,cgrates.org,call,1001,1001,+4986517174963,2020-08-30T14:40:32+03:00,0001-01-01T00:00:00Z,0s,0\n" + - "Cdr1,*raw,*voice,OriginCDR1,*rated,cgrates.org,call,1001,1001,+4986517174963,2020-08-30T14:40:32+03:00,2020-08-30T14:40:32+03:00,10s,-1\n" + - "Cdr2,*raw,*voice,OriginCDR2,*rated,cgrates.org,call,1001,1001,+4986517174963,2020-08-30T14:40:32+03:00,2020-08-30T14:40:32+03:00,5s,-1\n" + eCnt := "Cdr1,*raw,*voice,OriginCDR1,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T18:03:10+03:00,2018-10-04T18:03:10+03:00,10000000000,1.01\n" + + "Cdr2,*raw,*voice,OriginCDR2,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T18:03:10+03:00,2018-10-04T18:03:10+03:00,5000000000,1.01\n" + + "Cdr3,*raw,*voice,OriginCDR3,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T18:03:10+03:00,2018-10-04T18:03:10+03:00,30000000000,1.01\n" + + "Cdr4,*raw,*voice,OriginCDR4,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T18:03:10+03:00,2018-10-04T18:03:10+03:00,0,1.01\n" if outContent1, err := ioutil.ReadFile(files[0]); err != nil { t.Error(err) } else if len(eCnt) != len(string(outContent1)) { diff --git a/data/conf/samples/ees_internal/cgrates.json b/data/conf/samples/ees_internal/cgrates.json index 6f3b65bc2..d56c77617 100644 --- a/data/conf/samples/ees_internal/cgrates.json +++ b/data/conf/samples/ees_internal/cgrates.json @@ -67,6 +67,7 @@ "tenant": "cgrates.org", "flags": ["*attributes"], "attribute_context": "customContext", + "synchronous": true, "attempts": 1, "field_separator": ",", "fields":[ @@ -93,7 +94,7 @@ "apiers": { "enabled": true, "scheduler_conns": ["*internal"], - "ees_conns": ["*internal"], + "ees_conns": ["*localhost"], }, diff --git a/data/conf/samples/ees_mongo/cgrates.json b/data/conf/samples/ees_mongo/cgrates.json index 80af20a80..cf5e5b9cf 100644 --- a/data/conf/samples/ees_mongo/cgrates.json +++ b/data/conf/samples/ees_mongo/cgrates.json @@ -71,6 +71,7 @@ "tenant": "cgrates.org", "flags": ["*attributes"], "attribute_context": "customContext", + "synchronous": true, "attempts": 1, "field_separator": ",", "fields":[ @@ -97,7 +98,7 @@ "apiers": { "enabled": true, "scheduler_conns": ["*internal"], - "ees_conns": ["*internal"], + "ees_conns": ["*localhost"], }, diff --git a/data/conf/samples/ees_mysql/cgrates.json b/data/conf/samples/ees_mysql/cgrates.json index afcf6d6ff..0b8ccac13 100644 --- a/data/conf/samples/ees_mysql/cgrates.json +++ b/data/conf/samples/ees_mysql/cgrates.json @@ -69,6 +69,7 @@ "tenant": "cgrates.org", "flags": ["*attributes"], "attribute_context": "customContext", + "synchronous": true, "attempts": 1, "field_separator": ",", "fields":[ @@ -95,7 +96,7 @@ "apiers": { "enabled": true, "scheduler_conns": ["*internal"], - "ees_conns": ["*internal"], + "ees_conns": ["*localhost"], }, diff --git a/ees/ees.go b/ees/ees.go index 1ff5e89bd..04a6f4734 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -153,6 +153,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma var withErr bool var metricMapLock sync.RWMutex metricsMap := make(map[string]utils.MapStorage) + isVerbose := cgrEv.HasField(utils.EEsVerbose) for cfgIdx, eeCfg := range eeS.cfg.EEsNoLksCfg().Exporters { if eeCfg.Type == utils.META_NONE || // ignore *none type exporter (lenExpIDs != 0 && !expIDs.Has(eeCfg.ID)) { @@ -206,7 +207,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma if eeCfg.Synchronous { wg.Add(1) // wait for synchronous or file ones since these need to be done before continuing } - go func(evict, sync bool, ee EventExporter) { + go func(evict, sync bool, ee EventExporter, eeCfg *config.EventExporterCfg) { if err := ee.ExportEvent(cgrEv.CGREvent); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> with id <%s>, error: <%s>", @@ -219,14 +220,21 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma metricMapLock.Lock() metricsMap[ee.ID()] = ee.GetMetrics() metricMapLock.Unlock() + if isVerbose && !eeCfg.Synchronous { + utils.Logger.Warning( + fmt.Sprintf("<%s> with id <%s>, running verbosed export with syncronous false", + utils.EventExporterS, ee.ID())) + withErr = true + } if sync { wg.Done() } - }(!hasCache, eeCfg.Synchronous, ee) + }(!hasCache, eeCfg.Synchronous, ee, eeCfg) } wg.Wait() if withErr { err = utils.ErrPartiallyExecuted + return } *rply = make(map[string]utils.MapStorage) @@ -235,7 +243,6 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma (*rply)[k] = v } metricMapLock.Unlock() - return } diff --git a/ees/filefwv.go b/ees/filefwv.go index df056e71d..95ed89a3e 100644 --- a/ees/filefwv.go +++ b/ees/filefwv.go @@ -168,3 +168,7 @@ func (fFwv *FileFWVee) composeTrailer() (err error) { } return } + +func (fFwv *FileFWVee) GetMetrics() utils.MapStorage { + return fFwv.dc.Clone() +} diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index 77f8ebd85..55670064c 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -141,3 +141,7 @@ func (httpJson *HTTPJsonMapEe) post(body []byte, key string) (err error) { } return } + +func (httpJson *HTTPJsonMapEe) GetMetrics() utils.MapStorage { + return httpJson.dc.Clone() +} diff --git a/ees/httppost.go b/ees/httppost.go index f6574b7c3..7e4f3bde4 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -109,3 +109,7 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) { } return } + +func (httpPost *HTTPPost) GetMetrics() utils.MapStorage { + return httpPost.dc.Clone() +} diff --git a/ees/virtualee.go b/ees/virtualee.go index c4c64e445..eaa4fe9e6 100644 --- a/ees/virtualee.go +++ b/ees/virtualee.go @@ -85,3 +85,7 @@ func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { updateEEMetrics(vEe.dc, cgrEv.Event, vEe.cgrCfg.GeneralCfg().DefaultTimezone) return } + +func (vEe *VirtualEe) GetMetrics() utils.MapStorage { + return vEe.dc.Clone() +} diff --git a/utils/consts.go b/utils/consts.go index cce5571b1..763459f8f 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -856,6 +856,7 @@ const ( RouteID = "RouteID" MetaMonthlyEstimated = "*monthly_estimated" ProcessRuns = "ProcessRuns" + EEsVerbose = "*eesVerbose" ) // Migrator Action