diff --git a/actions/log.go b/actions/log.go
index 93f473faf..fd69e41e5 100644
--- a/actions/log.go
+++ b/actions/log.go
@@ -79,7 +79,7 @@ func (aL *actCDRLog) execute(ctx *context.Context, data utils.MapStorage, _ stri
utils.MetaCDR: utils.NewOrderedNavigableMap(),
}
// construct an AgentRequest so we can build the reply and send it to CDRServer
- cdrLogReq := engine.NewExportRequest(map[string]utils.MapStorage{
+ cdrLogReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: reqNm,
utils.MetaOpts: optsMS,
utils.MetaCfg: aL.config.GetDataProvider(),
diff --git a/ees/ee.go b/ees/ee.go
index c33a4c531..33c6de161 100644
--- a/ees/ee.go
+++ b/ees/ee.go
@@ -30,12 +30,12 @@ type EventExporter interface {
ID() string // return the exporter identificator
ExportEvent(cgrEv *utils.CGREvent) (err error) // called on each event to be exported
OnEvicted(itmID string, value interface{}) // called when the exporter needs to terminate
- GetMetrics() utils.MapStorage // called to get metrics
+ GetMetrics() *utils.SafeMapStorage // called to get metrics
}
// NewEventExporter produces exporters
func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (ee EventExporter, err error) {
- var dc utils.MapStorage
+ var dc *utils.SafeMapStorage
if dc, err = newEEMetrics(utils.FirstNonEmpty(
cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone,
cgrCfg.GeneralCfg().DefaultTimezone)); err != nil {
diff --git a/ees/ee_test.go b/ees/ee_test.go
index 5cfd530d6..736bd525e 100644
--- a/ees/ee_test.go
+++ b/ees/ee_test.go
@@ -53,11 +53,11 @@ func TestNewEventExporter(t *testing.T) {
t.Error("\nExpected an error")
}
newEE := ee.(*FileCSVee)
- newEE.dc[utils.TimeNow] = nil
- newEE.dc[utils.ExportPath] = nil
+ newEE.dc.MapStorage[utils.TimeNow] = nil
+ newEE.dc.MapStorage[utils.ExportPath] = nil
eeExpect.csvWriter = nil
- eeExpect.dc[utils.TimeNow] = nil
- eeExpect.dc[utils.ExportPath] = nil
+ eeExpect.dc.MapStorage[utils.TimeNow] = nil
+ eeExpect.dc.MapStorage[utils.ExportPath] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -89,10 +89,10 @@ func TestNewEventExporterCase2(t *testing.T) {
t.Error("\nExpected an error")
}
newEE := ee.(*FileFWVee)
- newEE.dc[utils.TimeNow] = nil
- newEE.dc[utils.ExportPath] = nil
- eeExpect.dc[utils.TimeNow] = nil
- eeExpect.dc[utils.ExportPath] = nil
+ newEE.dc.MapStorage[utils.TimeNow] = nil
+ newEE.dc.MapStorage[utils.ExportPath] = nil
+ eeExpect.dc.MapStorage[utils.TimeNow] = nil
+ eeExpect.dc.MapStorage[utils.ExportPath] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -118,8 +118,8 @@ func TestNewEventExporterCase3(t *testing.T) {
t.Error(err)
}
newEE := ee.(*HTTPPost)
- newEE.dc[utils.TimeNow] = nil
- eeExpect.dc[utils.TimeNow] = nil
+ newEE.dc.MapStorage[utils.TimeNow] = nil
+ eeExpect.dc.MapStorage[utils.TimeNow] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -145,8 +145,8 @@ func TestNewEventExporterCase4(t *testing.T) {
t.Error(err)
}
newEE := ee.(*HTTPjsonMapEE)
- newEE.dc[utils.TimeNow] = nil
- eeExpect.dc[utils.TimeNow] = nil
+ newEE.dc.MapStorage[utils.TimeNow] = nil
+ eeExpect.dc.MapStorage[utils.TimeNow] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -172,8 +172,8 @@ func TestNewEventExporterCase5(t *testing.T) {
t.Error(err)
}
newEE := ee.(*PosterJSONMapEE)
- newEE.dc[utils.TimeNow] = nil
- eeExpect.dc[utils.TimeNow] = nil
+ newEE.dc.MapStorage[utils.TimeNow] = nil
+ eeExpect.dc.MapStorage[utils.TimeNow] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -199,8 +199,8 @@ func TestNewEventExporterCase6(t *testing.T) {
t.Error(err)
}
newEE := ee.(*VirtualEe)
- newEE.dc[utils.TimeNow] = nil
- eeExpect.dc[utils.TimeNow] = nil
+ newEE.dc.MapStorage[utils.TimeNow] = nil
+ eeExpect.dc.MapStorage[utils.TimeNow] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -239,8 +239,8 @@ func TestNewEventExporterCase7(t *testing.T) {
t.Error(err)
}
newEE := ee.(*ElasticEe)
- newEE.dc[utils.TimeNow] = nil
- eeExpect.dc[utils.TimeNow] = nil
+ newEE.dc.MapStorage[utils.TimeNow] = nil
+ eeExpect.dc.MapStorage[utils.TimeNow] = nil
eeExpect.eClnt = newEE.eClnt
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", eeExpect, newEE)
diff --git a/ees/ees.go b/ees/ees.go
index 66566bf65..cf9124e15 100644
--- a/ees/ees.go
+++ b/ees/ees.go
@@ -216,7 +216,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
if sync {
if hasVerbose {
metricMapLock.Lock()
- metricsMap[ee.ID()] = ee.GetMetrics()
+ metricsMap[ee.ID()] = ee.GetMetrics().MapStorage
metricMapLock.Unlock()
}
wg.Done()
@@ -254,86 +254,93 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
return
}
-func newEEMetrics(location string) (utils.MapStorage, error) {
+func newEEMetrics(location string) (*utils.SafeMapStorage, error) {
tNow := time.Now()
loc, err := time.LoadLocation(location)
if err != nil {
return nil, err
}
- return utils.MapStorage{
+ return &utils.SafeMapStorage{MapStorage: utils.MapStorage{
utils.NumberOfEvents: int64(0),
utils.PositiveExports: utils.StringSet{},
utils.NegativeExports: utils.StringSet{},
utils.TimeNow: time.Date(tNow.Year(), tNow.Month(), tNow.Day(),
tNow.Hour(), tNow.Minute(), tNow.Second(), tNow.Nanosecond(), loc),
- }, nil
+ }}, nil
}
-func updateEEMetrics(dc utils.MapStorage, ev engine.MapEvent, timezone string) {
+func updateEEMetrics(dc *utils.SafeMapStorage, cgrID string, ev engine.MapEvent, hasError bool, timezone string) {
+ dc.Lock()
+ defer dc.Unlock()
+ if hasError {
+ dc.MapStorage[utils.NegativeExports].(utils.StringSet).Add(cgrID)
+ } else {
+ dc.MapStorage[utils.PositiveExports].(utils.StringSet).Add(cgrID)
+ }
if aTime, err := ev.GetTime(utils.AnswerTime, timezone); err == nil {
- if _, has := dc[utils.FirstEventATime]; !has {
- dc[utils.FirstEventATime] = time.Time{}
+ if _, has := dc.MapStorage[utils.FirstEventATime]; !has {
+ dc.MapStorage[utils.FirstEventATime] = time.Time{}
}
- if _, has := dc[utils.LastEventATime]; !has {
- dc[utils.LastEventATime] = time.Time{}
+ if _, has := dc.MapStorage[utils.LastEventATime]; !has {
+ dc.MapStorage[utils.LastEventATime] = time.Time{}
}
- if dc[utils.FirstEventATime].(time.Time).IsZero() ||
- aTime.Before(dc[utils.FirstEventATime].(time.Time)) {
- dc[utils.FirstEventATime] = aTime
+ if dc.MapStorage[utils.FirstEventATime].(time.Time).IsZero() ||
+ aTime.Before(dc.MapStorage[utils.FirstEventATime].(time.Time)) {
+ dc.MapStorage[utils.FirstEventATime] = aTime
}
- if aTime.After(dc[utils.LastEventATime].(time.Time)) {
- dc[utils.LastEventATime] = aTime
+ if aTime.After(dc.MapStorage[utils.LastEventATime].(time.Time)) {
+ dc.MapStorage[utils.LastEventATime] = aTime
}
}
if oID, err := ev.GetTInt64(utils.OrderID); err == nil {
- if _, has := dc[utils.FirstExpOrderID]; !has {
- dc[utils.FirstExpOrderID] = int64(0)
+ if _, has := dc.MapStorage[utils.FirstExpOrderID]; !has {
+ dc.MapStorage[utils.FirstExpOrderID] = int64(0)
}
- if _, has := dc[utils.LastExpOrderID]; !has {
- dc[utils.LastExpOrderID] = int64(0)
+ if _, has := dc.MapStorage[utils.LastExpOrderID]; !has {
+ dc.MapStorage[utils.LastExpOrderID] = int64(0)
}
- if dc[utils.FirstExpOrderID].(int64) == 0 ||
- dc[utils.FirstExpOrderID].(int64) > oID {
- dc[utils.FirstExpOrderID] = oID
+ if dc.MapStorage[utils.FirstExpOrderID].(int64) == 0 ||
+ dc.MapStorage[utils.FirstExpOrderID].(int64) > oID {
+ dc.MapStorage[utils.FirstExpOrderID] = oID
}
- if dc[utils.LastExpOrderID].(int64) < oID {
- dc[utils.LastExpOrderID] = oID
+ if dc.MapStorage[utils.LastExpOrderID].(int64) < oID {
+ dc.MapStorage[utils.LastExpOrderID] = oID
}
}
if cost, err := ev.GetFloat64(utils.Cost); err == nil {
- if _, has := dc[utils.TotalCost]; !has {
- dc[utils.TotalCost] = float64(0.0)
+ if _, has := dc.MapStorage[utils.TotalCost]; !has {
+ dc.MapStorage[utils.TotalCost] = float64(0.0)
}
- dc[utils.TotalCost] = dc[utils.TotalCost].(float64) + cost
+ dc.MapStorage[utils.TotalCost] = dc.MapStorage[utils.TotalCost].(float64) + cost
}
if tor, err := ev.GetString(utils.ToR); err == nil {
if usage, err := ev.GetDuration(utils.Usage); err == nil {
switch tor {
case utils.MetaVoice:
- if _, has := dc[utils.TotalDuration]; !has {
- dc[utils.TotalDuration] = time.Duration(0)
+ if _, has := dc.MapStorage[utils.TotalDuration]; !has {
+ dc.MapStorage[utils.TotalDuration] = time.Duration(0)
}
- dc[utils.TotalDuration] = dc[utils.TotalDuration].(time.Duration) + usage
+ dc.MapStorage[utils.TotalDuration] = dc.MapStorage[utils.TotalDuration].(time.Duration) + usage
case utils.MetaSMS:
- if _, has := dc[utils.TotalSMSUsage]; !has {
- dc[utils.TotalSMSUsage] = time.Duration(0)
+ if _, has := dc.MapStorage[utils.TotalSMSUsage]; !has {
+ dc.MapStorage[utils.TotalSMSUsage] = time.Duration(0)
}
- dc[utils.TotalSMSUsage] = dc[utils.TotalSMSUsage].(time.Duration) + usage
+ dc.MapStorage[utils.TotalSMSUsage] = dc.MapStorage[utils.TotalSMSUsage].(time.Duration) + usage
case utils.MetaMMS:
- if _, has := dc[utils.TotalMMSUsage]; !has {
- dc[utils.TotalMMSUsage] = time.Duration(0)
+ if _, has := dc.MapStorage[utils.TotalMMSUsage]; !has {
+ dc.MapStorage[utils.TotalMMSUsage] = time.Duration(0)
}
- dc[utils.TotalMMSUsage] = dc[utils.TotalMMSUsage].(time.Duration) + usage
+ dc.MapStorage[utils.TotalMMSUsage] = dc.MapStorage[utils.TotalMMSUsage].(time.Duration) + usage
case utils.MetaGeneric:
- if _, has := dc[utils.TotalGenericUsage]; !has {
- dc[utils.TotalGenericUsage] = time.Duration(0)
+ if _, has := dc.MapStorage[utils.TotalGenericUsage]; !has {
+ dc.MapStorage[utils.TotalGenericUsage] = time.Duration(0)
}
- dc[utils.TotalGenericUsage] = dc[utils.TotalGenericUsage].(time.Duration) + usage
+ dc.MapStorage[utils.TotalGenericUsage] = dc.MapStorage[utils.TotalGenericUsage].(time.Duration) + usage
case utils.MetaData:
- if _, has := dc[utils.TotalDataUsage]; !has {
- dc[utils.TotalDataUsage] = time.Duration(0)
+ if _, has := dc.MapStorage[utils.TotalDataUsage]; !has {
+ dc.MapStorage[utils.TotalDataUsage] = time.Duration(0)
}
- dc[utils.TotalDataUsage] = dc[utils.TotalDataUsage].(time.Duration) + usage
+ dc.MapStorage[utils.TotalDataUsage] = dc.MapStorage[utils.TotalDataUsage].(time.Duration) + usage
}
}
}
diff --git a/ees/ees_test.go b/ees/ees_test.go
index 4cc7f1895..2cd7ffd82 100644
--- a/ees/ees_test.go
+++ b/ees/ees_test.go
@@ -307,12 +307,12 @@ func (mockEventExporter) ExportEvent(cgrEv *utils.CGREvent) error { return nil }
func (mockEventExporter) OnEvicted(itdmID string, value interface{}) {
utils.Logger.Warning("NOT IMPLEMENTED")
}
-func (mockEventExporter) GetMetrics() utils.MapStorage {
- return utils.MapStorage{
+func (mockEventExporter) GetMetrics() *utils.SafeMapStorage {
+ return &utils.SafeMapStorage{MapStorage: utils.MapStorage{
utils.NumberOfEvents: int64(0),
utils.PositiveExports: utils.StringSet{},
utils.NegativeExports: 5,
- }
+ }}
}
func TestV1ProcessEventMockMetrics(t *testing.T) {
@@ -449,14 +449,15 @@ func TestUpdateEEMetrics(t *testing.T) {
utils.Usage: time.Second,
}
exp, _ := newEEMetrics(utils.EmptyString)
- exp[utils.FirstEventATime] = tnow
- exp[utils.LastEventATime] = tnow
- exp[utils.FirstExpOrderID] = int64(1)
- exp[utils.LastExpOrderID] = int64(1)
- exp[utils.TotalCost] = float64(5.5)
- exp[utils.TotalDuration] = time.Second
- exp[utils.TimeNow] = dc[utils.TimeNow]
- if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
+ exp.MapStorage[utils.FirstEventATime] = tnow
+ exp.MapStorage[utils.LastEventATime] = tnow
+ exp.MapStorage[utils.FirstExpOrderID] = int64(1)
+ exp.MapStorage[utils.LastExpOrderID] = int64(1)
+ exp.MapStorage[utils.TotalCost] = float64(5.5)
+ exp.MapStorage[utils.TotalDuration] = time.Second
+ exp.MapStorage[utils.TimeNow] = dc.MapStorage[utils.TimeNow]
+ exp.MapStorage[utils.PositiveExports] = utils.StringSet{"": {}}
+ if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
}
@@ -468,11 +469,11 @@ func TestUpdateEEMetrics(t *testing.T) {
utils.ToR: utils.MetaSMS,
utils.Usage: time.Second,
}
- exp[utils.LastEventATime] = tnow
- exp[utils.LastExpOrderID] = int64(2)
- exp[utils.TotalCost] = float64(11)
- exp[utils.TotalSMSUsage] = time.Second
- if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
+ exp.MapStorage[utils.LastEventATime] = tnow
+ exp.MapStorage[utils.LastExpOrderID] = int64(2)
+ exp.MapStorage[utils.TotalCost] = float64(11)
+ exp.MapStorage[utils.TotalSMSUsage] = time.Second
+ if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
}
@@ -484,11 +485,11 @@ func TestUpdateEEMetrics(t *testing.T) {
utils.ToR: utils.MetaMMS,
utils.Usage: time.Second,
}
- exp[utils.LastEventATime] = tnow
- exp[utils.LastExpOrderID] = int64(3)
- exp[utils.TotalCost] = float64(16.5)
- exp[utils.TotalMMSUsage] = time.Second
- if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
+ exp.MapStorage[utils.LastEventATime] = tnow
+ exp.MapStorage[utils.LastExpOrderID] = int64(3)
+ exp.MapStorage[utils.TotalCost] = float64(16.5)
+ exp.MapStorage[utils.TotalMMSUsage] = time.Second
+ if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
}
@@ -500,11 +501,11 @@ func TestUpdateEEMetrics(t *testing.T) {
utils.ToR: utils.MetaGeneric,
utils.Usage: time.Second,
}
- exp[utils.LastEventATime] = tnow
- exp[utils.LastExpOrderID] = int64(4)
- exp[utils.TotalCost] = float64(22)
- exp[utils.TotalGenericUsage] = time.Second
- if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
+ exp.MapStorage[utils.LastEventATime] = tnow
+ exp.MapStorage[utils.LastExpOrderID] = int64(4)
+ exp.MapStorage[utils.TotalCost] = float64(22)
+ exp.MapStorage[utils.TotalGenericUsage] = time.Second
+ if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
}
@@ -516,11 +517,11 @@ func TestUpdateEEMetrics(t *testing.T) {
utils.ToR: utils.MetaData,
utils.Usage: time.Second,
}
- exp[utils.LastEventATime] = tnow
- exp[utils.LastExpOrderID] = int64(5)
- exp[utils.TotalCost] = float64(27.5)
- exp[utils.TotalDataUsage] = time.Second
- if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
+ exp.MapStorage[utils.LastEventATime] = tnow
+ exp.MapStorage[utils.LastExpOrderID] = int64(5)
+ exp.MapStorage[utils.TotalCost] = float64(27.5)
+ exp.MapStorage[utils.TotalDataUsage] = time.Second
+ if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
}
}
diff --git a/ees/elastic.go b/ees/elastic.go
index 93b66ca88..80521105e 100644
--- a/ees/elastic.go
+++ b/ees/elastic.go
@@ -23,7 +23,6 @@ import (
"encoding/json"
"fmt"
"strings"
- "sync"
"github.com/elastic/go-elasticsearch/esapi"
@@ -34,7 +33,7 @@ import (
)
func NewElasticExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
- dc utils.MapStorage) (eEe *ElasticEe, err error) {
+ dc *utils.SafeMapStorage) (eEe *ElasticEe, err error) {
eEe = &ElasticEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
err = eEe.init()
@@ -48,9 +47,8 @@ type ElasticEe struct {
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
- sync.RWMutex
- dc utils.MapStorage
- opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap
+ dc *utils.SafeMapStorage
+ opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap
}
// init will create all the necessary dependencies, including opening the file
@@ -124,17 +122,13 @@ func (eEe *ElasticEe) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
- eEe.Lock()
defer func() {
- if err != nil {
- eEe.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
- } else {
- eEe.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
- }
- eEe.Unlock()
+ updateEEMetrics(eEe.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Timezone,
+ eEe.cgrCfg.GeneralCfg().DefaultTimezone))
}()
- eEe.dc[utils.NumberOfEvents] = eEe.dc[utils.NumberOfEvents].(int64) + 1
-
+ eEe.dc.Lock()
+ eEe.dc.MapStorage[utils.NumberOfEvents] = eEe.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
+ eEe.dc.Unlock()
valMp := make(map[string]interface{})
if len(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].ContentFields()) == 0 {
valMp = cgrEv.Event
@@ -142,10 +136,10 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
- eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
- utils.MetaReq: cgrEv.Event,
+ eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
+ utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: eEe.dc,
- utils.MetaOpts: cgrEv.APIOpts,
+ utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: eEe.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, eEe.cgrCfg.GeneralCfg().DefaultTenant),
eEe.filterS, oNm)
@@ -159,8 +153,7 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
valMp[strings.Join(path, utils.NestingSep)] = nmIt.String()
}
}
- updateEEMetrics(eEe.dc, cgrEv.Event, utils.FirstNonEmpty(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Timezone,
- eEe.cgrCfg.GeneralCfg().DefaultTimezone))
+
// Set up the request object
cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID())
runID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault)
@@ -198,6 +191,6 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
return
}
-func (eEe *ElasticEe) GetMetrics() utils.MapStorage {
+func (eEe *ElasticEe) GetMetrics() *utils.SafeMapStorage {
return eEe.dc.Clone()
}
diff --git a/ees/filecsv.go b/ees/filecsv.go
index 173f3d05f..0a692c558 100644
--- a/ees/filecsv.go
+++ b/ees/filecsv.go
@@ -24,7 +24,6 @@ import (
"io"
"os"
"path"
- "sync"
"github.com/cgrates/cgrates/engine"
@@ -33,7 +32,7 @@ import (
)
func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
- dc utils.MapStorage) (fCsv *FileCSVee, err error) {
+ dc *utils.SafeMapStorage) (fCsv *FileCSVee, err error) {
fCsv = &FileCSVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
err = fCsv.init()
@@ -48,8 +47,7 @@ type FileCSVee struct {
filterS *engine.FilterS
file io.WriteCloser
csvWriter *csv.Writer
- sync.RWMutex
- dc utils.MapStorage
+ dc *utils.SafeMapStorage
}
// init will create all the necessary dependencies, including opening the file
@@ -57,9 +55,9 @@ func (fCsv *FileCSVee) init() (err error) {
// create the file
filePath := path.Join(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ExportPath,
fCsv.id+utils.Underline+utils.UUIDSha1Prefix()+utils.CSVSuffix)
- fCsv.Lock()
- fCsv.dc[utils.ExportPath] = filePath
- fCsv.Unlock()
+ fCsv.dc.Lock()
+ fCsv.dc.MapStorage[utils.ExportPath] = filePath
+ fCsv.dc.Unlock()
if fCsv.file, err = os.Create(filePath); err != nil {
return
}
@@ -92,16 +90,13 @@ func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
- fCsv.Lock()
defer func() {
- if err != nil {
- fCsv.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
- } else {
- fCsv.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
- }
- fCsv.Unlock()
+ updateEEMetrics(fCsv.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone,
+ fCsv.cgrCfg.GeneralCfg().DefaultTimezone))
}()
- fCsv.dc[utils.NumberOfEvents] = fCsv.dc[utils.NumberOfEvents].(int64) + 1
+ fCsv.dc.Lock()
+ fCsv.dc.MapStorage[utils.NumberOfEvents] = fCsv.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
+ fCsv.dc.Unlock()
var csvRecord []string
if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields()) == 0 {
@@ -113,10 +108,10 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
- eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
- utils.MetaReq: cgrEv.Event,
+ eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
+ utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: fCsv.dc,
- utils.MetaOpts: cgrEv.APIOpts,
+ utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: fCsv.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, fCsv.cgrCfg.GeneralCfg().DefaultTenant),
fCsv.filterS, oNm)
@@ -127,8 +122,6 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
csvRecord = eeReq.ExpData[utils.MetaExp].OrderedFieldsAsStrings()
}
- updateEEMetrics(fCsv.dc, cgrEv.Event, utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone,
- fCsv.cgrCfg.GeneralCfg().DefaultTimezone))
return fCsv.csvWriter.Write(csvRecord)
}
@@ -140,7 +133,7 @@ func (fCsv *FileCSVee) composeHeader() (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaHdr: utils.NewOrderedNavigableMap(),
}
- eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
+ eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: fCsv.dc,
utils.MetaCfg: fCsv.cgrCfg.GetDataProvider(),
}, fCsv.cgrCfg.GeneralCfg().DefaultTenant,
@@ -159,7 +152,7 @@ func (fCsv *FileCSVee) composeTrailer() (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaTrl: utils.NewOrderedNavigableMap(),
}
- eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
+ eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: fCsv.dc,
utils.MetaCfg: fCsv.cgrCfg.GetDataProvider(),
}, fCsv.cgrCfg.GeneralCfg().DefaultTenant,
@@ -171,6 +164,6 @@ func (fCsv *FileCSVee) composeTrailer() (err error) {
return fCsv.csvWriter.Write(eeReq.ExpData[utils.MetaTrl].OrderedFieldsAsStrings())
}
-func (fCsv *FileCSVee) GetMetrics() utils.MapStorage {
+func (fCsv *FileCSVee) GetMetrics() *utils.SafeMapStorage {
return fCsv.dc.Clone()
}
diff --git a/ees/filecsv_test.go b/ees/filecsv_test.go
index 1aa06a45e..0cb9b3655 100644
--- a/ees/filecsv_test.go
+++ b/ees/filecsv_test.go
@@ -76,7 +76,7 @@ func TestFileCsvComposeHeader(t *testing.T) {
filterS: filterS,
file: nopCloser{byteBuff},
csvWriter: csvNW,
- dc: utils.MapStorage{},
+ dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -140,7 +140,7 @@ func TestFileCsvComposeTrailer(t *testing.T) {
filterS: filterS,
file: nopCloser{byteBuff},
csvWriter: csvNW,
- dc: utils.MapStorage{},
+ dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -286,7 +286,7 @@ func TestFileCsvOnEvictedTrailer(t *testing.T) {
filterS: filterS,
file: nopCloserWrite{byteBuff},
csvWriter: csvNW,
- dc: utils.MapStorage{},
+ dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -321,7 +321,7 @@ func TestFileCsvOnEvictedClose(t *testing.T) {
filterS: filterS,
file: nopCloserError{byteBuff},
csvWriter: csvNW,
- dc: utils.MapStorage{},
+ dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields = []*config.FCTemplate{
{
diff --git a/ees/filefwv.go b/ees/filefwv.go
index 788891fed..8b36192b5 100644
--- a/ees/filefwv.go
+++ b/ees/filefwv.go
@@ -23,14 +23,13 @@ import (
"io"
"os"
"path"
- "sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
-func NewFileFWVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc utils.MapStorage) (fFwv *FileFWVee, err error) {
+func NewFileFWVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc *utils.SafeMapStorage) (fFwv *FileFWVee, err error) {
fFwv = &FileFWVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
err = fFwv.init()
@@ -44,17 +43,16 @@ type FileFWVee struct {
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
file io.WriteCloser
- dc utils.MapStorage
- sync.RWMutex
+ dc *utils.SafeMapStorage
}
// init will create all the necessary dependencies, including opening the file
func (fFwv *FileFWVee) init() (err error) {
filePath := path.Join(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ExportPath,
fFwv.id+utils.Underline+utils.UUIDSha1Prefix()+utils.FWVSuffix)
- fFwv.Lock()
- fFwv.dc[utils.ExportPath] = filePath
- fFwv.Unlock()
+ fFwv.dc.Lock()
+ fFwv.dc.MapStorage[utils.ExportPath] = filePath
+ fFwv.dc.Unlock()
// create the file
if fFwv.file, err = os.Create(filePath); err != nil {
return
@@ -82,16 +80,13 @@ func (fFwv *FileFWVee) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
- fFwv.Lock()
defer func() {
- if err != nil {
- fFwv.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
- } else {
- fFwv.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
- }
- fFwv.Unlock()
+ updateEEMetrics(fFwv.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone,
+ fFwv.cgrCfg.GeneralCfg().DefaultTimezone))
}()
- fFwv.dc[utils.NumberOfEvents] = fFwv.dc[utils.NumberOfEvents].(int64) + 1
+ fFwv.dc.Lock()
+ fFwv.dc.MapStorage[utils.NumberOfEvents] = fFwv.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
+ fFwv.dc.Unlock()
var records []string
if len(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ContentFields()) == 0 {
records = make([]string, 0, len(cgrEv.Event))
@@ -102,10 +97,10 @@ func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
- eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
- utils.MetaReq: cgrEv.Event,
+ eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
+ utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: fFwv.dc,
- utils.MetaOpts: cgrEv.APIOpts,
+ utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: fFwv.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, fFwv.cgrCfg.GeneralCfg().DefaultTenant),
fFwv.filterS, oNm)
@@ -116,8 +111,6 @@ func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
records = eeReq.ExpData[utils.MetaExp].OrderedFieldsAsStrings()
}
- updateEEMetrics(fFwv.dc, cgrEv.Event, utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone,
- fFwv.cgrCfg.GeneralCfg().DefaultTimezone))
for _, record := range records {
if _, err = io.WriteString(fFwv.file, record); err != nil {
return
@@ -135,7 +128,7 @@ func (fFwv *FileFWVee) composeHeader() (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaHdr: utils.NewOrderedNavigableMap(),
}
- eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
+ eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: fFwv.dc,
utils.MetaCfg: fFwv.cgrCfg.GetDataProvider(),
}, fFwv.cgrCfg.GeneralCfg().DefaultTenant,
@@ -160,7 +153,7 @@ func (fFwv *FileFWVee) composeTrailer() (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaTrl: utils.NewOrderedNavigableMap(),
}
- eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
+ eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: fFwv.dc,
utils.MetaCfg: fFwv.cgrCfg.GetDataProvider(),
}, fFwv.cgrCfg.GeneralCfg().DefaultTenant,
@@ -177,6 +170,6 @@ func (fFwv *FileFWVee) composeTrailer() (err error) {
return
}
-func (fFwv *FileFWVee) GetMetrics() utils.MapStorage {
+func (fFwv *FileFWVee) GetMetrics() *utils.SafeMapStorage {
return fFwv.dc.Clone()
}
diff --git a/ees/filefwv_test.go b/ees/filefwv_test.go
index 343d8a510..adcf1dc66 100644
--- a/ees/filefwv_test.go
+++ b/ees/filefwv_test.go
@@ -78,7 +78,7 @@ func TestFileFwvComposeHeader(t *testing.T) {
cfgIdx: 0,
filterS: filterS,
file: nopCloser{byteBuff},
- dc: utils.MapStorage{},
+ dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -141,7 +141,7 @@ func TestFileFwvComposeTrailer(t *testing.T) {
cfgIdx: 0,
filterS: filterS,
file: nopCloser{byteBuff},
- dc: utils.MapStorage{},
+ dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -327,7 +327,7 @@ func TestFileFwvComposeHeaderWriteError(t *testing.T) {
cfgIdx: 0,
filterS: filterS,
file: nopCloserWrite{byteBuff},
- dc: utils.MapStorage{},
+ dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -360,7 +360,7 @@ func TestFileFwvComposeTrailerWriteError(t *testing.T) {
cfgIdx: 0,
filterS: filterS,
file: nopCloserWrite{byteBuff},
- dc: utils.MapStorage{},
+ dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -392,7 +392,7 @@ func TestFileFwvOnEvictedTrailer(t *testing.T) {
cfgIdx: 0,
filterS: filterS,
file: nopCloserWrite{byteBuff},
- dc: utils.MapStorage{},
+ dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -431,7 +431,7 @@ func TestFileFwvOnEvictedClose(t *testing.T) {
cfgIdx: 0,
filterS: filterS,
file: nopCloserError{byteBuff},
- dc: utils.MapStorage{},
+ dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Fields = []*config.FCTemplate{
{
diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go
index b02001233..7a56cb5c4 100644
--- a/ees/httpjsonmap.go
+++ b/ees/httpjsonmap.go
@@ -22,7 +22,6 @@ import (
"encoding/json"
"net/http"
"strings"
- "sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -30,7 +29,7 @@ import (
)
func NewHTTPjsonMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
- dc utils.MapStorage) (pstrJSON *HTTPjsonMapEE, err error) {
+ dc *utils.SafeMapStorage) (pstrJSON *HTTPjsonMapEE, err error) {
pstrJSON = &HTTPjsonMapEE{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
@@ -53,8 +52,7 @@ type HTTPjsonMapEE struct {
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
pstr *engine.HTTPPoster
- dc utils.MapStorage
- sync.RWMutex
+ dc *utils.SafeMapStorage
}
// ID returns the identificator of this exporter
@@ -68,17 +66,13 @@ func (httpEE *HTTPjsonMapEE) OnEvicted(string, interface{}) {
// ExportEvent implements EventExporter
func (httpEE *HTTPjsonMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
- httpEE.Lock()
defer func() {
- if err != nil {
- httpEE.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
- } else {
- httpEE.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
- }
- httpEE.Unlock()
+ updateEEMetrics(httpEE.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].Timezone,
+ httpEE.cgrCfg.GeneralCfg().DefaultTimezone))
}()
-
- httpEE.dc[utils.NumberOfEvents] = httpEE.dc[utils.NumberOfEvents].(int64) + 1
+ httpEE.dc.Lock()
+ httpEE.dc.MapStorage[utils.NumberOfEvents] = httpEE.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
+ httpEE.dc.Unlock()
valMp := make(map[string]interface{})
hdr := http.Header{}
@@ -88,10 +82,10 @@ func (httpEE *HTTPjsonMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
- eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
- utils.MetaReq: cgrEv.Event,
+ eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
+ utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: httpEE.dc,
- utils.MetaOpts: cgrEv.APIOpts,
+ utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: httpEE.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, httpEE.cgrCfg.GeneralCfg().DefaultTenant),
httpEE.filterS, oNm)
@@ -109,8 +103,7 @@ func (httpEE *HTTPjsonMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
return
}
}
- updateEEMetrics(httpEE.dc, cgrEv.Event, utils.FirstNonEmpty(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].Timezone,
- httpEE.cgrCfg.GeneralCfg().DefaultTimezone))
+
var body []byte
if body, err = json.Marshal(valMp); err != nil {
return
@@ -125,7 +118,7 @@ func (httpEE *HTTPjsonMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
return
}
-func (httpEE *HTTPjsonMapEE) GetMetrics() utils.MapStorage {
+func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.SafeMapStorage {
return httpEE.dc.Clone()
}
@@ -138,7 +131,7 @@ func (httpEE *HTTPjsonMapEE) composeHeader() (hdr http.Header, err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaHdr: utils.NewOrderedNavigableMap(),
}
- eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
+ eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: httpEE.dc,
utils.MetaCfg: httpEE.cgrCfg.GetDataProvider(),
}, httpEE.cgrCfg.GeneralCfg().DefaultTenant,
diff --git a/ees/httpjsonmap_test.go b/ees/httpjsonmap_test.go
index 199e247fa..e4b4973e3 100644
--- a/ees/httpjsonmap_test.go
+++ b/ees/httpjsonmap_test.go
@@ -83,8 +83,8 @@ func TestHttpJsonMapExportEvent1(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
- if !reflect.DeepEqual(dcExpect, httpEE.dc[utils.NumberOfEvents]) {
- t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc[utils.NumberOfEvents])
+ if !reflect.DeepEqual(dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents]) {
+ t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents])
}
}
@@ -143,8 +143,8 @@ func TestHttpJsonMapExportEvent2(t *testing.T) {
t.Error(err)
}
dcExpect := int64(1)
- if !reflect.DeepEqual(dcExpect, httpEE.dc[utils.NumberOfEvents]) {
- t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc[utils.NumberOfEvents])
+ if !reflect.DeepEqual(dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents]) {
+ t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents])
}
}
@@ -190,8 +190,8 @@ func TestHttpJsonMapExportEvent3(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
- if !reflect.DeepEqual(dcExpect, httpEE.dc[utils.NumberOfEvents]) {
- t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc[utils.NumberOfEvents])
+ if !reflect.DeepEqual(dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents]) {
+ t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents])
}
}
func TestHttpJsonMapExportEvent4(t *testing.T) {
@@ -239,8 +239,8 @@ func TestHttpJsonMapExportEvent4(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
- if !reflect.DeepEqual(dcExpect, httpEE.dc[utils.NumberOfEvents]) {
- t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc[utils.NumberOfEvents])
+ if !reflect.DeepEqual(dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents]) {
+ t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents])
}
}
@@ -292,8 +292,8 @@ func TestHttpJsonMapExportEvent5(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
- if !reflect.DeepEqual(dcExpect, httpEE.dc[utils.NumberOfEvents]) {
- t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc[utils.NumberOfEvents])
+ if !reflect.DeepEqual(dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents]) {
+ t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents])
}
httpEE.OnEvicted("test", "test")
}
diff --git a/ees/httppost.go b/ees/httppost.go
index bf6cc30ce..0580b561e 100644
--- a/ees/httppost.go
+++ b/ees/httppost.go
@@ -22,7 +22,6 @@ import (
"net/http"
"net/url"
"strings"
- "sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -30,7 +29,7 @@ import (
)
func NewHTTPPostEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
- dc utils.MapStorage) (httpPost *HTTPPost, err error) {
+ dc *utils.SafeMapStorage) (httpPost *HTTPPost, err error) {
httpPost = &HTTPPost{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
httpPost.httpPoster = engine.NewHTTPPoster(cgrCfg.GeneralCfg().ReplyTimeout,
@@ -47,8 +46,7 @@ type HTTPPost struct {
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
httpPoster *engine.HTTPPoster
- sync.RWMutex
- dc utils.MapStorage
+ dc *utils.SafeMapStorage
}
// ID returns the identificator of this exporter
@@ -62,16 +60,13 @@ func (httpPost *HTTPPost) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
- httpPost.Lock()
defer func() {
- if err != nil {
- httpPost.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
- } else {
- httpPost.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
- }
- httpPost.Unlock()
+ updateEEMetrics(httpPost.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Timezone,
+ httpPost.cgrCfg.GeneralCfg().DefaultTimezone))
}()
- httpPost.dc[utils.NumberOfEvents] = httpPost.dc[utils.NumberOfEvents].(int64) + 1
+ httpPost.dc.Lock()
+ httpPost.dc.MapStorage[utils.NumberOfEvents] = httpPost.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
+ httpPost.dc.Unlock()
urlVals := url.Values{}
hdr := http.Header{}
@@ -83,10 +78,10 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
- eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
- utils.MetaReq: cgrEv.Event,
+ eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
+ utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: httpPost.dc,
- utils.MetaOpts: cgrEv.APIOpts,
+ utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: httpPost.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, httpPost.cgrCfg.GeneralCfg().DefaultTenant),
httpPost.filterS, oNm)
@@ -103,8 +98,7 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
return
}
}
- updateEEMetrics(httpPost.dc, cgrEv.Event, utils.FirstNonEmpty(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Timezone,
- httpPost.cgrCfg.GeneralCfg().DefaultTimezone))
+
if err = httpPost.httpPoster.PostValues(urlVals, hdr); err != nil &&
httpPost.cgrCfg.GeneralCfg().FailedPostsDir != utils.MetaNone {
engine.AddFailedPost(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ExportPath,
@@ -117,7 +111,7 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
return
}
-func (httpPost *HTTPPost) GetMetrics() utils.MapStorage {
+func (httpPost *HTTPPost) GetMetrics() *utils.SafeMapStorage {
return httpPost.dc.Clone()
}
@@ -130,7 +124,7 @@ func (httpPost *HTTPPost) composeHeader() (hdr http.Header, err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaHdr: utils.NewOrderedNavigableMap(),
}
- eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
+ eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: httpPost.dc,
utils.MetaCfg: httpPost.cgrCfg.GetDataProvider(),
}, httpPost.cgrCfg.GeneralCfg().DefaultTenant,
diff --git a/ees/httppost_test.go b/ees/httppost_test.go
index d43031d9a..e6618a5c4 100644
--- a/ees/httppost_test.go
+++ b/ees/httppost_test.go
@@ -82,8 +82,8 @@ func TestHttpPostExportEvent(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
- if !reflect.DeepEqual(dcExpect, httpPost.dc[utils.NumberOfEvents]) {
- t.Errorf("Expected %q but received %q", dcExpect, httpPost.dc[utils.NumberOfEvents])
+ if !reflect.DeepEqual(dcExpect, httpPost.dc.MapStorage[utils.NumberOfEvents]) {
+ t.Errorf("Expected %q but received %q", dcExpect, httpPost.dc.MapStorage[utils.NumberOfEvents])
}
}
@@ -140,8 +140,8 @@ func TestHttpPostExportEvent2(t *testing.T) {
t.Error(err)
}
dcExpect := int64(1)
- if !reflect.DeepEqual(dcExpect, httpPost.dc[utils.NumberOfEvents]) {
- t.Errorf("Expected %q but received %q", dcExpect, httpPost.dc[utils.NumberOfEvents])
+ if !reflect.DeepEqual(dcExpect, httpPost.dc.MapStorage[utils.NumberOfEvents]) {
+ t.Errorf("Expected %q but received %q", dcExpect, httpPost.dc.MapStorage[utils.NumberOfEvents])
}
}
@@ -187,8 +187,8 @@ func TestHttpPostExportEvent3(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
- if !reflect.DeepEqual(dcExpect, httpPost.dc[utils.NumberOfEvents]) {
- t.Errorf("Expected %q but received %q", dcExpect, httpPost.dc[utils.NumberOfEvents])
+ if !reflect.DeepEqual(dcExpect, httpPost.dc.MapStorage[utils.NumberOfEvents]) {
+ t.Errorf("Expected %q but received %q", dcExpect, httpPost.dc.MapStorage[utils.NumberOfEvents])
}
}
@@ -237,8 +237,8 @@ func TestHttpPostExportEvent4(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
- if !reflect.DeepEqual(dcExpect, httpPost.dc[utils.NumberOfEvents]) {
- t.Errorf("Expected %q but received %q", dcExpect, httpPost.dc[utils.NumberOfEvents])
+ if !reflect.DeepEqual(dcExpect, httpPost.dc.MapStorage[utils.NumberOfEvents]) {
+ t.Errorf("Expected %q but received %q", dcExpect, httpPost.dc.MapStorage[utils.NumberOfEvents])
}
httpPost.OnEvicted("test", "test")
}
diff --git a/ees/posterjsonmap.go b/ees/posterjsonmap.go
index 8f237199b..9a13329b2 100644
--- a/ees/posterjsonmap.go
+++ b/ees/posterjsonmap.go
@@ -21,7 +21,6 @@ package ees
import (
"encoding/json"
"strings"
- "sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -29,7 +28,7 @@ import (
)
func NewPosterJSONMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
- dc utils.MapStorage) (pstrJSON *PosterJSONMapEE, err error) {
+ dc *utils.SafeMapStorage) (pstrJSON *PosterJSONMapEE, err error) {
pstrJSON = &PosterJSONMapEE{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
@@ -68,8 +67,7 @@ type PosterJSONMapEE struct {
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
poster engine.Poster
- dc utils.MapStorage
- sync.RWMutex
+ dc *utils.SafeMapStorage
}
// ID returns the identificator of this exporter
@@ -84,17 +82,13 @@ func (pstrEE *PosterJSONMapEE) OnEvicted(string, interface{}) {
// ExportEvent implements EventExporter
func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
- pstrEE.Lock()
defer func() {
- if err != nil {
- pstrEE.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
- } else {
- pstrEE.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
- }
- pstrEE.Unlock()
+ updateEEMetrics(pstrEE.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Timezone,
+ pstrEE.cgrCfg.GeneralCfg().DefaultTimezone))
}()
-
- pstrEE.dc[utils.NumberOfEvents] = pstrEE.dc[utils.NumberOfEvents].(int64) + 1
+ pstrEE.dc.Lock()
+ pstrEE.dc.MapStorage[utils.NumberOfEvents] = pstrEE.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
+ pstrEE.dc.Unlock()
valMp := make(map[string]interface{})
if len(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ContentFields()) == 0 {
@@ -103,10 +97,10 @@ func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
- eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
- utils.MetaReq: cgrEv.Event,
+ eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
+ utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: pstrEE.dc,
- utils.MetaOpts: cgrEv.APIOpts,
+ utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: pstrEE.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, pstrEE.cgrCfg.GeneralCfg().DefaultTenant),
pstrEE.filterS, oNm)
@@ -121,8 +115,7 @@ func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
valMp[strings.Join(path, utils.NestingSep)] = nmIt.String()
}
}
- updateEEMetrics(pstrEE.dc, cgrEv.Event, utils.FirstNonEmpty(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Timezone,
- pstrEE.cgrCfg.GeneralCfg().DefaultTimezone))
+
cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID())
runID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault)
var body []byte
@@ -138,6 +131,6 @@ func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
return
}
-func (pstrEE *PosterJSONMapEE) GetMetrics() utils.MapStorage {
+func (pstrEE *PosterJSONMapEE) GetMetrics() *utils.SafeMapStorage {
return pstrEE.dc.Clone()
}
diff --git a/ees/posterjsonmap_test.go b/ees/posterjsonmap_test.go
index 430d8a60c..eecb22529 100644
--- a/ees/posterjsonmap_test.go
+++ b/ees/posterjsonmap_test.go
@@ -183,16 +183,16 @@ func TestPosterJsonMapExportEvent(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
- if !reflect.DeepEqual(dcExpect, pstrEE.dc[utils.NumberOfEvents]) {
- t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc[utils.NumberOfEvents])
+ if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
+ t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
}
cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ComputeFields()
if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect = int64(2)
- if !reflect.DeepEqual(dcExpect, pstrEE.dc[utils.NumberOfEvents]) {
- t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc[utils.NumberOfEvents])
+ if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
+ t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
}
}
@@ -252,8 +252,8 @@ func TestPosterJsonMapExportEvent1(t *testing.T) {
t.Error(err)
}
dcExpect := int64(1)
- if !reflect.DeepEqual(dcExpect, pstrEE.dc[utils.NumberOfEvents]) {
- t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc[utils.NumberOfEvents])
+ if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
+ t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
}
bodyExpect := map[string]interface{}{
"2": "*req.field2",
@@ -310,8 +310,8 @@ func TestPosterJsonMapExportEvent2(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
- if !reflect.DeepEqual(dcExpect, pstrEE.dc[utils.NumberOfEvents]) {
- t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc[utils.NumberOfEvents])
+ if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
+ t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
}
}
@@ -350,8 +350,8 @@ func TestPosterJsonMapExportEvent3(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
- if !reflect.DeepEqual(dcExpect, pstrEE.dc[utils.NumberOfEvents]) {
- t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc[utils.NumberOfEvents])
+ if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
+ t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
}
pstrEE.OnEvicted("test", "test")
}
diff --git a/ees/sql.go b/ees/sql.go
index 757245329..71165ee25 100644
--- a/ees/sql.go
+++ b/ees/sql.go
@@ -23,7 +23,6 @@ import (
"fmt"
"net/url"
"strings"
- "sync"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
@@ -35,7 +34,7 @@ import (
)
func NewSQLEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
- dc utils.MapStorage) (sqlEe *SQLEe, err error) {
+ dc *utils.SafeMapStorage) (sqlEe *SQLEe, err error) {
sqlEe = &SQLEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
@@ -58,8 +57,7 @@ type SQLEe struct {
tableName string
- sync.RWMutex
- dc utils.MapStorage
+ dc *utils.SafeMapStorage
}
func (sqlEe *SQLEe) NewSQLEeURL(cgrCfg *config.CGRConfig) (dialect gorm.Dialector, err error) {
@@ -144,26 +142,23 @@ func (sqlEe *SQLEe) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (sqlEe *SQLEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
- sqlEe.Lock()
defer func() {
- if err != nil {
- sqlEe.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
- } else {
- sqlEe.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
- }
- sqlEe.Unlock()
+ updateEEMetrics(sqlEe.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Timezone,
+ sqlEe.cgrCfg.GeneralCfg().DefaultTimezone))
}()
- sqlEe.dc[utils.NumberOfEvents] = sqlEe.dc[utils.NumberOfEvents].(int64) + 1
+ sqlEe.dc.Lock()
+ sqlEe.dc.MapStorage[utils.NumberOfEvents] = sqlEe.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
+ sqlEe.dc.Unlock()
var vals []interface{}
var colNames []string
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
- eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
- utils.MetaReq: cgrEv.Event,
+ eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
+ utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: sqlEe.dc,
- utils.MetaOpts: cgrEv.APIOpts,
+ utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: sqlEe.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, sqlEe.cgrCfg.GeneralCfg().DefaultTenant),
sqlEe.filterS, oNm)
@@ -194,12 +189,10 @@ func (sqlEe *SQLEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}
sqlEe.db.Table(sqlEe.tableName).Exec(sqlQuery, vals...)
- updateEEMetrics(sqlEe.dc, cgrEv.Event, utils.FirstNonEmpty(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Timezone,
- sqlEe.cgrCfg.GeneralCfg().DefaultTimezone))
return
}
-func (sqlEe *SQLEe) GetMetrics() utils.MapStorage {
+func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage {
return sqlEe.dc.Clone()
}
diff --git a/ees/virtualee.go b/ees/virtualee.go
index 796af98fe..9a8e3a013 100644
--- a/ees/virtualee.go
+++ b/ees/virtualee.go
@@ -19,15 +19,13 @@ along with this program. If not, see
package ees
import (
- "sync"
-
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewVirtualExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
- dc utils.MapStorage) (vEe *VirtualEe, err error) {
+ dc *utils.SafeMapStorage) (vEe *VirtualEe, err error) {
vEe = &VirtualEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
err = vEe.init()
@@ -40,8 +38,7 @@ type VirtualEe struct {
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
- sync.RWMutex
- dc utils.MapStorage
+ dc *utils.SafeMapStorage
}
// init will create all the necessary dependencies, including opening the file
@@ -60,35 +57,31 @@ func (vEe *VirtualEe) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
- vEe.Lock()
defer func() {
- if err != nil {
- vEe.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
- } else {
- vEe.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
- }
- vEe.Unlock()
+ updateEEMetrics(vEe.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].Timezone,
+ vEe.cgrCfg.GeneralCfg().DefaultTimezone))
}()
- vEe.dc[utils.NumberOfEvents] = vEe.dc[utils.NumberOfEvents].(int64) + 1
+ vEe.dc.Lock()
+ vEe.dc.MapStorage[utils.NumberOfEvents] = vEe.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
+ vEe.dc.Unlock()
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
- eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
- utils.MetaReq: cgrEv.Event,
+ eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
+ utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: vEe.dc,
- utils.MetaOpts: cgrEv.APIOpts,
+ utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: vEe.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, vEe.cgrCfg.GeneralCfg().DefaultTenant),
vEe.filterS, oNm)
if err = eeReq.SetFields(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].ContentFields()); err != nil {
return
}
- updateEEMetrics(vEe.dc, cgrEv.Event, utils.FirstNonEmpty(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].Timezone,
- vEe.cgrCfg.GeneralCfg().DefaultTimezone))
+
return
}
-func (vEe *VirtualEe) GetMetrics() utils.MapStorage {
+func (vEe *VirtualEe) GetMetrics() *utils.SafeMapStorage {
return vEe.dc.Clone()
}
diff --git a/engine/exportrequest.go b/engine/exportrequest.go
index 76d058b97..a356564df 100644
--- a/engine/exportrequest.go
+++ b/engine/exportrequest.go
@@ -28,7 +28,7 @@ import (
)
// NewExportRequest returns a new EventRequest
-func NewExportRequest(inData map[string]utils.MapStorage,
+func NewExportRequest(inData map[string]utils.DataStorage,
tnt string,
filterS *FilterS, oNM map[string]*utils.OrderedNavigableMap) (eeR *ExportRequest) {
eeR = &ExportRequest{
@@ -43,7 +43,7 @@ func NewExportRequest(inData map[string]utils.MapStorage,
// ExportRequest represents data related to one request towards agent
// implements utils.DataProvider so we can pass it to filters
type ExportRequest struct {
- inData map[string]utils.MapStorage // request
+ inData map[string]utils.DataStorage // request
ExpData map[string]*utils.OrderedNavigableMap // *exp:OrderNavMp *trl:OrderNavMp *cdr:OrderNavMp
tnt string
filterS *FilterS
diff --git a/engine/exportrequest_test.go b/engine/exportrequest_test.go
index b192bc467..e6eddec96 100644
--- a/engine/exportrequest_test.go
+++ b/engine/exportrequest_test.go
@@ -28,8 +28,8 @@ import (
"github.com/cgrates/cgrates/utils"
)
-func TestEventRequestParseFieldDateTimeDaily(t *testing.T) {
- EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
+func TestExportRequestParseFieldDateTimeDaily(t *testing.T) {
+ EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile("*daily", utils.InfieldSep),
@@ -56,8 +56,8 @@ func TestEventRequestParseFieldDateTimeDaily(t *testing.T) {
}
}
-func TestEventReqParseFieldDateTimeTimeZone(t *testing.T) {
- EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
+func TestExportReqParseFieldDateTimeTimeZone(t *testing.T) {
+ EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile("*daily", utils.InfieldSep),
@@ -84,8 +84,8 @@ func TestEventReqParseFieldDateTimeTimeZone(t *testing.T) {
}
}
-func TestEventReqParseFieldDateTimeMonthly(t *testing.T) {
- EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
+func TestExportReqParseFieldDateTimeMonthly(t *testing.T) {
+ EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile("*monthly", utils.InfieldSep),
@@ -111,8 +111,8 @@ func TestEventReqParseFieldDateTimeMonthly(t *testing.T) {
}
}
-func TestEventReqParseFieldDateTimeMonthlyEstimated(t *testing.T) {
- EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
+func TestExportReqParseFieldDateTimeMonthlyEstimated(t *testing.T) {
+ EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile("*monthly_estimated", utils.InfieldSep),
@@ -138,8 +138,8 @@ func TestEventReqParseFieldDateTimeMonthlyEstimated(t *testing.T) {
}
}
-func TestEventReqParseFieldDateTimeYearly(t *testing.T) {
- EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
+func TestExportReqParseFieldDateTimeYearly(t *testing.T) {
+ EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile("*yearly", utils.InfieldSep),
@@ -165,8 +165,8 @@ func TestEventReqParseFieldDateTimeYearly(t *testing.T) {
}
}
-func TestEventReqParseFieldDateTimeMetaUnlimited(t *testing.T) {
- EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
+func TestExportReqParseFieldDateTimeMetaUnlimited(t *testing.T) {
+ EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile(utils.MetaUnlimited, utils.InfieldSep),
@@ -192,8 +192,8 @@ func TestEventReqParseFieldDateTimeMetaUnlimited(t *testing.T) {
}
}
-func TestEventReqParseFieldDateTimeEmpty(t *testing.T) {
- EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
+func TestExportReqParseFieldDateTimeEmpty(t *testing.T) {
+ EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile("", utils.InfieldSep),
@@ -219,8 +219,8 @@ func TestEventReqParseFieldDateTimeEmpty(t *testing.T) {
}
}
-func TestEventReqParseFieldDateTimeMonthEnd(t *testing.T) {
- EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
+func TestExportReqParseFieldDateTimeMonthEnd(t *testing.T) {
+ EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile("*month_endTest", utils.InfieldSep),
@@ -246,8 +246,8 @@ func TestEventReqParseFieldDateTimeMonthEnd(t *testing.T) {
}
}
-func TestAgentRequestParseFieldDateTimeError(t *testing.T) {
- EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
+func TestExportReqParseFieldDateTimeError(t *testing.T) {
+ EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile("*month_endTest", utils.InfieldSep),
@@ -261,13 +261,13 @@ func TestAgentRequestParseFieldDateTimeError(t *testing.T) {
}
}
-func TestEventReqParseFieldDateTimeError2(t *testing.T) {
+func TestExportReqParseFieldDateTimeError2(t *testing.T) {
prsr, err := config.NewRSRParsersFromSlice([]string{"2.", "~*req.CGRID<~*opts.Converter>"})
if err != nil {
t.Fatal(err)
}
- mS := map[string]utils.MapStorage{
- utils.MetaOpts: {
+ mS := map[string]utils.DataStorage{
+ utils.MetaOpts: utils.MapStorage{
utils.AccountField: "1002",
utils.Usage: "20m",
},
@@ -285,9 +285,9 @@ func TestEventReqParseFieldDateTimeError2(t *testing.T) {
}
}
-func TestEventReqFieldAsInterface(t *testing.T) {
- inData := map[string]utils.MapStorage{
- utils.MetaReq: {
+func TestExportReqFieldAsInterface(t *testing.T) {
+ inData := map[string]utils.DataStorage{
+ utils.MetaReq: utils.MapStorage{
"Account": "1001",
"Usage": "10m",
},
@@ -310,9 +310,9 @@ func TestEventReqFieldAsInterface(t *testing.T) {
}
}
-func TestEventReqNewEventExporter(t *testing.T) {
- inData := map[string]utils.MapStorage{
- utils.MetaReq: {
+func TestExportReqNewEventExporter(t *testing.T) {
+ inData := map[string]utils.DataStorage{
+ utils.MetaReq: utils.MapStorage{
"Account": "1001",
"Usage": "10m",
},
diff --git a/ers/ers.go b/ers/ers.go
index 18a989bc5..252519302 100644
--- a/ers/ers.go
+++ b/ers/ers.go
@@ -416,9 +416,9 @@ func (erS *ERService) onEvicted(id string, value interface{}) {
return
}
// convert the event to record
- eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
- utils.MetaReq: cgrEv.Event,
- utils.MetaOpts: cgrEv.APIOpts,
+ eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
+ utils.MetaReq: utils.MapStorage(cgrEv.Event),
+ utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: erS.cfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, erS.cfg.GeneralCfg().DefaultTenant),
erS.filterS, map[string]*utils.OrderedNavigableMap{
diff --git a/utils/mapstorage.go b/utils/mapstorage.go
index f4d1c6c3f..0f66c25fa 100644
--- a/utils/mapstorage.go
+++ b/utils/mapstorage.go
@@ -27,8 +27,8 @@ import (
"time"
)
-// dataStorage is the DataProvider that can be updated
-type dataStorage interface {
+// DataStorage is the DataProvider that can be updated
+type DataStorage interface {
DataProvider
Set(fldPath []string, val interface{}) error
@@ -194,7 +194,7 @@ func (ms MapStorage) Set(fldPath []string, val interface{}) (err error) {
return nMap.Set(fldPath[1:], val)
}
switch dp := ms[fldPath[0]].(type) {
- case dataStorage:
+ case DataStorage:
return dp.Set(fldPath[1:], val)
case map[string]interface{}:
return MapStorage(dp).Set(fldPath[1:], val)
@@ -232,7 +232,7 @@ func (ms MapStorage) GetKeys(nested bool, nestedLimit int, prefix string) (keys
for k, v := range ms { // in case of nested on false we take in consideraton the nestedLimit
//keys = append(keys, prefix+k)
switch rv := v.(type) { // and for performance we only take in consideration a limited set of types for nested false
- case dataStorage:
+ case DataStorage:
keys = append(keys, rv.GetKeys(nested, nestedLimit-1, prefix+k)...)
case map[string]interface{}:
keys = append(keys, MapStorage(rv).GetKeys(nested, nestedLimit-1, prefix+k)...)
@@ -247,7 +247,7 @@ func (ms MapStorage) GetKeys(nested bool, nestedLimit int, prefix string) (keys
for k, v := range ms {
//keys = append(keys, prefix+k)
switch rv := v.(type) {
- case dataStorage:
+ case DataStorage:
keys = append(keys, rv.GetKeys(nested, nestedLimit, prefix+k)...)
case map[string]interface{}:
keys = append(keys, MapStorage(rv).GetKeys(nested, nestedLimit, prefix+k)...)
@@ -257,7 +257,7 @@ func (ms MapStorage) GetKeys(nested bool, nestedLimit int, prefix string) (keys
// keys = append(keys, pref)
keys = append(keys, dp.GetKeys(nested, nestedLimit, pref)...)
}
- case []dataStorage:
+ case []DataStorage:
for i, dp := range rv {
pref := prefix + k + fmt.Sprintf("[%v]", i)
// keys = append(keys, pref)
@@ -304,7 +304,7 @@ func (ms MapStorage) Remove(fldPath []string) (err error) {
return
}
switch dp := val.(type) {
- case dataStorage:
+ case DataStorage:
return dp.Remove(fldPath[1:])
case map[string]interface{}:
return MapStorage(dp).Remove(fldPath[1:])
diff --git a/utils/mapstorage_test.go b/utils/mapstorage_test.go
index 14e26e3d8..5a43eeae5 100644
--- a/utils/mapstorage_test.go
+++ b/utils/mapstorage_test.go
@@ -631,7 +631,7 @@ func TestNavMapFieldAsInterface3(t *testing.T) {
func TestNavMapGetKeys2(t *testing.T) {
navMp := MapStorage{
- "FirstLevel": dataStorage(MapStorage{
+ "FirstLevel": DataStorage(MapStorage{
"SecondLevel": map[string]interface{}{
"ThirdLevel": MapStorage{
"Fld1": 123.123,
@@ -654,7 +654,7 @@ func TestNavMapGetKeys2(t *testing.T) {
},
"Field6": []string{"1", "2"},
"Field7": []interface{}{"1", "2"},
- "Field8": []dataStorage{MapStorage{"A": 1}},
+ "Field8": []DataStorage{MapStorage{"A": 1}},
"Field9": []MapStorage{{"A": 1}},
"Field10": []map[string]interface{}{{"A": 1}},
}
diff --git a/utils/safemapstorage.go b/utils/safemapstorage.go
new file mode 100644
index 000000000..07ab7449c
--- /dev/null
+++ b/utils/safemapstorage.go
@@ -0,0 +1,77 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package utils
+
+import (
+ "sync"
+)
+
+// SafeMapStorage is a Mapstorage with mutex
+type SafeMapStorage struct {
+ MapStorage
+ sync.RWMutex
+}
+
+// String returns the map as json string
+func (ms *SafeMapStorage) String() string {
+ ms.RLock()
+ defer ms.RUnlock()
+ return ms.MapStorage.String()
+}
+
+// FieldAsInterface returns the value from the path
+func (ms *SafeMapStorage) FieldAsInterface(fldPath []string) (val interface{}, err error) {
+ ms.RLock()
+ defer ms.RUnlock()
+ return ms.MapStorage.FieldAsInterface(fldPath)
+}
+
+// FieldAsString returns the value from path as string
+func (ms *SafeMapStorage) FieldAsString(fldPath []string) (str string, err error) {
+ ms.RLock()
+ defer ms.RUnlock()
+ return ms.MapStorage.FieldAsString(fldPath)
+}
+
+// Set sets the value at the given path
+func (ms *SafeMapStorage) Set(fldPath []string, val interface{}) (err error) {
+ ms.Lock()
+ defer ms.Unlock()
+ return ms.MapStorage.Set(fldPath, val)
+}
+
+// GetKeys returns all the keys from map
+func (ms *SafeMapStorage) GetKeys(nested bool, nestedLimit int, prefix string) (keys []string) {
+ ms.RLock()
+ defer ms.RUnlock()
+ return ms.MapStorage.GetKeys(nested, nestedLimit, prefix)
+}
+
+// Remove removes the item at path
+func (ms *SafeMapStorage) Remove(fldPath []string) (err error) {
+ ms.Lock()
+ defer ms.Unlock()
+ return ms.MapStorage.Remove(fldPath)
+}
+
+func (ms *SafeMapStorage) Clone() (msClone *SafeMapStorage) {
+ ms.RLock()
+ defer ms.RUnlock()
+ return &SafeMapStorage{MapStorage: ms.MapStorage.Clone()}
+}