From e411f5e7da6a510be6e3b6d712d69477076ee4ab Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 13 Aug 2020 11:55:25 +0300 Subject: [PATCH] Update ees metrics --- ees/ees.go | 46 ++++++++++++++- ees/ees_test.go | 115 +++++++++++++++++++++++++++++++++++++ ees/filecsv.go | 49 +++------------- ees/filefwv.go | 48 +++------------- ees/httpjsonmap.go | 49 +++------------- ees/httpjsonmap_it_test.go | 1 + ees/httppost.go | 49 +++------------- ees/virtualee.go | 49 +++------------- engine/mapevent.go | 9 +++ engine/mapevent_test.go | 22 +++++++ 10 files changed, 235 insertions(+), 202 deletions(-) create mode 100644 ees/ees_test.go diff --git a/ees/ees.go b/ees/ees.go index 21010b616..d68bd831c 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -231,11 +231,11 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *st func newEEMetrics() utils.MapStorage { return utils.MapStorage{ utils.NumberOfEvents: 0, - utils.TotalCost: 0.0, + utils.TotalCost: float64(0.0), utils.PositiveExports: utils.StringSet{}, utils.NegativeExports: utils.StringSet{}, - utils.FirstExpOrderID: 0, - utils.LastExpOrderID: 0, + utils.FirstExpOrderID: int64(0), + utils.LastExpOrderID: int64(0), utils.FirstEventATime: time.Time{}, utils.LastEventATime: time.Time{}, utils.TimeNow: time.Now(), @@ -246,3 +246,43 @@ func newEEMetrics() utils.MapStorage { utils.TotalDataUsage: time.Duration(0), } } + +func updateEEMetrics(dc utils.MapStorage, ev engine.MapEvent, timezone string) { + if aTime, err := ev.GetTime(utils.AnswerTime, timezone); err == nil { + if dc[utils.FirstEventATime].(time.Time).IsZero() || + aTime.Before(dc[utils.FirstEventATime].(time.Time)) { + dc[utils.FirstEventATime] = aTime + } + if aTime.After(dc[utils.LastEventATime].(time.Time)) { + dc[utils.LastEventATime] = aTime + } + } + if oID, err := ev.GetTInt64(utils.OrderID); err == nil { + if dc[utils.FirstExpOrderID].(int64) == 0 || + dc[utils.FirstExpOrderID].(int64) > oID { + dc[utils.FirstExpOrderID] = oID + } + if dc[utils.LastExpOrderID].(int64) < oID { + dc[utils.LastExpOrderID] = oID + } + } + if cost, err := ev.GetFloat64(utils.Cost); err == nil { + dc[utils.TotalCost] = dc[utils.TotalCost].(float64) + cost + } + if tor, err := ev.GetString(utils.ToR); err == nil { + if usage, err := ev.GetDuration(utils.Usage); err == nil { + switch tor { + case utils.VOICE: + dc[utils.TotalDuration] = dc[utils.TotalDuration].(time.Duration) + usage + case utils.SMS: + dc[utils.TotalSMSUsage] = dc[utils.TotalSMSUsage].(time.Duration) + usage + case utils.MMS: + dc[utils.TotalMMSUsage] = dc[utils.TotalMMSUsage].(time.Duration) + usage + case utils.GENERIC: + dc[utils.TotalGenericUsage] = dc[utils.TotalGenericUsage].(time.Duration) + usage + case utils.DATA: + dc[utils.TotalDataUsage] = dc[utils.TotalDataUsage].(time.Duration) + usage + } + } + } +} diff --git a/ees/ees_test.go b/ees/ees_test.go new file mode 100644 index 000000000..d906bb34a --- /dev/null +++ b/ees/ees_test.go @@ -0,0 +1,115 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func TestUpdateEEMetrics(t *testing.T) { + dc := newEEMetrics() + tnow := time.Now() + ev := engine.MapEvent{ + utils.AnswerTime: tnow, + utils.OrderID: 1, + utils.Cost: 5.5, + utils.ToR: utils.VOICE, + utils.Usage: time.Second, + } + exp := newEEMetrics() + exp[utils.FirstEventATime] = tnow + exp[utils.LastEventATime] = tnow + exp[utils.FirstExpOrderID] = int64(1) + exp[utils.LastExpOrderID] = int64(1) + exp[utils.TotalCost] = float64(5.5) + exp[utils.TotalDuration] = time.Second + exp[utils.TimeNow] = dc[utils.TimeNow] + if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) { + t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc)) + } + + tnow = tnow.Add(24 * time.Hour) + ev = engine.MapEvent{ + utils.AnswerTime: tnow, + utils.OrderID: 2, + utils.Cost: 5.5, + utils.ToR: utils.SMS, + utils.Usage: time.Second, + } + exp[utils.LastEventATime] = tnow + exp[utils.LastExpOrderID] = int64(2) + exp[utils.TotalCost] = float64(11) + exp[utils.TotalSMSUsage] = time.Second + if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) { + t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc)) + } + + tnow = tnow.Add(24 * time.Hour) + ev = engine.MapEvent{ + utils.AnswerTime: tnow, + utils.OrderID: 3, + utils.Cost: 5.5, + utils.ToR: utils.MMS, + utils.Usage: time.Second, + } + exp[utils.LastEventATime] = tnow + exp[utils.LastExpOrderID] = int64(3) + exp[utils.TotalCost] = float64(16.5) + exp[utils.TotalMMSUsage] = time.Second + if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) { + t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc)) + } + + tnow = tnow.Add(24 * time.Hour) + ev = engine.MapEvent{ + utils.AnswerTime: tnow, + utils.OrderID: 4, + utils.Cost: 5.5, + utils.ToR: utils.GENERIC, + utils.Usage: time.Second, + } + exp[utils.LastEventATime] = tnow + exp[utils.LastExpOrderID] = int64(4) + exp[utils.TotalCost] = float64(22) + exp[utils.TotalGenericUsage] = time.Second + if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) { + t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc)) + } + + tnow = tnow.Add(24 * time.Hour) + ev = engine.MapEvent{ + utils.AnswerTime: tnow, + utils.OrderID: 5, + utils.Cost: 5.5, + utils.ToR: utils.DATA, + utils.Usage: time.Second, + } + exp[utils.LastEventATime] = tnow + exp[utils.LastExpOrderID] = int64(5) + exp[utils.TotalCost] = float64(27.5) + exp[utils.TotalDataUsage] = time.Second + if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) { + t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc)) + } +} diff --git a/ees/filecsv.go b/ees/filecsv.go index 4388b1176..acdf214a5 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -24,7 +24,6 @@ import ( "os" "path" "sync" - "time" "github.com/cgrates/cgrates/engine" @@ -91,8 +90,14 @@ func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) { // ExportEvent implements EventExporter func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { fCsv.Lock() - defer fCsv.Unlock() - + defer func() { + if err != nil { + fCsv.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) + } else { + fCsv.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) + } + fCsv.Unlock() + }() fCsv.dc[utils.NumberOfEvents] = fCsv.dc[utils.NumberOfEvents].(int) + 1 var csvRecord []string @@ -104,7 +109,6 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { fCsv.filterS) if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields()); err != nil { - fCsv.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) return } for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() { @@ -114,42 +118,7 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { } csvRecord = append(csvRecord, strVal) } - if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, fCsv.cgrCfg.GeneralCfg().DefaultTimezone); err == nil { - if fCsv.dc[utils.FirstEventATime].(time.Time).IsZero() || fCsv.dc[utils.FirstEventATime].(time.Time).Before(aTime) { - fCsv.dc[utils.FirstEventATime] = aTime - } - if aTime.After(fCsv.dc[utils.LastEventATime].(time.Time)) { - fCsv.dc[utils.LastEventATime] = aTime - } - } - if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil { - if fCsv.dc[utils.FirstExpOrderID].(int64) > oID || fCsv.dc[utils.FirstExpOrderID].(int64) == 0 { - fCsv.dc[utils.FirstExpOrderID] = oID - } - if fCsv.dc[utils.LastExpOrderID].(int64) < oID { - fCsv.dc[utils.LastExpOrderID] = oID - } - } - if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil { - fCsv.dc[utils.TotalCost] = fCsv.dc[utils.TotalCost].(float64) + cost - } - if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil { - if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil { - switch tor { - case utils.VOICE: - fCsv.dc[utils.TotalDuration] = fCsv.dc[utils.TotalDuration].(time.Duration) + usage - case utils.SMS: - fCsv.dc[utils.TotalSMSUsage] = fCsv.dc[utils.TotalSMSUsage].(time.Duration) + usage - case utils.MMS: - fCsv.dc[utils.TotalMMSUsage] = fCsv.dc[utils.TotalMMSUsage].(time.Duration) + usage - case utils.GENERIC: - fCsv.dc[utils.TotalGenericUsage] = fCsv.dc[utils.TotalGenericUsage].(time.Duration) + usage - case utils.DATA: - fCsv.dc[utils.TotalDataUsage] = fCsv.dc[utils.TotalDataUsage].(time.Duration) + usage - } - } - } - fCsv.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) + updateEEMetrics(fCsv.dc, cgrEv.Event, fCsv.cgrCfg.GeneralCfg().DefaultTimezone) fCsv.csvWriter.Write(csvRecord) return } diff --git a/ees/filefwv.go b/ees/filefwv.go index 87339b364..df056e71d 100644 --- a/ees/filefwv.go +++ b/ees/filefwv.go @@ -24,7 +24,6 @@ import ( "os" "path" "sync" - "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -82,7 +81,14 @@ func (fFwv *FileFWVee) OnEvicted(_ string, _ interface{}) { // ExportEvent implements EventExporter func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { fFwv.Lock() - defer fFwv.Unlock() + defer func() { + if err != nil { + fFwv.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) + } else { + fFwv.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) + } + fFwv.Unlock() + }() fFwv.dc[utils.NumberOfEvents] = fFwv.dc[utils.NumberOfEvents].(int) + 1 var records []string req := utils.MapStorage{} @@ -93,7 +99,6 @@ func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { fFwv.filterS) if err = eeReq.SetFields(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ContentFields()); err != nil { - fFwv.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) return } for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() { @@ -103,42 +108,7 @@ func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { } records = append(records, strVal) } - if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, fFwv.cgrCfg.GeneralCfg().DefaultTimezone); err == nil { - if fFwv.dc[utils.FirstEventATime].(time.Time).IsZero() || fFwv.dc[utils.FirstEventATime].(time.Time).Before(aTime) { - fFwv.dc[utils.FirstEventATime] = aTime - } - if aTime.After(fFwv.dc[utils.LastEventATime].(time.Time)) { - fFwv.dc[utils.LastEventATime] = aTime - } - } - if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil { - if fFwv.dc[utils.FirstExpOrderID].(int64) > oID || fFwv.dc[utils.FirstExpOrderID].(int64) == 0 { - fFwv.dc[utils.FirstExpOrderID] = oID - } - if fFwv.dc[utils.LastExpOrderID].(int64) < oID { - fFwv.dc[utils.LastExpOrderID] = oID - } - } - if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil { - fFwv.dc[utils.TotalCost] = fFwv.dc[utils.TotalCost].(float64) + cost - } - if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil { - if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil { - switch tor { - case utils.VOICE: - fFwv.dc[utils.TotalDuration] = fFwv.dc[utils.TotalDuration].(time.Duration) + usage - case utils.SMS: - fFwv.dc[utils.TotalSMSUsage] = fFwv.dc[utils.TotalSMSUsage].(time.Duration) + usage - case utils.MMS: - fFwv.dc[utils.TotalMMSUsage] = fFwv.dc[utils.TotalMMSUsage].(time.Duration) + usage - case utils.GENERIC: - fFwv.dc[utils.TotalGenericUsage] = fFwv.dc[utils.TotalGenericUsage].(time.Duration) + usage - case utils.DATA: - fFwv.dc[utils.TotalDataUsage] = fFwv.dc[utils.TotalDataUsage].(time.Duration) + usage - } - } - } - fFwv.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) + updateEEMetrics(fFwv.dc, cgrEv.Event, fFwv.cgrCfg.GeneralCfg().DefaultTimezone) for _, record := range append(records, "\n") { if _, err = io.WriteString(fFwv.file, record); err != nil { return diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index a809689dd..77f8ebd85 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -23,7 +23,6 @@ import ( "fmt" "strings" "sync" - "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -33,14 +32,18 @@ import ( func NewHTTPJsonMapEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc utils.MapStorage) (httpJSON *HTTPJsonMapEe, err error) { dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID - httpJSON = &HTTPJsonMapEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, - cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc} + httpJSON = &HTTPJsonMapEe{ + id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, + cgrCfg: cgrCfg, + cfgIdx: cfgIdx, + filterS: filterS, + dc: dc, + } if cgrCfg.EEsCfg().Exporters[cfgIdx].Type == utils.MetaHTTPjsonMap { httpJSON.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify, cgrCfg.GeneralCfg().ReplyTimeout, cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type], cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts) } - return } @@ -51,8 +54,8 @@ type HTTPJsonMapEe struct { cfgIdx int // index of config instance within ERsCfg.Readers filterS *engine.FilterS httpPoster *engine.HTTPPoster + dc utils.MapStorage sync.RWMutex - dc utils.MapStorage } // ID returns the identificator of this exporter @@ -101,41 +104,7 @@ func (httpJson *HTTPJsonMapEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { } valMp[strings.Join(itm.Path, utils.NestingSep)] = utils.IfaceAsString(itm.Data) } - if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, httpJson.cgrCfg.GeneralCfg().DefaultTimezone); err == nil { - if httpJson.dc[utils.FirstEventATime].(time.Time).IsZero() || httpJson.dc[utils.FirstEventATime].(time.Time).Before(aTime) { - httpJson.dc[utils.FirstEventATime] = aTime - } - if aTime.After(httpJson.dc[utils.LastEventATime].(time.Time)) { - httpJson.dc[utils.LastEventATime] = aTime - } - } - if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil { - if httpJson.dc[utils.FirstExpOrderID].(int64) > oID || httpJson.dc[utils.FirstExpOrderID].(int64) == 0 { - httpJson.dc[utils.FirstExpOrderID] = oID - } - if httpJson.dc[utils.LastExpOrderID].(int64) < oID { - httpJson.dc[utils.LastExpOrderID] = oID - } - } - if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil { - httpJson.dc[utils.TotalCost] = httpJson.dc[utils.TotalCost].(float64) + cost - } - if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil { - if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil { - switch tor { - case utils.VOICE: - httpJson.dc[utils.TotalDuration] = httpJson.dc[utils.TotalDuration].(time.Duration) + usage - case utils.SMS: - httpJson.dc[utils.TotalSMSUsage] = httpJson.dc[utils.TotalSMSUsage].(time.Duration) + usage - case utils.MMS: - httpJson.dc[utils.TotalMMSUsage] = httpJson.dc[utils.TotalMMSUsage].(time.Duration) + usage - case utils.GENERIC: - httpJson.dc[utils.TotalGenericUsage] = httpJson.dc[utils.TotalGenericUsage].(time.Duration) + usage - case utils.DATA: - httpJson.dc[utils.TotalDataUsage] = httpJson.dc[utils.TotalDataUsage].(time.Duration) + usage - } - } - } + updateEEMetrics(httpJson.dc, cgrEv.Event, httpJson.cgrCfg.GeneralCfg().DefaultTimezone) cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID()) runID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault) var body []byte diff --git a/ees/httpjsonmap_it_test.go b/ees/httpjsonmap_it_test.go index d8edecb67..d984a1062 100644 --- a/ees/httpjsonmap_it_test.go +++ b/ees/httpjsonmap_it_test.go @@ -195,6 +195,7 @@ func testHTTPJsonMapExportEvent(t *testing.T) { utils.Usage: time.Duration(1), utils.RunID: utils.MetaDefault, utils.Cost: 0.15, + utils.OrderID: 10, "ExporterUsed": "HTTPJsonMapExporter", "ExtraFields": map[string]string{"extra1": "val_extra1", "extra2": "val_extra2", "extra3": "val_extra3"}, diff --git a/ees/httppost.go b/ees/httppost.go index 26e7ac08b..f6574b7c3 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -23,7 +23,6 @@ import ( "net/url" "strings" "sync" - "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -65,8 +64,14 @@ func (httpPost *HTTPPost) OnEvicted(_ string, _ interface{}) { // ExportEvent implements EventExporter func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) { httpPost.Lock() - defer httpPost.Unlock() - + defer func() { + if err != nil { + httpPost.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) + } else { + httpPost.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) + } + httpPost.Unlock() + }() httpPost.dc[utils.NumberOfEvents] = httpPost.dc[utils.NumberOfEvents].(int) + 1 var body interface{} @@ -79,7 +84,6 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) { httpPost.filterS) if err = eeReq.SetFields(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ContentFields()); err != nil { - httpPost.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) return } for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() { @@ -96,42 +100,7 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) { } urlVals.Set(strings.Join(itm.Path, utils.NestingSep), utils.IfaceAsString(itm.Data)) } - if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, httpPost.cgrCfg.GeneralCfg().DefaultTimezone); err == nil { - if httpPost.dc[utils.FirstEventATime].(time.Time).IsZero() || httpPost.dc[utils.FirstEventATime].(time.Time).Before(aTime) { - httpPost.dc[utils.FirstEventATime] = aTime - } - if aTime.After(httpPost.dc[utils.LastEventATime].(time.Time)) { - httpPost.dc[utils.LastEventATime] = aTime - } - } - if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil { - if httpPost.dc[utils.FirstExpOrderID].(int64) > oID || httpPost.dc[utils.FirstExpOrderID].(int64) == 0 { - httpPost.dc[utils.FirstExpOrderID] = oID - } - if httpPost.dc[utils.LastExpOrderID].(int64) < oID { - httpPost.dc[utils.LastExpOrderID] = oID - } - } - if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil { - httpPost.dc[utils.TotalCost] = httpPost.dc[utils.TotalCost].(float64) + cost - } - if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil { - if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil { - switch tor { - case utils.VOICE: - httpPost.dc[utils.TotalDuration] = httpPost.dc[utils.TotalDuration].(time.Duration) + usage - case utils.SMS: - httpPost.dc[utils.TotalSMSUsage] = httpPost.dc[utils.TotalSMSUsage].(time.Duration) + usage - case utils.MMS: - httpPost.dc[utils.TotalMMSUsage] = httpPost.dc[utils.TotalMMSUsage].(time.Duration) + usage - case utils.GENERIC: - httpPost.dc[utils.TotalGenericUsage] = httpPost.dc[utils.TotalGenericUsage].(time.Duration) + usage - case utils.DATA: - httpPost.dc[utils.TotalDataUsage] = httpPost.dc[utils.TotalDataUsage].(time.Duration) + usage - } - } - } - httpPost.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) + updateEEMetrics(httpPost.dc, cgrEv.Event, httpPost.cgrCfg.GeneralCfg().DefaultTimezone) body = urlVals if err = httpPost.httpPoster.Post(body, utils.EmptyString); err != nil && httpPost.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE { diff --git a/ees/virtualee.go b/ees/virtualee.go index fb3cf2dee..c4c64e445 100644 --- a/ees/virtualee.go +++ b/ees/virtualee.go @@ -20,7 +20,6 @@ package ees import ( "sync" - "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -64,8 +63,14 @@ func (vEe *VirtualEe) OnEvicted(_ string, _ interface{}) { // ExportEvent implements EventExporter func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { vEe.Lock() - defer vEe.Unlock() - + defer func() { + if err != nil { + vEe.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) + } else { + vEe.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) + } + vEe.Unlock() + }() vEe.dc[utils.NumberOfEvents] = vEe.dc[utils.NumberOfEvents].(int) + 1 req := utils.MapStorage{} @@ -75,44 +80,8 @@ func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { eeReq := NewEventExporterRequest(req, vEe.dc, cgrEv.Tenant, vEe.cgrCfg.GeneralCfg().DefaultTimezone, vEe.filterS) if err = eeReq.SetFields(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].ContentFields()); err != nil { - vEe.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) return } - if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, vEe.cgrCfg.GeneralCfg().DefaultTimezone); err == nil { - if vEe.dc[utils.FirstEventATime].(time.Time).IsZero() || vEe.dc[utils.FirstEventATime].(time.Time).Before(aTime) { - vEe.dc[utils.FirstEventATime] = aTime - } - if aTime.After(vEe.dc[utils.LastEventATime].(time.Time)) { - vEe.dc[utils.LastEventATime] = aTime - } - } - if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil { - if vEe.dc[utils.FirstExpOrderID].(int64) > oID || vEe.dc[utils.FirstExpOrderID].(int64) == 0 { - vEe.dc[utils.FirstExpOrderID] = oID - } - if vEe.dc[utils.LastExpOrderID].(int64) < oID { - vEe.dc[utils.LastExpOrderID] = oID - } - } - if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil { - vEe.dc[utils.TotalCost] = vEe.dc[utils.TotalCost].(float64) + cost - } - if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil { - if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil { - switch tor { - case utils.VOICE: - vEe.dc[utils.TotalDuration] = vEe.dc[utils.TotalDuration].(time.Duration) + usage - case utils.SMS: - vEe.dc[utils.TotalSMSUsage] = vEe.dc[utils.TotalSMSUsage].(time.Duration) + usage - case utils.MMS: - vEe.dc[utils.TotalMMSUsage] = vEe.dc[utils.TotalMMSUsage].(time.Duration) + usage - case utils.GENERIC: - vEe.dc[utils.TotalGenericUsage] = vEe.dc[utils.TotalGenericUsage].(time.Duration) + usage - case utils.DATA: - vEe.dc[utils.TotalDataUsage] = vEe.dc[utils.TotalDataUsage].(time.Duration) + usage - } - } - } - vEe.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) + updateEEMetrics(vEe.dc, cgrEv.Event, vEe.cgrCfg.GeneralCfg().DefaultTimezone) return } diff --git a/engine/mapevent.go b/engine/mapevent.go index e8accfbe8..7632bff85 100644 --- a/engine/mapevent.go +++ b/engine/mapevent.go @@ -85,6 +85,15 @@ func (me MapEvent) GetTInt64(fldName string) (out int64, err error) { return utils.IfaceAsTInt64(fldIface) } +// GetFloat64 returns a field as float64 instance +func (me MapEvent) GetFloat64(fldName string) (f float64, err error) { + iface, has := me[fldName] + if !has { + return f, utils.ErrNotFound + } + return utils.IfaceAsFloat64(iface) +} + func (me MapEvent) GetStringIgnoreErrors(fldName string) (out string) { out, _ = me.GetString(fldName) return diff --git a/engine/mapevent_test.go b/engine/mapevent_test.go index 5e9291984..c7f4a0a68 100644 --- a/engine/mapevent_test.go +++ b/engine/mapevent_test.go @@ -689,6 +689,28 @@ func TestMapEventGetTInt64(t *testing.T) { } } +func TestMapEventGetFloat64(t *testing.T) { + if rply, err := mapEv.GetFloat64("test2"); err != nil { + t.Error(err) + } else if rply != float64(42) { + t.Errorf("Expecting %+v, received: %+v", float64(42), rply) + } + + if rply, err := mapEv.GetFloat64("test3"); err != nil { + t.Error(err) + } else if rply != float64(42.3) { + t.Errorf("Expecting %+v, received: %+v", float64(42.3), rply) + } + + if rply, err := mapEv.GetFloat64("test4"); err == nil { + t.Errorf("Expecting error, received: %+v with error %v", rply, err) + } + + if rply, err := mapEv.GetFloat64("0test"); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Errorf("Expecting error: %v, received: %+v with error %v", utils.ErrNotFound, rply, err) + } +} + func TestMapEventGetDurationPtr(t *testing.T) { if rply, err := mapEv.GetDurationPtr("test4"); err == nil { t.Errorf("Expecting error, received: %+v with error %v", rply, err)