Added new api for exporter -ArchiveEventsAsReply + test

This commit is contained in:
porosnicuadrian
2021-11-26 17:46:40 +02:00
committed by Dan Christian Bogos
parent 9f63cf2b01
commit 2e6e553f3f
15 changed files with 319 additions and 49 deletions

View File

@@ -36,3 +36,7 @@ type EeSv1 struct {
func (cS *EeSv1) ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEeIDs, rply *map[string]map[string]interface{}) error {
return cS.ees.V1ProcessEvent(ctx, cgrEv, rply)
}
func (cS *EeSv1) ArchiveEventsAsReply(ctx *context.Context, args *ees.ArchiveEventsArgs, reply *[]byte) error {
return cS.ees.V1ArchiveEventsAsReply(ctx, args, reply)
}

View File

@@ -767,6 +767,9 @@ func (cfg *CGRConfig) checkConfigSanity() error {
switch exp.Type {
case utils.MetaFileCSV:
for _, dir := range []string{exp.ExportPath} {
if dir == utils.MetaBuffer {
break
}
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
return fmt.Errorf("<%s> nonexistent folder: %s for exporter with ID: %s", utils.EEs, dir, exp.ID)
}
@@ -776,6 +779,9 @@ func (cfg *CGRConfig) checkConfigSanity() error {
}
case utils.MetaFileFWV:
for _, dir := range []string{exp.ExportPath} {
if dir == utils.MetaBuffer {
break
}
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
return fmt.Errorf("<%s> nonexistent folder: %s for exporter with ID: %s", utils.EEs, dir, exp.ID)
}

View File

@@ -70,7 +70,7 @@
"attribute_context": "customContext",
"attempts": 1,
"field_separator": ",",
},
},
{
"id": "CSVExporterComposed",
"type": "*fileCSV",
@@ -119,6 +119,53 @@
{"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}"},
],
},
{
"id": "CSVExporterBuffered",
"type": "*fileCSV",
"export_path": "*buffer",
"attempts": 1,
"field_separator": ",",
"synchronous": true,
"fields":[
{"tag": "Number", "path": "*hdr.Number", "type": "*constant", "value": "NumberOfEvent"},
{"tag": "CGRID", "path": "*hdr.CGRID", "type": "*constant", "value": "CGRID"},
{"tag": "RunID", "path": "*hdr.RunID", "type": "*constant", "value": "RunID"},
{"tag": "ToR", "path": "*hdr.ToR", "type": "*constant", "value": "ToR"},
{"tag": "OriginID", "path": "*hdr.OriginID", "type": "*constant", "value": "OriginID"},
{"tag": "RequestType", "path": "*hdr.RequestType", "type": "*constant", "value": "RequestType"},
{"tag": "Tenant", "path": "*hdr.Tenant", "type": "*constant", "value": "Tenant"},
{"tag": "Category", "path": "*hdr.Category", "type": "*constant", "value": "Category"},
{"tag": "Account", "path": "*hdr.Account", "type": "*constant", "value": "Account"},
{"tag": "Subject", "path": "*hdr.Subject", "type": "*constant", "value": "Subject"},
{"tag": "Destination", "path": "*hdr.Destination", "type": "*constant", "value": "Destination"},
{"tag": "SetupTime", "path": "*hdr.SetupTime", "type": "*constant", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag": "AnswerTime", "path": "*hdr.AnswerTime", "type": "*constant", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag": "Usage", "path": "*hdr.Usage", "type": "*constant", "value": "Usage"},
{"tag": "Cost", "path": "*hdr.Cost", "type": "*constant", "value": "Cost"},
{"tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*dc.NumberOfEvents"},
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
{"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
{"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"},
{"tag": "OriginID1", "path": "*exp.OriginID", "type": "*composed", "value": "~*req.ComposedOriginID1"},
{"tag": "OriginID2", "path": "*exp.OriginID", "type": "*composed", "value": "~*req.ComposedOriginID2"},
{"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"},
{"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"},
{"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"},
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
{"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"},
{"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"},
{"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"},
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"},
{"tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*dc.NumberOfEvents"},
{"tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*dc.TotalDuration"},
{"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*dc.TotalSMSUsage"},
{"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}"},
],
},
{
"id": "FwvExporter",
"type": "*fileFWV",

View File

@@ -38,7 +38,6 @@
"accounts_conns": ["*localhost"]
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],

View File

@@ -51,9 +51,9 @@ func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig,
}
switch cfg.Type {
case utils.MetaFileCSV:
return NewFileCSVee(cfg, cgrCfg, filterS, dc)
return NewFileCSVee(cfg, cgrCfg, filterS, dc, nil)
case utils.MetaFileFWV:
return NewFileFWVee(cfg, cgrCfg, filterS, dc)
return NewFileFWVee(cfg, cgrCfg, filterS, dc, nil)
case utils.MetaHTTPPost:
return NewHTTPPostEE(cfg, cgrCfg, filterS, dc)
case utils.MetaHTTPjsonMap:
@@ -241,6 +241,7 @@ func (slicePreparing) PrepareMap(mp *utils.CGREvent) (interface{}, error) {
}
return csvRecord, nil
}
func (slicePreparing) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) {
return mp.OrderedFieldsAsStrings(), nil
}

View File

@@ -19,6 +19,7 @@ package ees
import (
"fmt"
"io"
"reflect"
"strings"
"testing"
@@ -45,11 +46,11 @@ func TestNewEventExporter(t *testing.T) {
if err != nil {
t.Error(err)
}
eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc)
eeExpect, err := NewFileCSVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc, nil)
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
}
err = eeExpect.init()
err = eeExpect.init(nil)
if err == nil {
t.Error("\nExpected an error")
}
@@ -82,11 +83,11 @@ func TestNewEventExporterCase2(t *testing.T) {
if err != nil {
t.Error(err)
}
eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc)
eeExpect, err := NewFileFWVee(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, dc, io.Discard)
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
}
err = eeExpect.init()
err = eeExpect.init(io.Discard)
if err == nil {
t.Error("\nExpected an error")
}

View File

@@ -19,7 +19,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ees
import (
"archive/zip"
"bytes"
"fmt"
"io"
"sync"
"time"
@@ -319,3 +322,79 @@ func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}
}
return
}
type ArchiveEventsArgs struct {
Tenant string
APIOpts map[string]interface{}
Events []*utils.CGREvent
}
// V1ArchiveEventsAsReply should archive the events sent with existing exporters. The zipped content should be returned back as a reply.
func (eeS *EeS) V1ArchiveEventsAsReply(ctx *context.Context, args *ArchiveEventsArgs, reply *[]byte) (err error) {
if args.Tenant == utils.EmptyString {
args.Tenant = eeS.cfg.GeneralCfg().DefaultTenant
}
expID, has := args.APIOpts[utils.MetaExporterID]
if !has {
return fmt.Errorf("ExporterID is missing from argument's options: <%v>", utils.ToJSON(args))
}
//var validExporter bool
var eesCfg *config.EventExporterCfg
for _, exporter := range eeS.cfg.EEsCfg().Exporters {
if exporter.ID == expID {
eesCfg = exporter
break
}
}
if eesCfg == nil {
return fmt.Errorf("exporter config with ID: <%v> is missing", expID)
}
if !eesCfg.Synchronous {
return fmt.Errorf("exporter with ID: <%v> is not synchronous", expID)
}
if eesCfg.ExportPath != utils.MetaBuffer {
return fmt.Errorf("exporter with ID: <%v> has an invalid ExportPath for archiving", expID)
}
var dc *utils.SafeMapStorage
if dc, err = newEEMetrics(utils.FirstNonEmpty(
eesCfg.Timezone,
eeS.cfg.GeneralCfg().DefaultTimezone)); err != nil {
return
}
var ee EventExporter
buff := new(bytes.Buffer)
zBuff := zip.NewWriter(buff)
//
var wrtr io.Writer
if wrtr, err = zBuff.Create("events.csv"); err != nil {
return err
}
switch eesCfg.Type {
case utils.MetaFileCSV:
ee, err = NewFileCSVee(eesCfg, eeS.cfg, eeS.fltrS, dc, &buffer{buff})
case utils.MetaFileFWV:
ee, err = NewFileFWVee(eesCfg, eeS.cfg, eeS.fltrS, dc, wrtr)
default:
err = fmt.Errorf("unsupported exporter type: <%s>", eesCfg.Type)
}
if err != nil {
return err
}
for _, event := range args.Events {
if err := exportEventWithExporter(ctx, ee, event, false, eeS.cfg, eeS.fltrS); err != nil {
return err
}
}
if err = ee.Close(); err != nil {
return err
}
*reply = buff.Bytes()
if err = zBuff.Close(); err != nil {
return err
}
buff.Reset()
return
}

View File

@@ -35,15 +35,15 @@ import (
func NewFileCSVee(cfg *config.EventExporterCfg,
cgrCfg *config.CGRConfig, filterS *engine.FilterS,
dc *utils.SafeMapStorage) (fCsv *FileCSVee, err error) {
dc *utils.SafeMapStorage, wrtr io.WriteCloser) (fCsv *FileCSVee, err error) {
fCsv = &FileCSVee{
cfg: cfg,
dc: dc,
//wrtr: wrtr,
cgrCfg: cgrCfg,
filterS: filterS,
}
err = fCsv.init()
err = fCsv.init(wrtr)
return
}
@@ -51,7 +51,7 @@ func NewFileCSVee(cfg *config.EventExporterCfg,
type FileCSVee struct {
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
file io.WriteCloser
wrtr io.WriteCloser // writer for the csv
csvWriter *csv.Writer
sync.Mutex
slicePreparing
@@ -60,7 +60,7 @@ type FileCSVee struct {
filterS *engine.FilterS
}
func (fCsv *FileCSVee) init() (err error) {
func (fCsv *FileCSVee) init(wrtr io.WriteCloser) (err error) {
fCsv.Lock()
defer fCsv.Unlock()
// create the file
@@ -69,10 +69,12 @@ func (fCsv *FileCSVee) init() (err error) {
fCsv.dc.Lock()
fCsv.dc.MapStorage[utils.ExportPath] = filePath
fCsv.dc.Unlock()
if fCsv.file, err = os.Create(filePath); err != nil {
if fCsv.cfg.ExportPath == utils.MetaBuffer {
fCsv.wrtr = wrtr
} else if fCsv.wrtr, err = os.Create(filePath); err != nil {
return
}
fCsv.csvWriter = csv.NewWriter(fCsv.file)
fCsv.csvWriter = csv.NewWriter(fCsv.wrtr)
fCsv.csvWriter.Comma = utils.CSVSep
if fCsv.Cfg().Opts.CSVFieldSeparator != nil {
fCsv.csvWriter.Comma = rune((*fCsv.Cfg().Opts.CSVFieldSeparator)[0])
@@ -123,7 +125,7 @@ func (fCsv *FileCSVee) Close() (err error) {
utils.EEs, fCsv.Cfg().ID, err.Error()))
}
fCsv.csvWriter.Flush()
if err = fCsv.file.Close(); err != nil {
if err = fCsv.wrtr.Close(); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file",
utils.EEs, fCsv.Cfg().ID, err.Error()))
}
@@ -131,3 +133,12 @@ func (fCsv *FileCSVee) Close() (err error) {
}
func (fCsv *FileCSVee) GetMetrics() *utils.SafeMapStorage { return fCsv.dc }
// Buffers cannot be closed, they just Reset. We implement our struct and used it for writer field in FileCSVee to be available for WriterCloser interface
type buffer struct {
io.Writer
}
func (buf *buffer) Close() (err error) {
return
}

View File

@@ -55,6 +55,7 @@ var (
testCsvVerifyExports,
testCsvExportComposedEvent,
testCsvVerifyComposedExports,
testCsvExportBufferedEvent,
testCsvExportEventWithInflateTemplate,
testCsvVerifyExportsWithInflateTemplate,
testCsvExportNotFoundExporter,
@@ -311,6 +312,121 @@ func testCsvVerifyComposedExports(t *testing.T) {
}
}
func testCsvExportBufferedEvent(t *testing.T) {
eventVoice := &ArchiveEventsArgs{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{
utils.MetaExporterID: "CSVExporterBuffered",
},
Events: []*utils.CGREvent{
{
Tenant: "cgrates.org",
ID: "voiceEvent",
Event: map[string]interface{}{
utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()),
utils.ToR: utils.MetaVoice,
"ComposedOriginID1": "dsaf",
"ComposedOriginID2": "dsaf",
utils.OriginHost: "192.168.1.1",
utils.RequestType: utils.MetaRated,
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.AccountField: "1001",
utils.Subject: "1001",
utils.Destination: "1002",
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
utils.Usage: 10 * time.Second,
utils.RunID: utils.MetaDefault,
utils.Cost: 1.016374,
"ExtraFields": map[string]string{"extra1": "val_extra1",
"extra2": "val_extra2", "extra3": "val_extra3"},
},
},
{
Tenant: "cgrates.org",
ID: "dataEvent",
Event: map[string]interface{}{
utils.CGRID: utils.Sha1("abcdef", time.Unix(1383813745, 0).UTC().String()),
utils.ToR: utils.MetaData,
utils.OriginID: "abcdef",
utils.OriginHost: "192.168.1.1",
utils.RequestType: utils.MetaRated,
utils.Tenant: "AnotherTenant",
utils.Category: "call", //for data CDR use different Tenant
utils.AccountField: "1001",
utils.Subject: "1001",
utils.Destination: "1002",
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
utils.Usage: 10 * time.Nanosecond,
utils.RunID: utils.MetaDefault,
utils.Cost: 0.012,
"ExtraFields": map[string]string{"extra1": "val_extra1",
"extra2": "val_extra2", "extra3": "val_extra3"},
},
},
{
Tenant: "cgrates.org",
ID: "smsEvent",
Event: map[string]interface{}{
utils.CGRID: utils.Sha1("nlllo", time.Unix(1383813745, 0).UTC().String()),
utils.ToR: utils.MetaData,
utils.OriginID: "abcdef",
utils.RequestType: utils.MetaNone,
utils.Tenant: "phone.org",
utils.Category: "sms", //for data CDR use different Tenant
utils.AccountField: "User2001",
utils.Subject: "User2001",
utils.Destination: "User2002",
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
utils.Usage: 10 * time.Nanosecond,
utils.RunID: "raw",
utils.Cost: 44.5,
"ExtraFields": map[string]string{"extra1": "val_extra1",
"extra2": "val_extra2", "extra3": "val_extra3"},
},
},
{
Tenant: "cgrates.org",
ID: "photoEvent",
Event: map[string]interface{}{
utils.CGRID: utils.Sha1("qwert", time.Unix(1383813745, 0).UTC().String()),
utils.OriginID: "abcdef",
utils.OriginHost: "127.0.0.1",
utils.RequestType: utils.MetaPrepaid,
utils.Tenant: "dispatchers.org",
utils.Category: "photo", //for data CDR use different Tenant
utils.AccountField: "1005",
utils.Subject: "1005",
utils.Destination: "1000",
utils.SetupTime: time.Unix(22383813745, 0).UTC(),
utils.AnswerTime: time.Unix(22383813760, 0).UTC(),
utils.Usage: 10 * time.Nanosecond,
utils.RunID: "Default_charging_id",
utils.Cost: 1.442234,
},
},
},
}
expected := `NumberOfEvent,CGRID,RunID,ToR,OriginID,RequestType,Tenant,Category,Account,Subject,Destination,SetupTime,AnswerTime,Usage,Cost` + "\n" +
`1,dbafe9c8614c785a65aabd116dd3959c3c56f7f6,*default,*voice,dsafdsaf,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10000000000,1.0164` + "\n" +
`2,ea1f1968cc207859672c332364fc7614c86b04c5,*default,*data,*rated,AnotherTenant,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10,0.012` + "\n" +
`3,9e0b2a4b23e0843efe522e8a611b092a16ecfba1,raw,*data,*none,phone.org,sms,User2001,User2001,User2002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10,44.5` + "\n" +
`4,cd8112998c2abb0e4a7cd3a94c74817cd5fe67d3,Default_charging_id,*prepaid,dispatchers.org,photo,1005,1005,1000,2679-04-25T22:02:25Z,2679-04-25T22:02:40Z,10,1.4422` + "\n" +
`4,10s,46.9706` + "\n"
var reply []byte
if err := csvRpc.Call(utils.EeSv1ArchiveEventsAsReply,
eventVoice, &reply); err != nil {
t.Error(err)
} else if string(reply) != expected {
t.Errorf("Expected %q \n received %q", expected, string(reply))
}
time.Sleep(time.Second)
}
func testCsvExportEventWithInflateTemplate(t *testing.T) {
eventVoice := &utils.CGREventWithEeIDs{
EeIDs: []string{"CSVExporterWIthTemplate"},
@@ -486,7 +602,7 @@ func TestCsvInitFileCSV(t *testing.T) {
cfg: cgrCfg.EEsCfg().Exporters[0],
dc: dc,
}
if err := fCsv.init(); err != nil {
if err := fCsv.init(nil); err != nil {
t.Error(err)
}
if err := os.RemoveAll("/tmp/TestInitFileCSV"); err != nil {

View File

@@ -63,7 +63,7 @@ func TestFileCsvComposeHeader(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
file: nopCloser{byteBuff},
wrtr: nopCloser{byteBuff},
csvWriter: csvNW,
dc: &utils.SafeMapStorage{},
}
@@ -126,7 +126,7 @@ func TestFileCsvComposeTrailer(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
file: nopCloser{byteBuff},
wrtr: nopCloser{byteBuff},
csvWriter: csvNW,
dc: &utils.SafeMapStorage{},
}
@@ -196,7 +196,7 @@ func TestFileCsvExportEvent(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
file: nopCloser{byteBuff},
wrtr: nopCloser{byteBuff},
csvWriter: csvNW,
dc: dc,
}
@@ -222,7 +222,7 @@ func TestFileCsvOnEvictedTrailer(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
file: nopCloserWrite{byteBuff},
wrtr: nopCloserWrite{byteBuff},
csvWriter: csvNW,
dc: &utils.SafeMapStorage{},
}
@@ -256,7 +256,7 @@ func TestFileCsvOnEvictedClose(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
file: nopCloserError{byteBuff},
wrtr: nopCloserError{byteBuff},
csvWriter: csvNW,
dc: &utils.SafeMapStorage{},
}

View File

@@ -31,7 +31,7 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.SafeMapStorage) (fFwv *FileFWVee, err error) {
func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.SafeMapStorage, writer io.Writer) (fFwv *FileFWVee, err error) {
fFwv = &FileFWVee{
cfg: cfg,
dc: dc,
@@ -39,15 +39,15 @@ func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filter
cgrCfg: cgrCfg,
filterS: filterS,
}
err = fFwv.init()
err = fFwv.init(writer)
return
}
// FileFWVee implements EventExporter interface for .fwv files
type FileFWVee struct {
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
file io.WriteCloser
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
writer io.WriteCloser
sync.Mutex
slicePreparing
@@ -57,14 +57,16 @@ type FileFWVee struct {
}
// init will create all the necessary dependencies, including opening the file
func (fFwv *FileFWVee) init() (err error) {
func (fFwv *FileFWVee) init(writer io.Writer) (err error) {
filePath := path.Join(fFwv.Cfg().ExportPath,
fFwv.Cfg().ID+utils.Underline+utils.UUIDSha1Prefix()+utils.FWVSuffix)
fFwv.dc.Lock()
fFwv.dc.MapStorage[utils.ExportPath] = filePath
fFwv.dc.Unlock()
// create the file
if fFwv.file, err = os.Create(filePath); err != nil {
if fFwv.cfg.ExportPath == utils.MetaBuffer {
fFwv.writer = &buffer{writer}
} else if fFwv.writer, err = os.Create(filePath); err != nil {
return
}
return fFwv.composeHeader()
@@ -80,11 +82,11 @@ func (fFwv *FileFWVee) composeHeader() (err error) {
return
}
for _, record := range exp.OrderedFieldsAsStrings() {
if _, err = io.WriteString(fFwv.file, record); err != nil {
if _, err = io.WriteString(fFwv.writer, record); err != nil {
return
}
}
_, err = io.WriteString(fFwv.file, "\n")
_, err = io.WriteString(fFwv.writer, "\n")
return
}
@@ -98,11 +100,11 @@ func (fFwv *FileFWVee) composeTrailer() (err error) {
return
}
for _, record := range exp.OrderedFieldsAsStrings() {
if _, err = io.WriteString(fFwv.file, record); err != nil {
if _, err = io.WriteString(fFwv.writer, record); err != nil {
return
}
}
_, err = io.WriteString(fFwv.file, "\n")
_, err = io.WriteString(fFwv.writer, "\n")
return
}
@@ -114,11 +116,11 @@ func (fFwv *FileFWVee) ExportEvent(_ *context.Context, records interface{}, _ st
fFwv.Lock() // make sure that only one event is writen in file at once
defer fFwv.Unlock()
for _, record := range records.([]string) {
if _, err = io.WriteString(fFwv.file, record); err != nil {
if _, err = io.WriteString(fFwv.writer, record); err != nil {
return
}
}
_, err = io.WriteString(fFwv.file, "\n")
_, err = io.WriteString(fFwv.writer, "\n")
return
}
@@ -130,7 +132,7 @@ func (fFwv *FileFWVee) Close() (err error) {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when composed trailer",
utils.EEs, fFwv.Cfg().ID, err.Error()))
}
if err = fFwv.file.Close(); err != nil {
if err = fFwv.writer.Close(); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file",
utils.EEs, fFwv.Cfg().ID, err.Error()))
}

View File

@@ -22,6 +22,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ees
import (
"io"
"net/rpc"
"os"
"path"
@@ -177,7 +178,7 @@ func TestFileFwvInit(t *testing.T) {
cfg: cgrCfg.EEsCfg().Exporters[0],
dc: dc,
}
if err := fFwv.init(); err != nil {
if err := fFwv.init(io.Discard); err != nil {
t.Error(err)
}
if err := os.RemoveAll("/tmp/TestInitFileCSV"); err != nil {

View File

@@ -54,7 +54,7 @@ func TestFileFwvComposeHeader(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
file: nopCloser{byteBuff},
writer: nopCloser{byteBuff},
dc: &utils.SafeMapStorage{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
@@ -116,7 +116,7 @@ func TestFileFwvComposeTrailer(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
file: nopCloser{byteBuff},
writer: nopCloser{byteBuff},
dc: &utils.SafeMapStorage{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
@@ -182,7 +182,7 @@ func TestFileFwvExportEvent(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
file: nopCloser{byteBuff},
writer: nopCloser{byteBuff},
dc: dc,
}
if err := fFwv.ExportEvent(context.Background(), []string{"value", "3"}, ""); err != nil {
@@ -218,7 +218,7 @@ func TestFileFwvExportEventWriteError(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
file: nopCloserWrite{byteBuff},
writer: nopCloserWrite{byteBuff},
dc: dc,
}
if err := fFwv.ExportEvent(context.Background(), []string{""}, ""); err == nil || err != utils.ErrNotImplemented {
@@ -236,7 +236,7 @@ func TestFileFwvComposeHeaderWriteError(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
file: nopCloserWrite{byteBuff},
writer: nopCloserWrite{byteBuff},
dc: &utils.SafeMapStorage{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
@@ -268,7 +268,7 @@ func TestFileFwvComposeTrailerWriteError(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
file: nopCloserWrite{byteBuff},
writer: nopCloserWrite{byteBuff},
dc: &utils.SafeMapStorage{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
@@ -299,7 +299,7 @@ func TestFileFwvOnEvictedTrailer(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
file: nopCloserWrite{byteBuff},
writer: nopCloserWrite{byteBuff},
dc: &utils.SafeMapStorage{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{
@@ -337,7 +337,7 @@ func TestFileFwvOnEvictedClose(t *testing.T) {
cfg: cfg.EEsCfg().Exporters[0],
cgrCfg: cfg,
filterS: filterS,
file: nopCloserError{byteBuff},
writer: nopCloserError{byteBuff},
dc: &utils.SafeMapStorage{},
}
fFwv.Cfg().Fields = []*config.FCTemplate{

View File

@@ -609,6 +609,7 @@ func TestNewAttrReloadCacheWithOptsFromMap(t *testing.T) {
mp[k] = []string{MetaAny}
}
}
exp := NewAttrReloadCacheWithOpts()
rply := NewAttrReloadCacheWithOptsFromMap(mp, "", nil)
if !reflect.DeepEqual(exp, rply) {
@@ -618,5 +619,4 @@ func TestNewAttrReloadCacheWithOptsFromMap(t *testing.T) {
if !reflect.DeepEqual(mp, rplyM) {
t.Errorf("Expected %+v \n, received %+v", ToJSON(mp), ToJSON(rplyM))
}
}

View File

@@ -565,6 +565,7 @@ const (
MetaEventCost = "*event_cost"
MetaPositiveExports = "*positive_exports"
MetaNegativeExports = "*negative_exports"
MetaBuffer = "*buffer"
MetaRoutesEventCost = "*routesEventCost"
Freeswitch = "freeswitch"
Kamailio = "kamailio"
@@ -772,6 +773,7 @@ const (
MetaCdrLog = "*cdrLog"
MetaCDR = "*cdr"
MetaExporterIDs = "*exporterIDs"
MetaExporterID = "*exporterID"
MetaAsync = "*async"
MetaUsage = "*usage"
MetaStartTime = "*startTime"
@@ -1498,9 +1500,10 @@ const (
// EEs
const (
EeSv1 = "EeSv1"
EeSv1Ping = "EeSv1.Ping"
EeSv1ProcessEvent = "EeSv1.ProcessEvent"
EeSv1 = "EeSv1"
EeSv1Ping = "EeSv1.Ping"
EeSv1ProcessEvent = "EeSv1.ProcessEvent"
EeSv1ArchiveEventsAsReply = "EeSv1.ArchiveEventsAsReply"
)
// ActionProfile APIs