mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add skel for integration test for CSVFileExporter
This commit is contained in:
committed by
Dan Christian Bogos
parent
6d40df5e13
commit
c0e141b49c
@@ -446,7 +446,7 @@ func (cfg *CGRConfig) checkConfigSanity() error {
|
||||
case utils.MetaFileCSV:
|
||||
for _, dir := range []string{exp.ExportPath} {
|
||||
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
|
||||
return fmt.Errorf("<%s> nonexistent folder: %s for reader with ID: %s", utils.EEs, dir, exp.ID)
|
||||
return fmt.Errorf("<%s> nonexistent folder: %s for exporter with ID: %s", utils.EEs, dir, exp.ID)
|
||||
}
|
||||
}
|
||||
if exp.FieldSep == utils.EmptyString {
|
||||
|
||||
@@ -132,9 +132,10 @@ type EventExporterCfg struct {
|
||||
Synchronous bool
|
||||
Attempts int
|
||||
FieldSep string
|
||||
HeaderFields []*FCTemplate
|
||||
ContentFields []*FCTemplate
|
||||
TrailerFields []*FCTemplate
|
||||
Fields []*FCTemplate
|
||||
headerFields []*FCTemplate
|
||||
contentFields []*FCTemplate
|
||||
trailerFields []*FCTemplate
|
||||
}
|
||||
|
||||
func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, separator string) (err error) {
|
||||
@@ -188,20 +189,20 @@ func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, separ
|
||||
eeC.FieldSep = *jsnEec.Field_separator
|
||||
}
|
||||
if jsnEec.Fields != nil {
|
||||
eeC.HeaderFields = make([]*FCTemplate, 0)
|
||||
eeC.ContentFields = make([]*FCTemplate, 0)
|
||||
eeC.TrailerFields = make([]*FCTemplate, 0)
|
||||
if fields, err := FCTemplatesFromFCTemplatesJsonCfg(*jsnEec.Fields, separator); err != nil {
|
||||
eeC.headerFields = make([]*FCTemplate, 0)
|
||||
eeC.contentFields = make([]*FCTemplate, 0)
|
||||
eeC.trailerFields = make([]*FCTemplate, 0)
|
||||
if eeC.Fields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnEec.Fields, separator); err != nil {
|
||||
return err
|
||||
} else {
|
||||
for _, field := range fields {
|
||||
for _, field := range eeC.Fields {
|
||||
switch field.GetPathSlice()[0] {
|
||||
case utils.MetaHdr:
|
||||
eeC.HeaderFields = append(eeC.HeaderFields, field)
|
||||
eeC.headerFields = append(eeC.headerFields, field)
|
||||
case utils.MetaExp:
|
||||
eeC.ContentFields = append(eeC.ContentFields, field)
|
||||
eeC.contentFields = append(eeC.contentFields, field)
|
||||
case utils.MetaTrl:
|
||||
eeC.TrailerFields = append(eeC.TrailerFields, field)
|
||||
eeC.trailerFields = append(eeC.trailerFields, field)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -209,6 +210,18 @@ func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, separ
|
||||
return
|
||||
}
|
||||
|
||||
func (eeC *EventExporterCfg) HeaderFields() []*FCTemplate {
|
||||
return eeC.headerFields
|
||||
}
|
||||
|
||||
func (eeC *EventExporterCfg) ContentFields() []*FCTemplate {
|
||||
return eeC.contentFields
|
||||
}
|
||||
|
||||
func (eeC *EventExporterCfg) TrailerFields() []*FCTemplate {
|
||||
return eeC.trailerFields
|
||||
}
|
||||
|
||||
func (eeC *EventExporterCfg) Clone() (cln *EventExporterCfg) {
|
||||
cln = new(EventExporterCfg)
|
||||
cln.ID = eeC.ID
|
||||
@@ -239,17 +252,17 @@ func (eeC *EventExporterCfg) Clone() (cln *EventExporterCfg) {
|
||||
cln.Synchronous = eeC.Synchronous
|
||||
cln.Attempts = eeC.Attempts
|
||||
cln.FieldSep = eeC.FieldSep
|
||||
cln.HeaderFields = make([]*FCTemplate, len(eeC.HeaderFields))
|
||||
for idx, fld := range eeC.HeaderFields {
|
||||
cln.HeaderFields[idx] = fld.Clone()
|
||||
cln.headerFields = make([]*FCTemplate, len(eeC.headerFields))
|
||||
for idx, fld := range eeC.headerFields {
|
||||
cln.headerFields[idx] = fld.Clone()
|
||||
}
|
||||
cln.ContentFields = make([]*FCTemplate, len(eeC.ContentFields))
|
||||
for idx, fld := range eeC.ContentFields {
|
||||
cln.ContentFields[idx] = fld.Clone()
|
||||
cln.contentFields = make([]*FCTemplate, len(eeC.contentFields))
|
||||
for idx, fld := range eeC.contentFields {
|
||||
cln.contentFields[idx] = fld.Clone()
|
||||
}
|
||||
cln.TrailerFields = make([]*FCTemplate, len(eeC.TrailerFields))
|
||||
for idx, fld := range eeC.TrailerFields {
|
||||
cln.TrailerFields[idx] = fld.Clone()
|
||||
cln.trailerFields = make([]*FCTemplate, len(eeC.trailerFields))
|
||||
for idx, fld := range eeC.trailerFields {
|
||||
cln.trailerFields[idx] = fld.Clone()
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -272,14 +285,8 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) map[string]interfa
|
||||
}
|
||||
flags[key] = buf
|
||||
}
|
||||
fields := make([]map[string]interface{}, 0, len(eeC.HeaderFields)+len(eeC.ContentFields)+len(eeC.TrailerFields))
|
||||
for _, fld := range eeC.HeaderFields {
|
||||
fields = append(fields, fld.AsMapInterface(separator))
|
||||
}
|
||||
for _, fld := range eeC.ContentFields {
|
||||
fields = append(fields, fld.AsMapInterface(separator))
|
||||
}
|
||||
for _, fld := range eeC.TrailerFields {
|
||||
fields := make([]map[string]interface{}, 0, len(eeC.Fields))
|
||||
for _, fld := range eeC.Fields {
|
||||
fields = append(fields, fld.AsMapInterface(separator))
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,50 @@
|
||||
//
|
||||
// Copyright (C) ITsysCOM GmbH
|
||||
|
||||
"general": {
|
||||
"log_level": 7,
|
||||
},
|
||||
|
||||
"listen": {
|
||||
"rpc_json": ":2012",
|
||||
"rpc_gob": ":2013",
|
||||
"http": ":2080",
|
||||
},
|
||||
|
||||
"data_db": {
|
||||
"db_type": "redis",
|
||||
"db_port": 6379,
|
||||
"db_name": "10",
|
||||
},
|
||||
|
||||
|
||||
"stor_db": {
|
||||
"db_password": "CGRateS.org",
|
||||
},
|
||||
|
||||
|
||||
"rals": {
|
||||
"enabled": true,
|
||||
},
|
||||
|
||||
|
||||
"schedulers": {
|
||||
"enabled": true,
|
||||
},
|
||||
|
||||
|
||||
"cdrs": {
|
||||
"enabled": true,
|
||||
"chargers_conns": ["*localhost"],
|
||||
"rals_conns": ["*internal"],
|
||||
"session_cost_retries": 0,
|
||||
},
|
||||
|
||||
|
||||
"chargers": {
|
||||
"enabled": true,
|
||||
"attributes_conns": ["*internal"],
|
||||
},
|
||||
|
||||
|
||||
"attributes": {
|
||||
@@ -15,12 +58,43 @@
|
||||
"enabled": true,
|
||||
"attributes_conns":["*internal"],
|
||||
"cache": {
|
||||
"*file_csv": {"limit": -1, "ttl": "5s", "static_ttl": false},
|
||||
"*file_csv": {"limit": -1, "ttl": "1s", "static_ttl": false},
|
||||
},
|
||||
"exporters": [
|
||||
],
|
||||
{
|
||||
"id": "CSVExporter",
|
||||
"type": "*file_csv",
|
||||
"export_path": "/tmp/testExport",
|
||||
"tenant": "cgrates.org",
|
||||
"flags": ["*attributes"],
|
||||
"attribute_context": "customContext",
|
||||
"attempts": 1,
|
||||
"field_separator": ",",
|
||||
"fields":[
|
||||
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
|
||||
{"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
|
||||
{"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"},
|
||||
{"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"},
|
||||
{"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"},
|
||||
{"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"},
|
||||
{"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"},
|
||||
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
|
||||
{"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"},
|
||||
{"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"},
|
||||
{"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
|
||||
{"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
|
||||
{"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"},
|
||||
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost", "rounding_decimals": 4},
|
||||
],
|
||||
},
|
||||
]
|
||||
},
|
||||
|
||||
|
||||
"apiers": {
|
||||
"enabled": true,
|
||||
"scheduler_conns": ["*internal"],
|
||||
},
|
||||
|
||||
|
||||
}
|
||||
|
||||
16
ees/ees.go
16
ees/ees.go
@@ -36,13 +36,15 @@ func onCacheEvicted(itmID string, value interface{}) {
|
||||
|
||||
// NewERService instantiates the EventExporterS
|
||||
func NewEventExporterS(cfg *config.CGRConfig, filterS *engine.FilterS,
|
||||
connMgr *engine.ConnManager) *EventExporterS {
|
||||
return &EventExporterS{
|
||||
connMgr *engine.ConnManager) (eeS *EventExporterS) {
|
||||
eeS = &EventExporterS{
|
||||
cfg: cfg,
|
||||
filterS: filterS,
|
||||
connMgr: connMgr,
|
||||
eesChs: make(map[string]*ltcache.Cache),
|
||||
}
|
||||
eeS.setupCache(cfg.EEsNoLksCfg().Cache)
|
||||
return
|
||||
}
|
||||
|
||||
// EventExporterS is managing the EventExporters
|
||||
@@ -133,6 +135,9 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s
|
||||
var wg sync.WaitGroup
|
||||
var withErr bool
|
||||
for cfgIdx, eeCfg := range eeS.cfg.EEsNoLksCfg().Exporters {
|
||||
if eeCfg.Type == utils.META_NONE { // ignore *default exporter
|
||||
continue
|
||||
}
|
||||
|
||||
if len(eeCfg.Filters) != 0 {
|
||||
cgrDp := config.NewNavigableMap(map[string]interface{}{
|
||||
@@ -166,7 +171,11 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s
|
||||
var isCached bool
|
||||
var ee EventExporter
|
||||
if hasCache {
|
||||
if x, isCached := eeCache.Get(eeCfg.ID); isCached {
|
||||
var x interface{}
|
||||
//fmt.Println("Try to get exporter from cache ")
|
||||
//fmt.Println(eeCfg.ID)
|
||||
if x, isCached = eeCache.Get(eeCfg.ID); isCached {
|
||||
//fmt.Println("Get FROM CACHE")
|
||||
ee = x.(EventExporter)
|
||||
}
|
||||
}
|
||||
@@ -200,6 +209,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s
|
||||
if withErr {
|
||||
err = utils.ErrPartiallyExecuted
|
||||
}
|
||||
*rply = utils.OK
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -22,7 +22,9 @@ import (
|
||||
"encoding/csv"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -32,7 +34,8 @@ import (
|
||||
)
|
||||
|
||||
func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (fCsv *FileCSVee, err error) {
|
||||
fCsv = &FileCSVee{cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS}
|
||||
fCsv = &FileCSVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
|
||||
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS}
|
||||
err = fCsv.init()
|
||||
return
|
||||
}
|
||||
@@ -45,6 +48,7 @@ type FileCSVee struct {
|
||||
filterS *engine.FilterS
|
||||
file *os.File
|
||||
csvWriter *csv.Writer
|
||||
sync.RWMutex
|
||||
|
||||
firstEventATime, lastEventATime time.Time
|
||||
numberOfEvents int
|
||||
@@ -58,7 +62,9 @@ type FileCSVee struct {
|
||||
|
||||
// init will create all the necessary dependencies, including opening the file
|
||||
func (fCsv *FileCSVee) init() (err error) {
|
||||
if fCsv.file, err = os.Create(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ExportPath); err != nil {
|
||||
// create the file
|
||||
if fCsv.file, err = os.Create(path.Join(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ExportPath,
|
||||
fCsv.id+utils.Underline+utils.UUIDSha1Prefix()+utils.CSVSuffix)); err != nil {
|
||||
return
|
||||
}
|
||||
fCsv.csvWriter = csv.NewWriter(fCsv.file)
|
||||
@@ -93,19 +99,23 @@ func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) {
|
||||
|
||||
// ExportEvent implements EventExporter
|
||||
func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
// convert cgrEvent in export record
|
||||
fCsv.Lock()
|
||||
defer fCsv.Unlock()
|
||||
fCsv.numberOfEvents++
|
||||
var csvRecord []string
|
||||
navMp := config.NewNavigableMap(map[string]interface{}{
|
||||
utils.MetaReq: cgrEv.Event,
|
||||
})
|
||||
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields {
|
||||
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields() {
|
||||
if pass, err := fCsv.filterS.Pass(cgrEv.Tenant, cfgFld.Filters,
|
||||
navMp); err != nil || !pass {
|
||||
continue
|
||||
}
|
||||
val, err := cfgFld.Value.ParseDataProvider(navMp, fCsv.cgrCfg.GeneralCfg().RSRSep)
|
||||
val, err := cfgFld.Value.ParseDataProvider(navMp, utils.NestingSep)
|
||||
if err != nil {
|
||||
if err == utils.ErrNotFound {
|
||||
err = utils.ErrPrefix(err, cfgFld.Value.GetRule())
|
||||
}
|
||||
fCsv.negativeExports.Add(cgrEv.ID)
|
||||
return err
|
||||
}
|
||||
@@ -153,13 +163,16 @@ func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
|
||||
|
||||
// Compose and cache the header
|
||||
func (fCsv *FileCSVee) composeHeader() (err error) {
|
||||
if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields) == 0 {
|
||||
if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields()) == 0 {
|
||||
return
|
||||
}
|
||||
var csvRecord []string
|
||||
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields {
|
||||
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields() {
|
||||
val, err := cfgFld.Value.ParseValue(utils.EmptyString)
|
||||
if err != nil {
|
||||
if err == utils.ErrNotFound {
|
||||
err = utils.ErrPrefix(err, cfgFld.Value.GetRule())
|
||||
}
|
||||
return err
|
||||
}
|
||||
csvRecord = append(csvRecord, val)
|
||||
@@ -169,11 +182,11 @@ func (fCsv *FileCSVee) composeHeader() (err error) {
|
||||
|
||||
// Compose and cache the trailer
|
||||
func (fCsv *FileCSVee) composeTrailer() (err error) {
|
||||
if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields) == 0 {
|
||||
if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields()) == 0 {
|
||||
return
|
||||
}
|
||||
var csvRecord []string
|
||||
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields {
|
||||
for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields() {
|
||||
switch cfgFld.Type {
|
||||
case utils.MetaExportID:
|
||||
csvRecord = append(csvRecord, fCsv.id)
|
||||
|
||||
226
ees/filecsv_it_test.go
Normal file
226
ees/filecsv_it_test.go
Normal file
@@ -0,0 +1,226 @@
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package ees
|
||||
|
||||
import (
|
||||
"net/rpc"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
)
|
||||
|
||||
var (
|
||||
csvConfigDir string
|
||||
csvCfgPath string
|
||||
csvCfg *config.CGRConfig
|
||||
csvRpc *rpc.Client
|
||||
|
||||
sTestsCsv = []func(t *testing.T){
|
||||
testCsvCreateDirectory,
|
||||
testCsvLoadConfig,
|
||||
testCsvResetDataDB,
|
||||
testCsvResetStorDb,
|
||||
testCsvStartEngine,
|
||||
testCsvRPCConn,
|
||||
testCsvExportEvent,
|
||||
testCsvStopCgrEngine,
|
||||
//testCsvCleanDirectory,
|
||||
}
|
||||
)
|
||||
|
||||
func TestCsvExport(t *testing.T) {
|
||||
csvConfigDir = "ees"
|
||||
for _, stest := range sTestsCsv {
|
||||
t.Run(csvConfigDir, stest)
|
||||
}
|
||||
}
|
||||
|
||||
func testCsvCreateDirectory(t *testing.T) {
|
||||
for _, dir := range []string{"/tmp/testExport"} {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Fatal("Error removing folder: ", dir, err)
|
||||
}
|
||||
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
|
||||
t.Fatal("Error creating folder: ", dir, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testCsvLoadConfig(t *testing.T) {
|
||||
var err error
|
||||
csvCfgPath = path.Join(*dataDir, "conf", "samples", csvConfigDir)
|
||||
if csvCfg, err = config.NewCGRConfigFromPath(csvCfgPath); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testCsvResetDataDB(t *testing.T) {
|
||||
if err := engine.InitDataDb(csvCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testCsvResetStorDb(t *testing.T) {
|
||||
if err := engine.InitStorDb(csvCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testCsvStartEngine(t *testing.T) {
|
||||
if _, err := engine.StopStartEngine(csvCfgPath, *waitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testCsvRPCConn(t *testing.T) {
|
||||
var err error
|
||||
csvRpc, err = newRPCClient(csvCfg.ListenCfg())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testCsvExportEvent(t *testing.T) {
|
||||
eventVoice := &utils.CGREventWithOpts{
|
||||
CGREventWithArgDispatcher: &utils.CGREventWithArgDispatcher{
|
||||
CGREvent: &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "voiceEvent",
|
||||
Time: utils.TimePointer(time.Now()),
|
||||
Event: map[string]interface{}{
|
||||
utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()),
|
||||
utils.ToR: utils.VOICE,
|
||||
utils.OriginID: "dsafdsaf",
|
||||
utils.OriginHost: "192.168.1.1",
|
||||
utils.RequestType: utils.META_RATED,
|
||||
utils.Tenant: "cgrates.org",
|
||||
utils.Category: "call",
|
||||
utils.Account: "1001",
|
||||
utils.Subject: "1001",
|
||||
utils.Destination: "1002",
|
||||
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
|
||||
utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
|
||||
utils.Usage: time.Duration(10) * time.Second,
|
||||
utils.RunID: utils.MetaDefault,
|
||||
utils.Cost: 1.01,
|
||||
"ExtraFields": map[string]string{"extra1": "val_extra1",
|
||||
"extra2": "val_extra2", "extra3": "val_extra3"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
eventData := &utils.CGREventWithOpts{
|
||||
CGREventWithArgDispatcher: &utils.CGREventWithArgDispatcher{
|
||||
CGREvent: &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "dataEvent",
|
||||
Time: utils.TimePointer(time.Now()),
|
||||
Event: map[string]interface{}{
|
||||
utils.CGRID: utils.Sha1("abcdef", time.Unix(1383813745, 0).UTC().String()),
|
||||
utils.ToR: utils.DATA,
|
||||
utils.OriginID: "abcdef",
|
||||
utils.OriginHost: "192.168.1.1",
|
||||
utils.RequestType: utils.META_RATED,
|
||||
utils.Tenant: "AnotherTenant",
|
||||
utils.Category: "call", //for data CDR use different Tenant
|
||||
utils.Account: "1001",
|
||||
utils.Subject: "1001",
|
||||
utils.Destination: "1002",
|
||||
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
|
||||
utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
|
||||
utils.Usage: time.Duration(10) * time.Nanosecond,
|
||||
utils.RunID: utils.MetaDefault,
|
||||
utils.Cost: 0.012,
|
||||
"ExtraFields": map[string]string{"extra1": "val_extra1",
|
||||
"extra2": "val_extra2", "extra3": "val_extra3"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
eventSMS := &utils.CGREventWithOpts{
|
||||
CGREventWithArgDispatcher: &utils.CGREventWithArgDispatcher{
|
||||
CGREvent: &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "SMSEvent",
|
||||
Time: utils.TimePointer(time.Now()),
|
||||
Event: map[string]interface{}{
|
||||
utils.CGRID: utils.Sha1("sdfwer", time.Unix(1383813745, 0).UTC().String()),
|
||||
utils.ToR: utils.SMS,
|
||||
utils.OriginID: "sdfwer",
|
||||
utils.OriginHost: "192.168.1.1",
|
||||
utils.RequestType: utils.META_RATED,
|
||||
utils.Tenant: "cgrates.org",
|
||||
utils.Category: "call",
|
||||
utils.Account: "1001",
|
||||
utils.Subject: "1001",
|
||||
utils.Destination: "1002",
|
||||
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
|
||||
utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
|
||||
utils.Usage: time.Duration(1),
|
||||
utils.RunID: utils.MetaDefault,
|
||||
utils.Cost: 0.15,
|
||||
"ExtraFields": map[string]string{"extra1": "val_extra1",
|
||||
"extra2": "val_extra2", "extra3": "val_extra3"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
var reply string
|
||||
if err := csvRpc.Call(utils.EventExporterSv1ProcessEvent, eventVoice, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Expected %+v, received: %+v", utils.OK, reply)
|
||||
}
|
||||
if err := csvRpc.Call(utils.EventExporterSv1ProcessEvent, eventData, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Expected %+v, received: %+v", utils.OK, reply)
|
||||
}
|
||||
if err := csvRpc.Call(utils.EventExporterSv1ProcessEvent, eventSMS, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Expected %+v, received: %+v", utils.OK, reply)
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
func testCsvStopCgrEngine(t *testing.T) {
|
||||
if err := engine.KillEngine(100); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testCsvCleanDirectory(t *testing.T) {
|
||||
for _, dir := range []string{"/tmp/testExport"} {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Fatal("Error removing folder: ", dir, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
46
ees/lib_test.go
Normal file
46
ees/lib_test.go
Normal file
@@ -0,0 +1,46 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package ees
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"net/rpc"
|
||||
"net/rpc/jsonrpc"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
|
||||
waitRater = flag.Int("wait_rater", 100, "Number of milliseconds to wait for rater to start and cache")
|
||||
encoding = flag.String("rpc", utils.MetaJSON, "what encoding would be used for rpc communication")
|
||||
)
|
||||
|
||||
func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) {
|
||||
switch *encoding {
|
||||
case utils.MetaJSON:
|
||||
return jsonrpc.Dial(utils.TCP, cfg.RPCJSONListen)
|
||||
case utils.MetaGOB:
|
||||
return rpc.Dial(utils.TCP, cfg.RPCGOBListen)
|
||||
default:
|
||||
return nil, errors.New("UNSUPPORTED_RPC")
|
||||
}
|
||||
}
|
||||
@@ -68,6 +68,9 @@ type ERService struct {
|
||||
// ListenAndServe keeps the service alive
|
||||
func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) {
|
||||
for cfgIdx, rdrCfg := range erS.cfg.ERsCfg().Readers {
|
||||
if rdrCfg.Type == utils.META_NONE { // ignore *default reader
|
||||
continue
|
||||
}
|
||||
if err = erS.addReader(rdrCfg.ID, cfgIdx); err != nil {
|
||||
utils.Logger.Crit(
|
||||
fmt.Sprintf("<%s> adding reader <%s> got error: <%s>",
|
||||
|
||||
@@ -54,30 +54,6 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int,
|
||||
return NewFlatstoreER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit)
|
||||
case utils.MetaJSON:
|
||||
return NewJSONFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit)
|
||||
case utils.META_NONE:
|
||||
return NewNoneER(cfg, cfgIdx), nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// NewNoneER return a disabled EventReader
|
||||
func NewNoneER(cfg *config.CGRConfig, cfgIdx int) EventReader {
|
||||
return &noneEventReader{
|
||||
cfg: cfg,
|
||||
cfgIdx: cfgIdx,
|
||||
}
|
||||
}
|
||||
|
||||
// noneEventReader a reader that does nothing
|
||||
type noneEventReader struct {
|
||||
cfg *config.CGRConfig
|
||||
cfgIdx int
|
||||
}
|
||||
|
||||
// Config returns the reader config
|
||||
func (rdr *noneEventReader) Config() *config.EventReaderCfg {
|
||||
return rdr.cfg.ERsCfg().Readers[rdr.cfgIdx]
|
||||
}
|
||||
|
||||
// Serve used to implement EventReader interface
|
||||
func (noneEventReader) Serve() error { return nil }
|
||||
|
||||
@@ -29,8 +29,8 @@ import (
|
||||
|
||||
var (
|
||||
dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
|
||||
waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache")
|
||||
encoding = flag.String("rpc", utils.MetaJSON, "what encoding whould be uused for rpc comunication")
|
||||
waitRater = flag.Int("wait_rater", 100, "Number of milliseconds to wait for rater to start and cache")
|
||||
encoding = flag.String("rpc", utils.MetaJSON, "what encoding would be used for rpc communication")
|
||||
dbType = flag.String("dbtype", utils.MetaInternal, "The type of DataBase (Internal/Mongo/mySql)")
|
||||
err error
|
||||
)
|
||||
|
||||
@@ -740,6 +740,7 @@ const (
|
||||
MetaDaily = "*daily"
|
||||
MetaWeekly = "*weekly"
|
||||
RateS = "RateS"
|
||||
Underline = "_"
|
||||
)
|
||||
|
||||
// Migrator Action
|
||||
@@ -1490,8 +1491,9 @@ const (
|
||||
|
||||
// EEs
|
||||
const (
|
||||
EventExporterSv1 = "EventExporterSv1"
|
||||
EventExporterSv1Ping = "EventExporterSv1.Ping"
|
||||
EventExporterSv1 = "EventExporterSv1"
|
||||
EventExporterSv1Ping = "EventExporterSv1.Ping"
|
||||
EventExporterSv1ProcessEvent = "EventExporterSv1.ProcessEvent"
|
||||
)
|
||||
|
||||
//cgr_ variables
|
||||
|
||||
Reference in New Issue
Block a user