diff --git a/apis/ees.go b/apis/ees.go
index 6e0efcd70..740db08e4 100644
--- a/apis/ees.go
+++ b/apis/ees.go
@@ -36,3 +36,7 @@ type EeSv1 struct {
func (cS *EeSv1) ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEeIDs, rply *map[string]map[string]interface{}) error {
return cS.ees.V1ProcessEvent(ctx, cgrEv, rply)
}
+
+func (cS *EeSv1) ArchiveEventsAsReply(ctx *context.Context, args *ees.ArchiveEventsArgs, reply *[]byte) error {
+ return cS.ees.V1ArchiveEventsAsReply(ctx, args, reply)
+}
diff --git a/config/configsanity.go b/config/configsanity.go
index 3387b4689..c39d90895 100644
--- a/config/configsanity.go
+++ b/config/configsanity.go
@@ -767,6 +767,9 @@ func (cfg *CGRConfig) checkConfigSanity() error {
switch exp.Type {
case utils.MetaFileCSV:
for _, dir := range []string{exp.ExportPath} {
+ if dir == utils.MetaBuffer {
+ break
+ }
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
return fmt.Errorf("<%s> nonexistent folder: %s for exporter with ID: %s", utils.EEs, dir, exp.ID)
}
@@ -776,6 +779,9 @@ func (cfg *CGRConfig) checkConfigSanity() error {
}
case utils.MetaFileFWV:
for _, dir := range []string{exp.ExportPath} {
+ if dir == utils.MetaBuffer {
+ break
+ }
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
return fmt.Errorf("<%s> nonexistent folder: %s for exporter with ID: %s", utils.EEs, dir, exp.ID)
}
diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json
index 09b632324..a33f2ce47 100644
--- a/data/conf/samples/ees/cgrates.json
+++ b/data/conf/samples/ees/cgrates.json
@@ -70,7 +70,7 @@
"attribute_context": "customContext",
"attempts": 1,
"field_separator": ",",
- },
+ },
{
"id": "CSVExporterComposed",
"type": "*fileCSV",
@@ -119,6 +119,53 @@
{"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}"},
],
},
+ {
+ "id": "CSVExporterBuffered",
+ "type": "*fileCSV",
+ "export_path": "*buffer",
+ "attempts": 1,
+ "field_separator": ",",
+ "synchronous": true,
+ "fields":[
+ {"tag": "Number", "path": "*hdr.Number", "type": "*constant", "value": "NumberOfEvent"},
+ {"tag": "CGRID", "path": "*hdr.CGRID", "type": "*constant", "value": "CGRID"},
+ {"tag": "RunID", "path": "*hdr.RunID", "type": "*constant", "value": "RunID"},
+ {"tag": "ToR", "path": "*hdr.ToR", "type": "*constant", "value": "ToR"},
+ {"tag": "OriginID", "path": "*hdr.OriginID", "type": "*constant", "value": "OriginID"},
+ {"tag": "RequestType", "path": "*hdr.RequestType", "type": "*constant", "value": "RequestType"},
+ {"tag": "Tenant", "path": "*hdr.Tenant", "type": "*constant", "value": "Tenant"},
+ {"tag": "Category", "path": "*hdr.Category", "type": "*constant", "value": "Category"},
+ {"tag": "Account", "path": "*hdr.Account", "type": "*constant", "value": "Account"},
+ {"tag": "Subject", "path": "*hdr.Subject", "type": "*constant", "value": "Subject"},
+ {"tag": "Destination", "path": "*hdr.Destination", "type": "*constant", "value": "Destination"},
+ {"tag": "SetupTime", "path": "*hdr.SetupTime", "type": "*constant", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
+ {"tag": "AnswerTime", "path": "*hdr.AnswerTime", "type": "*constant", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
+ {"tag": "Usage", "path": "*hdr.Usage", "type": "*constant", "value": "Usage"},
+ {"tag": "Cost", "path": "*hdr.Cost", "type": "*constant", "value": "Cost"},
+
+ {"tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*dc.NumberOfEvents"},
+ {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
+ {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
+ {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"},
+ {"tag": "OriginID1", "path": "*exp.OriginID", "type": "*composed", "value": "~*req.ComposedOriginID1"},
+ {"tag": "OriginID2", "path": "*exp.OriginID", "type": "*composed", "value": "~*req.ComposedOriginID2"},
+ {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"},
+ {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"},
+ {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"},
+ {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
+ {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"},
+ {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"},
+ {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
+ {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
+ {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"},
+ {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
+
+ {"tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*dc.NumberOfEvents"},
+ {"tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*dc.TotalDuration"},
+ {"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*dc.TotalSMSUsage"},
+ {"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}"},
+ ],
+ },
{
"id": "FwvExporter",
"type": "*fileFWV",
diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json
index bff919923..45994b4ec 100644
--- a/data/conf/samples/tutmysql/cgrates.json
+++ b/data/conf/samples/tutmysql/cgrates.json
@@ -38,7 +38,6 @@
"accounts_conns": ["*localhost"]
},
-
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],
diff --git a/ees/ee.go b/ees/ee.go
index d561eeb55..8fb08515b 100644
--- a/ees/ee.go
+++ b/ees/ee.go
@@ -51,9 +51,9 @@ func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig,
}
switch cfg.Type {
case utils.MetaFileCSV:
- return NewFileCSVee(cfg, cgrCfg, filterS, dc)
+ return NewFileCSVee(cfg, cgrCfg, filterS, dc, nil)
case utils.MetaFileFWV:
- return NewFileFWVee(cfg, cgrCfg, filterS, dc)
+ return NewFileFWVee(cfg, cgrCfg, filterS, dc, nil)
case utils.MetaHTTPPost:
return NewHTTPPostEE(cfg, cgrCfg, filterS, dc)
case utils.MetaHTTPjsonMap:
@@ -241,6 +241,7 @@ func (slicePreparing) PrepareMap(mp *utils.CGREvent) (interface{}, error) {
}
return csvRecord, nil
}
+
func (slicePreparing) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) {
return mp.OrderedFieldsAsStrings(), nil
}
diff --git a/ees/ee_test.go b/ees/ee_test.go
index cf881fbbf..a9ff1a321 100644
--- a/ees/ee_test.go
+++ b/ees/ee_test.go
@@ -19,6 +19,7 @@ package ees
import (
"fmt"
+ "io"
"reflect"
"strings"
"testing"
@@ -45,11 +46,11 @@ func TestNewEventExporter(t *testing.T) {
if err != nil {
t.Error(err)
}
- eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc)
+ eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc, nil)
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
}
- err = eeExpect.init()
+ err = eeExpect.init(nil)
if err == nil {
t.Error("\nExpected an error")
}
@@ -82,11 +83,11 @@ func TestNewEventExporterCase2(t *testing.T) {
if err != nil {
t.Error(err)
}
- eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc)
+ eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc, io.Discard)
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
}
- err = eeExpect.init()
+ err = eeExpect.init(io.Discard)
if err == nil {
t.Error("\nExpected an error")
}
diff --git a/ees/ees.go b/ees/ees.go
index 69a48ae5b..4d3537cff 100644
--- a/ees/ees.go
+++ b/ees/ees.go
@@ -19,7 +19,10 @@ along with this program. If not, see
package ees
import (
+ "archive/zip"
+ "bytes"
"fmt"
+ "io"
"sync"
"time"
@@ -319,3 +322,79 @@ func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}
}
return
}
+
+type ArchiveEventsArgs struct {
+ Tenant string
+ APIOpts map[string]interface{}
+ Events []*utils.CGREvent
+}
+
+// V1ArchiveEventsAsReply should archive the events sent with existing exporters. The zipped content should be returned back as a reply.
+func (eeS *EeS) V1ArchiveEventsAsReply(ctx *context.Context, args *ArchiveEventsArgs, reply *[]byte) (err error) {
+ if args.Tenant == utils.EmptyString {
+ args.Tenant = eeS.cfg.GeneralCfg().DefaultTenant
+ }
+ expID, has := args.APIOpts[utils.MetaExporterID]
+ if !has {
+ return fmt.Errorf("ExporterID is missing from argument's options: <%v>", utils.ToJSON(args))
+ }
+ //var validExporter bool
+ var eesCfg *config.EventExporterCfg
+ for _, exporter := range eeS.cfg.EEsCfg().Exporters {
+ if exporter.ID == expID {
+ eesCfg = exporter
+ break
+ }
+ }
+ if eesCfg == nil {
+ return fmt.Errorf("exporter config with ID: <%v> is missing", expID)
+ }
+ if !eesCfg.Synchronous {
+ return fmt.Errorf("exporter with ID: <%v> is not synchronous", expID)
+ }
+ if eesCfg.ExportPath != utils.MetaBuffer {
+ return fmt.Errorf("exporter with ID: <%v> has an invalid ExportPath for archiving", expID)
+ }
+ var dc *utils.SafeMapStorage
+ if dc, err = newEEMetrics(utils.FirstNonEmpty(
+ eesCfg.Timezone,
+ eeS.cfg.GeneralCfg().DefaultTimezone)); err != nil {
+ return
+ }
+ var ee EventExporter
+
+ buff := new(bytes.Buffer)
+ zBuff := zip.NewWriter(buff)
+ //
+ var wrtr io.Writer
+ if wrtr, err = zBuff.Create("events.csv"); err != nil {
+ return err
+ }
+ switch eesCfg.Type {
+ case utils.MetaFileCSV:
+ ee, err = NewFileCSVee(eesCfg, eeS.cfg, eeS.fltrS, dc, &buffer{buff})
+ case utils.MetaFileFWV:
+ ee, err = NewFileFWVee(eesCfg, eeS.cfg, eeS.fltrS, dc, wrtr)
+ default:
+ err = fmt.Errorf("unsupported exporter type: <%s>", eesCfg.Type)
+ }
+ if err != nil {
+ return err
+ }
+ for _, event := range args.Events {
+ if err := exportEventWithExporter(ctx, ee, event, false, eeS.cfg, eeS.fltrS); err != nil {
+ return err
+ }
+ }
+ if err = ee.Close(); err != nil {
+ return err
+ }
+
+ *reply = buff.Bytes()
+ if err = zBuff.Close(); err != nil {
+ return err
+ }
+
+ buff.Reset()
+ return
+}
diff --git a/ees/filecsv.go b/ees/filecsv.go
index ed2d76c80..64cdd4987 100644
--- a/ees/filecsv.go
+++ b/ees/filecsv.go
@@ -35,15 +35,15 @@ import (
func NewFileCSVee(cfg *config.EventExporterCfg,
cgrCfg *config.CGRConfig, filterS *engine.FilterS,
- dc *utils.SafeMapStorage) (fCsv *FileCSVee, err error) {
+ dc *utils.SafeMapStorage, wrtr io.WriteCloser) (fCsv *FileCSVee, err error) {
fCsv = &FileCSVee{
cfg: cfg,
dc: dc,
-
+ //wrtr: wrtr,
cgrCfg: cgrCfg,
filterS: filterS,
}
- err = fCsv.init()
+ err = fCsv.init(wrtr)
return
}
@@ -51,7 +51,7 @@ func NewFileCSVee(cfg *config.EventExporterCfg,
type FileCSVee struct {
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
- file io.WriteCloser
+ wrtr io.WriteCloser // writer for the csv
csvWriter *csv.Writer
sync.Mutex
slicePreparing
@@ -60,7 +60,7 @@ type FileCSVee struct {
filterS *engine.FilterS
}
-func (fCsv *FileCSVee) init() (err error) {
+func (fCsv *FileCSVee) init(wrtr io.WriteCloser) (err error) {
fCsv.Lock()
defer fCsv.Unlock()
// create the file
@@ -69,10 +69,12 @@ func (fCsv *FileCSVee) init() (err error) {
fCsv.dc.Lock()
fCsv.dc.MapStorage[utils.ExportPath] = filePath
fCsv.dc.Unlock()
- if fCsv.file, err = os.Create(filePath); err != nil {
+ if fCsv.cfg.ExportPath == utils.MetaBuffer {
+ fCsv.wrtr = wrtr
+ } else if fCsv.wrtr, err = os.Create(filePath); err != nil {
return
}
- fCsv.csvWriter = csv.NewWriter(fCsv.file)
+ fCsv.csvWriter = csv.NewWriter(fCsv.wrtr)
fCsv.csvWriter.Comma = utils.CSVSep
if fCsv.Cfg().Opts.CSVFieldSeparator != nil {
fCsv.csvWriter.Comma = rune((*fCsv.Cfg().Opts.CSVFieldSeparator)[0])
@@ -123,7 +125,7 @@ func (fCsv *FileCSVee) Close() (err error) {
utils.EEs, fCsv.Cfg().ID, err.Error()))
}
fCsv.csvWriter.Flush()
- if err = fCsv.file.Close(); err != nil {
+ if err = fCsv.wrtr.Close(); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file",
utils.EEs, fCsv.Cfg().ID, err.Error()))
}
@@ -131,3 +133,12 @@ func (fCsv *FileCSVee) Close() (err error) {
}
func (fCsv *FileCSVee) GetMetrics() *utils.SafeMapStorage { return fCsv.dc }
+
+// Buffers cannot be closed, they just Reset. We implement our struct and used it for writer field in FileCSVee to be available for WriterCloser interface
+type buffer struct {
+ io.Writer
+}
+
+func (buf *buffer) Close() (err error) {
+ return
+}
diff --git a/ees/filecsv_it_test.go b/ees/filecsv_it_test.go
index 67465a50e..b7b15acb1 100644
--- a/ees/filecsv_it_test.go
+++ b/ees/filecsv_it_test.go
@@ -55,6 +55,7 @@ var (
testCsvVerifyExports,
testCsvExportComposedEvent,
testCsvVerifyComposedExports,
+ testCsvExportBufferedEvent,
testCsvExportEventWithInflateTemplate,
testCsvVerifyExportsWithInflateTemplate,
testCsvExportNotFoundExporter,
@@ -311,6 +312,121 @@ func testCsvVerifyComposedExports(t *testing.T) {
}
}
+func testCsvExportBufferedEvent(t *testing.T) {
+ eventVoice := &ArchiveEventsArgs{
+ Tenant: "cgrates.org",
+ APIOpts: map[string]interface{}{
+ utils.MetaExporterID: "CSVExporterBuffered",
+ },
+ Events: []*utils.CGREvent{
+ {
+ Tenant: "cgrates.org",
+ ID: "voiceEvent",
+ Event: map[string]interface{}{
+ utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()),
+ utils.ToR: utils.MetaVoice,
+ "ComposedOriginID1": "dsaf",
+ "ComposedOriginID2": "dsaf",
+ utils.OriginHost: "192.168.1.1",
+ utils.RequestType: utils.MetaRated,
+ utils.Tenant: "cgrates.org",
+ utils.Category: "call",
+ utils.AccountField: "1001",
+ utils.Subject: "1001",
+ utils.Destination: "1002",
+ utils.SetupTime: time.Unix(1383813745, 0).UTC(),
+ utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
+ utils.Usage: 10 * time.Second,
+ utils.RunID: utils.MetaDefault,
+ utils.Cost: 1.016374,
+ "ExtraFields": map[string]string{"extra1": "val_extra1",
+ "extra2": "val_extra2", "extra3": "val_extra3"},
+ },
+ },
+ {
+ Tenant: "cgrates.org",
+ ID: "dataEvent",
+ Event: map[string]interface{}{
+ utils.CGRID: utils.Sha1("abcdef", time.Unix(1383813745, 0).UTC().String()),
+ utils.ToR: utils.MetaData,
+ utils.OriginID: "abcdef",
+ utils.OriginHost: "192.168.1.1",
+ utils.RequestType: utils.MetaRated,
+ utils.Tenant: "AnotherTenant",
+ utils.Category: "call", //for data CDR use different Tenant
+ utils.AccountField: "1001",
+ utils.Subject: "1001",
+ utils.Destination: "1002",
+ utils.SetupTime: time.Unix(1383813745, 0).UTC(),
+ utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
+ utils.Usage: 10 * time.Nanosecond,
+ utils.RunID: utils.MetaDefault,
+ utils.Cost: 0.012,
+ "ExtraFields": map[string]string{"extra1": "val_extra1",
+ "extra2": "val_extra2", "extra3": "val_extra3"},
+ },
+ },
+ {
+ Tenant: "cgrates.org",
+ ID: "smsEvent",
+ Event: map[string]interface{}{
+ utils.CGRID: utils.Sha1("nlllo", time.Unix(1383813745, 0).UTC().String()),
+ utils.ToR: utils.MetaData,
+ utils.OriginID: "abcdef",
+ utils.RequestType: utils.MetaNone,
+ utils.Tenant: "phone.org",
+ utils.Category: "sms", //for data CDR use different Tenant
+ utils.AccountField: "User2001",
+ utils.Subject: "User2001",
+ utils.Destination: "User2002",
+ utils.SetupTime: time.Unix(1383813745, 0).UTC(),
+ utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
+ utils.Usage: 10 * time.Nanosecond,
+ utils.RunID: "raw",
+ utils.Cost: 44.5,
+ "ExtraFields": map[string]string{"extra1": "val_extra1",
+ "extra2": "val_extra2", "extra3": "val_extra3"},
+ },
+ },
+ {
+ Tenant: "cgrates.org",
+ ID: "photoEvent",
+ Event: map[string]interface{}{
+ utils.CGRID: utils.Sha1("qwert", time.Unix(1383813745, 0).UTC().String()),
+ utils.OriginID: "abcdef",
+ utils.OriginHost: "127.0.0.1",
+ utils.RequestType: utils.MetaPrepaid,
+ utils.Tenant: "dispatchers.org",
+ utils.Category: "photo", //for data CDR use different Tenant
+ utils.AccountField: "1005",
+ utils.Subject: "1005",
+ utils.Destination: "1000",
+ utils.SetupTime: time.Unix(22383813745, 0).UTC(),
+ utils.AnswerTime: time.Unix(22383813760, 0).UTC(),
+ utils.Usage: 10 * time.Nanosecond,
+ utils.RunID: "Default_charging_id",
+ utils.Cost: 1.442234,
+ },
+ },
+ },
+ }
+
+ expected := `NumberOfEvent,CGRID,RunID,ToR,OriginID,RequestType,Tenant,Category,Account,Subject,Destination,SetupTime,AnswerTime,Usage,Cost` + "\n" +
+ `1,dbafe9c8614c785a65aabd116dd3959c3c56f7f6,*default,*voice,dsafdsaf,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10000000000,1.0164` + "\n" +
+ `2,ea1f1968cc207859672c332364fc7614c86b04c5,*default,*data,*rated,AnotherTenant,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10,0.012` + "\n" +
+ `3,9e0b2a4b23e0843efe522e8a611b092a16ecfba1,raw,*data,*none,phone.org,sms,User2001,User2001,User2002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10,44.5` + "\n" +
+ `4,cd8112998c2abb0e4a7cd3a94c74817cd5fe67d3,Default_charging_id,*prepaid,dispatchers.org,photo,1005,1005,1000,2679-04-25T22:02:25Z,2679-04-25T22:02:40Z,10,1.4422` + "\n" +
+ `4,10s,46.9706` + "\n"
+ var reply []byte
+ if err := csvRpc.Call(utils.EeSv1ArchiveEventsAsReply,
+ eventVoice, &reply); err != nil {
+ t.Error(err)
+ } else if string(reply) != expected {
+ t.Errorf("Expected %q \n received %q", expected, string(reply))
+ }
+ time.Sleep(time.Second)
+}
+
func testCsvExportEventWithInflateTemplate(t *testing.T) {
eventVoice := &utils.CGREventWithEeIDs{
EeIDs: []string{"CSVExporterWIthTemplate"},
@@ -486,7 +602,7 @@ func TestCsvInitFileCSV(t *testing.T) {
cfg: cgrCfg.EEsCfg().Exporters[0],
dc: dc,
}
- if err := fCsv.init(); err != nil {
+ if err := fCsv.init(nil); err != nil {
t.Error(err)
}
if err := os.RemoveAll("/tmp/TestInitFileCSV"); err != nil {
diff --git a/ees/filecsv_test.go b/ees/filecsv_test.go
index 60c32e0b0..82f7b273c 100644
--- a/ees/filecsv_test.go
+++ b/ees/filecsv_test.go
@@ -63,7 +63,7 @@ func TestFileCsvComposeHeader(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
- file: nopCloser{byteBuff},
+ wrtr: nopCloser{byteBuff},
csvWriter: csvNW,
dc: &utils.SafeMapStorage{},
}
@@ -126,7 +126,7 @@ func TestFileCsvComposeTrailer(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
- file: nopCloser{byteBuff},
+ wrtr: nopCloser{byteBuff},
csvWriter: csvNW,
dc: &utils.SafeMapStorage{},
}
@@ -196,7 +196,7 @@ func TestFileCsvExportEvent(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
- file: nopCloser{byteBuff},
+ wrtr: nopCloser{byteBuff},
csvWriter: csvNW,
dc: dc,
}
@@ -222,7 +222,7 @@ func TestFileCsvOnEvictedTrailer(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
- file: nopCloserWrite{byteBuff},
+ wrtr: nopCloserWrite{byteBuff},
csvWriter: csvNW,
dc: &utils.SafeMapStorage{},
}
@@ -256,7 +256,7 @@ func TestFileCsvOnEvictedClose(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
- file: nopCloserError{byteBuff},
+ wrtr: nopCloserError{byteBuff},
csvWriter: csvNW,
dc: &utils.SafeMapStorage{},
}
diff --git a/ees/filefwv.go b/ees/filefwv.go
index 33aa52583..ce5c60b32 100644
--- a/ees/filefwv.go
+++ b/ees/filefwv.go
@@ -31,7 +31,7 @@ import (
"github.com/cgrates/cgrates/utils"
)
-func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.SafeMapStorage) (fFwv *FileFWVee, err error) {
+func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.SafeMapStorage, writer io.Writer) (fFwv *FileFWVee, err error) {
fFwv = &FileFWVee{
cfg: cfg,
dc: dc,
@@ -39,15 +39,15 @@ func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filter
cgrCfg: cgrCfg,
filterS: filterS,
}
- err = fFwv.init()
+ err = fFwv.init(writer)
return
}
// FileFWVee implements EventExporter interface for .fwv files
type FileFWVee struct {
- cfg *config.EventExporterCfg
- dc *utils.SafeMapStorage
- file io.WriteCloser
+ cfg *config.EventExporterCfg
+ dc *utils.SafeMapStorage
+ writer io.WriteCloser
sync.Mutex
slicePreparing
@@ -57,14 +57,16 @@ type FileFWVee struct {
}
// init will create all the necessary dependencies, including opening the file
-func (fFwv *FileFWVee) init() (err error) {
+func (fFwv *FileFWVee) init(writer io.Writer) (err error) {
filePath := path.Join(fFwv.Cfg().ExportPath,
fFwv.Cfg().ID+utils.Underline+utils.UUIDSha1Prefix()+utils.FWVSuffix)
fFwv.dc.Lock()
fFwv.dc.MapStorage[utils.ExportPath] = filePath
fFwv.dc.Unlock()
// create the file
- if fFwv.file, err = os.Create(filePath); err != nil {
+ if fFwv.cfg.ExportPath == utils.MetaBuffer {
+ fFwv.writer = &buffer{writer}
+ } else if fFwv.writer, err = os.Create(filePath); err != nil {
return
}
return fFwv.composeHeader()
@@ -80,11 +82,11 @@ func (fFwv *FileFWVee) composeHeader() (err error) {
return
}
for _, record := range exp.OrderedFieldsAsStrings() {
- if _, err = io.WriteString(fFwv.file, record); err != nil {
+ if _, err = io.WriteString(fFwv.writer, record); err != nil {
return
}
}
- _, err = io.WriteString(fFwv.file, "\n")
+ _, err = io.WriteString(fFwv.writer, "\n")
return
}
@@ -98,11 +100,11 @@ func (fFwv *FileFWVee) composeTrailer() (err error) {
return
}
for _, record := range exp.OrderedFieldsAsStrings() {
- if _, err = io.WriteString(fFwv.file, record); err != nil {
+ if _, err = io.WriteString(fFwv.writer, record); err != nil {
return
}
}
- _, err = io.WriteString(fFwv.file, "\n")
+ _, err = io.WriteString(fFwv.writer, "\n")
return
}
@@ -114,11 +116,11 @@ func (fFwv *FileFWVee) ExportEvent(_ *context.Context, records interface{}, _ st
fFwv.Lock() // make sure that only one event is writen in file at once
defer fFwv.Unlock()
for _, record := range records.([]string) {
- if _, err = io.WriteString(fFwv.file, record); err != nil {
+ if _, err = io.WriteString(fFwv.writer, record); err != nil {
return
}
}
- _, err = io.WriteString(fFwv.file, "\n")
+ _, err = io.WriteString(fFwv.writer, "\n")
return
}
@@ -130,7 +132,7 @@ func (fFwv *FileFWVee) Close() (err error) {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when composed trailer",
utils.EEs, fFwv.Cfg().ID, err.Error()))
}
- if err = fFwv.file.Close(); err != nil {
+ if err = fFwv.writer.Close(); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file",
utils.EEs, fFwv.Cfg().ID, err.Error()))
}
diff --git a/ees/filefwv_it_test.go b/ees/filefwv_it_test.go
index 32f812590..3aadaa2ef 100644
--- a/ees/filefwv_it_test.go
+++ b/ees/filefwv_it_test.go
@@ -22,6 +22,7 @@ along with this program. If not, see
package ees
import (
+ "io"
"net/rpc"
"os"
"path"
@@ -177,7 +178,7 @@ func TestFileFwvInit(t *testing.T) {
cfg: cgrCfg.EEsCfg().Exporters[0],
dc: dc,
}
- if err := fFwv.init(); err != nil {
+ if err := fFwv.init(io.Discard); err != nil {
t.Error(err)
}
if err := os.RemoveAll("/tmp/TestInitFileCSV"); err != nil {
diff --git a/ees/filefwv_test.go b/ees/filefwv_test.go
index b0d3bc1e2..410e6ebea 100644
--- a/ees/filefwv_test.go
+++ b/ees/filefwv_test.go
@@ -54,7 +54,7 @@ func TestFileFwvComposeHeader(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
- file: nopCloser{byteBuff},
+ writer: nopCloser{byteBuff},
dc: &utils.SafeMapStorage{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
@@ -116,7 +116,7 @@ func TestFileFwvComposeTrailer(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
- file: nopCloser{byteBuff},
+ writer: nopCloser{byteBuff},
dc: &utils.SafeMapStorage{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
@@ -182,7 +182,7 @@ func TestFileFwvExportEvent(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
- file: nopCloser{byteBuff},
+ writer: nopCloser{byteBuff},
dc: dc,
}
if err := fFwv.ExportEvent(context.Background(), []string{"value", "3"}, ""); err != nil {
@@ -218,7 +218,7 @@ func TestFileFwvExportEventWriteError(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
- file: nopCloserWrite{byteBuff},
+ writer: nopCloserWrite{byteBuff},
dc: dc,
}
if err := fFwv.ExportEvent(context.Background(), []string{""}, ""); err == nil || err != utils.ErrNotImplemented {
@@ -236,7 +236,7 @@ func TestFileFwvComposeHeaderWriteError(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
- file: nopCloserWrite{byteBuff},
+ writer: nopCloserWrite{byteBuff},
dc: &utils.SafeMapStorage{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
@@ -268,7 +268,7 @@ func TestFileFwvComposeTrailerWriteError(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
- file: nopCloserWrite{byteBuff},
+ writer: nopCloserWrite{byteBuff},
dc: &utils.SafeMapStorage{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
@@ -299,7 +299,7 @@ func TestFileFwvOnEvictedTrailer(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
- file: nopCloserWrite{byteBuff},
+ writer: nopCloserWrite{byteBuff},
dc: &utils.SafeMapStorage{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
@@ -337,7 +337,7 @@ func TestFileFwvOnEvictedClose(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
- file: nopCloserError{byteBuff},
+ writer: nopCloserError{byteBuff},
dc: &utils.SafeMapStorage{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
diff --git a/utils/apitpdata_test.go b/utils/apitpdata_test.go
index cd5da5295..d49f23d3b 100644
--- a/utils/apitpdata_test.go
+++ b/utils/apitpdata_test.go
@@ -609,6 +609,7 @@ func TestNewAttrReloadCacheWithOptsFromMap(t *testing.T) {
mp[k] = []string{MetaAny}
}
}
+
exp := NewAttrReloadCacheWithOpts()
rply := NewAttrReloadCacheWithOptsFromMap(mp, "", nil)
if !reflect.DeepEqual(exp, rply) {
@@ -618,5 +619,4 @@ func TestNewAttrReloadCacheWithOptsFromMap(t *testing.T) {
if !reflect.DeepEqual(mp, rplyM) {
t.Errorf("Expected %+v \n, received %+v", ToJSON(mp), ToJSON(rplyM))
}
-
}
diff --git a/utils/consts.go b/utils/consts.go
index f3adbed57..72a3b687f 100644
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -565,6 +565,7 @@ const (
MetaEventCost = "*event_cost"
MetaPositiveExports = "*positive_exports"
MetaNegativeExports = "*negative_exports"
+ MetaBuffer = "*buffer"
MetaRoutesEventCost = "*routesEventCost"
Freeswitch = "freeswitch"
Kamailio = "kamailio"
@@ -772,6 +773,7 @@ const (
MetaCdrLog = "*cdrLog"
MetaCDR = "*cdr"
MetaExporterIDs = "*exporterIDs"
+ MetaExporterID = "*exporterID"
MetaAsync = "*async"
MetaUsage = "*usage"
MetaStartTime = "*startTime"
@@ -1498,9 +1500,10 @@ const (
// EEs
const (
- EeSv1 = "EeSv1"
- EeSv1Ping = "EeSv1.Ping"
- EeSv1ProcessEvent = "EeSv1.ProcessEvent"
+ EeSv1 = "EeSv1"
+ EeSv1Ping = "EeSv1.Ping"
+ EeSv1ProcessEvent = "EeSv1.ProcessEvent"
+ EeSv1ArchiveEventsAsReply = "EeSv1.ArchiveEventsAsReply"
)
// ActionProfile APIs