Updated ees metrics

This commit is contained in:
Trial97
2021-07-22 08:24:59 +03:00
committed by Dan Christian Bogos
parent 9de6f9e922
commit 7609c6a925
24 changed files with 363 additions and 333 deletions

View File

@@ -79,7 +79,7 @@ func (aL *actCDRLog) execute(ctx *context.Context, data utils.MapStorage, _ stri
utils.MetaCDR: utils.NewOrderedNavigableMap(),
}
// construct an AgentRequest so we can build the reply and send it to CDRServer
cdrLogReq := engine.NewExportRequest(map[string]utils.MapStorage{
cdrLogReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: reqNm,
utils.MetaOpts: optsMS,
utils.MetaCfg: aL.config.GetDataProvider(),

View File

@@ -30,12 +30,12 @@ type EventExporter interface {
ID() string // return the exporter identificator
ExportEvent(cgrEv *utils.CGREvent) (err error) // called on each event to be exported
OnEvicted(itmID string, value interface{}) // called when the exporter needs to terminate
GetMetrics() utils.MapStorage // called to get metrics
GetMetrics() *utils.SafeMapStorage // called to get metrics
}
// NewEventExporter produces exporters
func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (ee EventExporter, err error) {
var dc utils.MapStorage
var dc *utils.SafeMapStorage
if dc, err = newEEMetrics(utils.FirstNonEmpty(
cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone,
cgrCfg.GeneralCfg().DefaultTimezone)); err != nil {

View File

@@ -53,11 +53,11 @@ func TestNewEventExporter(t *testing.T) {
t.Error("\nExpected an error")
}
newEE := ee.(*FileCSVee)
newEE.dc[utils.TimeNow] = nil
newEE.dc[utils.ExportPath] = nil
newEE.dc.MapStorage[utils.TimeNow] = nil
newEE.dc.MapStorage[utils.ExportPath] = nil
eeExpect.csvWriter = nil
eeExpect.dc[utils.TimeNow] = nil
eeExpect.dc[utils.ExportPath] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.ExportPath] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -89,10 +89,10 @@ func TestNewEventExporterCase2(t *testing.T) {
t.Error("\nExpected an error")
}
newEE := ee.(*FileFWVee)
newEE.dc[utils.TimeNow] = nil
newEE.dc[utils.ExportPath] = nil
eeExpect.dc[utils.TimeNow] = nil
eeExpect.dc[utils.ExportPath] = nil
newEE.dc.MapStorage[utils.TimeNow] = nil
newEE.dc.MapStorage[utils.ExportPath] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.ExportPath] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -118,8 +118,8 @@ func TestNewEventExporterCase3(t *testing.T) {
t.Error(err)
}
newEE := ee.(*HTTPPost)
newEE.dc[utils.TimeNow] = nil
eeExpect.dc[utils.TimeNow] = nil
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -145,8 +145,8 @@ func TestNewEventExporterCase4(t *testing.T) {
t.Error(err)
}
newEE := ee.(*HTTPjsonMapEE)
newEE.dc[utils.TimeNow] = nil
eeExpect.dc[utils.TimeNow] = nil
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -172,8 +172,8 @@ func TestNewEventExporterCase5(t *testing.T) {
t.Error(err)
}
newEE := ee.(*PosterJSONMapEE)
newEE.dc[utils.TimeNow] = nil
eeExpect.dc[utils.TimeNow] = nil
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -199,8 +199,8 @@ func TestNewEventExporterCase6(t *testing.T) {
t.Error(err)
}
newEE := ee.(*VirtualEe)
newEE.dc[utils.TimeNow] = nil
eeExpect.dc[utils.TimeNow] = nil
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(eeExpect), utils.ToJSON(newEE))
}
@@ -239,8 +239,8 @@ func TestNewEventExporterCase7(t *testing.T) {
t.Error(err)
}
newEE := ee.(*ElasticEe)
newEE.dc[utils.TimeNow] = nil
eeExpect.dc[utils.TimeNow] = nil
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
eeExpect.eClnt = newEE.eClnt
if !reflect.DeepEqual(eeExpect, newEE) {
t.Errorf("Expected %+v \n but got %+v", eeExpect, newEE)

View File

@@ -216,7 +216,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
if sync {
if hasVerbose {
metricMapLock.Lock()
metricsMap[ee.ID()] = ee.GetMetrics()
metricsMap[ee.ID()] = ee.GetMetrics().MapStorage
metricMapLock.Unlock()
}
wg.Done()
@@ -254,86 +254,93 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
return
}
func newEEMetrics(location string) (utils.MapStorage, error) {
func newEEMetrics(location string) (*utils.SafeMapStorage, error) {
tNow := time.Now()
loc, err := time.LoadLocation(location)
if err != nil {
return nil, err
}
return utils.MapStorage{
return &utils.SafeMapStorage{MapStorage: utils.MapStorage{
utils.NumberOfEvents: int64(0),
utils.PositiveExports: utils.StringSet{},
utils.NegativeExports: utils.StringSet{},
utils.TimeNow: time.Date(tNow.Year(), tNow.Month(), tNow.Day(),
tNow.Hour(), tNow.Minute(), tNow.Second(), tNow.Nanosecond(), loc),
}, nil
}}, nil
}
func updateEEMetrics(dc utils.MapStorage, ev engine.MapEvent, timezone string) {
func updateEEMetrics(dc *utils.SafeMapStorage, cgrID string, ev engine.MapEvent, hasError bool, timezone string) {
dc.Lock()
defer dc.Unlock()
if hasError {
dc.MapStorage[utils.NegativeExports].(utils.StringSet).Add(cgrID)
} else {
dc.MapStorage[utils.PositiveExports].(utils.StringSet).Add(cgrID)
}
if aTime, err := ev.GetTime(utils.AnswerTime, timezone); err == nil {
if _, has := dc[utils.FirstEventATime]; !has {
dc[utils.FirstEventATime] = time.Time{}
if _, has := dc.MapStorage[utils.FirstEventATime]; !has {
dc.MapStorage[utils.FirstEventATime] = time.Time{}
}
if _, has := dc[utils.LastEventATime]; !has {
dc[utils.LastEventATime] = time.Time{}
if _, has := dc.MapStorage[utils.LastEventATime]; !has {
dc.MapStorage[utils.LastEventATime] = time.Time{}
}
if dc[utils.FirstEventATime].(time.Time).IsZero() ||
aTime.Before(dc[utils.FirstEventATime].(time.Time)) {
dc[utils.FirstEventATime] = aTime
if dc.MapStorage[utils.FirstEventATime].(time.Time).IsZero() ||
aTime.Before(dc.MapStorage[utils.FirstEventATime].(time.Time)) {
dc.MapStorage[utils.FirstEventATime] = aTime
}
if aTime.After(dc[utils.LastEventATime].(time.Time)) {
dc[utils.LastEventATime] = aTime
if aTime.After(dc.MapStorage[utils.LastEventATime].(time.Time)) {
dc.MapStorage[utils.LastEventATime] = aTime
}
}
if oID, err := ev.GetTInt64(utils.OrderID); err == nil {
if _, has := dc[utils.FirstExpOrderID]; !has {
dc[utils.FirstExpOrderID] = int64(0)
if _, has := dc.MapStorage[utils.FirstExpOrderID]; !has {
dc.MapStorage[utils.FirstExpOrderID] = int64(0)
}
if _, has := dc[utils.LastExpOrderID]; !has {
dc[utils.LastExpOrderID] = int64(0)
if _, has := dc.MapStorage[utils.LastExpOrderID]; !has {
dc.MapStorage[utils.LastExpOrderID] = int64(0)
}
if dc[utils.FirstExpOrderID].(int64) == 0 ||
dc[utils.FirstExpOrderID].(int64) > oID {
dc[utils.FirstExpOrderID] = oID
if dc.MapStorage[utils.FirstExpOrderID].(int64) == 0 ||
dc.MapStorage[utils.FirstExpOrderID].(int64) > oID {
dc.MapStorage[utils.FirstExpOrderID] = oID
}
if dc[utils.LastExpOrderID].(int64) < oID {
dc[utils.LastExpOrderID] = oID
if dc.MapStorage[utils.LastExpOrderID].(int64) < oID {
dc.MapStorage[utils.LastExpOrderID] = oID
}
}
if cost, err := ev.GetFloat64(utils.Cost); err == nil {
if _, has := dc[utils.TotalCost]; !has {
dc[utils.TotalCost] = float64(0.0)
if _, has := dc.MapStorage[utils.TotalCost]; !has {
dc.MapStorage[utils.TotalCost] = float64(0.0)
}
dc[utils.TotalCost] = dc[utils.TotalCost].(float64) + cost
dc.MapStorage[utils.TotalCost] = dc.MapStorage[utils.TotalCost].(float64) + cost
}
if tor, err := ev.GetString(utils.ToR); err == nil {
if usage, err := ev.GetDuration(utils.Usage); err == nil {
switch tor {
case utils.MetaVoice:
if _, has := dc[utils.TotalDuration]; !has {
dc[utils.TotalDuration] = time.Duration(0)
if _, has := dc.MapStorage[utils.TotalDuration]; !has {
dc.MapStorage[utils.TotalDuration] = time.Duration(0)
}
dc[utils.TotalDuration] = dc[utils.TotalDuration].(time.Duration) + usage
dc.MapStorage[utils.TotalDuration] = dc.MapStorage[utils.TotalDuration].(time.Duration) + usage
case utils.MetaSMS:
if _, has := dc[utils.TotalSMSUsage]; !has {
dc[utils.TotalSMSUsage] = time.Duration(0)
if _, has := dc.MapStorage[utils.TotalSMSUsage]; !has {
dc.MapStorage[utils.TotalSMSUsage] = time.Duration(0)
}
dc[utils.TotalSMSUsage] = dc[utils.TotalSMSUsage].(time.Duration) + usage
dc.MapStorage[utils.TotalSMSUsage] = dc.MapStorage[utils.TotalSMSUsage].(time.Duration) + usage
case utils.MetaMMS:
if _, has := dc[utils.TotalMMSUsage]; !has {
dc[utils.TotalMMSUsage] = time.Duration(0)
if _, has := dc.MapStorage[utils.TotalMMSUsage]; !has {
dc.MapStorage[utils.TotalMMSUsage] = time.Duration(0)
}
dc[utils.TotalMMSUsage] = dc[utils.TotalMMSUsage].(time.Duration) + usage
dc.MapStorage[utils.TotalMMSUsage] = dc.MapStorage[utils.TotalMMSUsage].(time.Duration) + usage
case utils.MetaGeneric:
if _, has := dc[utils.TotalGenericUsage]; !has {
dc[utils.TotalGenericUsage] = time.Duration(0)
if _, has := dc.MapStorage[utils.TotalGenericUsage]; !has {
dc.MapStorage[utils.TotalGenericUsage] = time.Duration(0)
}
dc[utils.TotalGenericUsage] = dc[utils.TotalGenericUsage].(time.Duration) + usage
dc.MapStorage[utils.TotalGenericUsage] = dc.MapStorage[utils.TotalGenericUsage].(time.Duration) + usage
case utils.MetaData:
if _, has := dc[utils.TotalDataUsage]; !has {
dc[utils.TotalDataUsage] = time.Duration(0)
if _, has := dc.MapStorage[utils.TotalDataUsage]; !has {
dc.MapStorage[utils.TotalDataUsage] = time.Duration(0)
}
dc[utils.TotalDataUsage] = dc[utils.TotalDataUsage].(time.Duration) + usage
dc.MapStorage[utils.TotalDataUsage] = dc.MapStorage[utils.TotalDataUsage].(time.Duration) + usage
}
}
}

View File

@@ -307,12 +307,12 @@ func (mockEventExporter) ExportEvent(cgrEv *utils.CGREvent) error { return nil }
func (mockEventExporter) OnEvicted(itdmID string, value interface{}) {
utils.Logger.Warning("NOT IMPLEMENTED")
}
func (mockEventExporter) GetMetrics() utils.MapStorage {
return utils.MapStorage{
func (mockEventExporter) GetMetrics() *utils.SafeMapStorage {
return &utils.SafeMapStorage{MapStorage: utils.MapStorage{
utils.NumberOfEvents: int64(0),
utils.PositiveExports: utils.StringSet{},
utils.NegativeExports: 5,
}
}}
}
func TestV1ProcessEventMockMetrics(t *testing.T) {
@@ -449,14 +449,15 @@ func TestUpdateEEMetrics(t *testing.T) {
utils.Usage: time.Second,
}
exp, _ := newEEMetrics(utils.EmptyString)
exp[utils.FirstEventATime] = tnow
exp[utils.LastEventATime] = tnow
exp[utils.FirstExpOrderID] = int64(1)
exp[utils.LastExpOrderID] = int64(1)
exp[utils.TotalCost] = float64(5.5)
exp[utils.TotalDuration] = time.Second
exp[utils.TimeNow] = dc[utils.TimeNow]
if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
exp.MapStorage[utils.FirstEventATime] = tnow
exp.MapStorage[utils.LastEventATime] = tnow
exp.MapStorage[utils.FirstExpOrderID] = int64(1)
exp.MapStorage[utils.LastExpOrderID] = int64(1)
exp.MapStorage[utils.TotalCost] = float64(5.5)
exp.MapStorage[utils.TotalDuration] = time.Second
exp.MapStorage[utils.TimeNow] = dc.MapStorage[utils.TimeNow]
exp.MapStorage[utils.PositiveExports] = utils.StringSet{"": {}}
if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
}
@@ -468,11 +469,11 @@ func TestUpdateEEMetrics(t *testing.T) {
utils.ToR: utils.MetaSMS,
utils.Usage: time.Second,
}
exp[utils.LastEventATime] = tnow
exp[utils.LastExpOrderID] = int64(2)
exp[utils.TotalCost] = float64(11)
exp[utils.TotalSMSUsage] = time.Second
if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
exp.MapStorage[utils.LastEventATime] = tnow
exp.MapStorage[utils.LastExpOrderID] = int64(2)
exp.MapStorage[utils.TotalCost] = float64(11)
exp.MapStorage[utils.TotalSMSUsage] = time.Second
if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
}
@@ -484,11 +485,11 @@ func TestUpdateEEMetrics(t *testing.T) {
utils.ToR: utils.MetaMMS,
utils.Usage: time.Second,
}
exp[utils.LastEventATime] = tnow
exp[utils.LastExpOrderID] = int64(3)
exp[utils.TotalCost] = float64(16.5)
exp[utils.TotalMMSUsage] = time.Second
if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
exp.MapStorage[utils.LastEventATime] = tnow
exp.MapStorage[utils.LastExpOrderID] = int64(3)
exp.MapStorage[utils.TotalCost] = float64(16.5)
exp.MapStorage[utils.TotalMMSUsage] = time.Second
if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
}
@@ -500,11 +501,11 @@ func TestUpdateEEMetrics(t *testing.T) {
utils.ToR: utils.MetaGeneric,
utils.Usage: time.Second,
}
exp[utils.LastEventATime] = tnow
exp[utils.LastExpOrderID] = int64(4)
exp[utils.TotalCost] = float64(22)
exp[utils.TotalGenericUsage] = time.Second
if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
exp.MapStorage[utils.LastEventATime] = tnow
exp.MapStorage[utils.LastExpOrderID] = int64(4)
exp.MapStorage[utils.TotalCost] = float64(22)
exp.MapStorage[utils.TotalGenericUsage] = time.Second
if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
}
@@ -516,11 +517,11 @@ func TestUpdateEEMetrics(t *testing.T) {
utils.ToR: utils.MetaData,
utils.Usage: time.Second,
}
exp[utils.LastEventATime] = tnow
exp[utils.LastExpOrderID] = int64(5)
exp[utils.TotalCost] = float64(27.5)
exp[utils.TotalDataUsage] = time.Second
if updateEEMetrics(dc, ev, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
exp.MapStorage[utils.LastEventATime] = tnow
exp.MapStorage[utils.LastExpOrderID] = int64(5)
exp.MapStorage[utils.TotalCost] = float64(27.5)
exp.MapStorage[utils.TotalDataUsage] = time.Second
if updateEEMetrics(dc, "", ev, false, utils.EmptyString); !reflect.DeepEqual(dc, exp) {
t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(dc))
}
}

View File

@@ -23,7 +23,6 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"github.com/elastic/go-elasticsearch/esapi"
@@ -34,7 +33,7 @@ import (
)
func NewElasticExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (eEe *ElasticEe, err error) {
dc *utils.SafeMapStorage) (eEe *ElasticEe, err error) {
eEe = &ElasticEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
err = eEe.init()
@@ -48,9 +47,8 @@ type ElasticEe struct {
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
sync.RWMutex
dc utils.MapStorage
opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap
dc *utils.SafeMapStorage
opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap
}
// init will create all the necessary dependencies, including opening the file
@@ -124,17 +122,13 @@ func (eEe *ElasticEe) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
eEe.Lock()
defer func() {
if err != nil {
eEe.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
} else {
eEe.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
}
eEe.Unlock()
updateEEMetrics(eEe.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Timezone,
eEe.cgrCfg.GeneralCfg().DefaultTimezone))
}()
eEe.dc[utils.NumberOfEvents] = eEe.dc[utils.NumberOfEvents].(int64) + 1
eEe.dc.Lock()
eEe.dc.MapStorage[utils.NumberOfEvents] = eEe.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
eEe.dc.Unlock()
valMp := make(map[string]interface{})
if len(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].ContentFields()) == 0 {
valMp = cgrEv.Event
@@ -142,10 +136,10 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
utils.MetaReq: cgrEv.Event,
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: eEe.dc,
utils.MetaOpts: cgrEv.APIOpts,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: eEe.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, eEe.cgrCfg.GeneralCfg().DefaultTenant),
eEe.filterS, oNm)
@@ -159,8 +153,7 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
valMp[strings.Join(path, utils.NestingSep)] = nmIt.String()
}
}
updateEEMetrics(eEe.dc, cgrEv.Event, utils.FirstNonEmpty(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Timezone,
eEe.cgrCfg.GeneralCfg().DefaultTimezone))
// Set up the request object
cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID())
runID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault)
@@ -198,6 +191,6 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
return
}
func (eEe *ElasticEe) GetMetrics() utils.MapStorage {
func (eEe *ElasticEe) GetMetrics() *utils.SafeMapStorage {
return eEe.dc.Clone()
}

View File

@@ -24,7 +24,6 @@ import (
"io"
"os"
"path"
"sync"
"github.com/cgrates/cgrates/engine"
@@ -33,7 +32,7 @@ import (
)
func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (fCsv *FileCSVee, err error) {
dc *utils.SafeMapStorage) (fCsv *FileCSVee, err error) {
fCsv = &FileCSVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
err = fCsv.init()
@@ -48,8 +47,7 @@ type FileCSVee struct {
filterS *engine.FilterS
file io.WriteCloser
csvWriter *csv.Writer
sync.RWMutex
dc utils.MapStorage
dc *utils.SafeMapStorage
}
// init will create all the necessary dependencies, including opening the file
@@ -57,9 +55,9 @@ func (fCsv *FileCSVee) init() (err error) {
// create the file
filePath := path.Join(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ExportPath,
fCsv.id+utils.Underline+utils.UUIDSha1Prefix()+utils.CSVSuffix)
fCsv.Lock()
fCsv.dc[utils.ExportPath] = filePath
fCsv.Unlock()
fCsv.dc.Lock()
fCsv.dc.MapStorage[utils.ExportPath] = filePath
fCsv.dc.Unlock()
if fCsv.file, err = os.Create(filePath); err != nil {
return
}
@@ -92,16 +90,13 @@ func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
fCsv.Lock()
defer func() {
if err != nil {
fCsv.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
} else {
fCsv.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
}
fCsv.Unlock()
updateEEMetrics(fCsv.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone,
fCsv.cgrCfg.GeneralCfg().DefaultTimezone))
}()
fCsv.dc[utils.NumberOfEvents] = fCsv.dc[utils.NumberOfEvents].(int64) + 1
fCsv.dc.Lock()
fCsv.dc.MapStorage[utils.NumberOfEvents] = fCsv.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
fCsv.dc.Unlock()
var csvRecord []string
if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields()) == 0 {
@@ -113,10 +108,10 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
utils.MetaReq: cgrEv.Event,
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: fCsv.dc,
utils.MetaOpts: cgrEv.APIOpts,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: fCsv.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, fCsv.cgrCfg.GeneralCfg().DefaultTenant),
fCsv.filterS, oNm)
@@ -127,8 +122,6 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
csvRecord = eeReq.ExpData[utils.MetaExp].OrderedFieldsAsStrings()
}
updateEEMetrics(fCsv.dc, cgrEv.Event, utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone,
fCsv.cgrCfg.GeneralCfg().DefaultTimezone))
return fCsv.csvWriter.Write(csvRecord)
}
@@ -140,7 +133,7 @@ func (fCsv *FileCSVee) composeHeader() (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaHdr: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: fCsv.dc,
utils.MetaCfg: fCsv.cgrCfg.GetDataProvider(),
}, fCsv.cgrCfg.GeneralCfg().DefaultTenant,
@@ -159,7 +152,7 @@ func (fCsv *FileCSVee) composeTrailer() (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaTrl: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: fCsv.dc,
utils.MetaCfg: fCsv.cgrCfg.GetDataProvider(),
}, fCsv.cgrCfg.GeneralCfg().DefaultTenant,
@@ -171,6 +164,6 @@ func (fCsv *FileCSVee) composeTrailer() (err error) {
return fCsv.csvWriter.Write(eeReq.ExpData[utils.MetaTrl].OrderedFieldsAsStrings())
}
func (fCsv *FileCSVee) GetMetrics() utils.MapStorage {
func (fCsv *FileCSVee) GetMetrics() *utils.SafeMapStorage {
return fCsv.dc.Clone()
}

View File

@@ -76,7 +76,7 @@ func TestFileCsvComposeHeader(t *testing.T) {
filterS: filterS,
file: nopCloser{byteBuff},
csvWriter: csvNW,
dc: utils.MapStorage{},
dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -140,7 +140,7 @@ func TestFileCsvComposeTrailer(t *testing.T) {
filterS: filterS,
file: nopCloser{byteBuff},
csvWriter: csvNW,
dc: utils.MapStorage{},
dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -286,7 +286,7 @@ func TestFileCsvOnEvictedTrailer(t *testing.T) {
filterS: filterS,
file: nopCloserWrite{byteBuff},
csvWriter: csvNW,
dc: utils.MapStorage{},
dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -321,7 +321,7 @@ func TestFileCsvOnEvictedClose(t *testing.T) {
filterS: filterS,
file: nopCloserError{byteBuff},
csvWriter: csvNW,
dc: utils.MapStorage{},
dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Fields = []*config.FCTemplate{
{

View File

@@ -23,14 +23,13 @@ import (
"io"
"os"
"path"
"sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewFileFWVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc utils.MapStorage) (fFwv *FileFWVee, err error) {
func NewFileFWVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc *utils.SafeMapStorage) (fFwv *FileFWVee, err error) {
fFwv = &FileFWVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
err = fFwv.init()
@@ -44,17 +43,16 @@ type FileFWVee struct {
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
file io.WriteCloser
dc utils.MapStorage
sync.RWMutex
dc *utils.SafeMapStorage
}
// init will create all the necessary dependencies, including opening the file
func (fFwv *FileFWVee) init() (err error) {
filePath := path.Join(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ExportPath,
fFwv.id+utils.Underline+utils.UUIDSha1Prefix()+utils.FWVSuffix)
fFwv.Lock()
fFwv.dc[utils.ExportPath] = filePath
fFwv.Unlock()
fFwv.dc.Lock()
fFwv.dc.MapStorage[utils.ExportPath] = filePath
fFwv.dc.Unlock()
// create the file
if fFwv.file, err = os.Create(filePath); err != nil {
return
@@ -82,16 +80,13 @@ func (fFwv *FileFWVee) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
fFwv.Lock()
defer func() {
if err != nil {
fFwv.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
} else {
fFwv.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
}
fFwv.Unlock()
updateEEMetrics(fFwv.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone,
fFwv.cgrCfg.GeneralCfg().DefaultTimezone))
}()
fFwv.dc[utils.NumberOfEvents] = fFwv.dc[utils.NumberOfEvents].(int64) + 1
fFwv.dc.Lock()
fFwv.dc.MapStorage[utils.NumberOfEvents] = fFwv.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
fFwv.dc.Unlock()
var records []string
if len(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ContentFields()) == 0 {
records = make([]string, 0, len(cgrEv.Event))
@@ -102,10 +97,10 @@ func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
utils.MetaReq: cgrEv.Event,
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: fFwv.dc,
utils.MetaOpts: cgrEv.APIOpts,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: fFwv.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, fFwv.cgrCfg.GeneralCfg().DefaultTenant),
fFwv.filterS, oNm)
@@ -116,8 +111,6 @@ func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
records = eeReq.ExpData[utils.MetaExp].OrderedFieldsAsStrings()
}
updateEEMetrics(fFwv.dc, cgrEv.Event, utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone,
fFwv.cgrCfg.GeneralCfg().DefaultTimezone))
for _, record := range records {
if _, err = io.WriteString(fFwv.file, record); err != nil {
return
@@ -135,7 +128,7 @@ func (fFwv *FileFWVee) composeHeader() (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaHdr: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: fFwv.dc,
utils.MetaCfg: fFwv.cgrCfg.GetDataProvider(),
}, fFwv.cgrCfg.GeneralCfg().DefaultTenant,
@@ -160,7 +153,7 @@ func (fFwv *FileFWVee) composeTrailer() (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaTrl: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: fFwv.dc,
utils.MetaCfg: fFwv.cgrCfg.GetDataProvider(),
}, fFwv.cgrCfg.GeneralCfg().DefaultTenant,
@@ -177,6 +170,6 @@ func (fFwv *FileFWVee) composeTrailer() (err error) {
return
}
func (fFwv *FileFWVee) GetMetrics() utils.MapStorage {
func (fFwv *FileFWVee) GetMetrics() *utils.SafeMapStorage {
return fFwv.dc.Clone()
}

View File

@@ -78,7 +78,7 @@ func TestFileFwvComposeHeader(t *testing.T) {
cfgIdx: 0,
filterS: filterS,
file: nopCloser{byteBuff},
dc: utils.MapStorage{},
dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -141,7 +141,7 @@ func TestFileFwvComposeTrailer(t *testing.T) {
cfgIdx: 0,
filterS: filterS,
file: nopCloser{byteBuff},
dc: utils.MapStorage{},
dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -327,7 +327,7 @@ func TestFileFwvComposeHeaderWriteError(t *testing.T) {
cfgIdx: 0,
filterS: filterS,
file: nopCloserWrite{byteBuff},
dc: utils.MapStorage{},
dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -360,7 +360,7 @@ func TestFileFwvComposeTrailerWriteError(t *testing.T) {
cfgIdx: 0,
filterS: filterS,
file: nopCloserWrite{byteBuff},
dc: utils.MapStorage{},
dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -392,7 +392,7 @@ func TestFileFwvOnEvictedTrailer(t *testing.T) {
cfgIdx: 0,
filterS: filterS,
file: nopCloserWrite{byteBuff},
dc: utils.MapStorage{},
dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -431,7 +431,7 @@ func TestFileFwvOnEvictedClose(t *testing.T) {
cfgIdx: 0,
filterS: filterS,
file: nopCloserError{byteBuff},
dc: utils.MapStorage{},
dc: &utils.SafeMapStorage{},
}
cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Fields = []*config.FCTemplate{
{

View File

@@ -22,7 +22,6 @@ import (
"encoding/json"
"net/http"
"strings"
"sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -30,7 +29,7 @@ import (
)
func NewHTTPjsonMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (pstrJSON *HTTPjsonMapEE, err error) {
dc *utils.SafeMapStorage) (pstrJSON *HTTPjsonMapEE, err error) {
pstrJSON = &HTTPjsonMapEE{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
@@ -53,8 +52,7 @@ type HTTPjsonMapEE struct {
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
pstr *engine.HTTPPoster
dc utils.MapStorage
sync.RWMutex
dc *utils.SafeMapStorage
}
// ID returns the identificator of this exporter
@@ -68,17 +66,13 @@ func (httpEE *HTTPjsonMapEE) OnEvicted(string, interface{}) {
// ExportEvent implements EventExporter
func (httpEE *HTTPjsonMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
httpEE.Lock()
defer func() {
if err != nil {
httpEE.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
} else {
httpEE.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
}
httpEE.Unlock()
updateEEMetrics(httpEE.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].Timezone,
httpEE.cgrCfg.GeneralCfg().DefaultTimezone))
}()
httpEE.dc[utils.NumberOfEvents] = httpEE.dc[utils.NumberOfEvents].(int64) + 1
httpEE.dc.Lock()
httpEE.dc.MapStorage[utils.NumberOfEvents] = httpEE.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
httpEE.dc.Unlock()
valMp := make(map[string]interface{})
hdr := http.Header{}
@@ -88,10 +82,10 @@ func (httpEE *HTTPjsonMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
utils.MetaReq: cgrEv.Event,
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: httpEE.dc,
utils.MetaOpts: cgrEv.APIOpts,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: httpEE.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, httpEE.cgrCfg.GeneralCfg().DefaultTenant),
httpEE.filterS, oNm)
@@ -109,8 +103,7 @@ func (httpEE *HTTPjsonMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
return
}
}
updateEEMetrics(httpEE.dc, cgrEv.Event, utils.FirstNonEmpty(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].Timezone,
httpEE.cgrCfg.GeneralCfg().DefaultTimezone))
var body []byte
if body, err = json.Marshal(valMp); err != nil {
return
@@ -125,7 +118,7 @@ func (httpEE *HTTPjsonMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
return
}
func (httpEE *HTTPjsonMapEE) GetMetrics() utils.MapStorage {
func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.SafeMapStorage {
return httpEE.dc.Clone()
}
@@ -138,7 +131,7 @@ func (httpEE *HTTPjsonMapEE) composeHeader() (hdr http.Header, err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaHdr: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: httpEE.dc,
utils.MetaCfg: httpEE.cgrCfg.GetDataProvider(),
}, httpEE.cgrCfg.GeneralCfg().DefaultTenant,

View File

@@ -83,8 +83,8 @@ func TestHttpJsonMapExportEvent1(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
if !reflect.DeepEqual(dcExpect, httpEE.dc[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc[utils.NumberOfEvents])
if !reflect.DeepEqual(dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents])
}
}
@@ -143,8 +143,8 @@ func TestHttpJsonMapExportEvent2(t *testing.T) {
t.Error(err)
}
dcExpect := int64(1)
if !reflect.DeepEqual(dcExpect, httpEE.dc[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc[utils.NumberOfEvents])
if !reflect.DeepEqual(dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents])
}
}
@@ -190,8 +190,8 @@ func TestHttpJsonMapExportEvent3(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
if !reflect.DeepEqual(dcExpect, httpEE.dc[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc[utils.NumberOfEvents])
if !reflect.DeepEqual(dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents])
}
}
func TestHttpJsonMapExportEvent4(t *testing.T) {
@@ -239,8 +239,8 @@ func TestHttpJsonMapExportEvent4(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
if !reflect.DeepEqual(dcExpect, httpEE.dc[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc[utils.NumberOfEvents])
if !reflect.DeepEqual(dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents])
}
}
@@ -292,8 +292,8 @@ func TestHttpJsonMapExportEvent5(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
if !reflect.DeepEqual(dcExpect, httpEE.dc[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc[utils.NumberOfEvents])
if !reflect.DeepEqual(dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpEE.dc.MapStorage[utils.NumberOfEvents])
}
httpEE.OnEvicted("test", "test")
}

View File

@@ -22,7 +22,6 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -30,7 +29,7 @@ import (
)
func NewHTTPPostEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (httpPost *HTTPPost, err error) {
dc *utils.SafeMapStorage) (httpPost *HTTPPost, err error) {
httpPost = &HTTPPost{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
httpPost.httpPoster = engine.NewHTTPPoster(cgrCfg.GeneralCfg().ReplyTimeout,
@@ -47,8 +46,7 @@ type HTTPPost struct {
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
httpPoster *engine.HTTPPoster
sync.RWMutex
dc utils.MapStorage
dc *utils.SafeMapStorage
}
// ID returns the identificator of this exporter
@@ -62,16 +60,13 @@ func (httpPost *HTTPPost) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
httpPost.Lock()
defer func() {
if err != nil {
httpPost.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
} else {
httpPost.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
}
httpPost.Unlock()
updateEEMetrics(httpPost.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Timezone,
httpPost.cgrCfg.GeneralCfg().DefaultTimezone))
}()
httpPost.dc[utils.NumberOfEvents] = httpPost.dc[utils.NumberOfEvents].(int64) + 1
httpPost.dc.Lock()
httpPost.dc.MapStorage[utils.NumberOfEvents] = httpPost.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
httpPost.dc.Unlock()
urlVals := url.Values{}
hdr := http.Header{}
@@ -83,10 +78,10 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
utils.MetaReq: cgrEv.Event,
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: httpPost.dc,
utils.MetaOpts: cgrEv.APIOpts,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: httpPost.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, httpPost.cgrCfg.GeneralCfg().DefaultTenant),
httpPost.filterS, oNm)
@@ -103,8 +98,7 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
return
}
}
updateEEMetrics(httpPost.dc, cgrEv.Event, utils.FirstNonEmpty(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Timezone,
httpPost.cgrCfg.GeneralCfg().DefaultTimezone))
if err = httpPost.httpPoster.PostValues(urlVals, hdr); err != nil &&
httpPost.cgrCfg.GeneralCfg().FailedPostsDir != utils.MetaNone {
engine.AddFailedPost(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ExportPath,
@@ -117,7 +111,7 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
return
}
func (httpPost *HTTPPost) GetMetrics() utils.MapStorage {
func (httpPost *HTTPPost) GetMetrics() *utils.SafeMapStorage {
return httpPost.dc.Clone()
}
@@ -130,7 +124,7 @@ func (httpPost *HTTPPost) composeHeader() (hdr http.Header, err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaHdr: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: httpPost.dc,
utils.MetaCfg: httpPost.cgrCfg.GetDataProvider(),
}, httpPost.cgrCfg.GeneralCfg().DefaultTenant,

View File

@@ -82,8 +82,8 @@ func TestHttpPostExportEvent(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
if !reflect.DeepEqual(dcExpect, httpPost.dc[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpPost.dc[utils.NumberOfEvents])
if !reflect.DeepEqual(dcExpect, httpPost.dc.MapStorage[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpPost.dc.MapStorage[utils.NumberOfEvents])
}
}
@@ -140,8 +140,8 @@ func TestHttpPostExportEvent2(t *testing.T) {
t.Error(err)
}
dcExpect := int64(1)
if !reflect.DeepEqual(dcExpect, httpPost.dc[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpPost.dc[utils.NumberOfEvents])
if !reflect.DeepEqual(dcExpect, httpPost.dc.MapStorage[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpPost.dc.MapStorage[utils.NumberOfEvents])
}
}
@@ -187,8 +187,8 @@ func TestHttpPostExportEvent3(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
if !reflect.DeepEqual(dcExpect, httpPost.dc[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpPost.dc[utils.NumberOfEvents])
if !reflect.DeepEqual(dcExpect, httpPost.dc.MapStorage[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpPost.dc.MapStorage[utils.NumberOfEvents])
}
}
@@ -237,8 +237,8 @@ func TestHttpPostExportEvent4(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
if !reflect.DeepEqual(dcExpect, httpPost.dc[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpPost.dc[utils.NumberOfEvents])
if !reflect.DeepEqual(dcExpect, httpPost.dc.MapStorage[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, httpPost.dc.MapStorage[utils.NumberOfEvents])
}
httpPost.OnEvicted("test", "test")
}

View File

@@ -21,7 +21,6 @@ package ees
import (
"encoding/json"
"strings"
"sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -29,7 +28,7 @@ import (
)
func NewPosterJSONMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (pstrJSON *PosterJSONMapEE, err error) {
dc *utils.SafeMapStorage) (pstrJSON *PosterJSONMapEE, err error) {
pstrJSON = &PosterJSONMapEE{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
@@ -68,8 +67,7 @@ type PosterJSONMapEE struct {
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
poster engine.Poster
dc utils.MapStorage
sync.RWMutex
dc *utils.SafeMapStorage
}
// ID returns the identificator of this exporter
@@ -84,17 +82,13 @@ func (pstrEE *PosterJSONMapEE) OnEvicted(string, interface{}) {
// ExportEvent implements EventExporter
func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
pstrEE.Lock()
defer func() {
if err != nil {
pstrEE.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
} else {
pstrEE.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
}
pstrEE.Unlock()
updateEEMetrics(pstrEE.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Timezone,
pstrEE.cgrCfg.GeneralCfg().DefaultTimezone))
}()
pstrEE.dc[utils.NumberOfEvents] = pstrEE.dc[utils.NumberOfEvents].(int64) + 1
pstrEE.dc.Lock()
pstrEE.dc.MapStorage[utils.NumberOfEvents] = pstrEE.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
pstrEE.dc.Unlock()
valMp := make(map[string]interface{})
if len(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ContentFields()) == 0 {
@@ -103,10 +97,10 @@ func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
utils.MetaReq: cgrEv.Event,
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: pstrEE.dc,
utils.MetaOpts: cgrEv.APIOpts,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: pstrEE.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, pstrEE.cgrCfg.GeneralCfg().DefaultTenant),
pstrEE.filterS, oNm)
@@ -121,8 +115,7 @@ func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
valMp[strings.Join(path, utils.NestingSep)] = nmIt.String()
}
}
updateEEMetrics(pstrEE.dc, cgrEv.Event, utils.FirstNonEmpty(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Timezone,
pstrEE.cgrCfg.GeneralCfg().DefaultTimezone))
cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID())
runID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault)
var body []byte
@@ -138,6 +131,6 @@ func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
return
}
func (pstrEE *PosterJSONMapEE) GetMetrics() utils.MapStorage {
func (pstrEE *PosterJSONMapEE) GetMetrics() *utils.SafeMapStorage {
return pstrEE.dc.Clone()
}

View File

@@ -183,16 +183,16 @@ func TestPosterJsonMapExportEvent(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
if !reflect.DeepEqual(dcExpect, pstrEE.dc[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc[utils.NumberOfEvents])
if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
}
cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ComputeFields()
if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect = int64(2)
if !reflect.DeepEqual(dcExpect, pstrEE.dc[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc[utils.NumberOfEvents])
if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
}
}
@@ -252,8 +252,8 @@ func TestPosterJsonMapExportEvent1(t *testing.T) {
t.Error(err)
}
dcExpect := int64(1)
if !reflect.DeepEqual(dcExpect, pstrEE.dc[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc[utils.NumberOfEvents])
if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
}
bodyExpect := map[string]interface{}{
"2": "*req.field2",
@@ -310,8 +310,8 @@ func TestPosterJsonMapExportEvent2(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
if !reflect.DeepEqual(dcExpect, pstrEE.dc[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc[utils.NumberOfEvents])
if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
}
}
@@ -350,8 +350,8 @@ func TestPosterJsonMapExportEvent3(t *testing.T) {
t.Errorf("Expected %q but received %q", errExpect, err)
}
dcExpect := int64(1)
if !reflect.DeepEqual(dcExpect, pstrEE.dc[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc[utils.NumberOfEvents])
if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
}
pstrEE.OnEvicted("test", "test")
}

View File

@@ -23,7 +23,6 @@ import (
"fmt"
"net/url"
"strings"
"sync"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
@@ -35,7 +34,7 @@ import (
)
func NewSQLEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (sqlEe *SQLEe, err error) {
dc *utils.SafeMapStorage) (sqlEe *SQLEe, err error) {
sqlEe = &SQLEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
@@ -58,8 +57,7 @@ type SQLEe struct {
tableName string
sync.RWMutex
dc utils.MapStorage
dc *utils.SafeMapStorage
}
func (sqlEe *SQLEe) NewSQLEeURL(cgrCfg *config.CGRConfig) (dialect gorm.Dialector, err error) {
@@ -144,26 +142,23 @@ func (sqlEe *SQLEe) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (sqlEe *SQLEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
sqlEe.Lock()
defer func() {
if err != nil {
sqlEe.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
} else {
sqlEe.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
}
sqlEe.Unlock()
updateEEMetrics(sqlEe.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Timezone,
sqlEe.cgrCfg.GeneralCfg().DefaultTimezone))
}()
sqlEe.dc[utils.NumberOfEvents] = sqlEe.dc[utils.NumberOfEvents].(int64) + 1
sqlEe.dc.Lock()
sqlEe.dc.MapStorage[utils.NumberOfEvents] = sqlEe.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
sqlEe.dc.Unlock()
var vals []interface{}
var colNames []string
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
utils.MetaReq: cgrEv.Event,
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: sqlEe.dc,
utils.MetaOpts: cgrEv.APIOpts,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: sqlEe.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, sqlEe.cgrCfg.GeneralCfg().DefaultTenant),
sqlEe.filterS, oNm)
@@ -194,12 +189,10 @@ func (sqlEe *SQLEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}
sqlEe.db.Table(sqlEe.tableName).Exec(sqlQuery, vals...)
updateEEMetrics(sqlEe.dc, cgrEv.Event, utils.FirstNonEmpty(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Timezone,
sqlEe.cgrCfg.GeneralCfg().DefaultTimezone))
return
}
func (sqlEe *SQLEe) GetMetrics() utils.MapStorage {
func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage {
return sqlEe.dc.Clone()
}

View File

@@ -19,15 +19,13 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ees
import (
"sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewVirtualExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (vEe *VirtualEe, err error) {
dc *utils.SafeMapStorage) (vEe *VirtualEe, err error) {
vEe = &VirtualEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
err = vEe.init()
@@ -40,8 +38,7 @@ type VirtualEe struct {
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
sync.RWMutex
dc utils.MapStorage
dc *utils.SafeMapStorage
}
// init will create all the necessary dependencies, including opening the file
@@ -60,35 +57,31 @@ func (vEe *VirtualEe) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
vEe.Lock()
defer func() {
if err != nil {
vEe.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
} else {
vEe.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
}
vEe.Unlock()
updateEEMetrics(vEe.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].Timezone,
vEe.cgrCfg.GeneralCfg().DefaultTimezone))
}()
vEe.dc[utils.NumberOfEvents] = vEe.dc[utils.NumberOfEvents].(int64) + 1
vEe.dc.Lock()
vEe.dc.MapStorage[utils.NumberOfEvents] = vEe.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
vEe.dc.Unlock()
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
utils.MetaReq: cgrEv.Event,
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: vEe.dc,
utils.MetaOpts: cgrEv.APIOpts,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: vEe.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, vEe.cgrCfg.GeneralCfg().DefaultTenant),
vEe.filterS, oNm)
if err = eeReq.SetFields(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].ContentFields()); err != nil {
return
}
updateEEMetrics(vEe.dc, cgrEv.Event, utils.FirstNonEmpty(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].Timezone,
vEe.cgrCfg.GeneralCfg().DefaultTimezone))
return
}
func (vEe *VirtualEe) GetMetrics() utils.MapStorage {
func (vEe *VirtualEe) GetMetrics() *utils.SafeMapStorage {
return vEe.dc.Clone()
}

View File

@@ -28,7 +28,7 @@ import (
)
// NewExportRequest returns a new EventRequest
func NewExportRequest(inData map[string]utils.MapStorage,
func NewExportRequest(inData map[string]utils.DataStorage,
tnt string,
filterS *FilterS, oNM map[string]*utils.OrderedNavigableMap) (eeR *ExportRequest) {
eeR = &ExportRequest{
@@ -43,7 +43,7 @@ func NewExportRequest(inData map[string]utils.MapStorage,
// ExportRequest represents data related to one request towards agent
// implements utils.DataProvider so we can pass it to filters
type ExportRequest struct {
inData map[string]utils.MapStorage // request
inData map[string]utils.DataStorage // request
ExpData map[string]*utils.OrderedNavigableMap // *exp:OrderNavMp *trl:OrderNavMp *cdr:OrderNavMp
tnt string
filterS *FilterS

View File

@@ -28,8 +28,8 @@ import (
"github.com/cgrates/cgrates/utils"
)
func TestEventRequestParseFieldDateTimeDaily(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
func TestExportRequestParseFieldDateTimeDaily(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile("*daily", utils.InfieldSep),
@@ -56,8 +56,8 @@ func TestEventRequestParseFieldDateTimeDaily(t *testing.T) {
}
}
func TestEventReqParseFieldDateTimeTimeZone(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
func TestExportReqParseFieldDateTimeTimeZone(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile("*daily", utils.InfieldSep),
@@ -84,8 +84,8 @@ func TestEventReqParseFieldDateTimeTimeZone(t *testing.T) {
}
}
func TestEventReqParseFieldDateTimeMonthly(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
func TestExportReqParseFieldDateTimeMonthly(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile("*monthly", utils.InfieldSep),
@@ -111,8 +111,8 @@ func TestEventReqParseFieldDateTimeMonthly(t *testing.T) {
}
}
func TestEventReqParseFieldDateTimeMonthlyEstimated(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
func TestExportReqParseFieldDateTimeMonthlyEstimated(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile("*monthly_estimated", utils.InfieldSep),
@@ -138,8 +138,8 @@ func TestEventReqParseFieldDateTimeMonthlyEstimated(t *testing.T) {
}
}
func TestEventReqParseFieldDateTimeYearly(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
func TestExportReqParseFieldDateTimeYearly(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile("*yearly", utils.InfieldSep),
@@ -165,8 +165,8 @@ func TestEventReqParseFieldDateTimeYearly(t *testing.T) {
}
}
func TestEventReqParseFieldDateTimeMetaUnlimited(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
func TestExportReqParseFieldDateTimeMetaUnlimited(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile(utils.MetaUnlimited, utils.InfieldSep),
@@ -192,8 +192,8 @@ func TestEventReqParseFieldDateTimeMetaUnlimited(t *testing.T) {
}
}
func TestEventReqParseFieldDateTimeEmpty(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
func TestExportReqParseFieldDateTimeEmpty(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile("", utils.InfieldSep),
@@ -219,8 +219,8 @@ func TestEventReqParseFieldDateTimeEmpty(t *testing.T) {
}
}
func TestEventReqParseFieldDateTimeMonthEnd(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
func TestExportReqParseFieldDateTimeMonthEnd(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile("*month_endTest", utils.InfieldSep),
@@ -246,8 +246,8 @@ func TestEventReqParseFieldDateTimeMonthEnd(t *testing.T) {
}
}
func TestAgentRequestParseFieldDateTimeError(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.MapStorage{}, "", nil, nil)
func TestExportReqParseFieldDateTimeError(t *testing.T) {
EventReq := NewExportRequest(map[string]utils.DataStorage{}, "", nil, nil)
fctTemp := &config.FCTemplate{
Type: utils.MetaDateTime,
Value: config.NewRSRParsersMustCompile("*month_endTest", utils.InfieldSep),
@@ -261,13 +261,13 @@ func TestAgentRequestParseFieldDateTimeError(t *testing.T) {
}
}
func TestEventReqParseFieldDateTimeError2(t *testing.T) {
func TestExportReqParseFieldDateTimeError2(t *testing.T) {
prsr, err := config.NewRSRParsersFromSlice([]string{"2.", "~*req.CGRID<~*opts.Converter>"})
if err != nil {
t.Fatal(err)
}
mS := map[string]utils.MapStorage{
utils.MetaOpts: {
mS := map[string]utils.DataStorage{
utils.MetaOpts: utils.MapStorage{
utils.AccountField: "1002",
utils.Usage: "20m",
},
@@ -285,9 +285,9 @@ func TestEventReqParseFieldDateTimeError2(t *testing.T) {
}
}
func TestEventReqFieldAsInterface(t *testing.T) {
inData := map[string]utils.MapStorage{
utils.MetaReq: {
func TestExportReqFieldAsInterface(t *testing.T) {
inData := map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage{
"Account": "1001",
"Usage": "10m",
},
@@ -310,9 +310,9 @@ func TestEventReqFieldAsInterface(t *testing.T) {
}
}
func TestEventReqNewEventExporter(t *testing.T) {
inData := map[string]utils.MapStorage{
utils.MetaReq: {
func TestExportReqNewEventExporter(t *testing.T) {
inData := map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage{
"Account": "1001",
"Usage": "10m",
},

View File

@@ -416,9 +416,9 @@ func (erS *ERService) onEvicted(id string, value interface{}) {
return
}
// convert the event to record
eeReq := engine.NewExportRequest(map[string]utils.MapStorage{
utils.MetaReq: cgrEv.Event,
utils.MetaOpts: cgrEv.APIOpts,
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: erS.cfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, erS.cfg.GeneralCfg().DefaultTenant),
erS.filterS, map[string]*utils.OrderedNavigableMap{

View File

@@ -27,8 +27,8 @@ import (
"time"
)
// dataStorage is the DataProvider that can be updated
type dataStorage interface {
// DataStorage is the DataProvider that can be updated
type DataStorage interface {
DataProvider
Set(fldPath []string, val interface{}) error
@@ -194,7 +194,7 @@ func (ms MapStorage) Set(fldPath []string, val interface{}) (err error) {
return nMap.Set(fldPath[1:], val)
}
switch dp := ms[fldPath[0]].(type) {
case dataStorage:
case DataStorage:
return dp.Set(fldPath[1:], val)
case map[string]interface{}:
return MapStorage(dp).Set(fldPath[1:], val)
@@ -232,7 +232,7 @@ func (ms MapStorage) GetKeys(nested bool, nestedLimit int, prefix string) (keys
for k, v := range ms { // in case of nested on false we take in consideraton the nestedLimit
//keys = append(keys, prefix+k)
switch rv := v.(type) { // and for performance we only take in consideration a limited set of types for nested false
case dataStorage:
case DataStorage:
keys = append(keys, rv.GetKeys(nested, nestedLimit-1, prefix+k)...)
case map[string]interface{}:
keys = append(keys, MapStorage(rv).GetKeys(nested, nestedLimit-1, prefix+k)...)
@@ -247,7 +247,7 @@ func (ms MapStorage) GetKeys(nested bool, nestedLimit int, prefix string) (keys
for k, v := range ms {
//keys = append(keys, prefix+k)
switch rv := v.(type) {
case dataStorage:
case DataStorage:
keys = append(keys, rv.GetKeys(nested, nestedLimit, prefix+k)...)
case map[string]interface{}:
keys = append(keys, MapStorage(rv).GetKeys(nested, nestedLimit, prefix+k)...)
@@ -257,7 +257,7 @@ func (ms MapStorage) GetKeys(nested bool, nestedLimit int, prefix string) (keys
// keys = append(keys, pref)
keys = append(keys, dp.GetKeys(nested, nestedLimit, pref)...)
}
case []dataStorage:
case []DataStorage:
for i, dp := range rv {
pref := prefix + k + fmt.Sprintf("[%v]", i)
// keys = append(keys, pref)
@@ -304,7 +304,7 @@ func (ms MapStorage) Remove(fldPath []string) (err error) {
return
}
switch dp := val.(type) {
case dataStorage:
case DataStorage:
return dp.Remove(fldPath[1:])
case map[string]interface{}:
return MapStorage(dp).Remove(fldPath[1:])

View File

@@ -631,7 +631,7 @@ func TestNavMapFieldAsInterface3(t *testing.T) {
func TestNavMapGetKeys2(t *testing.T) {
navMp := MapStorage{
"FirstLevel": dataStorage(MapStorage{
"FirstLevel": DataStorage(MapStorage{
"SecondLevel": map[string]interface{}{
"ThirdLevel": MapStorage{
"Fld1": 123.123,
@@ -654,7 +654,7 @@ func TestNavMapGetKeys2(t *testing.T) {
},
"Field6": []string{"1", "2"},
"Field7": []interface{}{"1", "2"},
"Field8": []dataStorage{MapStorage{"A": 1}},
"Field8": []DataStorage{MapStorage{"A": 1}},
"Field9": []MapStorage{{"A": 1}},
"Field10": []map[string]interface{}{{"A": 1}},
}

77
utils/safemapstorage.go Normal file
View File

@@ -0,0 +1,77 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package utils
import (
"sync"
)
// SafeMapStorage is a Mapstorage with mutex
type SafeMapStorage struct {
MapStorage
sync.RWMutex
}
// String returns the map as json string
func (ms *SafeMapStorage) String() string {
ms.RLock()
defer ms.RUnlock()
return ms.MapStorage.String()
}
// FieldAsInterface returns the value from the path
func (ms *SafeMapStorage) FieldAsInterface(fldPath []string) (val interface{}, err error) {
ms.RLock()
defer ms.RUnlock()
return ms.MapStorage.FieldAsInterface(fldPath)
}
// FieldAsString returns the value from path as string
func (ms *SafeMapStorage) FieldAsString(fldPath []string) (str string, err error) {
ms.RLock()
defer ms.RUnlock()
return ms.MapStorage.FieldAsString(fldPath)
}
// Set sets the value at the given path
func (ms *SafeMapStorage) Set(fldPath []string, val interface{}) (err error) {
ms.Lock()
defer ms.Unlock()
return ms.MapStorage.Set(fldPath, val)
}
// GetKeys returns all the keys from map
func (ms *SafeMapStorage) GetKeys(nested bool, nestedLimit int, prefix string) (keys []string) {
ms.RLock()
defer ms.RUnlock()
return ms.MapStorage.GetKeys(nested, nestedLimit, prefix)
}
// Remove removes the item at path
func (ms *SafeMapStorage) Remove(fldPath []string) (err error) {
ms.Lock()
defer ms.Unlock()
return ms.MapStorage.Remove(fldPath)
}
func (ms *SafeMapStorage) Clone() (msClone *SafeMapStorage) {
ms.RLock()
defer ms.RUnlock()
return &SafeMapStorage{MapStorage: ms.MapStorage.Clone()}
}