Update EventExporter ProcessEvent

This commit is contained in:
TeoV
2020-09-01 16:54:51 +03:00
committed by Dan Christian Bogos
parent 6fd8357d67
commit 833fcfc657
12 changed files with 62 additions and 26 deletions

View File

@@ -1841,7 +1841,7 @@ func (apierSv1 *APIerSv1) ExportToFolder(arg *utils.ArgExportToFolder, reply *st
return nil
}
func (apierSv1 *APIerSv1) ExportCDRs(args *utils.ArgExportCDRs, reply *string) error {
func (apierSv1 *APIerSv1) ExportCDRs(args *utils.ArgExportCDRs, reply *map[string]utils.MapStorage) error {
if len(apierSv1.Config.ApierCfg().EEsConns) == 0 {
return utils.NewErrNotConnected(utils.EEs)
}

View File

@@ -38,6 +38,6 @@ func (eSv1 *EventExporterSv1) Ping(ign *utils.CGREventWithOpts, reply *string) e
// ProcessEvent triggers exports on EEs side
func (eSv1 *EventExporterSv1) ProcessEvent(args *utils.CGREventWithIDs,
reply *string) error {
reply *map[string]utils.MapStorage) error {
return eSv1.eeS.V1ProcessEvent(args, reply)
}

View File

@@ -143,30 +143,30 @@ func testEEsAddCDRs(t *testing.T) {
storedCdrs := []*engine.CDR{
{CGRID: "Cdr1",
OrderID: 1, ToR: utils.VOICE, OriginID: "OriginCDR1", OriginHost: "192.168.1.1", Source: "test",
RequestType: utils.META_RATED, Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(),
AnswerTime: time.Now(), RunID: utils.MetaDefault, Usage: time.Duration(10) * time.Second,
RequestType: utils.META_NONE, Tenant: "cgrates.org",
Category: "call", Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2018, 10, 4, 15, 3, 10, 0, time.UTC),
AnswerTime: time.Date(2018, 10, 4, 15, 3, 10, 0, time.UTC), RunID: utils.MetaDefault, Usage: time.Duration(10) * time.Second,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
},
{CGRID: "Cdr2",
OrderID: 2, ToR: utils.VOICE, OriginID: "OriginCDR2", OriginHost: "192.168.1.1", Source: "test2",
RequestType: utils.META_RATED, Tenant: "cgrates.org", Category: "call",
Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(),
AnswerTime: time.Now(), RunID: utils.MetaDefault, Usage: time.Duration(5) * time.Second,
RequestType: utils.META_NONE, Tenant: "cgrates.org", Category: "call",
Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2018, 10, 4, 15, 3, 10, 0, time.UTC),
AnswerTime: time.Date(2018, 10, 4, 15, 3, 10, 0, time.UTC), RunID: utils.MetaDefault, Usage: time.Duration(5) * time.Second,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
},
{CGRID: "Cdr3",
OrderID: 3, ToR: utils.VOICE, OriginID: "OriginCDR3", OriginHost: "192.168.1.1", Source: "test2",
RequestType: utils.META_RATED, Tenant: "cgrates.org", Category: "call",
Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(),
AnswerTime: time.Now(), RunID: utils.MetaDefault, Usage: time.Duration(30) * time.Second,
RequestType: utils.META_NONE, Tenant: "cgrates.org", Category: "call",
Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2018, 10, 4, 15, 3, 10, 0, time.UTC),
AnswerTime: time.Date(2018, 10, 4, 15, 3, 10, 0, time.UTC), RunID: utils.MetaDefault, Usage: time.Duration(30) * time.Second,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
},
{CGRID: "Cdr4",
OrderID: 4, ToR: utils.VOICE, OriginID: "OriginCDR4", OriginHost: "192.168.1.1", Source: "test3",
RequestType: utils.META_RATED, Tenant: "cgrates.org", Category: "call",
Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Now(),
AnswerTime: time.Time{}, RunID: utils.MetaDefault, Usage: time.Duration(0) * time.Second,
RequestType: utils.META_NONE, Tenant: "cgrates.org", Category: "call",
Account: "1001", Subject: "1001", Destination: "+4986517174963", SetupTime: time.Date(2018, 10, 4, 15, 3, 10, 0, time.UTC),
AnswerTime: time.Date(2018, 10, 4, 15, 3, 10, 0, time.UTC), RunID: utils.MetaDefault, Usage: time.Duration(0) * time.Second,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
},
}
@@ -185,11 +185,20 @@ func testEEsExportCDRs(t *testing.T) {
attr := &utils.ArgExportCDRs{
ExporterIDs: []string{"CSVExporter"},
}
var rply string
var rply map[string]utils.MapStorage
if err := eeSRPC.Call(utils.APIerSv1ExportCDRs, &attr, &rply); err != nil {
t.Error("Unexpected error: ", err.Error())
}
time.Sleep(1 * time.Second)
time.Sleep(time.Second)
if rply["CSVExporter"]["FirstExpOrderID"] != 1.0 {
t.Errorf("Expected %+v, received: %+v", 1.0, rply["CSVExporter"]["FirstExpOrderID"])
} else if rply["CSVExporter"]["LastExpOrderID"] != 4.0 {
t.Errorf("Expected %+v, received: %+v", 4.0, rply["CSVExporter"]["LastExpOrderID"])
} else if rply["CSVExporter"]["NumberOfEvents"] != 4.0 {
t.Errorf("Expected %+v, received: %+v", 4.0, rply["CSVExporter"]["NumberOfEvents"])
} else if rply["CSVExporter"]["TotalCost"] != 4.04 {
t.Errorf("Expected %+v, received: %+v", 4.04, rply["CSVExporter"]["TotalCost"])
}
}
func testEEsVerifyExports(t *testing.T) {
@@ -206,10 +215,10 @@ func testEEsVerifyExports(t *testing.T) {
if len(files) != 1 {
t.Errorf("Expected %+v, received: %+v", 1, len(files))
}
eCnt := "Cdr3,*raw,*voice,OriginCDR3,*rated,cgrates.org,call,1001,1001,+4986517174963,2020-08-30T14:40:32+03:00,2020-08-30T14:40:32+03:00,30s,-1\n" +
"Cdr4,*raw,*voice,OriginCDR4,*rated,cgrates.org,call,1001,1001,+4986517174963,2020-08-30T14:40:32+03:00,0001-01-01T00:00:00Z,0s,0\n" +
"Cdr1,*raw,*voice,OriginCDR1,*rated,cgrates.org,call,1001,1001,+4986517174963,2020-08-30T14:40:32+03:00,2020-08-30T14:40:32+03:00,10s,-1\n" +
"Cdr2,*raw,*voice,OriginCDR2,*rated,cgrates.org,call,1001,1001,+4986517174963,2020-08-30T14:40:32+03:00,2020-08-30T14:40:32+03:00,5s,-1\n"
eCnt := "Cdr1,*raw,*voice,OriginCDR1,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T18:03:10+03:00,2018-10-04T18:03:10+03:00,10000000000,1.01\n" +
"Cdr2,*raw,*voice,OriginCDR2,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T18:03:10+03:00,2018-10-04T18:03:10+03:00,5000000000,1.01\n" +
"Cdr3,*raw,*voice,OriginCDR3,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T18:03:10+03:00,2018-10-04T18:03:10+03:00,30000000000,1.01\n" +
"Cdr4,*raw,*voice,OriginCDR4,*none,cgrates.org,call,1001,1001,+4986517174963,2018-10-04T18:03:10+03:00,2018-10-04T18:03:10+03:00,0,1.01\n"
if outContent1, err := ioutil.ReadFile(files[0]); err != nil {
t.Error(err)
} else if len(eCnt) != len(string(outContent1)) {

View File

@@ -67,6 +67,7 @@
"tenant": "cgrates.org",
"flags": ["*attributes"],
"attribute_context": "customContext",
"synchronous": true,
"attempts": 1,
"field_separator": ",",
"fields":[
@@ -93,7 +94,7 @@
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
"ees_conns": ["*internal"],
"ees_conns": ["*localhost"],
},

View File

@@ -71,6 +71,7 @@
"tenant": "cgrates.org",
"flags": ["*attributes"],
"attribute_context": "customContext",
"synchronous": true,
"attempts": 1,
"field_separator": ",",
"fields":[
@@ -97,7 +98,7 @@
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
"ees_conns": ["*internal"],
"ees_conns": ["*localhost"],
},

View File

@@ -69,6 +69,7 @@
"tenant": "cgrates.org",
"flags": ["*attributes"],
"attribute_context": "customContext",
"synchronous": true,
"attempts": 1,
"field_separator": ",",
"fields":[
@@ -95,7 +96,7 @@
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
"ees_conns": ["*internal"],
"ees_conns": ["*localhost"],
},

View File

@@ -153,6 +153,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma
var withErr bool
var metricMapLock sync.RWMutex
metricsMap := make(map[string]utils.MapStorage)
isVerbose := cgrEv.HasField(utils.EEsVerbose)
for cfgIdx, eeCfg := range eeS.cfg.EEsNoLksCfg().Exporters {
if eeCfg.Type == utils.META_NONE || // ignore *none type exporter
(lenExpIDs != 0 && !expIDs.Has(eeCfg.ID)) {
@@ -206,7 +207,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma
if eeCfg.Synchronous {
wg.Add(1) // wait for synchronous or file ones since these need to be done before continuing
}
go func(evict, sync bool, ee EventExporter) {
go func(evict, sync bool, ee EventExporter, eeCfg *config.EventExporterCfg) {
if err := ee.ExportEvent(cgrEv.CGREvent); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> with id <%s>, error: <%s>",
@@ -219,14 +220,21 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma
metricMapLock.Lock()
metricsMap[ee.ID()] = ee.GetMetrics()
metricMapLock.Unlock()
if isVerbose && !eeCfg.Synchronous {
utils.Logger.Warning(
fmt.Sprintf("<%s> with id <%s>, running verbosed export with syncronous false",
utils.EventExporterS, ee.ID()))
withErr = true
}
if sync {
wg.Done()
}
}(!hasCache, eeCfg.Synchronous, ee)
}(!hasCache, eeCfg.Synchronous, ee, eeCfg)
}
wg.Wait()
if withErr {
err = utils.ErrPartiallyExecuted
return
}
*rply = make(map[string]utils.MapStorage)
@@ -235,7 +243,6 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *ma
(*rply)[k] = v
}
metricMapLock.Unlock()
return
}

View File

@@ -168,3 +168,7 @@ func (fFwv *FileFWVee) composeTrailer() (err error) {
}
return
}
func (fFwv *FileFWVee) GetMetrics() utils.MapStorage {
return fFwv.dc.Clone()
}

View File

@@ -141,3 +141,7 @@ func (httpJson *HTTPJsonMapEe) post(body []byte, key string) (err error) {
}
return
}
func (httpJson *HTTPJsonMapEe) GetMetrics() utils.MapStorage {
return httpJson.dc.Clone()
}

View File

@@ -109,3 +109,7 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}
return
}
func (httpPost *HTTPPost) GetMetrics() utils.MapStorage {
return httpPost.dc.Clone()
}

View File

@@ -85,3 +85,7 @@ func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
updateEEMetrics(vEe.dc, cgrEv.Event, vEe.cgrCfg.GeneralCfg().DefaultTimezone)
return
}
func (vEe *VirtualEe) GetMetrics() utils.MapStorage {
return vEe.dc.Clone()
}

View File

@@ -856,6 +856,7 @@ const (
RouteID = "RouteID"
MetaMonthlyEstimated = "*monthly_estimated"
ProcessRuns = "ProcessRuns"
EEsVerbose = "*eesVerbose"
)
// Migrator Action