From 8340e65ebd5e0ba74571ec31584f05f9483287c0 Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 2 Sep 2020 17:08:18 +0300 Subject: [PATCH] Update EventExporter ProcessEvent to include metrics informations --- apier/v1/apier.go | 31 ++++++++++++++++---- apier/v1/ees.go | 2 +- apier/v1/ees_it_test.go | 19 ++++++------ data/conf/samples/ees_internal/cgrates.json | 2 +- data/conf/samples/ees_mongo/cgrates.json | 2 +- data/conf/samples/ees_mysql/cgrates.json | 2 +- ees/ees.go | 32 ++++++++++++--------- ees/filecsv.go | 2 +- ees/filefwv.go | 2 +- ees/httpjsonmap.go | 2 +- ees/httppost.go | 2 +- ees/virtualee.go | 2 +- engine/cdrs.go | 5 ++-- utils/consts.go | 6 ++-- 14 files changed, 71 insertions(+), 40 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 01acd915e..1353eaa76 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1841,7 +1841,7 @@ func (apierSv1 *APIerSv1) ExportToFolder(arg *utils.ArgExportToFolder, reply *st return nil } -func (apierSv1 *APIerSv1) ExportCDRs(args *utils.ArgExportCDRs, reply *map[string]utils.MapStorage) error { +func (apierSv1 *APIerSv1) ExportCDRs(args *utils.ArgExportCDRs, reply *map[string]interface{}) error { if len(apierSv1.Config.ApierCfg().EEsConns) == 0 { return utils.NewErrNotConnected(utils.EEs) } @@ -1856,12 +1856,20 @@ func (apierSv1 *APIerSv1) ExportCDRs(args *utils.ArgExportCDRs, reply *map[strin return utils.ErrNotFound } withErros := false + var rplyCdr map[string]map[string]interface{} for _, cdr := range cdrs { + argCdr := &utils.CGREventWithIDs{ + IDs: args.ExporterIDs, + CGREventWithOpts: &utils.CGREventWithOpts{ + CGREvent: cdr.AsCGREvent(), + Opts: make(map[string]interface{}), + }, + } + if args.Verbose { + argCdr.CGREventWithOpts.Opts[utils.EEsVerbose] = struct{}{} + } if err := apierSv1.ConnMgr.Call(apierSv1.Config.ApierCfg().EEsConns, nil, utils.EventExporterSv1ProcessEvent, - &utils.CGREventWithIDs{ - IDs: args.ExporterIDs, - CGREventWithOpts: &utils.CGREventWithOpts{CGREvent: cdr.AsCGREvent()}, - }, reply); err != nil { + argCdr, &rplyCdr); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> error: <%s> processing event: <%s> with <%s>", utils.ApierS, err.Error(), utils.ToJSON(cdr.AsCGREvent()), utils.EventExporterS)) withErros = true @@ -1870,5 +1878,18 @@ func (apierSv1 *APIerSv1) ExportCDRs(args *utils.ArgExportCDRs, reply *map[strin if withErros { return utils.ErrPartiallyExecuted } + // we consider only the last reply because it should have the metrics updated + if !args.Verbose { + (*reply)[utils.ExporterIDs] = make([]string, 0, len(rplyCdr)) + } + for exporterID, metrics := range rplyCdr { + if !args.Verbose { + (*reply)[utils.ExporterIDs] = append((*reply)[utils.ExporterIDs].([]string), exporterID) + } else { + for k, v := range metrics { + (*reply)[k] = v + } + } + } return nil } diff --git a/apier/v1/ees.go b/apier/v1/ees.go index e3b26b11d..abd6ea07c 100644 --- a/apier/v1/ees.go +++ b/apier/v1/ees.go @@ -38,6 +38,6 @@ func (eSv1 *EventExporterSv1) Ping(ign *utils.CGREventWithOpts, reply *string) e // ProcessEvent triggers exports on EEs side func (eSv1 *EventExporterSv1) ProcessEvent(args *utils.CGREventWithIDs, - reply *map[string]utils.MapStorage) error { + reply *map[string]map[string]interface{}) error { return eSv1.eeS.V1ProcessEvent(args, reply) } diff --git a/apier/v1/ees_it_test.go b/apier/v1/ees_it_test.go index db7423183..69a47cd67 100644 --- a/apier/v1/ees_it_test.go +++ b/apier/v1/ees_it_test.go @@ -184,20 +184,21 @@ func testEEsAddCDRs(t *testing.T) { func testEEsExportCDRs(t *testing.T) { attr := &utils.ArgExportCDRs{ ExporterIDs: []string{"CSVExporter"}, + Verbose: true, } - var rply map[string]utils.MapStorage + var rply map[string]interface{} if err := eeSRPC.Call(utils.APIerSv1ExportCDRs, &attr, &rply); err != nil { t.Error("Unexpected error: ", err.Error()) } time.Sleep(time.Second) - if rply["CSVExporter"]["FirstExpOrderID"] != 1.0 { - t.Errorf("Expected %+v, received: %+v", 1.0, rply["CSVExporter"]["FirstExpOrderID"]) - } else if rply["CSVExporter"]["LastExpOrderID"] != 4.0 { - t.Errorf("Expected %+v, received: %+v", 4.0, rply["CSVExporter"]["LastExpOrderID"]) - } else if rply["CSVExporter"]["NumberOfEvents"] != 4.0 { - t.Errorf("Expected %+v, received: %+v", 4.0, rply["CSVExporter"]["NumberOfEvents"]) - } else if rply["CSVExporter"]["TotalCost"] != 4.04 { - t.Errorf("Expected %+v, received: %+v", 4.04, rply["CSVExporter"]["TotalCost"]) + if rply["FirstExpOrderID"] != 1.0 { + t.Errorf("Expected %+v, received: %+v", 1.0, rply["FirstExpOrderID"]) + } else if rply["LastExpOrderID"] != 4.0 { + t.Errorf("Expected %+v, received: %+v", 4.0, rply["LastExpOrderID"]) + } else if rply["NumberOfEvents"] != 4.0 { + t.Errorf("Expected %+v, received: %+v", 4.0, rply["NumberOfEvents"]) + } else if rply["TotalCost"] != 4.04 { + t.Errorf("Expected %+v, received: %+v", 4.04, rply["TotalCost"]) } } diff --git a/data/conf/samples/ees_internal/cgrates.json b/data/conf/samples/ees_internal/cgrates.json index d56c77617..a6787035f 100644 --- a/data/conf/samples/ees_internal/cgrates.json +++ b/data/conf/samples/ees_internal/cgrates.json @@ -57,7 +57,7 @@ "enabled": true, "attributes_conns":["*internal"], "cache": { - "*file_csv": {"limit": -1, "ttl": "500ms", "static_ttl": false}, + "*file_csv": {"limit": -1, "ttl": "1s", "static_ttl": false}, }, "exporters": [ { diff --git a/data/conf/samples/ees_mongo/cgrates.json b/data/conf/samples/ees_mongo/cgrates.json index cf5e5b9cf..33b7b49e4 100644 --- a/data/conf/samples/ees_mongo/cgrates.json +++ b/data/conf/samples/ees_mongo/cgrates.json @@ -61,7 +61,7 @@ "enabled": true, "attributes_conns":["*internal"], "cache": { - "*file_csv": {"limit": -1, "ttl": "500ms", "static_ttl": false}, + "*file_csv": {"limit": -1, "ttl": "1s", "static_ttl": false}, }, "exporters": [ { diff --git a/data/conf/samples/ees_mysql/cgrates.json b/data/conf/samples/ees_mysql/cgrates.json index 0b8ccac13..f6a228f47 100644 --- a/data/conf/samples/ees_mysql/cgrates.json +++ b/data/conf/samples/ees_mysql/cgrates.json @@ -59,7 +59,7 @@ "enabled": true, "attributes_conns":["*internal"], "cache": { - "*file_csv": {"limit": -1, "ttl": "500ms", "static_ttl": false}, + "*file_csv": {"limit": -1, "ttl": "1s", "static_ttl": false}, }, "exporters": [ { diff --git a/ees/ees.go b/ees/ees.go index 04a6f4734..16ac5d911 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -141,7 +141,8 @@ func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREventWithOpts, attr } // V1ProcessEvent will be called each time a new event is received from readers -func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *map[string]utils.MapStorage) (err error) { +// rply -> map[string]map[string]interface{} +func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *map[string]map[string]interface{}) (err error) { eeS.cfg.RLocks(config.EEsJson) defer eeS.cfg.RUnlocks(config.EEsJson) @@ -153,7 +154,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma var withErr bool var metricMapLock sync.RWMutex metricsMap := make(map[string]utils.MapStorage) - isVerbose := cgrEv.HasField(utils.EEsVerbose) + _, hasVerbose := cgrEv.Opts[utils.OptsEEsVerbose] for cfgIdx, eeCfg := range eeS.cfg.EEsNoLksCfg().Exporters { if eeCfg.Type == utils.META_NONE || // ignore *none type exporter (lenExpIDs != 0 && !expIDs.Has(eeCfg.ID)) { @@ -207,7 +208,16 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma if eeCfg.Synchronous { wg.Add(1) // wait for synchronous or file ones since these need to be done before continuing } - go func(evict, sync bool, ee EventExporter, eeCfg *config.EventExporterCfg) { + metricMapLock.Lock() + metricsMap[ee.ID()] = utils.MapStorage{} + metricMapLock.Unlock() + // log the message before starting the gorutine, but still execute the exporter + if hasVerbose && !eeCfg.Synchronous { + utils.Logger.Warning( + fmt.Sprintf("<%s> with id <%s>, running verbosed exporter with syncronous false", + utils.EventExporterS, ee.ID())) + } + go func(evict, sync bool, ee EventExporter) { if err := ee.ExportEvent(cgrEv.CGREvent); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> with id <%s>, error: <%s>", @@ -217,19 +227,15 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma if evict { ee.OnEvicted("", nil) // so we can close ie the file } - metricMapLock.Lock() - metricsMap[ee.ID()] = ee.GetMetrics() - metricMapLock.Unlock() - if isVerbose && !eeCfg.Synchronous { - utils.Logger.Warning( - fmt.Sprintf("<%s> with id <%s>, running verbosed export with syncronous false", - utils.EventExporterS, ee.ID())) - withErr = true + if hasVerbose && eeCfg.Synchronous { + metricMapLock.Lock() + metricsMap[ee.ID()] = ee.GetMetrics() + metricMapLock.Unlock() } if sync { wg.Done() } - }(!hasCache, eeCfg.Synchronous, ee, eeCfg) + }(!hasCache, eeCfg.Synchronous, ee) } wg.Wait() if withErr { @@ -237,7 +243,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma return } - *rply = make(map[string]utils.MapStorage) + *rply = make(map[string]map[string]interface{}) metricMapLock.Lock() for k, v := range metricsMap { (*rply)[k] = v diff --git a/ees/filecsv.go b/ees/filecsv.go index b11ab6f43..e18f470f5 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -33,7 +33,7 @@ import ( func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc utils.MapStorage) (fCsv *FileCSVee, err error) { - dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID + dc[utils.ExporterID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID fCsv = &FileCSVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc} err = fCsv.init() diff --git a/ees/filefwv.go b/ees/filefwv.go index 95ed89a3e..0e776a278 100644 --- a/ees/filefwv.go +++ b/ees/filefwv.go @@ -31,7 +31,7 @@ import ( ) func NewFileFWVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc utils.MapStorage) (fFwv *FileFWVee, err error) { - dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID + dc[utils.ExporterID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID fFwv = &FileFWVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc} err = fFwv.init() diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index 55670064c..80f6c485b 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -31,7 +31,7 @@ import ( func NewHTTPJsonMapEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc utils.MapStorage) (httpJSON *HTTPJsonMapEe, err error) { - dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID + dc[utils.ExporterID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID httpJSON = &HTTPJsonMapEe{ id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, cgrCfg: cgrCfg, diff --git a/ees/httppost.go b/ees/httppost.go index 7e4f3bde4..d505da5ed 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -31,7 +31,7 @@ import ( func NewHTTPPostEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc utils.MapStorage) (httpPost *HTTPPost, err error) { - dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID + dc[utils.ExporterID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID httpPost = &HTTPPost{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc} httpPost.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify, diff --git a/ees/virtualee.go b/ees/virtualee.go index eaa4fe9e6..82d797e71 100644 --- a/ees/virtualee.go +++ b/ees/virtualee.go @@ -28,7 +28,7 @@ import ( func NewVirtualExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc utils.MapStorage) (vEe *VirtualEe, err error) { - dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID + dc[utils.ExporterID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID vEe = &VirtualEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc} err = vEe.init() diff --git a/engine/cdrs.go b/engine/cdrs.go index 53a848ec0..366918aaf 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -150,7 +150,8 @@ func (cdrS *CDRServer) rateCDR(cdr *CDRWithOpts) ([]*CDR, error) { var cdrsRated []*CDR _, hasLastUsed := cdr.ExtraFields[utils.LastUsed] if utils.SliceHasMember([]string{utils.META_PREPAID, utils.PREPAID}, cdr.RequestType) && - (cdr.Usage != 0 || hasLastUsed) && cdr.CostDetails == nil { // ToDo: Get rid of PREPAID as soon as we don't want to support it backwards + (cdr.Usage != 0 || hasLastUsed) && cdr.CostDetails == nil { + // ToDo: Get rid of PREPAID as soon as we don't want to support it backwards // Should be previously calculated and stored in DB fib := utils.Fib() var smCosts []*SMCost @@ -418,7 +419,7 @@ func (cdrS *CDRServer) statSProcessEvent(cgrEv *utils.CGREventWithOpts) (err err // eeSProcessEvent will process the event with the EEs component func (cdrS *CDRServer) eeSProcessEvent(cgrEv *utils.CGREventWithIDs) (err error) { - var reply string + var reply map[string]map[string]interface{} if err = cdrS.connMgr.Call(cdrS.cgrCfg.CdrsCfg().EEsConns, nil, utils.EventExporterSv1ProcessEvent, cgrEv, &reply); err != nil && diff --git a/utils/consts.go b/utils/consts.go index 763459f8f..5c1287ee0 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -806,7 +806,8 @@ const ( Attempts = "Attempts" FieldSeparator = "FieldSeparator" ExportPath = "ExportPath" - ExportID = "ExportID" + ExporterID = "ExporterID" + ExporterIDs = "ExporterIDs" TimeNow = "TimeNow" ExportFileName = "ExportFileName" GroupID = "GroupID" @@ -856,7 +857,6 @@ const ( RouteID = "RouteID" MetaMonthlyEstimated = "*monthly_estimated" ProcessRuns = "ProcessRuns" - EEsVerbose = "*eesVerbose" ) // Migrator Action @@ -2365,6 +2365,8 @@ const ( // DispatcherS OptsAPIKey = "*apiKey" OptsRouteID = "*routeID" + // EEs + OptsEEsVerbose = "*eesVerbose" // Others OptsContext = "*context" Subsys = "*subsys"