From 2e6e553f3fdfd0005443aa9dc58ac3d09e0cae54 Mon Sep 17 00:00:00 2001 From: porosnicuadrian Date: Fri, 26 Nov 2021 17:46:40 +0200 Subject: [PATCH] Added new api for exporter -ArchiveEventsAsReply + test --- apis/ees.go | 4 + config/configsanity.go | 6 ++ data/conf/samples/ees/cgrates.json | 49 +++++++++- data/conf/samples/tutmysql/cgrates.json | 1 - ees/ee.go | 5 +- ees/ee_test.go | 9 +- ees/ees.go | 79 ++++++++++++++++ ees/filecsv.go | 27 ++++-- ees/filecsv_it_test.go | 118 +++++++++++++++++++++++- ees/filecsv_test.go | 10 +- ees/filefwv.go | 30 +++--- ees/filefwv_it_test.go | 3 +- ees/filefwv_test.go | 16 ++-- utils/apitpdata_test.go | 2 +- utils/consts.go | 9 +- 15 files changed, 319 insertions(+), 49 deletions(-) diff --git a/apis/ees.go b/apis/ees.go index 6e0efcd70..740db08e4 100644 --- a/apis/ees.go +++ b/apis/ees.go @@ -36,3 +36,7 @@ type EeSv1 struct { func (cS *EeSv1) ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEeIDs, rply *map[string]map[string]interface{}) error { return cS.ees.V1ProcessEvent(ctx, cgrEv, rply) } + +func (cS *EeSv1) ArchiveEventsAsReply(ctx *context.Context, args *ees.ArchiveEventsArgs, reply *[]byte) error { + return cS.ees.V1ArchiveEventsAsReply(ctx, args, reply) +} diff --git a/config/configsanity.go b/config/configsanity.go index 3387b4689..c39d90895 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -767,6 +767,9 @@ func (cfg *CGRConfig) checkConfigSanity() error { switch exp.Type { case utils.MetaFileCSV: for _, dir := range []string{exp.ExportPath} { + if dir == utils.MetaBuffer { + break + } if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { return fmt.Errorf("<%s> nonexistent folder: %s for exporter with ID: %s", utils.EEs, dir, exp.ID) } @@ -776,6 +779,9 @@ func (cfg *CGRConfig) checkConfigSanity() error { } case utils.MetaFileFWV: for _, dir := range []string{exp.ExportPath} { + if dir == utils.MetaBuffer { + break + } if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { return fmt.Errorf("<%s> nonexistent folder: %s for exporter with ID: %s", utils.EEs, dir, exp.ID) } diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index 09b632324..a33f2ce47 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -70,7 +70,7 @@ "attribute_context": "customContext", "attempts": 1, "field_separator": ",", - }, + }, { "id": "CSVExporterComposed", "type": "*fileCSV", @@ -119,6 +119,53 @@ {"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}"}, ], }, + { + "id": "CSVExporterBuffered", + "type": "*fileCSV", + "export_path": "*buffer", + "attempts": 1, + "field_separator": ",", + "synchronous": true, + "fields":[ + {"tag": "Number", "path": "*hdr.Number", "type": "*constant", "value": "NumberOfEvent"}, + {"tag": "CGRID", "path": "*hdr.CGRID", "type": "*constant", "value": "CGRID"}, + {"tag": "RunID", "path": "*hdr.RunID", "type": "*constant", "value": "RunID"}, + {"tag": "ToR", "path": "*hdr.ToR", "type": "*constant", "value": "ToR"}, + {"tag": "OriginID", "path": "*hdr.OriginID", "type": "*constant", "value": "OriginID"}, + {"tag": "RequestType", "path": "*hdr.RequestType", "type": "*constant", "value": "RequestType"}, + {"tag": "Tenant", "path": "*hdr.Tenant", "type": "*constant", "value": "Tenant"}, + {"tag": "Category", "path": "*hdr.Category", "type": "*constant", "value": "Category"}, + {"tag": "Account", "path": "*hdr.Account", "type": "*constant", "value": "Account"}, + {"tag": "Subject", "path": "*hdr.Subject", "type": "*constant", "value": "Subject"}, + {"tag": "Destination", "path": "*hdr.Destination", "type": "*constant", "value": "Destination"}, + {"tag": "SetupTime", "path": "*hdr.SetupTime", "type": "*constant", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "AnswerTime", "path": "*hdr.AnswerTime", "type": "*constant", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "Usage", "path": "*hdr.Usage", "type": "*constant", "value": "Usage"}, + {"tag": "Cost", "path": "*hdr.Cost", "type": "*constant", "value": "Cost"}, + + {"tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*dc.NumberOfEvents"}, + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID1", "path": "*exp.OriginID", "type": "*composed", "value": "~*req.ComposedOriginID1"}, + {"tag": "OriginID2", "path": "*exp.OriginID", "type": "*composed", "value": "~*req.ComposedOriginID2"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, + + {"tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*dc.NumberOfEvents"}, + {"tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*dc.TotalDuration"}, + {"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*dc.TotalSMSUsage"}, + {"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}"}, + ], + }, { "id": "FwvExporter", "type": "*fileFWV", diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index bff919923..45994b4ec 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -38,7 +38,6 @@ "accounts_conns": ["*localhost"] }, - "chargers": { "enabled": true, "attributes_conns": ["*internal"], diff --git a/ees/ee.go b/ees/ee.go index d561eeb55..8fb08515b 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -51,9 +51,9 @@ func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, } switch cfg.Type { case utils.MetaFileCSV: - return NewFileCSVee(cfg, cgrCfg, filterS, dc) + return NewFileCSVee(cfg, cgrCfg, filterS, dc, nil) case utils.MetaFileFWV: - return NewFileFWVee(cfg, cgrCfg, filterS, dc) + return NewFileFWVee(cfg, cgrCfg, filterS, dc, nil) case utils.MetaHTTPPost: return NewHTTPPostEE(cfg, cgrCfg, filterS, dc) case utils.MetaHTTPjsonMap: @@ -241,6 +241,7 @@ func (slicePreparing) PrepareMap(mp *utils.CGREvent) (interface{}, error) { } return csvRecord, nil } + func (slicePreparing) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) { return mp.OrderedFieldsAsStrings(), nil } diff --git a/ees/ee_test.go b/ees/ee_test.go index cf881fbbf..a9ff1a321 100644 --- a/ees/ee_test.go +++ b/ees/ee_test.go @@ -19,6 +19,7 @@ package ees import ( "fmt" + "io" "reflect" "strings" "testing" @@ -45,11 +46,11 @@ func TestNewEventExporter(t *testing.T) { if err != nil { t.Error(err) } - eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc) + eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc, nil) if strings.Contains(errExpect, err.Error()) { t.Errorf("Expected %+v but got %+v", errExpect, err) } - err = eeExpect.init() + err = eeExpect.init(nil) if err == nil { t.Error("\nExpected an error") } @@ -82,11 +83,11 @@ func TestNewEventExporterCase2(t *testing.T) { if err != nil { t.Error(err) } - eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc) + eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc, io.Discard) if strings.Contains(errExpect, err.Error()) { t.Errorf("Expected %+v but got %+v", errExpect, err) } - err = eeExpect.init() + err = eeExpect.init(io.Discard) if err == nil { t.Error("\nExpected an error") } diff --git a/ees/ees.go b/ees/ees.go index 69a48ae5b..4d3537cff 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -19,7 +19,10 @@ along with this program. If not, see package ees import ( + "archive/zip" + "bytes" "fmt" + "io" "sync" "time" @@ -319,3 +322,79 @@ func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{} } return } + +type ArchiveEventsArgs struct { + Tenant string + APIOpts map[string]interface{} + Events []*utils.CGREvent +} + +// V1ArchiveEventsAsReply should archive the events sent with existing exporters. The zipped content should be returned back as a reply. +func (eeS *EeS) V1ArchiveEventsAsReply(ctx *context.Context, args *ArchiveEventsArgs, reply *[]byte) (err error) { + if args.Tenant == utils.EmptyString { + args.Tenant = eeS.cfg.GeneralCfg().DefaultTenant + } + expID, has := args.APIOpts[utils.MetaExporterID] + if !has { + return fmt.Errorf("ExporterID is missing from argument's options: <%v>", utils.ToJSON(args)) + } + //var validExporter bool + var eesCfg *config.EventExporterCfg + for _, exporter := range eeS.cfg.EEsCfg().Exporters { + if exporter.ID == expID { + eesCfg = exporter + break + } + } + if eesCfg == nil { + return fmt.Errorf("exporter config with ID: <%v> is missing", expID) + } + if !eesCfg.Synchronous { + return fmt.Errorf("exporter with ID: <%v> is not synchronous", expID) + } + if eesCfg.ExportPath != utils.MetaBuffer { + return fmt.Errorf("exporter with ID: <%v> has an invalid ExportPath for archiving", expID) + } + var dc *utils.SafeMapStorage + if dc, err = newEEMetrics(utils.FirstNonEmpty( + eesCfg.Timezone, + eeS.cfg.GeneralCfg().DefaultTimezone)); err != nil { + return + } + var ee EventExporter + + buff := new(bytes.Buffer) + zBuff := zip.NewWriter(buff) + // + var wrtr io.Writer + if wrtr, err = zBuff.Create("events.csv"); err != nil { + return err + } + switch eesCfg.Type { + case utils.MetaFileCSV: + ee, err = NewFileCSVee(eesCfg, eeS.cfg, eeS.fltrS, dc, &buffer{buff}) + case utils.MetaFileFWV: + ee, err = NewFileFWVee(eesCfg, eeS.cfg, eeS.fltrS, dc, wrtr) + default: + err = fmt.Errorf("unsupported exporter type: <%s>", eesCfg.Type) + } + if err != nil { + return err + } + for _, event := range args.Events { + if err := exportEventWithExporter(ctx, ee, event, false, eeS.cfg, eeS.fltrS); err != nil { + return err + } + } + if err = ee.Close(); err != nil { + return err + } + + *reply = buff.Bytes() + if err = zBuff.Close(); err != nil { + return err + } + + buff.Reset() + return +} diff --git a/ees/filecsv.go b/ees/filecsv.go index ed2d76c80..64cdd4987 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -35,15 +35,15 @@ import ( func NewFileCSVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, - dc *utils.SafeMapStorage) (fCsv *FileCSVee, err error) { + dc *utils.SafeMapStorage, wrtr io.WriteCloser) (fCsv *FileCSVee, err error) { fCsv = &FileCSVee{ cfg: cfg, dc: dc, - + //wrtr: wrtr, cgrCfg: cgrCfg, filterS: filterS, } - err = fCsv.init() + err = fCsv.init(wrtr) return } @@ -51,7 +51,7 @@ func NewFileCSVee(cfg *config.EventExporterCfg, type FileCSVee struct { cfg *config.EventExporterCfg dc *utils.SafeMapStorage - file io.WriteCloser + wrtr io.WriteCloser // writer for the csv csvWriter *csv.Writer sync.Mutex slicePreparing @@ -60,7 +60,7 @@ type FileCSVee struct { filterS *engine.FilterS } -func (fCsv *FileCSVee) init() (err error) { +func (fCsv *FileCSVee) init(wrtr io.WriteCloser) (err error) { fCsv.Lock() defer fCsv.Unlock() // create the file @@ -69,10 +69,12 @@ func (fCsv *FileCSVee) init() (err error) { fCsv.dc.Lock() fCsv.dc.MapStorage[utils.ExportPath] = filePath fCsv.dc.Unlock() - if fCsv.file, err = os.Create(filePath); err != nil { + if fCsv.cfg.ExportPath == utils.MetaBuffer { + fCsv.wrtr = wrtr + } else if fCsv.wrtr, err = os.Create(filePath); err != nil { return } - fCsv.csvWriter = csv.NewWriter(fCsv.file) + fCsv.csvWriter = csv.NewWriter(fCsv.wrtr) fCsv.csvWriter.Comma = utils.CSVSep if fCsv.Cfg().Opts.CSVFieldSeparator != nil { fCsv.csvWriter.Comma = rune((*fCsv.Cfg().Opts.CSVFieldSeparator)[0]) @@ -123,7 +125,7 @@ func (fCsv *FileCSVee) Close() (err error) { utils.EEs, fCsv.Cfg().ID, err.Error())) } fCsv.csvWriter.Flush() - if err = fCsv.file.Close(); err != nil { + if err = fCsv.wrtr.Close(); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file", utils.EEs, fCsv.Cfg().ID, err.Error())) } @@ -131,3 +133,12 @@ func (fCsv *FileCSVee) Close() (err error) { } func (fCsv *FileCSVee) GetMetrics() *utils.SafeMapStorage { return fCsv.dc } + +// Buffers cannot be closed, they just Reset. We implement our struct and used it for writer field in FileCSVee to be available for WriterCloser interface +type buffer struct { + io.Writer +} + +func (buf *buffer) Close() (err error) { + return +} diff --git a/ees/filecsv_it_test.go b/ees/filecsv_it_test.go index 67465a50e..b7b15acb1 100644 --- a/ees/filecsv_it_test.go +++ b/ees/filecsv_it_test.go @@ -55,6 +55,7 @@ var ( testCsvVerifyExports, testCsvExportComposedEvent, testCsvVerifyComposedExports, + testCsvExportBufferedEvent, testCsvExportEventWithInflateTemplate, testCsvVerifyExportsWithInflateTemplate, testCsvExportNotFoundExporter, @@ -311,6 +312,121 @@ func testCsvVerifyComposedExports(t *testing.T) { } } +func testCsvExportBufferedEvent(t *testing.T) { + eventVoice := &ArchiveEventsArgs{ + Tenant: "cgrates.org", + APIOpts: map[string]interface{}{ + utils.MetaExporterID: "CSVExporterBuffered", + }, + Events: []*utils.CGREvent{ + { + Tenant: "cgrates.org", + ID: "voiceEvent", + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.MetaVoice, + "ComposedOriginID1": "dsaf", + "ComposedOriginID2": "dsaf", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.MetaRated, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.AccountField: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: 10 * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 1.016374, + "ExtraFields": map[string]string{"extra1": "val_extra1", + "extra2": "val_extra2", "extra3": "val_extra3"}, + }, + }, + { + Tenant: "cgrates.org", + ID: "dataEvent", + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("abcdef", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.MetaData, + utils.OriginID: "abcdef", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.MetaRated, + utils.Tenant: "AnotherTenant", + utils.Category: "call", //for data CDR use different Tenant + utils.AccountField: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: 10 * time.Nanosecond, + utils.RunID: utils.MetaDefault, + utils.Cost: 0.012, + "ExtraFields": map[string]string{"extra1": "val_extra1", + "extra2": "val_extra2", "extra3": "val_extra3"}, + }, + }, + { + Tenant: "cgrates.org", + ID: "smsEvent", + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("nlllo", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.MetaData, + utils.OriginID: "abcdef", + utils.RequestType: utils.MetaNone, + utils.Tenant: "phone.org", + utils.Category: "sms", //for data CDR use different Tenant + utils.AccountField: "User2001", + utils.Subject: "User2001", + utils.Destination: "User2002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: 10 * time.Nanosecond, + utils.RunID: "raw", + utils.Cost: 44.5, + "ExtraFields": map[string]string{"extra1": "val_extra1", + "extra2": "val_extra2", "extra3": "val_extra3"}, + }, + }, + { + Tenant: "cgrates.org", + ID: "photoEvent", + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("qwert", time.Unix(1383813745, 0).UTC().String()), + utils.OriginID: "abcdef", + utils.OriginHost: "127.0.0.1", + utils.RequestType: utils.MetaPrepaid, + utils.Tenant: "dispatchers.org", + utils.Category: "photo", //for data CDR use different Tenant + utils.AccountField: "1005", + utils.Subject: "1005", + utils.Destination: "1000", + utils.SetupTime: time.Unix(22383813745, 0).UTC(), + utils.AnswerTime: time.Unix(22383813760, 0).UTC(), + utils.Usage: 10 * time.Nanosecond, + utils.RunID: "Default_charging_id", + utils.Cost: 1.442234, + }, + }, + }, + } + + expected := `NumberOfEvent,CGRID,RunID,ToR,OriginID,RequestType,Tenant,Category,Account,Subject,Destination,SetupTime,AnswerTime,Usage,Cost` + "\n" + + `1,dbafe9c8614c785a65aabd116dd3959c3c56f7f6,*default,*voice,dsafdsaf,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10000000000,1.0164` + "\n" + + `2,ea1f1968cc207859672c332364fc7614c86b04c5,*default,*data,*rated,AnotherTenant,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10,0.012` + "\n" + + `3,9e0b2a4b23e0843efe522e8a611b092a16ecfba1,raw,*data,*none,phone.org,sms,User2001,User2001,User2002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10,44.5` + "\n" + + `4,cd8112998c2abb0e4a7cd3a94c74817cd5fe67d3,Default_charging_id,*prepaid,dispatchers.org,photo,1005,1005,1000,2679-04-25T22:02:25Z,2679-04-25T22:02:40Z,10,1.4422` + "\n" + + `4,10s,46.9706` + "\n" + var reply []byte + if err := csvRpc.Call(utils.EeSv1ArchiveEventsAsReply, + eventVoice, &reply); err != nil { + t.Error(err) + } else if string(reply) != expected { + t.Errorf("Expected %q \n received %q", expected, string(reply)) + } + time.Sleep(time.Second) +} + func testCsvExportEventWithInflateTemplate(t *testing.T) { eventVoice := &utils.CGREventWithEeIDs{ EeIDs: []string{"CSVExporterWIthTemplate"}, @@ -486,7 +602,7 @@ func TestCsvInitFileCSV(t *testing.T) { cfg: cgrCfg.EEsCfg().Exporters[0], dc: dc, } - if err := fCsv.init(); err != nil { + if err := fCsv.init(nil); err != nil { t.Error(err) } if err := os.RemoveAll("/tmp/TestInitFileCSV"); err != nil { diff --git a/ees/filecsv_test.go b/ees/filecsv_test.go index 60c32e0b0..82f7b273c 100644 --- a/ees/filecsv_test.go +++ b/ees/filecsv_test.go @@ -63,7 +63,7 @@ func TestFileCsvComposeHeader(t *testing.T) { cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, filterS: filterS, - file: nopCloser{byteBuff}, + wrtr: nopCloser{byteBuff}, csvWriter: csvNW, dc: &utils.SafeMapStorage{}, } @@ -126,7 +126,7 @@ func TestFileCsvComposeTrailer(t *testing.T) { cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, filterS: filterS, - file: nopCloser{byteBuff}, + wrtr: nopCloser{byteBuff}, csvWriter: csvNW, dc: &utils.SafeMapStorage{}, } @@ -196,7 +196,7 @@ func TestFileCsvExportEvent(t *testing.T) { cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, filterS: filterS, - file: nopCloser{byteBuff}, + wrtr: nopCloser{byteBuff}, csvWriter: csvNW, dc: dc, } @@ -222,7 +222,7 @@ func TestFileCsvOnEvictedTrailer(t *testing.T) { cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, filterS: filterS, - file: nopCloserWrite{byteBuff}, + wrtr: nopCloserWrite{byteBuff}, csvWriter: csvNW, dc: &utils.SafeMapStorage{}, } @@ -256,7 +256,7 @@ func TestFileCsvOnEvictedClose(t *testing.T) { cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, filterS: filterS, - file: nopCloserError{byteBuff}, + wrtr: nopCloserError{byteBuff}, csvWriter: csvNW, dc: &utils.SafeMapStorage{}, } diff --git a/ees/filefwv.go b/ees/filefwv.go index 33aa52583..ce5c60b32 100644 --- a/ees/filefwv.go +++ b/ees/filefwv.go @@ -31,7 +31,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.SafeMapStorage) (fFwv *FileFWVee, err error) { +func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.SafeMapStorage, writer io.Writer) (fFwv *FileFWVee, err error) { fFwv = &FileFWVee{ cfg: cfg, dc: dc, @@ -39,15 +39,15 @@ func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filter cgrCfg: cgrCfg, filterS: filterS, } - err = fFwv.init() + err = fFwv.init(writer) return } // FileFWVee implements EventExporter interface for .fwv files type FileFWVee struct { - cfg *config.EventExporterCfg - dc *utils.SafeMapStorage - file io.WriteCloser + cfg *config.EventExporterCfg + dc *utils.SafeMapStorage + writer io.WriteCloser sync.Mutex slicePreparing @@ -57,14 +57,16 @@ type FileFWVee struct { } // init will create all the necessary dependencies, including opening the file -func (fFwv *FileFWVee) init() (err error) { +func (fFwv *FileFWVee) init(writer io.Writer) (err error) { filePath := path.Join(fFwv.Cfg().ExportPath, fFwv.Cfg().ID+utils.Underline+utils.UUIDSha1Prefix()+utils.FWVSuffix) fFwv.dc.Lock() fFwv.dc.MapStorage[utils.ExportPath] = filePath fFwv.dc.Unlock() // create the file - if fFwv.file, err = os.Create(filePath); err != nil { + if fFwv.cfg.ExportPath == utils.MetaBuffer { + fFwv.writer = &buffer{writer} + } else if fFwv.writer, err = os.Create(filePath); err != nil { return } return fFwv.composeHeader() @@ -80,11 +82,11 @@ func (fFwv *FileFWVee) composeHeader() (err error) { return } for _, record := range exp.OrderedFieldsAsStrings() { - if _, err = io.WriteString(fFwv.file, record); err != nil { + if _, err = io.WriteString(fFwv.writer, record); err != nil { return } } - _, err = io.WriteString(fFwv.file, "\n") + _, err = io.WriteString(fFwv.writer, "\n") return } @@ -98,11 +100,11 @@ func (fFwv *FileFWVee) composeTrailer() (err error) { return } for _, record := range exp.OrderedFieldsAsStrings() { - if _, err = io.WriteString(fFwv.file, record); err != nil { + if _, err = io.WriteString(fFwv.writer, record); err != nil { return } } - _, err = io.WriteString(fFwv.file, "\n") + _, err = io.WriteString(fFwv.writer, "\n") return } @@ -114,11 +116,11 @@ func (fFwv *FileFWVee) ExportEvent(_ *context.Context, records interface{}, _ st fFwv.Lock() // make sure that only one event is writen in file at once defer fFwv.Unlock() for _, record := range records.([]string) { - if _, err = io.WriteString(fFwv.file, record); err != nil { + if _, err = io.WriteString(fFwv.writer, record); err != nil { return } } - _, err = io.WriteString(fFwv.file, "\n") + _, err = io.WriteString(fFwv.writer, "\n") return } @@ -130,7 +132,7 @@ func (fFwv *FileFWVee) Close() (err error) { utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when composed trailer", utils.EEs, fFwv.Cfg().ID, err.Error())) } - if err = fFwv.file.Close(); err != nil { + if err = fFwv.writer.Close(); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file", utils.EEs, fFwv.Cfg().ID, err.Error())) } diff --git a/ees/filefwv_it_test.go b/ees/filefwv_it_test.go index 32f812590..3aadaa2ef 100644 --- a/ees/filefwv_it_test.go +++ b/ees/filefwv_it_test.go @@ -22,6 +22,7 @@ along with this program. If not, see package ees import ( + "io" "net/rpc" "os" "path" @@ -177,7 +178,7 @@ func TestFileFwvInit(t *testing.T) { cfg: cgrCfg.EEsCfg().Exporters[0], dc: dc, } - if err := fFwv.init(); err != nil { + if err := fFwv.init(io.Discard); err != nil { t.Error(err) } if err := os.RemoveAll("/tmp/TestInitFileCSV"); err != nil { diff --git a/ees/filefwv_test.go b/ees/filefwv_test.go index b0d3bc1e2..410e6ebea 100644 --- a/ees/filefwv_test.go +++ b/ees/filefwv_test.go @@ -54,7 +54,7 @@ func TestFileFwvComposeHeader(t *testing.T) { cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, filterS: filterS, - file: nopCloser{byteBuff}, + writer: nopCloser{byteBuff}, dc: &utils.SafeMapStorage{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ @@ -116,7 +116,7 @@ func TestFileFwvComposeTrailer(t *testing.T) { cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, filterS: filterS, - file: nopCloser{byteBuff}, + writer: nopCloser{byteBuff}, dc: &utils.SafeMapStorage{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ @@ -182,7 +182,7 @@ func TestFileFwvExportEvent(t *testing.T) { cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, filterS: filterS, - file: nopCloser{byteBuff}, + writer: nopCloser{byteBuff}, dc: dc, } if err := fFwv.ExportEvent(context.Background(), []string{"value", "3"}, ""); err != nil { @@ -218,7 +218,7 @@ func TestFileFwvExportEventWriteError(t *testing.T) { cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, filterS: filterS, - file: nopCloserWrite{byteBuff}, + writer: nopCloserWrite{byteBuff}, dc: dc, } if err := fFwv.ExportEvent(context.Background(), []string{""}, ""); err == nil || err != utils.ErrNotImplemented { @@ -236,7 +236,7 @@ func TestFileFwvComposeHeaderWriteError(t *testing.T) { cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, filterS: filterS, - file: nopCloserWrite{byteBuff}, + writer: nopCloserWrite{byteBuff}, dc: &utils.SafeMapStorage{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ @@ -268,7 +268,7 @@ func TestFileFwvComposeTrailerWriteError(t *testing.T) { cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, filterS: filterS, - file: nopCloserWrite{byteBuff}, + writer: nopCloserWrite{byteBuff}, dc: &utils.SafeMapStorage{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ @@ -299,7 +299,7 @@ func TestFileFwvOnEvictedTrailer(t *testing.T) { cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, filterS: filterS, - file: nopCloserWrite{byteBuff}, + writer: nopCloserWrite{byteBuff}, dc: &utils.SafeMapStorage{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ @@ -337,7 +337,7 @@ func TestFileFwvOnEvictedClose(t *testing.T) { cfg: cfg.EEsCfg().Exporters[0], cgrCfg: cfg, filterS: filterS, - file: nopCloserError{byteBuff}, + writer: nopCloserError{byteBuff}, dc: &utils.SafeMapStorage{}, } fFwv.Cfg().Fields = []*config.FCTemplate{ diff --git a/utils/apitpdata_test.go b/utils/apitpdata_test.go index cd5da5295..d49f23d3b 100644 --- a/utils/apitpdata_test.go +++ b/utils/apitpdata_test.go @@ -609,6 +609,7 @@ func TestNewAttrReloadCacheWithOptsFromMap(t *testing.T) { mp[k] = []string{MetaAny} } } + exp := NewAttrReloadCacheWithOpts() rply := NewAttrReloadCacheWithOptsFromMap(mp, "", nil) if !reflect.DeepEqual(exp, rply) { @@ -618,5 +619,4 @@ func TestNewAttrReloadCacheWithOptsFromMap(t *testing.T) { if !reflect.DeepEqual(mp, rplyM) { t.Errorf("Expected %+v \n, received %+v", ToJSON(mp), ToJSON(rplyM)) } - } diff --git a/utils/consts.go b/utils/consts.go index f3adbed57..72a3b687f 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -565,6 +565,7 @@ const ( MetaEventCost = "*event_cost" MetaPositiveExports = "*positive_exports" MetaNegativeExports = "*negative_exports" + MetaBuffer = "*buffer" MetaRoutesEventCost = "*routesEventCost" Freeswitch = "freeswitch" Kamailio = "kamailio" @@ -772,6 +773,7 @@ const ( MetaCdrLog = "*cdrLog" MetaCDR = "*cdr" MetaExporterIDs = "*exporterIDs" + MetaExporterID = "*exporterID" MetaAsync = "*async" MetaUsage = "*usage" MetaStartTime = "*startTime" @@ -1498,9 +1500,10 @@ const ( // EEs const ( - EeSv1 = "EeSv1" - EeSv1Ping = "EeSv1.Ping" - EeSv1ProcessEvent = "EeSv1.ProcessEvent" + EeSv1 = "EeSv1" + EeSv1Ping = "EeSv1.Ping" + EeSv1ProcessEvent = "EeSv1.ProcessEvent" + EeSv1ArchiveEventsAsReply = "EeSv1.ArchiveEventsAsReply" ) // ActionProfile APIs