Update EventExporter ProcessEvent to include metrics informations

This commit is contained in:
TeoV
2020-09-02 17:08:18 +03:00
committed by Dan Christian Bogos
parent 196f8da1cc
commit 8340e65ebd
14 changed files with 71 additions and 40 deletions

View File

@@ -1841,7 +1841,7 @@ func (apierSv1 *APIerSv1) ExportToFolder(arg *utils.ArgExportToFolder, reply *st
return nil
}
func (apierSv1 *APIerSv1) ExportCDRs(args *utils.ArgExportCDRs, reply *map[string]utils.MapStorage) error {
func (apierSv1 *APIerSv1) ExportCDRs(args *utils.ArgExportCDRs, reply *map[string]interface{}) error {
if len(apierSv1.Config.ApierCfg().EEsConns) == 0 {
return utils.NewErrNotConnected(utils.EEs)
}
@@ -1856,12 +1856,20 @@ func (apierSv1 *APIerSv1) ExportCDRs(args *utils.ArgExportCDRs, reply *map[strin
return utils.ErrNotFound
}
withErros := false
var rplyCdr map[string]map[string]interface{}
for _, cdr := range cdrs {
argCdr := &utils.CGREventWithIDs{
IDs: args.ExporterIDs,
CGREventWithOpts: &utils.CGREventWithOpts{
CGREvent: cdr.AsCGREvent(),
Opts: make(map[string]interface{}),
},
}
if args.Verbose {
argCdr.CGREventWithOpts.Opts[utils.EEsVerbose] = struct{}{}
}
if err := apierSv1.ConnMgr.Call(apierSv1.Config.ApierCfg().EEsConns, nil, utils.EventExporterSv1ProcessEvent,
&utils.CGREventWithIDs{
IDs: args.ExporterIDs,
CGREventWithOpts: &utils.CGREventWithOpts{CGREvent: cdr.AsCGREvent()},
}, reply); err != nil {
argCdr, &rplyCdr); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> error: <%s> processing event: <%s> with <%s>",
utils.ApierS, err.Error(), utils.ToJSON(cdr.AsCGREvent()), utils.EventExporterS))
withErros = true
@@ -1870,5 +1878,18 @@ func (apierSv1 *APIerSv1) ExportCDRs(args *utils.ArgExportCDRs, reply *map[strin
if withErros {
return utils.ErrPartiallyExecuted
}
// we consider only the last reply because it should have the metrics updated
if !args.Verbose {
(*reply)[utils.ExporterIDs] = make([]string, 0, len(rplyCdr))
}
for exporterID, metrics := range rplyCdr {
if !args.Verbose {
(*reply)[utils.ExporterIDs] = append((*reply)[utils.ExporterIDs].([]string), exporterID)
} else {
for k, v := range metrics {
(*reply)[k] = v
}
}
}
return nil
}

View File

@@ -38,6 +38,6 @@ func (eSv1 *EventExporterSv1) Ping(ign *utils.CGREventWithOpts, reply *string) e
// ProcessEvent triggers exports on EEs side
func (eSv1 *EventExporterSv1) ProcessEvent(args *utils.CGREventWithIDs,
reply *map[string]utils.MapStorage) error {
reply *map[string]map[string]interface{}) error {
return eSv1.eeS.V1ProcessEvent(args, reply)
}

View File

@@ -184,20 +184,21 @@ func testEEsAddCDRs(t *testing.T) {
func testEEsExportCDRs(t *testing.T) {
attr := &utils.ArgExportCDRs{
ExporterIDs: []string{"CSVExporter"},
Verbose: true,
}
var rply map[string]utils.MapStorage
var rply map[string]interface{}
if err := eeSRPC.Call(utils.APIerSv1ExportCDRs, &attr, &rply); err != nil {
t.Error("Unexpected error: ", err.Error())
}
time.Sleep(time.Second)
if rply["CSVExporter"]["FirstExpOrderID"] != 1.0 {
t.Errorf("Expected %+v, received: %+v", 1.0, rply["CSVExporter"]["FirstExpOrderID"])
} else if rply["CSVExporter"]["LastExpOrderID"] != 4.0 {
t.Errorf("Expected %+v, received: %+v", 4.0, rply["CSVExporter"]["LastExpOrderID"])
} else if rply["CSVExporter"]["NumberOfEvents"] != 4.0 {
t.Errorf("Expected %+v, received: %+v", 4.0, rply["CSVExporter"]["NumberOfEvents"])
} else if rply["CSVExporter"]["TotalCost"] != 4.04 {
t.Errorf("Expected %+v, received: %+v", 4.04, rply["CSVExporter"]["TotalCost"])
if rply["FirstExpOrderID"] != 1.0 {
t.Errorf("Expected %+v, received: %+v", 1.0, rply["FirstExpOrderID"])
} else if rply["LastExpOrderID"] != 4.0 {
t.Errorf("Expected %+v, received: %+v", 4.0, rply["LastExpOrderID"])
} else if rply["NumberOfEvents"] != 4.0 {
t.Errorf("Expected %+v, received: %+v", 4.0, rply["NumberOfEvents"])
} else if rply["TotalCost"] != 4.04 {
t.Errorf("Expected %+v, received: %+v", 4.04, rply["TotalCost"])
}
}

View File

@@ -57,7 +57,7 @@
"enabled": true,
"attributes_conns":["*internal"],
"cache": {
"*file_csv": {"limit": -1, "ttl": "500ms", "static_ttl": false},
"*file_csv": {"limit": -1, "ttl": "1s", "static_ttl": false},
},
"exporters": [
{

View File

@@ -61,7 +61,7 @@
"enabled": true,
"attributes_conns":["*internal"],
"cache": {
"*file_csv": {"limit": -1, "ttl": "500ms", "static_ttl": false},
"*file_csv": {"limit": -1, "ttl": "1s", "static_ttl": false},
},
"exporters": [
{

View File

@@ -59,7 +59,7 @@
"enabled": true,
"attributes_conns":["*internal"],
"cache": {
"*file_csv": {"limit": -1, "ttl": "500ms", "static_ttl": false},
"*file_csv": {"limit": -1, "ttl": "1s", "static_ttl": false},
},
"exporters": [
{

View File

@@ -141,7 +141,8 @@ func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREventWithOpts, attr
}
// V1ProcessEvent will be called each time a new event is received from readers
func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *map[string]utils.MapStorage) (err error) {
// rply -> map[string]map[string]interface{}
func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *map[string]map[string]interface{}) (err error) {
eeS.cfg.RLocks(config.EEsJson)
defer eeS.cfg.RUnlocks(config.EEsJson)
@@ -153,7 +154,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma
var withErr bool
var metricMapLock sync.RWMutex
metricsMap := make(map[string]utils.MapStorage)
isVerbose := cgrEv.HasField(utils.EEsVerbose)
_, hasVerbose := cgrEv.Opts[utils.OptsEEsVerbose]
for cfgIdx, eeCfg := range eeS.cfg.EEsNoLksCfg().Exporters {
if eeCfg.Type == utils.META_NONE || // ignore *none type exporter
(lenExpIDs != 0 && !expIDs.Has(eeCfg.ID)) {
@@ -207,7 +208,16 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma
if eeCfg.Synchronous {
wg.Add(1) // wait for synchronous or file ones since these need to be done before continuing
}
go func(evict, sync bool, ee EventExporter, eeCfg *config.EventExporterCfg) {
metricMapLock.Lock()
metricsMap[ee.ID()] = utils.MapStorage{}
metricMapLock.Unlock()
// log the message before starting the gorutine, but still execute the exporter
if hasVerbose && !eeCfg.Synchronous {
utils.Logger.Warning(
fmt.Sprintf("<%s> with id <%s>, running verbosed exporter with syncronous false",
utils.EventExporterS, ee.ID()))
}
go func(evict, sync bool, ee EventExporter) {
if err := ee.ExportEvent(cgrEv.CGREvent); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> with id <%s>, error: <%s>",
@@ -217,19 +227,15 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma
if evict {
ee.OnEvicted("", nil) // so we can close ie the file
}
metricMapLock.Lock()
metricsMap[ee.ID()] = ee.GetMetrics()
metricMapLock.Unlock()
if isVerbose && !eeCfg.Synchronous {
utils.Logger.Warning(
fmt.Sprintf("<%s> with id <%s>, running verbosed export with syncronous false",
utils.EventExporterS, ee.ID()))
withErr = true
if hasVerbose && eeCfg.Synchronous {
metricMapLock.Lock()
metricsMap[ee.ID()] = ee.GetMetrics()
metricMapLock.Unlock()
}
if sync {
wg.Done()
}
}(!hasCache, eeCfg.Synchronous, ee, eeCfg)
}(!hasCache, eeCfg.Synchronous, ee)
}
wg.Wait()
if withErr {
@@ -237,7 +243,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma
return
}
*rply = make(map[string]utils.MapStorage)
*rply = make(map[string]map[string]interface{})
metricMapLock.Lock()
for k, v := range metricsMap {
(*rply)[k] = v

View File

@@ -33,7 +33,7 @@ import (
func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (fCsv *FileCSVee, err error) {
dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
dc[utils.ExporterID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
fCsv = &FileCSVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
err = fCsv.init()

View File

@@ -31,7 +31,7 @@ import (
)
func NewFileFWVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc utils.MapStorage) (fFwv *FileFWVee, err error) {
dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
dc[utils.ExporterID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
fFwv = &FileFWVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
err = fFwv.init()

View File

@@ -31,7 +31,7 @@ import (
func NewHTTPJsonMapEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (httpJSON *HTTPJsonMapEe, err error) {
dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
dc[utils.ExporterID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
httpJSON = &HTTPJsonMapEe{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,

View File

@@ -31,7 +31,7 @@ import (
func NewHTTPPostEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (httpPost *HTTPPost, err error) {
dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
dc[utils.ExporterID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
httpPost = &HTTPPost{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
httpPost.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify,

View File

@@ -28,7 +28,7 @@ import (
func NewVirtualExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (vEe *VirtualEe, err error) {
dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
dc[utils.ExporterID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
vEe = &VirtualEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
err = vEe.init()

View File

@@ -150,7 +150,8 @@ func (cdrS *CDRServer) rateCDR(cdr *CDRWithOpts) ([]*CDR, error) {
var cdrsRated []*CDR
_, hasLastUsed := cdr.ExtraFields[utils.LastUsed]
if utils.SliceHasMember([]string{utils.META_PREPAID, utils.PREPAID}, cdr.RequestType) &&
(cdr.Usage != 0 || hasLastUsed) && cdr.CostDetails == nil { // ToDo: Get rid of PREPAID as soon as we don't want to support it backwards
(cdr.Usage != 0 || hasLastUsed) && cdr.CostDetails == nil {
// ToDo: Get rid of PREPAID as soon as we don't want to support it backwards
// Should be previously calculated and stored in DB
fib := utils.Fib()
var smCosts []*SMCost
@@ -418,7 +419,7 @@ func (cdrS *CDRServer) statSProcessEvent(cgrEv *utils.CGREventWithOpts) (err err
// eeSProcessEvent will process the event with the EEs component
func (cdrS *CDRServer) eeSProcessEvent(cgrEv *utils.CGREventWithIDs) (err error) {
var reply string
var reply map[string]map[string]interface{}
if err = cdrS.connMgr.Call(cdrS.cgrCfg.CdrsCfg().EEsConns, nil,
utils.EventExporterSv1ProcessEvent,
cgrEv, &reply); err != nil &&

View File

@@ -806,7 +806,8 @@ const (
Attempts = "Attempts"
FieldSeparator = "FieldSeparator"
ExportPath = "ExportPath"
ExportID = "ExportID"
ExporterID = "ExporterID"
ExporterIDs = "ExporterIDs"
TimeNow = "TimeNow"
ExportFileName = "ExportFileName"
GroupID = "GroupID"
@@ -856,7 +857,6 @@ const (
RouteID = "RouteID"
MetaMonthlyEstimated = "*monthly_estimated"
ProcessRuns = "ProcessRuns"
EEsVerbose = "*eesVerbose"
)
// Migrator Action
@@ -2365,6 +2365,8 @@ const (
// DispatcherS
OptsAPIKey = "*apiKey"
OptsRouteID = "*routeID"
// EEs
OptsEEsVerbose = "*eesVerbose"
// Others
OptsContext = "*context"
Subsys = "*subsys"