diff --git a/config/configsanity.go b/config/configsanity.go
index 2a99a2f50..32dbe5f69 100644
--- a/config/configsanity.go
+++ b/config/configsanity.go
@@ -446,7 +446,7 @@ func (cfg *CGRConfig) checkConfigSanity() error {
case utils.MetaFileCSV:
for _, dir := range []string{exp.ExportPath} {
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
- return fmt.Errorf("<%s> nonexistent folder: %s for reader with ID: %s", utils.EEs, dir, exp.ID)
+ return fmt.Errorf("<%s> nonexistent folder: %s for exporter with ID: %s", utils.EEs, dir, exp.ID)
}
}
if exp.FieldSep == utils.EmptyString {
diff --git a/config/eescfg.go b/config/eescfg.go
index d13285768..72f4fc84c 100644
--- a/config/eescfg.go
+++ b/config/eescfg.go
@@ -132,9 +132,10 @@ type EventExporterCfg struct {
Synchronous bool
Attempts int
FieldSep string
- HeaderFields []*FCTemplate
- ContentFields []*FCTemplate
- TrailerFields []*FCTemplate
+ Fields []*FCTemplate
+ headerFields []*FCTemplate
+ contentFields []*FCTemplate
+ trailerFields []*FCTemplate
}
func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, separator string) (err error) {
@@ -188,20 +189,20 @@ func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, separ
eeC.FieldSep = *jsnEec.Field_separator
}
if jsnEec.Fields != nil {
- eeC.HeaderFields = make([]*FCTemplate, 0)
- eeC.ContentFields = make([]*FCTemplate, 0)
- eeC.TrailerFields = make([]*FCTemplate, 0)
- if fields, err := FCTemplatesFromFCTemplatesJsonCfg(*jsnEec.Fields, separator); err != nil {
+ eeC.headerFields = make([]*FCTemplate, 0)
+ eeC.contentFields = make([]*FCTemplate, 0)
+ eeC.trailerFields = make([]*FCTemplate, 0)
+ if eeC.Fields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnEec.Fields, separator); err != nil {
return err
} else {
- for _, field := range fields {
+ for _, field := range eeC.Fields {
switch field.GetPathSlice()[0] {
case utils.MetaHdr:
- eeC.HeaderFields = append(eeC.HeaderFields, field)
+ eeC.headerFields = append(eeC.headerFields, field)
case utils.MetaExp:
- eeC.ContentFields = append(eeC.ContentFields, field)
+ eeC.contentFields = append(eeC.contentFields, field)
case utils.MetaTrl:
- eeC.TrailerFields = append(eeC.TrailerFields, field)
+ eeC.trailerFields = append(eeC.trailerFields, field)
}
}
}
@@ -209,6 +210,18 @@ func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, separ
return
}
+func (eeC *EventExporterCfg) HeaderFields() []*FCTemplate {
+ return eeC.headerFields
+}
+
+func (eeC *EventExporterCfg) ContentFields() []*FCTemplate {
+ return eeC.contentFields
+}
+
+func (eeC *EventExporterCfg) TrailerFields() []*FCTemplate {
+ return eeC.trailerFields
+}
+
func (eeC *EventExporterCfg) Clone() (cln *EventExporterCfg) {
cln = new(EventExporterCfg)
cln.ID = eeC.ID
@@ -239,17 +252,17 @@ func (eeC *EventExporterCfg) Clone() (cln *EventExporterCfg) {
cln.Synchronous = eeC.Synchronous
cln.Attempts = eeC.Attempts
cln.FieldSep = eeC.FieldSep
- cln.HeaderFields = make([]*FCTemplate, len(eeC.HeaderFields))
- for idx, fld := range eeC.HeaderFields {
- cln.HeaderFields[idx] = fld.Clone()
+ cln.headerFields = make([]*FCTemplate, len(eeC.headerFields))
+ for idx, fld := range eeC.headerFields {
+ cln.headerFields[idx] = fld.Clone()
}
- cln.ContentFields = make([]*FCTemplate, len(eeC.ContentFields))
- for idx, fld := range eeC.ContentFields {
- cln.ContentFields[idx] = fld.Clone()
+ cln.contentFields = make([]*FCTemplate, len(eeC.contentFields))
+ for idx, fld := range eeC.contentFields {
+ cln.contentFields[idx] = fld.Clone()
}
- cln.TrailerFields = make([]*FCTemplate, len(eeC.TrailerFields))
- for idx, fld := range eeC.TrailerFields {
- cln.TrailerFields[idx] = fld.Clone()
+ cln.trailerFields = make([]*FCTemplate, len(eeC.trailerFields))
+ for idx, fld := range eeC.trailerFields {
+ cln.trailerFields[idx] = fld.Clone()
}
return
}
@@ -272,14 +285,8 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) map[string]interfa
}
flags[key] = buf
}
- fields := make([]map[string]interface{}, 0, len(eeC.HeaderFields)+len(eeC.ContentFields)+len(eeC.TrailerFields))
- for _, fld := range eeC.HeaderFields {
- fields = append(fields, fld.AsMapInterface(separator))
- }
- for _, fld := range eeC.ContentFields {
- fields = append(fields, fld.AsMapInterface(separator))
- }
- for _, fld := range eeC.TrailerFields {
+ fields := make([]map[string]interface{}, 0, len(eeC.Fields))
+ for _, fld := range eeC.Fields {
fields = append(fields, fld.AsMapInterface(separator))
}
diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json
index 63676ed2c..436296e3f 100644
--- a/data/conf/samples/ees/cgrates.json
+++ b/data/conf/samples/ees/cgrates.json
@@ -3,7 +3,50 @@
//
// Copyright (C) ITsysCOM GmbH
+"general": {
+ "log_level": 7,
+},
+"listen": {
+ "rpc_json": ":2012",
+ "rpc_gob": ":2013",
+ "http": ":2080",
+},
+
+"data_db": {
+ "db_type": "redis",
+ "db_port": 6379,
+ "db_name": "10",
+},
+
+
+"stor_db": {
+ "db_password": "CGRateS.org",
+},
+
+
+"rals": {
+ "enabled": true,
+},
+
+
+"schedulers": {
+ "enabled": true,
+},
+
+
+"cdrs": {
+ "enabled": true,
+ "chargers_conns": ["*localhost"],
+ "rals_conns": ["*internal"],
+ "session_cost_retries": 0,
+},
+
+
+"chargers": {
+ "enabled": true,
+ "attributes_conns": ["*internal"],
+},
"attributes": {
@@ -15,12 +58,43 @@
"enabled": true,
"attributes_conns":["*internal"],
"cache": {
- "*file_csv": {"limit": -1, "ttl": "5s", "static_ttl": false},
+ "*file_csv": {"limit": -1, "ttl": "1s", "static_ttl": false},
},
"exporters": [
- ],
+ {
+ "id": "CSVExporter",
+ "type": "*file_csv",
+ "export_path": "/tmp/testExport",
+ "tenant": "cgrates.org",
+ "flags": ["*attributes"],
+ "attribute_context": "customContext",
+ "attempts": 1,
+ "field_separator": ",",
+ "fields":[
+ {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
+ {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
+ {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"},
+ {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"},
+ {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"},
+ {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"},
+ {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"},
+ {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
+ {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"},
+ {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"},
+ {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
+ {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
+ {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"},
+ {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost", "rounding_decimals": 4},
+ ],
+ },
+ ]
},
+"apiers": {
+ "enabled": true,
+ "scheduler_conns": ["*internal"],
+},
+
}
diff --git a/ees/ees.go b/ees/ees.go
index e4bbb18ed..1ebcde7db 100644
--- a/ees/ees.go
+++ b/ees/ees.go
@@ -36,13 +36,15 @@ func onCacheEvicted(itmID string, value interface{}) {
// NewERService instantiates the EventExporterS
func NewEventExporterS(cfg *config.CGRConfig, filterS *engine.FilterS,
- connMgr *engine.ConnManager) *EventExporterS {
- return &EventExporterS{
+ connMgr *engine.ConnManager) (eeS *EventExporterS) {
+ eeS = &EventExporterS{
cfg: cfg,
filterS: filterS,
connMgr: connMgr,
eesChs: make(map[string]*ltcache.Cache),
}
+ eeS.setupCache(cfg.EEsNoLksCfg().Cache)
+ return
}
// EventExporterS is managing the EventExporters
@@ -133,6 +135,9 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s
var wg sync.WaitGroup
var withErr bool
for cfgIdx, eeCfg := range eeS.cfg.EEsNoLksCfg().Exporters {
+ if eeCfg.Type == utils.META_NONE { // ignore *default exporter
+ continue
+ }
if len(eeCfg.Filters) != 0 {
cgrDp := config.NewNavigableMap(map[string]interface{}{
@@ -166,7 +171,11 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s
var isCached bool
var ee EventExporter
if hasCache {
- if x, isCached := eeCache.Get(eeCfg.ID); isCached {
+ var x interface{}
+ //fmt.Println("Try to get exporter from cache ")
+ //fmt.Println(eeCfg.ID)
+ if x, isCached = eeCache.Get(eeCfg.ID); isCached {
+ //fmt.Println("Get FROM CACHE")
ee = x.(EventExporter)
}
}
@@ -200,6 +209,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s
if withErr {
err = utils.ErrPartiallyExecuted
}
+ *rply = utils.OK
return
}
diff --git a/ees/filecsv.go b/ees/filecsv.go
index fdc2e7a50..39d9495f4 100644
--- a/ees/filecsv.go
+++ b/ees/filecsv.go
@@ -22,7 +22,9 @@ import (
"encoding/csv"
"fmt"
"os"
+ "path"
"strconv"
+ "sync"
"time"
"github.com/cgrates/cgrates/engine"
@@ -32,7 +34,8 @@ import (
)
func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (fCsv *FileCSVee, err error) {
- fCsv = &FileCSVee{cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS}
+ fCsv = &FileCSVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
+ cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS}
err = fCsv.init()
return
}
@@ -45,6 +48,7 @@ type FileCSVee struct {
filterS *engine.FilterS
file *os.File
csvWriter *csv.Writer
+ sync.RWMutex
firstEventATime, lastEventATime time.Time
numberOfEvents int
@@ -58,7 +62,9 @@ type FileCSVee struct {
// init will create all the necessary dependencies, including opening the file
func (fCsv *FileCSVee) init() (err error) {
- if fCsv.file, err = os.Create(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ExportPath); err != nil {
+ // create the file
+ if fCsv.file, err = os.Create(path.Join(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ExportPath,
+ fCsv.id+utils.Underline+utils.UUIDSha1Prefix()+utils.CSVSuffix)); err != nil {
return
}
fCsv.csvWriter = csv.NewWriter(fCsv.file)
@@ -93,19 +99,23 @@ func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
- // convert cgrEvent in export record
+ fCsv.Lock()
+ defer fCsv.Unlock()
fCsv.numberOfEvents++
var csvRecord []string
navMp := config.NewNavigableMap(map[string]interface{}{
utils.MetaReq: cgrEv.Event,
})
- for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields {
+ for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields() {
if pass, err := fCsv.filterS.Pass(cgrEv.Tenant, cfgFld.Filters,
navMp); err != nil || !pass {
continue
}
- val, err := cfgFld.Value.ParseDataProvider(navMp, fCsv.cgrCfg.GeneralCfg().RSRSep)
+ val, err := cfgFld.Value.ParseDataProvider(navMp, utils.NestingSep)
if err != nil {
+ if err == utils.ErrNotFound {
+ err = utils.ErrPrefix(err, cfgFld.Value.GetRule())
+ }
fCsv.negativeExports.Add(cgrEv.ID)
return err
}
@@ -153,13 +163,16 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
// Compose and cache the header
func (fCsv *FileCSVee) composeHeader() (err error) {
- if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields) == 0 {
+ if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields()) == 0 {
return
}
var csvRecord []string
- for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields {
+ for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields() {
val, err := cfgFld.Value.ParseValue(utils.EmptyString)
if err != nil {
+ if err == utils.ErrNotFound {
+ err = utils.ErrPrefix(err, cfgFld.Value.GetRule())
+ }
return err
}
csvRecord = append(csvRecord, val)
@@ -169,11 +182,11 @@ func (fCsv *FileCSVee) composeHeader() (err error) {
// Compose and cache the trailer
func (fCsv *FileCSVee) composeTrailer() (err error) {
- if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields) == 0 {
+ if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields()) == 0 {
return
}
var csvRecord []string
- for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields {
+ for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields() {
switch cfgFld.Type {
case utils.MetaExportID:
csvRecord = append(csvRecord, fCsv.id)
diff --git a/ees/filecsv_it_test.go b/ees/filecsv_it_test.go
new file mode 100644
index 000000000..c30edb963
--- /dev/null
+++ b/ees/filecsv_it_test.go
@@ -0,0 +1,226 @@
+// +build integration
+
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package ees
+
+import (
+ "net/rpc"
+ "os"
+ "path"
+ "testing"
+ "time"
+
+ "github.com/cgrates/cgrates/utils"
+
+ "github.com/cgrates/cgrates/engine"
+
+ "github.com/cgrates/cgrates/config"
+)
+
+var (
+ csvConfigDir string
+ csvCfgPath string
+ csvCfg *config.CGRConfig
+ csvRpc *rpc.Client
+
+ sTestsCsv = []func(t *testing.T){
+ testCsvCreateDirectory,
+ testCsvLoadConfig,
+ testCsvResetDataDB,
+ testCsvResetStorDb,
+ testCsvStartEngine,
+ testCsvRPCConn,
+ testCsvExportEvent,
+ testCsvStopCgrEngine,
+ //testCsvCleanDirectory,
+ }
+)
+
+func TestCsvExport(t *testing.T) {
+ csvConfigDir = "ees"
+ for _, stest := range sTestsCsv {
+ t.Run(csvConfigDir, stest)
+ }
+}
+
+func testCsvCreateDirectory(t *testing.T) {
+ for _, dir := range []string{"/tmp/testExport"} {
+ if err := os.RemoveAll(dir); err != nil {
+ t.Fatal("Error removing folder: ", dir, err)
+ }
+ if err := os.MkdirAll(dir, os.ModePerm); err != nil {
+ t.Fatal("Error creating folder: ", dir, err)
+ }
+ }
+}
+
+func testCsvLoadConfig(t *testing.T) {
+ var err error
+ csvCfgPath = path.Join(*dataDir, "conf", "samples", csvConfigDir)
+ if csvCfg, err = config.NewCGRConfigFromPath(csvCfgPath); err != nil {
+ t.Error(err)
+ }
+}
+
+func testCsvResetDataDB(t *testing.T) {
+ if err := engine.InitDataDb(csvCfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testCsvResetStorDb(t *testing.T) {
+ if err := engine.InitStorDb(csvCfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testCsvStartEngine(t *testing.T) {
+ if _, err := engine.StopStartEngine(csvCfgPath, *waitRater); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testCsvRPCConn(t *testing.T) {
+ var err error
+ csvRpc, err = newRPCClient(csvCfg.ListenCfg())
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testCsvExportEvent(t *testing.T) {
+ eventVoice := &utils.CGREventWithOpts{
+ CGREventWithArgDispatcher: &utils.CGREventWithArgDispatcher{
+ CGREvent: &utils.CGREvent{
+ Tenant: "cgrates.org",
+ ID: "voiceEvent",
+ Time: utils.TimePointer(time.Now()),
+ Event: map[string]interface{}{
+ utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()),
+ utils.ToR: utils.VOICE,
+ utils.OriginID: "dsafdsaf",
+ utils.OriginHost: "192.168.1.1",
+ utils.RequestType: utils.META_RATED,
+ utils.Tenant: "cgrates.org",
+ utils.Category: "call",
+ utils.Account: "1001",
+ utils.Subject: "1001",
+ utils.Destination: "1002",
+ utils.SetupTime: time.Unix(1383813745, 0).UTC(),
+ utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
+ utils.Usage: time.Duration(10) * time.Second,
+ utils.RunID: utils.MetaDefault,
+ utils.Cost: 1.01,
+ "ExtraFields": map[string]string{"extra1": "val_extra1",
+ "extra2": "val_extra2", "extra3": "val_extra3"},
+ },
+ },
+ },
+ }
+
+ eventData := &utils.CGREventWithOpts{
+ CGREventWithArgDispatcher: &utils.CGREventWithArgDispatcher{
+ CGREvent: &utils.CGREvent{
+ Tenant: "cgrates.org",
+ ID: "dataEvent",
+ Time: utils.TimePointer(time.Now()),
+ Event: map[string]interface{}{
+ utils.CGRID: utils.Sha1("abcdef", time.Unix(1383813745, 0).UTC().String()),
+ utils.ToR: utils.DATA,
+ utils.OriginID: "abcdef",
+ utils.OriginHost: "192.168.1.1",
+ utils.RequestType: utils.META_RATED,
+ utils.Tenant: "AnotherTenant",
+ utils.Category: "call", //for data CDR use different Tenant
+ utils.Account: "1001",
+ utils.Subject: "1001",
+ utils.Destination: "1002",
+ utils.SetupTime: time.Unix(1383813745, 0).UTC(),
+ utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
+ utils.Usage: time.Duration(10) * time.Nanosecond,
+ utils.RunID: utils.MetaDefault,
+ utils.Cost: 0.012,
+ "ExtraFields": map[string]string{"extra1": "val_extra1",
+ "extra2": "val_extra2", "extra3": "val_extra3"},
+ },
+ },
+ },
+ }
+
+ eventSMS := &utils.CGREventWithOpts{
+ CGREventWithArgDispatcher: &utils.CGREventWithArgDispatcher{
+ CGREvent: &utils.CGREvent{
+ Tenant: "cgrates.org",
+ ID: "SMSEvent",
+ Time: utils.TimePointer(time.Now()),
+ Event: map[string]interface{}{
+ utils.CGRID: utils.Sha1("sdfwer", time.Unix(1383813745, 0).UTC().String()),
+ utils.ToR: utils.SMS,
+ utils.OriginID: "sdfwer",
+ utils.OriginHost: "192.168.1.1",
+ utils.RequestType: utils.META_RATED,
+ utils.Tenant: "cgrates.org",
+ utils.Category: "call",
+ utils.Account: "1001",
+ utils.Subject: "1001",
+ utils.Destination: "1002",
+ utils.SetupTime: time.Unix(1383813745, 0).UTC(),
+ utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
+ utils.Usage: time.Duration(1),
+ utils.RunID: utils.MetaDefault,
+ utils.Cost: 0.15,
+ "ExtraFields": map[string]string{"extra1": "val_extra1",
+ "extra2": "val_extra2", "extra3": "val_extra3"},
+ },
+ },
+ },
+ }
+ var reply string
+ if err := csvRpc.Call(utils.EventExporterSv1ProcessEvent, eventVoice, &reply); err != nil {
+ t.Error(err)
+ } else if reply != utils.OK {
+ t.Errorf("Expected %+v, received: %+v", utils.OK, reply)
+ }
+ if err := csvRpc.Call(utils.EventExporterSv1ProcessEvent, eventData, &reply); err != nil {
+ t.Error(err)
+ } else if reply != utils.OK {
+ t.Errorf("Expected %+v, received: %+v", utils.OK, reply)
+ }
+ if err := csvRpc.Call(utils.EventExporterSv1ProcessEvent, eventSMS, &reply); err != nil {
+ t.Error(err)
+ } else if reply != utils.OK {
+ t.Errorf("Expected %+v, received: %+v", utils.OK, reply)
+ }
+ time.Sleep(1 * time.Second)
+}
+
+func testCsvStopCgrEngine(t *testing.T) {
+ if err := engine.KillEngine(100); err != nil {
+ t.Error(err)
+ }
+}
+
+func testCsvCleanDirectory(t *testing.T) {
+ for _, dir := range []string{"/tmp/testExport"} {
+ if err := os.RemoveAll(dir); err != nil {
+ t.Fatal("Error removing folder: ", dir, err)
+ }
+ }
+}
diff --git a/ees/lib_test.go b/ees/lib_test.go
new file mode 100644
index 000000000..fe8d2730e
--- /dev/null
+++ b/ees/lib_test.go
@@ -0,0 +1,46 @@
+/*
+Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package ees
+
+import (
+ "errors"
+ "flag"
+ "net/rpc"
+ "net/rpc/jsonrpc"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/utils"
+)
+
+var (
+ dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
+ waitRater = flag.Int("wait_rater", 100, "Number of milliseconds to wait for rater to start and cache")
+ encoding = flag.String("rpc", utils.MetaJSON, "what encoding would be used for rpc communication")
+)
+
+func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) {
+ switch *encoding {
+ case utils.MetaJSON:
+ return jsonrpc.Dial(utils.TCP, cfg.RPCJSONListen)
+ case utils.MetaGOB:
+ return rpc.Dial(utils.TCP, cfg.RPCGOBListen)
+ default:
+ return nil, errors.New("UNSUPPORTED_RPC")
+ }
+}
diff --git a/ers/ers.go b/ers/ers.go
index 28f55b0ce..d0397651b 100644
--- a/ers/ers.go
+++ b/ers/ers.go
@@ -68,6 +68,9 @@ type ERService struct {
// ListenAndServe keeps the service alive
func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) {
for cfgIdx, rdrCfg := range erS.cfg.ERsCfg().Readers {
+ if rdrCfg.Type == utils.META_NONE { // ignore *default reader
+ continue
+ }
if err = erS.addReader(rdrCfg.ID, cfgIdx); err != nil {
utils.Logger.Crit(
fmt.Sprintf("<%s> adding reader <%s> got error: <%s>",
diff --git a/ers/reader.go b/ers/reader.go
index c65ccc813..b29c97d61 100644
--- a/ers/reader.go
+++ b/ers/reader.go
@@ -54,30 +54,6 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int,
return NewFlatstoreER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit)
case utils.MetaJSON:
return NewJSONFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit)
- case utils.META_NONE:
- return NewNoneER(cfg, cfgIdx), nil
}
return
}
-
-// NewNoneER return a disabled EventReader
-func NewNoneER(cfg *config.CGRConfig, cfgIdx int) EventReader {
- return &noneEventReader{
- cfg: cfg,
- cfgIdx: cfgIdx,
- }
-}
-
-// noneEventReader a reader that does nothing
-type noneEventReader struct {
- cfg *config.CGRConfig
- cfgIdx int
-}
-
-// Config returns the reader config
-func (rdr *noneEventReader) Config() *config.EventReaderCfg {
- return rdr.cfg.ERsCfg().Readers[rdr.cfgIdx]
-}
-
-// Serve used to implement EventReader interface
-func (noneEventReader) Serve() error { return nil }
diff --git a/general_tests/libtest.go b/general_tests/libtest.go
index ce39d19fe..d8944e709 100644
--- a/general_tests/libtest.go
+++ b/general_tests/libtest.go
@@ -29,8 +29,8 @@ import (
var (
dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
- waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache")
- encoding = flag.String("rpc", utils.MetaJSON, "what encoding whould be uused for rpc comunication")
+ waitRater = flag.Int("wait_rater", 100, "Number of milliseconds to wait for rater to start and cache")
+ encoding = flag.String("rpc", utils.MetaJSON, "what encoding would be used for rpc communication")
dbType = flag.String("dbtype", utils.MetaInternal, "The type of DataBase (Internal/Mongo/mySql)")
err error
)
diff --git a/utils/consts.go b/utils/consts.go
index 7a24c456a..272b32569 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -740,6 +740,7 @@ const (
MetaDaily = "*daily"
MetaWeekly = "*weekly"
RateS = "RateS"
+ Underline = "_"
)
// Migrator Action
@@ -1490,8 +1491,9 @@ const (
// EEs
const (
- EventExporterSv1 = "EventExporterSv1"
- EventExporterSv1Ping = "EventExporterSv1.Ping"
+ EventExporterSv1 = "EventExporterSv1"
+ EventExporterSv1Ping = "EventExporterSv1.Ping"
+ EventExporterSv1ProcessEvent = "EventExporterSv1.ProcessEvent"
)
//cgr_ variables