Updated poster integration tests

This commit is contained in:
Trial97
2020-07-23 11:47:55 +03:00
committed by Dan Christian Bogos
parent 9256e9065e
commit 8c44ac1809
8 changed files with 45 additions and 49 deletions

View File

@@ -33,7 +33,7 @@
"attributes_conns": ["*internal"],
"exporters": [
{
"id": "aws_test_file",
"id": "amqp_test_file",
"type": "*amqp_json_map",
"export_path": "amqps://guest:guest@localhost:256733/",
"tenant": "cgrates.org",
@@ -75,7 +75,7 @@
"chargers_conns": ["*internal"],
"stats_conns": ["*internal"],
"thresholds_conns": ["*internal"],
"online_cdr_exports": ["aws_test_file"],
"online_cdr_exports": ["amqp_test_file"],
"ees_conns": ["*localhost"]
},

View File

@@ -36,7 +36,7 @@
"attributes_conns": ["*internal"],
"exporters": [
{
"id": "aws_test_file",
"id": "amqp_test_file",
"type": "*amqp_json_map",
"export_path": "amqps://guest:guest@localhost:256733/",
"tenant": "cgrates.org",
@@ -76,7 +76,7 @@
"chargers_conns": ["*internal"],
"stats_conns": ["*internal"],
"thresholds_conns": ["*internal"],
"online_cdr_exports": ["aws_test_file"],
"online_cdr_exports": ["amqp_test_file"],
"ees_conns": ["*localhost"]
},

View File

@@ -33,7 +33,7 @@
"attributes_conns": ["*internal"],
"exporters": [
{
"id": "aws_test_file",
"id": "amqp_test_file",
"type": "*amqp_json_map",
"export_path": "amqps://guest:guest@localhost:256733/",
"tenant": "cgrates.org",
@@ -74,7 +74,7 @@
"chargers_conns": ["*internal"],
"stats_conns": ["*internal"],
"thresholds_conns": ["*internal"],
"online_cdr_exports": ["aws_test_file"],
"online_cdr_exports": ["amqp_test_file"],
"ees_conns": ["*localhost"]
},

View File

@@ -132,21 +132,19 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *st
eeS.cfg.RLocks(config.EEsJson)
defer eeS.cfg.RUnlocks(config.EEsJson)
expIDs := utils.NewStringSet(cgrEv.IDs)
lenExpIDs := expIDs.Size()
cgrDp := utils.MapStorage{utils.MetaReq: cgrEv.Event}
var wg sync.WaitGroup
var withErr bool
for cfgIdx, eeCfg := range eeS.cfg.EEsNoLksCfg().Exporters {
if eeCfg.Type == utils.META_NONE { // ignore *none type exporter
if eeCfg.Type == utils.META_NONE || // ignore *none type exporter
(lenExpIDs != 0 && !expIDs.Has(eeCfg.ID)) {
continue
}
if len(cgrEv.IDs) != 0 {
if !utils.IsSliceMember(cgrEv.IDs, eeCfg.ID) {
continue
}
}
if len(eeCfg.Filters) != 0 {
cgrDp := utils.MapStorage{utils.MetaReq: cgrEv.Event}
tnt := cgrEv.Tenant
if eeTnt, errTnt := eeCfg.Tenant.ParseDataProvider(cgrDp); errTnt == nil && eeTnt != utils.EmptyString {
tnt = eeTnt

View File

@@ -31,12 +31,12 @@ import (
)
func NewHTTPJsonMapEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (httpJson *HTTPJsonMapEe, err error) {
dc utils.MapStorage) (httpJSON *HTTPJsonMapEe, err error) {
dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
httpJson = &HTTPJsonMapEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
httpJSON = &HTTPJsonMapEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
if cgrCfg.EEsCfg().Exporters[cfgIdx].Type == utils.MetaHTTPjsonMap {
httpJson.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify,
httpJSON.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify,
cgrCfg.GeneralCfg().ReplyTimeout, cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type], cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts)
}
@@ -44,7 +44,7 @@ func NewHTTPJsonMapEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Filt
return
}
// FileCSVee implements EventExporter interface for .csv files
// HTTPJsonMapEe implements EventExporter interface for .csv files
type HTTPJsonMapEe struct {
id string
cgrCfg *config.CGRConfig
@@ -61,28 +61,29 @@ func (httpJson *HTTPJsonMapEe) ID() string {
}
// OnEvicted implements EventExporter, doing the cleanup before exit
func (httpJson *HTTPJsonMapEe) OnEvicted(_ string, _ interface{}) {
func (httpJson *HTTPJsonMapEe) OnEvicted(string, interface{}) {
return
}
// ExportEvent implements EventExporter
func (httpJson *HTTPJsonMapEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
httpJson.Lock()
defer httpJson.Unlock()
defer func() {
if err != nil {
httpJson.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
} else {
httpJson.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
}
httpJson.Unlock()
}()
httpJson.dc[utils.NumberOfEvents] = httpJson.dc[utils.NumberOfEvents].(int) + 1
var body interface{}
valMp := make(map[string]string)
req := utils.MapStorage{}
for k, v := range cgrEv.Event {
req[k] = v
}
eeReq := NewEventExporterRequest(req, httpJson.dc, cgrEv.Tenant, httpJson.cgrCfg.GeneralCfg().DefaultTimezone,
httpJson.filterS)
eeReq := NewEventExporterRequest(utils.MapStorage(cgrEv.Event), httpJson.dc,
cgrEv.Tenant, httpJson.cgrCfg.GeneralCfg().DefaultTimezone, httpJson.filterS)
if err = eeReq.SetFields(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ContentFields()); err != nil {
httpJson.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID)
return
}
for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() {
@@ -92,7 +93,8 @@ func (httpJson *HTTPJsonMapEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}
itm, isNMItem := nmIt.(*config.NMItem)
if !isNMItem {
return fmt.Errorf("cannot encode reply value: %s, err: not NMItems", utils.ToJSON(el.Value))
err = fmt.Errorf("cannot encode reply value: %s, err: not NMItems", utils.ToJSON(el.Value))
return
}
if itm == nil {
continue // all attributes, not writable to diameter packet
@@ -134,36 +136,35 @@ func (httpJson *HTTPJsonMapEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
}
}
}
cgrID := utils.GenUUID()
cgrID, err = cgrEv.FieldAsString(utils.CGRID)
var runID string
runID, err = cgrEv.FieldAsString(utils.RunID)
httpJson.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID)
cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID())
runID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault)
var body []byte
if body, err = json.Marshal(valMp); err != nil {
return
}
return httpJson.post(body, utils.ConcatenatedKey(cgrID, runID))
err = httpJson.post(body, utils.ConcatenatedKey(cgrID, runID))
return
}
func (httpJson *HTTPJsonMapEe) post(body interface{}, key string) (err error) {
func (httpJson *HTTPJsonMapEe) post(body []byte, key string) (err error) {
switch httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Type {
case utils.MetaHTTPjsonMap:
err = httpJson.httpPoster.Post(body, utils.EmptyString)
case utils.MetaAMQPjsonMap:
err = engine.PostersCache.PostAMQP(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte))
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body)
case utils.MetaAMQPV1jsonMap:
err = engine.PostersCache.PostAMQPv1(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte))
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body)
case utils.MetaSQSjsonMap:
err = engine.PostersCache.PostSQS(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte))
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body)
case utils.MetaKafkajsonMap:
err = engine.PostersCache.PostKafka(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte), key)
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body, key)
case utils.MetaS3jsonMap:
err = engine.PostersCache.PostS3(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte), key)
httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body, key)
}
if err != nil && httpJson.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE {
engine.AddFailedPost(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath,

View File

@@ -569,9 +569,7 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithOpts,
CGREvent: cgrEv.CGREvent,
ArgDispatcher: cgrEv.ArgDispatcher,
},
}
if len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 {
evWithOpts.IDs = cdrS.cgrCfg.CdrsCfg().OnlineCDRExports
IDs: cdrS.cgrCfg.CdrsCfg().OnlineCDRExports,
}
if err = cdrS.eeSProcessEvent(evWithOpts); err != nil {
utils.Logger.Warning(

View File

@@ -132,7 +132,7 @@ func testCDRsPostFailoverLoadTariffPlanFromFolder(t *testing.T) {
func testCDRsPostFailoverProcessCDR(t *testing.T) {
args := &engine.ArgV1ProcessEvent{
Flags: []string{utils.MetaExport, "*attributes:false", "*rals:false", "*chargers:*false",
Flags: []string{utils.MetaExport, "*attributes:false", "*rals:false", "*chargers:false",
"*store:false", "*thresholds:false", "*stats:false"}, // only export the CDR
CGREvent: utils.CGREvent{
ID: "1",

View File

@@ -564,10 +564,9 @@ func testV1CDRsProcessEventExport(t *testing.T) {
},
},
}
if err := pecdrsRpc.Call(utils.CDRsV1ProcessEvent, args, &reply); err != nil {
if err := pecdrsRpc.Call(utils.CDRsV1ProcessEvent, args, &reply); err == nil ||
err.Error() != utils.ErrPartiallyExecuted.Error() { // the export should fail as we test if the cdr is corectly writen in file
t.Error("Unexpected error: ", err)
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
}
func testV1CDRsProcessEventExportCheck(t *testing.T) {
@@ -580,7 +579,7 @@ func testV1CDRsProcessEventExportCheck(t *testing.T) {
var fileName string
for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config
fileName = file.Name()
if strings.HasPrefix(fileName, "cdr|") {
if strings.HasPrefix(fileName, "EventExporterS|") {
foundFile = true
filePath := path.Join(pecdrsCfg.GeneralCfg().FailedPostsDir, fileName)
ev, err := engine.NewExportEventsFromFile(filePath)