Update ees metrics

This commit is contained in:
Trial97
2020-08-13 11:55:25 +03:00
committed by Dan Christian Bogos
parent e2049ae318
commit e411f5e7da
10 changed files with 235 additions and 202 deletions

View File

@@ -231,11 +231,11 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *st
func newEEMetrics() utils.MapStorage {
return utils.MapStorage{
utils.NumberOfEvents: 0,
utils.TotalCost: 0.0,
utils.TotalCost: float64(0.0),
utils.PositiveExports: utils.StringSet{},
utils.NegativeExports: utils.StringSet{},
utils.FirstExpOrderID: 0,
utils.LastExpOrderID: 0,
utils.FirstExpOrderID: int64(0),
utils.LastExpOrderID: int64(0),
utils.FirstEventATime: time.Time{},
utils.LastEventATime: time.Time{},
utils.TimeNow: time.Now(),
@@ -246,3 +246,43 @@ func newEEMetrics() utils.MapStorage {
utils.TotalDataUsage: time.Duration(0),
}
}
func updateEEMetrics(dc utils.MapStorage, ev engine.MapEvent, timezone string) {
if aTime, err := ev.GetTime(utils.AnswerTime, timezone); err == nil {
if dc[utils.FirstEventATime].(time.Time).IsZero() ||
aTime.Before(dc[utils.FirstEventATime].(time.Time)) {
dc[utils.FirstEventATime] = aTime
}
if aTime.After(dc[utils.LastEventATime].(time.Time)) {
dc[utils.LastEventATime] = aTime
}
}
if oID, err := ev.GetTInt64(utils.OrderID); err == nil {
if dc[utils.FirstExpOrderID].(int64) == 0 ||
dc[utils.FirstExpOrderID].(int64) > oID {
dc[utils.FirstExpOrderID] = oID
}
if dc[utils.LastExpOrderID].(int64) < oID {
dc[utils.LastExpOrderID] = oID
}
}
if cost, err := ev.GetFloat64(utils.Cost); err == nil {
dc[utils.TotalCost] = dc[utils.TotalCost].(float64) + cost
}
if tor, err := ev.GetString(utils.ToR); err == nil {
if usage, err := ev.GetDuration(utils.Usage); err == nil {
switch tor {
case utils.VOICE:
dc[utils.TotalDuration] = dc[utils.TotalDuration].(time.Duration) + usage
case utils.SMS:
dc[utils.TotalSMSUsage] = dc[utils.TotalSMSUsage].(time.Duration) + usage
case utils.MMS:
dc[utils.TotalMMSUsage] = dc[utils.TotalMMSUsage].(time.Duration) + usage
case utils.GENERIC:
dc[utils.TotalGenericUsage] = dc[utils.TotalGenericUsage].(time.Duration) + usage
case utils.DATA:
dc[utils.TotalDataUsage] = dc[utils.TotalDataUsage].(time.Duration) + usage
}
}
}
}

115
ees/ees_test.go Normal file
View File

@@ -0,0 +1,115 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package ees
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func TestUpdateEEMetrics(t *testing.T) {
dc := newEEMetrics()
tnow := time.Now()
ev := engine.MapEvent{
utils.AnswerTime: tnow,
utils.OrderID: 1,
utils.Cost: 5.5,
utils.ToR: utils.VOICE,
utils.Usage: time.Second,
}
exp := newEEMetrics()
exp[utils.FirstEventATime] = tnow
exp[utils.LastEventATime] = tnow
exp[utils.FirstExpOrderID] = int64(1)
exp[utils.LastExpOrderID] = int64(1)
exp[utils.TotalCost] = float64(5.5)
exp[utils.TotalDuration] = time.Second
exp[utils.TimeNow] = dc[utils.TimeNow]
if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
}
tnow = tnow.Add(24 * time.Hour)
ev = engine.MapEvent{
utils.AnswerTime: tnow,
utils.OrderID: 2,
utils.Cost: 5.5,
utils.ToR: utils.SMS,
utils.Usage: time.Second,
}
exp[utils.LastEventATime] = tnow
exp[utils.LastExpOrderID] = int64(2)
exp[utils.TotalCost] = float64(11)
exp[utils.TotalSMSUsage] = time.Second
if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
}
tnow = tnow.Add(24 * time.Hour)
ev = engine.MapEvent{
utils.AnswerTime: tnow,
utils.OrderID: 3,
utils.Cost: 5.5,
utils.ToR: utils.MMS,
utils.Usage: time.Second,
}
exp[utils.LastEventATime] = tnow
exp[utils.LastExpOrderID] = int64(3)
exp[utils.TotalCost] = float64(16.5)
exp[utils.TotalMMSUsage] = time.Second
if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
}
tnow = tnow.Add(24 * time.Hour)
ev = engine.MapEvent{
utils.AnswerTime: tnow,
utils.OrderID: 4,
utils.Cost: 5.5,
utils.ToR: utils.GENERIC,
utils.Usage: time.Second,
}
exp[utils.LastEventATime] = tnow
exp[utils.LastExpOrderID] = int64(4)
exp[utils.TotalCost] = float64(22)
exp[utils.TotalGenericUsage] = time.Second
if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
}
tnow = tnow.Add(24 * time.Hour)
ev = engine.MapEvent{
utils.AnswerTime: tnow,
utils.OrderID: 5,
utils.Cost: 5.5,
utils.ToR: utils.DATA,
utils.Usage: time.Second,
}
exp[utils.LastEventATime] = tnow
exp[utils.LastExpOrderID] = int64(5)
exp[utils.TotalCost] = float64(27.5)
exp[utils.TotalDataUsage] = time.Second
if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
}
}

View File

@@ -24,7 +24,6 @@ import (
"os"
"path"
"sync"
"time"
"github.com/cgrates/cgrates/engine"
@@ -91,8 +90,14 @@ func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
fCsv.Lock()
defer fCsv.Unlock()
defer func() {
if err != nil {
fCsv.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
} else {
fCsv.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
}
fCsv.Unlock()
}()
fCsv.dc[utils.NumberOfEvents] = fCsv.dc[utils.NumberOfEvents].(int) + 1
var csvRecord []string
@@ -104,7 +109,6 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
fCsv.filterS)
if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields()); err != nil {
fCsv.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
return
}
for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() {
@@ -114,42 +118,7 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}
csvRecord = append(csvRecord, strVal)
}
if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, fCsv.cgrCfg.GeneralCfg().DefaultTimezone); err == nil {
if fCsv.dc[utils.FirstEventATime].(time.Time).IsZero() || fCsv.dc[utils.FirstEventATime].(time.Time).Before(aTime) {
fCsv.dc[utils.FirstEventATime] = aTime
}
if aTime.After(fCsv.dc[utils.LastEventATime].(time.Time)) {
fCsv.dc[utils.LastEventATime] = aTime
}
}
if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil {
if fCsv.dc[utils.FirstExpOrderID].(int64) > oID || fCsv.dc[utils.FirstExpOrderID].(int64) == 0 {
fCsv.dc[utils.FirstExpOrderID] = oID
}
if fCsv.dc[utils.LastExpOrderID].(int64) < oID {
fCsv.dc[utils.LastExpOrderID] = oID
}
}
if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil {
fCsv.dc[utils.TotalCost] = fCsv.dc[utils.TotalCost].(float64) + cost
}
if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil {
if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil {
switch tor {
case utils.VOICE:
fCsv.dc[utils.TotalDuration] = fCsv.dc[utils.TotalDuration].(time.Duration) + usage
case utils.SMS:
fCsv.dc[utils.TotalSMSUsage] = fCsv.dc[utils.TotalSMSUsage].(time.Duration) + usage
case utils.MMS:
fCsv.dc[utils.TotalMMSUsage] = fCsv.dc[utils.TotalMMSUsage].(time.Duration) + usage
case utils.GENERIC:
fCsv.dc[utils.TotalGenericUsage] = fCsv.dc[utils.TotalGenericUsage].(time.Duration) + usage
case utils.DATA:
fCsv.dc[utils.TotalDataUsage] = fCsv.dc[utils.TotalDataUsage].(time.Duration) + usage
}
}
}
fCsv.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
updateEEMetrics(fCsv.dc, cgrEv.Event, fCsv.cgrCfg.GeneralCfg().DefaultTimezone)
fCsv.csvWriter.Write(csvRecord)
return
}

View File

@@ -24,7 +24,6 @@ import (
"os"
"path"
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -82,7 +81,14 @@ func (fFwv *FileFWVee) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
fFwv.Lock()
defer fFwv.Unlock()
defer func() {
if err != nil {
fFwv.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
} else {
fFwv.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
}
fFwv.Unlock()
}()
fFwv.dc[utils.NumberOfEvents] = fFwv.dc[utils.NumberOfEvents].(int) + 1
var records []string
req := utils.MapStorage{}
@@ -93,7 +99,6 @@ func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
fFwv.filterS)
if err = eeReq.SetFields(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ContentFields()); err != nil {
fFwv.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
return
}
for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() {
@@ -103,42 +108,7 @@ func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}
records = append(records, strVal)
}
if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, fFwv.cgrCfg.GeneralCfg().DefaultTimezone); err == nil {
if fFwv.dc[utils.FirstEventATime].(time.Time).IsZero() || fFwv.dc[utils.FirstEventATime].(time.Time).Before(aTime) {
fFwv.dc[utils.FirstEventATime] = aTime
}
if aTime.After(fFwv.dc[utils.LastEventATime].(time.Time)) {
fFwv.dc[utils.LastEventATime] = aTime
}
}
if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil {
if fFwv.dc[utils.FirstExpOrderID].(int64) > oID || fFwv.dc[utils.FirstExpOrderID].(int64) == 0 {
fFwv.dc[utils.FirstExpOrderID] = oID
}
if fFwv.dc[utils.LastExpOrderID].(int64) < oID {
fFwv.dc[utils.LastExpOrderID] = oID
}
}
if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil {
fFwv.dc[utils.TotalCost] = fFwv.dc[utils.TotalCost].(float64) + cost
}
if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil {
if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil {
switch tor {
case utils.VOICE:
fFwv.dc[utils.TotalDuration] = fFwv.dc[utils.TotalDuration].(time.Duration) + usage
case utils.SMS:
fFwv.dc[utils.TotalSMSUsage] = fFwv.dc[utils.TotalSMSUsage].(time.Duration) + usage
case utils.MMS:
fFwv.dc[utils.TotalMMSUsage] = fFwv.dc[utils.TotalMMSUsage].(time.Duration) + usage
case utils.GENERIC:
fFwv.dc[utils.TotalGenericUsage] = fFwv.dc[utils.TotalGenericUsage].(time.Duration) + usage
case utils.DATA:
fFwv.dc[utils.TotalDataUsage] = fFwv.dc[utils.TotalDataUsage].(time.Duration) + usage
}
}
}
fFwv.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
updateEEMetrics(fFwv.dc, cgrEv.Event, fFwv.cgrCfg.GeneralCfg().DefaultTimezone)
for _, record := range append(records, "\n") {
if _, err = io.WriteString(fFwv.file, record); err != nil {
return

View File

@@ -23,7 +23,6 @@ import (
"fmt"
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -33,14 +32,18 @@ import (
func NewHTTPJsonMapEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (httpJSON *HTTPJsonMapEe, err error) {
dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
httpJSON = &HTTPJsonMapEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
httpJSON = &HTTPJsonMapEe{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
}
if cgrCfg.EEsCfg().Exporters[cfgIdx].Type == utils.MetaHTTPjsonMap {
httpJSON.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify,
cgrCfg.GeneralCfg().ReplyTimeout, cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type], cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts)
}
return
}
@@ -51,8 +54,8 @@ type HTTPJsonMapEe struct {
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
httpPoster *engine.HTTPPoster
dc utils.MapStorage
sync.RWMutex
dc utils.MapStorage
}
// ID returns the identificator of this exporter
@@ -101,41 +104,7 @@ func (httpJson *HTTPJsonMapEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}
valMp[strings.Join(itm.Path, utils.NestingSep)] = utils.IfaceAsString(itm.Data)
}
if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, httpJson.cgrCfg.GeneralCfg().DefaultTimezone); err == nil {
if httpJson.dc[utils.FirstEventATime].(time.Time).IsZero() || httpJson.dc[utils.FirstEventATime].(time.Time).Before(aTime) {
httpJson.dc[utils.FirstEventATime] = aTime
}
if aTime.After(httpJson.dc[utils.LastEventATime].(time.Time)) {
httpJson.dc[utils.LastEventATime] = aTime
}
}
if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil {
if httpJson.dc[utils.FirstExpOrderID].(int64) > oID || httpJson.dc[utils.FirstExpOrderID].(int64) == 0 {
httpJson.dc[utils.FirstExpOrderID] = oID
}
if httpJson.dc[utils.LastExpOrderID].(int64) < oID {
httpJson.dc[utils.LastExpOrderID] = oID
}
}
if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil {
httpJson.dc[utils.TotalCost] = httpJson.dc[utils.TotalCost].(float64) + cost
}
if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil {
if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil {
switch tor {
case utils.VOICE:
httpJson.dc[utils.TotalDuration] = httpJson.dc[utils.TotalDuration].(time.Duration) + usage
case utils.SMS:
httpJson.dc[utils.TotalSMSUsage] = httpJson.dc[utils.TotalSMSUsage].(time.Duration) + usage
case utils.MMS:
httpJson.dc[utils.TotalMMSUsage] = httpJson.dc[utils.TotalMMSUsage].(time.Duration) + usage
case utils.GENERIC:
httpJson.dc[utils.TotalGenericUsage] = httpJson.dc[utils.TotalGenericUsage].(time.Duration) + usage
case utils.DATA:
httpJson.dc[utils.TotalDataUsage] = httpJson.dc[utils.TotalDataUsage].(time.Duration) + usage
}
}
}
updateEEMetrics(httpJson.dc, cgrEv.Event, httpJson.cgrCfg.GeneralCfg().DefaultTimezone)
cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID())
runID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault)
var body []byte

View File

@@ -195,6 +195,7 @@ func testHTTPJsonMapExportEvent(t *testing.T) {
utils.Usage: time.Duration(1),
utils.RunID: utils.MetaDefault,
utils.Cost: 0.15,
utils.OrderID: 10,
"ExporterUsed": "HTTPJsonMapExporter",
"ExtraFields": map[string]string{"extra1": "val_extra1",
"extra2": "val_extra2", "extra3": "val_extra3"},

View File

@@ -23,7 +23,6 @@ import (
"net/url"
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -65,8 +64,14 @@ func (httpPost *HTTPPost) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
httpPost.Lock()
defer httpPost.Unlock()
defer func() {
if err != nil {
httpPost.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
} else {
httpPost.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
}
httpPost.Unlock()
}()
httpPost.dc[utils.NumberOfEvents] = httpPost.dc[utils.NumberOfEvents].(int) + 1
var body interface{}
@@ -79,7 +84,6 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
httpPost.filterS)
if err = eeReq.SetFields(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ContentFields()); err != nil {
httpPost.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
return
}
for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() {
@@ -96,42 +100,7 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}
urlVals.Set(strings.Join(itm.Path, utils.NestingSep), utils.IfaceAsString(itm.Data))
}
if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, httpPost.cgrCfg.GeneralCfg().DefaultTimezone); err == nil {
if httpPost.dc[utils.FirstEventATime].(time.Time).IsZero() || httpPost.dc[utils.FirstEventATime].(time.Time).Before(aTime) {
httpPost.dc[utils.FirstEventATime] = aTime
}
if aTime.After(httpPost.dc[utils.LastEventATime].(time.Time)) {
httpPost.dc[utils.LastEventATime] = aTime
}
}
if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil {
if httpPost.dc[utils.FirstExpOrderID].(int64) > oID || httpPost.dc[utils.FirstExpOrderID].(int64) == 0 {
httpPost.dc[utils.FirstExpOrderID] = oID
}
if httpPost.dc[utils.LastExpOrderID].(int64) < oID {
httpPost.dc[utils.LastExpOrderID] = oID
}
}
if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil {
httpPost.dc[utils.TotalCost] = httpPost.dc[utils.TotalCost].(float64) + cost
}
if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil {
if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil {
switch tor {
case utils.VOICE:
httpPost.dc[utils.TotalDuration] = httpPost.dc[utils.TotalDuration].(time.Duration) + usage
case utils.SMS:
httpPost.dc[utils.TotalSMSUsage] = httpPost.dc[utils.TotalSMSUsage].(time.Duration) + usage
case utils.MMS:
httpPost.dc[utils.TotalMMSUsage] = httpPost.dc[utils.TotalMMSUsage].(time.Duration) + usage
case utils.GENERIC:
httpPost.dc[utils.TotalGenericUsage] = httpPost.dc[utils.TotalGenericUsage].(time.Duration) + usage
case utils.DATA:
httpPost.dc[utils.TotalDataUsage] = httpPost.dc[utils.TotalDataUsage].(time.Duration) + usage
}
}
}
httpPost.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
updateEEMetrics(httpPost.dc, cgrEv.Event, httpPost.cgrCfg.GeneralCfg().DefaultTimezone)
body = urlVals
if err = httpPost.httpPoster.Post(body, utils.EmptyString); err != nil &&
httpPost.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE {

View File

@@ -20,7 +20,6 @@ package ees
import (
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -64,8 +63,14 @@ func (vEe *VirtualEe) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
vEe.Lock()
defer vEe.Unlock()
defer func() {
if err != nil {
vEe.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
} else {
vEe.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
}
vEe.Unlock()
}()
vEe.dc[utils.NumberOfEvents] = vEe.dc[utils.NumberOfEvents].(int) + 1
req := utils.MapStorage{}
@@ -75,44 +80,8 @@ func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
eeReq := NewEventExporterRequest(req, vEe.dc, cgrEv.Tenant, vEe.cgrCfg.GeneralCfg().DefaultTimezone,
vEe.filterS)
if err = eeReq.SetFields(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].ContentFields()); err != nil {
vEe.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
return
}
if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, vEe.cgrCfg.GeneralCfg().DefaultTimezone); err == nil {
if vEe.dc[utils.FirstEventATime].(time.Time).IsZero() || vEe.dc[utils.FirstEventATime].(time.Time).Before(aTime) {
vEe.dc[utils.FirstEventATime] = aTime
}
if aTime.After(vEe.dc[utils.LastEventATime].(time.Time)) {
vEe.dc[utils.LastEventATime] = aTime
}
}
if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil {
if vEe.dc[utils.FirstExpOrderID].(int64) > oID || vEe.dc[utils.FirstExpOrderID].(int64) == 0 {
vEe.dc[utils.FirstExpOrderID] = oID
}
if vEe.dc[utils.LastExpOrderID].(int64) < oID {
vEe.dc[utils.LastExpOrderID] = oID
}
}
if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil {
vEe.dc[utils.TotalCost] = vEe.dc[utils.TotalCost].(float64) + cost
}
if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil {
if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil {
switch tor {
case utils.VOICE:
vEe.dc[utils.TotalDuration] = vEe.dc[utils.TotalDuration].(time.Duration) + usage
case utils.SMS:
vEe.dc[utils.TotalSMSUsage] = vEe.dc[utils.TotalSMSUsage].(time.Duration) + usage
case utils.MMS:
vEe.dc[utils.TotalMMSUsage] = vEe.dc[utils.TotalMMSUsage].(time.Duration) + usage
case utils.GENERIC:
vEe.dc[utils.TotalGenericUsage] = vEe.dc[utils.TotalGenericUsage].(time.Duration) + usage
case utils.DATA:
vEe.dc[utils.TotalDataUsage] = vEe.dc[utils.TotalDataUsage].(time.Duration) + usage
}
}
}
vEe.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
updateEEMetrics(vEe.dc, cgrEv.Event, vEe.cgrCfg.GeneralCfg().DefaultTimezone)
return
}

View File

@@ -85,6 +85,15 @@ func (me MapEvent) GetTInt64(fldName string) (out int64, err error) {
return utils.IfaceAsTInt64(fldIface)
}
// GetFloat64 returns a field as float64 instance
func (me MapEvent) GetFloat64(fldName string) (f float64, err error) {
iface, has := me[fldName]
if !has {
return f, utils.ErrNotFound
}
return utils.IfaceAsFloat64(iface)
}
func (me MapEvent) GetStringIgnoreErrors(fldName string) (out string) {
out, _ = me.GetString(fldName)
return

View File

@@ -689,6 +689,28 @@ func TestMapEventGetTInt64(t *testing.T) {
}
}
func TestMapEventGetFloat64(t *testing.T) {
if rply, err := mapEv.GetFloat64("test2"); err != nil {
t.Error(err)
} else if rply != float64(42) {
t.Errorf("Expecting %+v, received: %+v", float64(42), rply)
}
if rply, err := mapEv.GetFloat64("test3"); err != nil {
t.Error(err)
} else if rply != float64(42.3) {
t.Errorf("Expecting %+v, received: %+v", float64(42.3), rply)
}
if rply, err := mapEv.GetFloat64("test4"); err == nil {
t.Errorf("Expecting error, received: %+v with error %v", rply, err)
}
if rply, err := mapEv.GetFloat64("0test"); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Errorf("Expecting error: %v, received: %+v with error %v", utils.ErrNotFound, rply, err)
}
}
func TestMapEventGetDurationPtr(t *testing.T) {
if rply, err := mapEv.GetDurationPtr("test4"); err == nil {
t.Errorf("Expecting error, received: %+v with error %v", rply, err)