From 8c44ac18095ecd192f69b22d6d00b8f3296c3939 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 23 Jul 2020 11:47:55 +0300 Subject: [PATCH] Updated poster integration tests --- .../samples/cdrsv1processevent/cgrates.json | 4 +- .../cdrsv1processeventmongo/cgrates.json | 4 +- .../cdrsv1processeventmysql/cgrates.json | 4 +- ees/ees.go | 14 ++--- ees/httpjsonmap.go | 55 ++++++++++--------- engine/cdrs.go | 4 +- general_tests/cdrs_post_failover_it_test.go | 2 +- general_tests/cdrs_processevent_it_test.go | 7 +-- 8 files changed, 45 insertions(+), 49 deletions(-) diff --git a/data/conf/samples/cdrsv1processevent/cgrates.json b/data/conf/samples/cdrsv1processevent/cgrates.json index 3b1f334e3..9145991b3 100644 --- a/data/conf/samples/cdrsv1processevent/cgrates.json +++ b/data/conf/samples/cdrsv1processevent/cgrates.json @@ -33,7 +33,7 @@ "attributes_conns": ["*internal"], "exporters": [ { - "id": "aws_test_file", + "id": "amqp_test_file", "type": "*amqp_json_map", "export_path": "amqps://guest:guest@localhost:256733/", "tenant": "cgrates.org", @@ -75,7 +75,7 @@ "chargers_conns": ["*internal"], "stats_conns": ["*internal"], "thresholds_conns": ["*internal"], - "online_cdr_exports": ["aws_test_file"], + "online_cdr_exports": ["amqp_test_file"], "ees_conns": ["*localhost"] }, diff --git a/data/conf/samples/cdrsv1processeventmongo/cgrates.json b/data/conf/samples/cdrsv1processeventmongo/cgrates.json index 7a3046f30..b4a30fb86 100644 --- a/data/conf/samples/cdrsv1processeventmongo/cgrates.json +++ b/data/conf/samples/cdrsv1processeventmongo/cgrates.json @@ -36,7 +36,7 @@ "attributes_conns": ["*internal"], "exporters": [ { - "id": "aws_test_file", + "id": "amqp_test_file", "type": "*amqp_json_map", "export_path": "amqps://guest:guest@localhost:256733/", "tenant": "cgrates.org", @@ -76,7 +76,7 @@ "chargers_conns": ["*internal"], "stats_conns": ["*internal"], "thresholds_conns": ["*internal"], - "online_cdr_exports": ["aws_test_file"], + "online_cdr_exports": ["amqp_test_file"], "ees_conns": ["*localhost"] }, diff --git a/data/conf/samples/cdrsv1processeventmysql/cgrates.json b/data/conf/samples/cdrsv1processeventmysql/cgrates.json index d7cc30024..9d1370568 100644 --- a/data/conf/samples/cdrsv1processeventmysql/cgrates.json +++ b/data/conf/samples/cdrsv1processeventmysql/cgrates.json @@ -33,7 +33,7 @@ "attributes_conns": ["*internal"], "exporters": [ { - "id": "aws_test_file", + "id": "amqp_test_file", "type": "*amqp_json_map", "export_path": "amqps://guest:guest@localhost:256733/", "tenant": "cgrates.org", @@ -74,7 +74,7 @@ "chargers_conns": ["*internal"], "stats_conns": ["*internal"], "thresholds_conns": ["*internal"], - "online_cdr_exports": ["aws_test_file"], + "online_cdr_exports": ["amqp_test_file"], "ees_conns": ["*localhost"] }, diff --git a/ees/ees.go b/ees/ees.go index 810a9d0fc..4c32dffdd 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -132,21 +132,19 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithIDs, rply *st eeS.cfg.RLocks(config.EEsJson) defer eeS.cfg.RUnlocks(config.EEsJson) + expIDs := utils.NewStringSet(cgrEv.IDs) + lenExpIDs := expIDs.Size() + cgrDp := utils.MapStorage{utils.MetaReq: cgrEv.Event} + var wg sync.WaitGroup var withErr bool for cfgIdx, eeCfg := range eeS.cfg.EEsNoLksCfg().Exporters { - if eeCfg.Type == utils.META_NONE { // ignore *none type exporter + if eeCfg.Type == utils.META_NONE || // ignore *none type exporter + (lenExpIDs != 0 && !expIDs.Has(eeCfg.ID)) { continue } - if len(cgrEv.IDs) != 0 { - if !utils.IsSliceMember(cgrEv.IDs, eeCfg.ID) { - continue - } - } - if len(eeCfg.Filters) != 0 { - cgrDp := utils.MapStorage{utils.MetaReq: cgrEv.Event} tnt := cgrEv.Tenant if eeTnt, errTnt := eeCfg.Tenant.ParseDataProvider(cgrDp); errTnt == nil && eeTnt != utils.EmptyString { tnt = eeTnt diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index 4abd87bc4..a809689dd 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -31,12 +31,12 @@ import ( ) func NewHTTPJsonMapEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, - dc utils.MapStorage) (httpJson *HTTPJsonMapEe, err error) { + dc utils.MapStorage) (httpJSON *HTTPJsonMapEe, err error) { dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID - httpJson = &HTTPJsonMapEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, + httpJSON = &HTTPJsonMapEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc} if cgrCfg.EEsCfg().Exporters[cfgIdx].Type == utils.MetaHTTPjsonMap { - httpJson.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify, + httpJSON.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify, cgrCfg.GeneralCfg().ReplyTimeout, cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type], cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts) } @@ -44,7 +44,7 @@ func NewHTTPJsonMapEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Filt return } -// FileCSVee implements EventExporter interface for .csv files +// HTTPJsonMapEe implements EventExporter interface for .csv files type HTTPJsonMapEe struct { id string cgrCfg *config.CGRConfig @@ -61,28 +61,29 @@ func (httpJson *HTTPJsonMapEe) ID() string { } // OnEvicted implements EventExporter, doing the cleanup before exit -func (httpJson *HTTPJsonMapEe) OnEvicted(_ string, _ interface{}) { +func (httpJson *HTTPJsonMapEe) OnEvicted(string, interface{}) { return } // ExportEvent implements EventExporter func (httpJson *HTTPJsonMapEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { httpJson.Lock() - defer httpJson.Unlock() + defer func() { + if err != nil { + httpJson.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) + } else { + httpJson.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) + } + httpJson.Unlock() + }() httpJson.dc[utils.NumberOfEvents] = httpJson.dc[utils.NumberOfEvents].(int) + 1 - var body interface{} valMp := make(map[string]string) - req := utils.MapStorage{} - for k, v := range cgrEv.Event { - req[k] = v - } - eeReq := NewEventExporterRequest(req, httpJson.dc, cgrEv.Tenant, httpJson.cgrCfg.GeneralCfg().DefaultTimezone, - httpJson.filterS) + eeReq := NewEventExporterRequest(utils.MapStorage(cgrEv.Event), httpJson.dc, + cgrEv.Tenant, httpJson.cgrCfg.GeneralCfg().DefaultTimezone, httpJson.filterS) if err = eeReq.SetFields(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ContentFields()); err != nil { - httpJson.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) return } for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() { @@ -92,7 +93,8 @@ func (httpJson *HTTPJsonMapEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { } itm, isNMItem := nmIt.(*config.NMItem) if !isNMItem { - return fmt.Errorf("cannot encode reply value: %s, err: not NMItems", utils.ToJSON(el.Value)) + err = fmt.Errorf("cannot encode reply value: %s, err: not NMItems", utils.ToJSON(el.Value)) + return } if itm == nil { continue // all attributes, not writable to diameter packet @@ -134,36 +136,35 @@ func (httpJson *HTTPJsonMapEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { } } } - cgrID := utils.GenUUID() - cgrID, err = cgrEv.FieldAsString(utils.CGRID) - var runID string - runID, err = cgrEv.FieldAsString(utils.RunID) - httpJson.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) + cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID()) + runID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault) + var body []byte if body, err = json.Marshal(valMp); err != nil { return } - return httpJson.post(body, utils.ConcatenatedKey(cgrID, runID)) + err = httpJson.post(body, utils.ConcatenatedKey(cgrID, runID)) + return } -func (httpJson *HTTPJsonMapEe) post(body interface{}, key string) (err error) { +func (httpJson *HTTPJsonMapEe) post(body []byte, key string) (err error) { switch httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Type { case utils.MetaHTTPjsonMap: err = httpJson.httpPoster.Post(body, utils.EmptyString) case utils.MetaAMQPjsonMap: err = engine.PostersCache.PostAMQP(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, - httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte)) + httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body) case utils.MetaAMQPV1jsonMap: err = engine.PostersCache.PostAMQPv1(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, - httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte)) + httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body) case utils.MetaSQSjsonMap: err = engine.PostersCache.PostSQS(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, - httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte)) + httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body) case utils.MetaKafkajsonMap: err = engine.PostersCache.PostKafka(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, - httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte), key) + httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body, key) case utils.MetaS3jsonMap: err = engine.PostersCache.PostS3(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, - httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body.([]byte), key) + httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].Attempts, body, key) } if err != nil && httpJson.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE { engine.AddFailedPost(httpJson.cgrCfg.EEsCfg().Exporters[httpJson.cfgIdx].ExportPath, diff --git a/engine/cdrs.go b/engine/cdrs.go index 9efb0bed1..5146c9d47 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -569,9 +569,7 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithOpts, CGREvent: cgrEv.CGREvent, ArgDispatcher: cgrEv.ArgDispatcher, }, - } - if len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 { - evWithOpts.IDs = cdrS.cgrCfg.CdrsCfg().OnlineCDRExports + IDs: cdrS.cgrCfg.CdrsCfg().OnlineCDRExports, } if err = cdrS.eeSProcessEvent(evWithOpts); err != nil { utils.Logger.Warning( diff --git a/general_tests/cdrs_post_failover_it_test.go b/general_tests/cdrs_post_failover_it_test.go index 39928bee7..98bc6ee53 100644 --- a/general_tests/cdrs_post_failover_it_test.go +++ b/general_tests/cdrs_post_failover_it_test.go @@ -132,7 +132,7 @@ func testCDRsPostFailoverLoadTariffPlanFromFolder(t *testing.T) { func testCDRsPostFailoverProcessCDR(t *testing.T) { args := &engine.ArgV1ProcessEvent{ - Flags: []string{utils.MetaExport, "*attributes:false", "*rals:false", "*chargers:*false", + Flags: []string{utils.MetaExport, "*attributes:false", "*rals:false", "*chargers:false", "*store:false", "*thresholds:false", "*stats:false"}, // only export the CDR CGREvent: utils.CGREvent{ ID: "1", diff --git a/general_tests/cdrs_processevent_it_test.go b/general_tests/cdrs_processevent_it_test.go index 10b2b4b7b..d3a651707 100644 --- a/general_tests/cdrs_processevent_it_test.go +++ b/general_tests/cdrs_processevent_it_test.go @@ -564,10 +564,9 @@ func testV1CDRsProcessEventExport(t *testing.T) { }, }, } - if err := pecdrsRpc.Call(utils.CDRsV1ProcessEvent, args, &reply); err != nil { + if err := pecdrsRpc.Call(utils.CDRsV1ProcessEvent, args, &reply); err == nil || + err.Error() != utils.ErrPartiallyExecuted.Error() { // the export should fail as we test if the cdr is corectly writen in file t.Error("Unexpected error: ", err) - } else if reply != utils.OK { - t.Error("Unexpected reply received: ", reply) } } func testV1CDRsProcessEventExportCheck(t *testing.T) { @@ -580,7 +579,7 @@ func testV1CDRsProcessEventExportCheck(t *testing.T) { var fileName string for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config fileName = file.Name() - if strings.HasPrefix(fileName, "cdr|") { + if strings.HasPrefix(fileName, "EventExporterS|") { foundFile = true filePath := path.Join(pecdrsCfg.GeneralCfg().FailedPostsDir, fileName) ev, err := engine.NewExportEventsFromFile(filePath)