mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
Added support for empty fields in ees when exporting the event in json format
This commit is contained in:
committed by
Dan Christian Bogos
parent
a24f312727
commit
45204cec17
@@ -375,6 +375,17 @@
|
||||
{"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"},
|
||||
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "HTTPJsonMapExporterWithNoFields",
|
||||
"type": "*http_json_map",
|
||||
"export_path": "http://127.0.0.1:12081/event_json_map_http",
|
||||
"tenant": "cgrates.org",
|
||||
"flags": ["*attributes"],
|
||||
"attribute_context": "customContext",
|
||||
"attempts": 1,
|
||||
"filters": ["*string:~*req.ExporterUsed:HTTPJsonMapExporterWithNoFields"],
|
||||
"fields":[]
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
@@ -135,34 +135,35 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
}()
|
||||
eEe.dc[utils.NumberOfEvents] = eEe.dc[utils.NumberOfEvents].(int64) + 1
|
||||
|
||||
req := utils.MapStorage{}
|
||||
for k, v := range cgrEv.Event {
|
||||
req[k] = v
|
||||
}
|
||||
valMp := make(map[string]string)
|
||||
eeReq := NewEventExporterRequest(req, eEe.dc,
|
||||
eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Tenant,
|
||||
eEe.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Timezone,
|
||||
eEe.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
eEe.filterS)
|
||||
if err = eeReq.SetFields(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].ContentFields()); err != nil {
|
||||
return
|
||||
}
|
||||
for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() {
|
||||
var nmIt utils.NMInterface
|
||||
if nmIt, err = eeReq.cnt.Field(el.Value); err != nil {
|
||||
valMp := make(map[string]interface{})
|
||||
if len(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].ContentFields()) == 0 {
|
||||
valMp = cgrEv.Event
|
||||
} else {
|
||||
req := utils.MapStorage(cgrEv.Event)
|
||||
eeReq := NewEventExporterRequest(req, eEe.dc,
|
||||
eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Tenant,
|
||||
eEe.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Timezone,
|
||||
eEe.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
eEe.filterS)
|
||||
if err = eeReq.SetFields(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].ContentFields()); err != nil {
|
||||
return
|
||||
}
|
||||
itm, isNMItem := nmIt.(*config.NMItem)
|
||||
if !isNMItem {
|
||||
err = fmt.Errorf("cannot encode reply value: %s, err: not NMItems", utils.ToJSON(el.Value))
|
||||
return
|
||||
for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() {
|
||||
var nmIt utils.NMInterface
|
||||
if nmIt, err = eeReq.cnt.Field(el.Value); err != nil {
|
||||
return
|
||||
}
|
||||
itm, isNMItem := nmIt.(*config.NMItem)
|
||||
if !isNMItem {
|
||||
err = fmt.Errorf("cannot encode reply value: %s, err: not NMItems", utils.ToJSON(el.Value))
|
||||
return
|
||||
}
|
||||
if itm == nil {
|
||||
continue // all attributes, not writable to diameter packet
|
||||
}
|
||||
valMp[strings.Join(itm.Path, utils.NestingSep)] = utils.IfaceAsString(itm.Data)
|
||||
}
|
||||
if itm == nil {
|
||||
continue // all attributes, not writable to diameter packet
|
||||
}
|
||||
valMp[strings.Join(itm.Path, utils.NestingSep)] = utils.IfaceAsString(itm.Data)
|
||||
}
|
||||
updateEEMetrics(eEe.dc, cgrEv.Event, utils.FirstNonEmpty(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Timezone,
|
||||
eEe.cgrCfg.GeneralCfg().DefaultTimezone))
|
||||
|
||||
@@ -104,10 +104,7 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
fCsv.dc[utils.NumberOfEvents] = fCsv.dc[utils.NumberOfEvents].(int64) + 1
|
||||
|
||||
var csvRecord []string
|
||||
req := utils.MapStorage{}
|
||||
for k, v := range cgrEv.Event {
|
||||
req[k] = v
|
||||
}
|
||||
req := utils.MapStorage(cgrEv.Event)
|
||||
eeReq := NewEventExporterRequest(req, fCsv.dc, fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Tenant,
|
||||
fCsv.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone,
|
||||
|
||||
@@ -94,10 +94,7 @@ func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
}()
|
||||
fFwv.dc[utils.NumberOfEvents] = fFwv.dc[utils.NumberOfEvents].(int64) + 1
|
||||
var records []string
|
||||
req := utils.MapStorage{}
|
||||
for k, v := range cgrEv.Event {
|
||||
req[k] = v
|
||||
}
|
||||
req := utils.MapStorage(cgrEv.Event)
|
||||
eeReq := NewEventExporterRequest(req, fFwv.dc,
|
||||
fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Tenant,
|
||||
fFwv.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
|
||||
@@ -98,30 +98,34 @@ func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
|
||||
pstrEE.dc[utils.NumberOfEvents] = pstrEE.dc[utils.NumberOfEvents].(int64) + 1
|
||||
|
||||
valMp := make(map[string]string)
|
||||
eeReq := NewEventExporterRequest(utils.MapStorage(cgrEv.Event), pstrEE.dc,
|
||||
pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Tenant,
|
||||
pstrEE.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Timezone,
|
||||
pstrEE.cgrCfg.GeneralCfg().DefaultTimezone), pstrEE.filterS)
|
||||
valMp := make(map[string]interface{})
|
||||
if len(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ContentFields()) == 0 {
|
||||
valMp = cgrEv.Event
|
||||
} else {
|
||||
eeReq := NewEventExporterRequest(utils.MapStorage(cgrEv.Event), pstrEE.dc,
|
||||
pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Tenant,
|
||||
pstrEE.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Timezone,
|
||||
pstrEE.cgrCfg.GeneralCfg().DefaultTimezone), pstrEE.filterS)
|
||||
|
||||
if err = eeReq.SetFields(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ContentFields()); err != nil {
|
||||
return
|
||||
}
|
||||
for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() {
|
||||
var nmIt utils.NMInterface
|
||||
if nmIt, err = eeReq.cnt.Field(el.Value); err != nil {
|
||||
if err = eeReq.SetFields(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ContentFields()); err != nil {
|
||||
return
|
||||
}
|
||||
itm, isNMItem := nmIt.(*config.NMItem)
|
||||
if !isNMItem {
|
||||
err = fmt.Errorf("cannot encode reply value: %s, err: not NMItems", utils.ToJSON(el.Value))
|
||||
return
|
||||
for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() {
|
||||
var nmIt utils.NMInterface
|
||||
if nmIt, err = eeReq.cnt.Field(el.Value); err != nil {
|
||||
return
|
||||
}
|
||||
itm, isNMItem := nmIt.(*config.NMItem)
|
||||
if !isNMItem {
|
||||
err = fmt.Errorf("cannot encode reply value: %s, err: not NMItems", utils.ToJSON(el.Value))
|
||||
return
|
||||
}
|
||||
if itm == nil {
|
||||
continue // all attributes, not writable to diameter packet
|
||||
}
|
||||
valMp[strings.Join(itm.Path, utils.NestingSep)] = utils.IfaceAsString(itm.Data)
|
||||
}
|
||||
if itm == nil {
|
||||
continue // all attributes, not writable to diameter packet
|
||||
}
|
||||
valMp[strings.Join(itm.Path, utils.NestingSep)] = utils.IfaceAsString(itm.Data)
|
||||
}
|
||||
updateEEMetrics(pstrEE.dc, cgrEv.Event, pstrEE.cgrCfg.GeneralCfg().DefaultTimezone)
|
||||
cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID())
|
||||
|
||||
@@ -203,6 +203,26 @@ func testHTTPJsonMapExportEvent(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
eventSMSNoFields := &utils.CGREventWithIDs{
|
||||
CGREventWithOpts: &utils.CGREventWithOpts{
|
||||
CGREvent: &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "SMSEvent",
|
||||
Time: utils.TimePointer(time.Now()),
|
||||
Event: map[string]interface{}{
|
||||
utils.CGRID: utils.Sha1("sms2", time.Unix(1383813745, 0).UTC().String()),
|
||||
utils.ToR: utils.SMS,
|
||||
utils.Tenant: "cgrates.org",
|
||||
utils.Category: "call",
|
||||
utils.Account: "1001",
|
||||
utils.Destination: "1002",
|
||||
utils.RunID: utils.MetaDefault,
|
||||
"ExporterUsed": "HTTPJsonMapExporterWithNoFields",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
var reply map[string]utils.MapStorage
|
||||
if err := httpJSONMapRpc.Call(utils.EventExporterSv1ProcessEvent, eventVoice, &reply); err != nil {
|
||||
t.Error(err)
|
||||
@@ -258,4 +278,15 @@ func testHTTPJsonMapExportEvent(t *testing.T) {
|
||||
t.Errorf("Expected %+v, received: %+v", strVal, rcv)
|
||||
}
|
||||
}
|
||||
|
||||
if err := httpJSONMapRpc.Call(utils.EventExporterSv1ProcessEvent, eventSMSNoFields, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
// verify HTTPJsonMap for eventSMS
|
||||
for key, strVal := range eventSMSNoFields.Event {
|
||||
if rcv := httpJsonMap[key]; rcv != strVal {
|
||||
t.Errorf("Expected %q, received: %q", strVal, rcv)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,10 +74,7 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
httpPost.dc[utils.NumberOfEvents] = httpPost.dc[utils.NumberOfEvents].(int64) + 1
|
||||
|
||||
urlVals := url.Values{}
|
||||
req := utils.MapStorage{}
|
||||
for k, v := range cgrEv.Event {
|
||||
req[k] = v
|
||||
}
|
||||
req := utils.MapStorage(cgrEv.Event)
|
||||
eeReq := NewEventExporterRequest(req, httpPost.dc,
|
||||
httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Tenant,
|
||||
httpPost.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
|
||||
@@ -72,10 +72,7 @@ func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
}()
|
||||
vEe.dc[utils.NumberOfEvents] = vEe.dc[utils.NumberOfEvents].(int64) + 1
|
||||
|
||||
req := utils.MapStorage{}
|
||||
for k, v := range cgrEv.Event {
|
||||
req[k] = v
|
||||
}
|
||||
req := utils.MapStorage(cgrEv.Event)
|
||||
eeReq := NewEventExporterRequest(req, vEe.dc,
|
||||
vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].Tenant,
|
||||
vEe.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
|
||||
@@ -195,6 +195,9 @@ results+=($?)
|
||||
|
||||
fi
|
||||
|
||||
echo "go test github.com/cgrates/cgrates/ees -tags=integration"
|
||||
go test github.com/cgrates/cgrates/ees -tags=integration
|
||||
results+=($?)
|
||||
echo 'go test github.com/cgrates/cgrates/config -tags=integration'
|
||||
go test github.com/cgrates/cgrates/config -tags=integration
|
||||
results+=($?)
|
||||
|
||||
@@ -105,7 +105,8 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium
|
||||
* [FilterS] Added *apiban filter
|
||||
* [EEs] Add support for *elastic exporter
|
||||
* [AttributeS] Add support for adding fields from other places that event (e.g. Resource.TotalUsage, Stat.MetricName, Account.Balance)
|
||||
|
||||
* [EEs] Empty fields in exporter config will export the full event for the exporters that use json format
|
||||
|
||||
-- DanB <danb@cgrates.org> Wed, 19 Feb 2020 13:25:52 +0200
|
||||
|
||||
cgrates (0.10.0) UNRELEASED; urgency=medium
|
||||
|
||||
Reference in New Issue
Block a user