Add integration test for ExportCDRs with multiple exporters

This commit is contained in:
TeoV
2020-09-03 14:57:11 +03:00
committed by Dan Christian Bogos
parent 756e14d3d9
commit 8cc0af4981
11 changed files with 330 additions and 29 deletions

View File

@@ -227,7 +227,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma
if evict {
ee.OnEvicted("", nil) // so we can close ie the file
}
if hasVerbose && eeCfg.Synchronous {
if hasVerbose && sync {
metricMapLock.Lock()
metricsMap[ee.ID()] = ee.GetMetrics()
metricMapLock.Unlock()
@@ -254,7 +254,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma
func newEEMetrics() utils.MapStorage {
return utils.MapStorage{
utils.NumberOfEvents: 0,
utils.NumberOfEvents: int64(0),
utils.PositiveExports: utils.StringSet{},
utils.NegativeExports: utils.StringSet{},
utils.TimeNow: time.Now(),
@@ -330,3 +330,107 @@ func updateEEMetrics(dc utils.MapStorage, ev engine.MapEvent, timezone string) {
}
}
}
func MergeEEMetrics(dc map[string]map[string]interface{}) (reply map[string]interface{}, err error) {
positiveExp := make(utils.StringSet)
negativeExp := make(utils.StringSet)
reply = map[string]interface{}{
utils.PositiveExports: positiveExp,
utils.NegativeExports: negativeExp,
}
for _, metrics := range dc {
for k, v := range metrics {
switch k {
case utils.NumberOfEvents:
val, err := utils.IfaceAsTInt64(v)
if err != nil {
return nil, err
}
if _, has := reply[k]; !has {
reply[k] = val
} else {
reply[k] = reply[k].(int64) + val
}
case utils.PositiveExports:
val, canCast := v.(map[string]interface{})
if !canCast {
return nil, fmt.Errorf("cannot cast to map[string]interface{} %+v for positive exports", v)
}
for pos := range val {
positiveExp.Add(pos)
}
case utils.NegativeExports:
val, canCast := v.(map[string]interface{})
if !canCast {
return nil, fmt.Errorf("cannot cast to map[string]interface{} %+v for negative exports", v)
}
for neg := range val {
negativeExp.Add(neg)
}
case utils.FirstEventATime:
val, err := utils.IfaceAsTime(v, config.CgrConfig().GeneralCfg().DefaultTimezone)
if err != nil {
return nil, err
}
if _, has := reply[k]; !has {
reply[k] = val
} else if val.Before(reply[k].(time.Time)) {
reply[k] = val
}
case utils.LastEventATime:
val, err := utils.IfaceAsTime(v, config.CgrConfig().GeneralCfg().DefaultTimezone)
if err != nil {
return nil, err
}
if _, has := reply[k]; !has {
reply[k] = val
} else if val.After(reply[k].(time.Time)) {
reply[k] = val
}
case utils.FirstExpOrderID:
val, err := utils.IfaceAsTInt64(v)
if err != nil {
return nil, err
}
if _, has := reply[k]; !has {
reply[k] = val
} else if reply[k].(int64) > val {
reply[k] = val
}
case utils.LastExpOrderID:
val, err := utils.IfaceAsTInt64(v)
if err != nil {
return nil, err
}
if _, has := reply[k]; !has {
reply[k] = val
} else if reply[k].(int64) < val {
reply[k] = val
}
case utils.TotalCost:
val, err := utils.IfaceAsFloat64(v)
if err != nil {
return nil, err
}
if _, has := reply[k]; !has {
reply[k] = val
} else {
reply[k] = reply[k].(float64) + val
}
case utils.TotalDuration, utils.TotalSMSUsage,
utils.TotalMMSUsage, utils.TotalGenericUsage,
utils.TotalDataUsage:
val, err := utils.IfaceAsDuration(v)
if err != nil {
return nil, err
}
if _, has := reply[k]; !has {
reply[k] = val
} else {
reply[k] = reply[k].(time.Duration) + val
}
}
}
}
return
}

View File

@@ -98,7 +98,7 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}
fCsv.Unlock()
}()
fCsv.dc[utils.NumberOfEvents] = fCsv.dc[utils.NumberOfEvents].(int) + 1
fCsv.dc[utils.NumberOfEvents] = fCsv.dc[utils.NumberOfEvents].(int64) + 1
var csvRecord []string
req := utils.MapStorage{}

View File

@@ -89,7 +89,7 @@ func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}
fFwv.Unlock()
}()
fFwv.dc[utils.NumberOfEvents] = fFwv.dc[utils.NumberOfEvents].(int) + 1
fFwv.dc[utils.NumberOfEvents] = fFwv.dc[utils.NumberOfEvents].(int64) + 1
var records []string
req := utils.MapStorage{}
for k, v := range cgrEv.Event {

View File

@@ -80,7 +80,7 @@ func (httpJson *HTTPJsonMapEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
httpJson.Unlock()
}()
httpJson.dc[utils.NumberOfEvents] = httpJson.dc[utils.NumberOfEvents].(int) + 1
httpJson.dc[utils.NumberOfEvents] = httpJson.dc[utils.NumberOfEvents].(int64) + 1
valMp := make(map[string]string)
eeReq := NewEventExporterRequest(utils.MapStorage(cgrEv.Event), httpJson.dc,

View File

@@ -72,7 +72,7 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}
httpPost.Unlock()
}()
httpPost.dc[utils.NumberOfEvents] = httpPost.dc[utils.NumberOfEvents].(int) + 1
httpPost.dc[utils.NumberOfEvents] = httpPost.dc[utils.NumberOfEvents].(int64) + 1
var body interface{}
urlVals := url.Values{}

View File

@@ -71,7 +71,7 @@ func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}
vEe.Unlock()
}()
vEe.dc[utils.NumberOfEvents] = vEe.dc[utils.NumberOfEvents].(int) + 1
vEe.dc[utils.NumberOfEvents] = vEe.dc[utils.NumberOfEvents].(int64) + 1
req := utils.MapStorage{}
for k, v := range cgrEv.Event {