mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-15 21:29:52 +05:00
Updated integration tests
This commit is contained in:
committed by
Dan Christian Bogos
parent
9aa153c0b0
commit
a4eb09a008
@@ -38,9 +38,10 @@ func newActHTTPPost(cfg *config.CGRConfig, aCfg *engine.APAction) (aL *actHTTPPo
|
||||
}
|
||||
for i, actD := range aL.cfg().Diktats {
|
||||
aL.pstrs[i], _ = ees.NewHTTPjsonMapEE(&config.EventExporterCfg{
|
||||
ID: aL.id(),
|
||||
ExportPath: actD.Path,
|
||||
Attempts: aL.config.GeneralCfg().PosterAttempts,
|
||||
ID: aL.id(),
|
||||
ExportPath: actD.Path,
|
||||
Attempts: aL.config.GeneralCfg().PosterAttempts,
|
||||
FailedPostsDir: cfg.GeneralCfg().FailedPostsDir,
|
||||
}, cfg, nil, nil)
|
||||
}
|
||||
return
|
||||
@@ -72,7 +73,11 @@ func (aL *actHTTPPost) execute(_ *context.Context, data utils.MapStorage, _ stri
|
||||
if async, has := aL.cfg().Opts[utils.MetaAsync]; has && utils.IfaceAsString(async) == utils.TrueStr {
|
||||
go ees.ExportWithAttempts(pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString)
|
||||
} else if err = ees.ExportWithAttempts(pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString); err != nil {
|
||||
partExec = true
|
||||
if pstr.Cfg().FailedPostsDir != utils.MetaNone {
|
||||
err = nil
|
||||
} else {
|
||||
partExec = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if partExec {
|
||||
|
||||
@@ -43,10 +43,7 @@ func TestACHTTPPostExecute(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
http := &actHTTPPost{
|
||||
config: cfg,
|
||||
aCfg: apAction,
|
||||
}
|
||||
http := newActHTTPPost(cfg, apAction)
|
||||
|
||||
dataStorage := utils.MapStorage{
|
||||
utils.MetaReq: map[string]interface{}{
|
||||
@@ -67,7 +64,7 @@ func TestACHTTPPostExecute(t *testing.T) {
|
||||
buff := new(bytes.Buffer)
|
||||
log.SetOutput(buff)
|
||||
|
||||
expected := "[WARNING] <HTTPPoster> Posting to : <~*balance.TestBalance.Value>, error: <Post \"~*balance.TestBalance.Value\": unsupported protocol scheme \"\">"
|
||||
expected := `<EEs> Exporter <TEST_ACTION_HTTPPOST> could not export because err: <Post "~*balance.TestBalance.Value": unsupported protocol scheme "">`
|
||||
if err := http.execute(nil, dataStorage, utils.EmptyString); err != nil {
|
||||
t.Error(err)
|
||||
} else if rcv := buff.String(); !strings.Contains(rcv, expected) {
|
||||
@@ -110,10 +107,7 @@ func TestACHTTPPostValues(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
http := &actHTTPPost{
|
||||
config: cfg,
|
||||
aCfg: apAction,
|
||||
}
|
||||
http := newActHTTPPost(cfg, apAction)
|
||||
dataStorage := utils.MapStorage{
|
||||
utils.MetaReq: map[string]interface{}{
|
||||
utils.AccountField: 1003,
|
||||
|
||||
@@ -452,15 +452,15 @@ func testAccMaxAbstracts(t *testing.T) {
|
||||
Type: utils.MetaConcrete,
|
||||
Units: 213,
|
||||
/*
|
||||
CostIncrements: []*utils.APICostIncrement{
|
||||
{
|
||||
Increment: utils.Float64Pointer(float64(time.Second)),
|
||||
FixedFee: utils.Float64Pointer(0),
|
||||
RecurrentFee: utils.Float64Pointer(0),
|
||||
CostIncrements: []*utils.APICostIncrement{
|
||||
{
|
||||
Increment: utils.Float64Pointer(float64(time.Second)),
|
||||
FixedFee: utils.Float64Pointer(0),
|
||||
RecurrentFee: utils.Float64Pointer(0),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
*/
|
||||
*/
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -550,7 +550,7 @@ func testAccMaxAbstracts(t *testing.T) {
|
||||
FilterIDs: []string{"*string:~*req.Account:1004"},
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 0,
|
||||
Weight: 0,
|
||||
},
|
||||
},
|
||||
Balances: map[string]*utils.ExtBalance{
|
||||
@@ -558,7 +558,7 @@ func testAccMaxAbstracts(t *testing.T) {
|
||||
ID: "AbstractBalance1",
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 25,
|
||||
Weight: 25,
|
||||
},
|
||||
},
|
||||
Type: "*abstract",
|
||||
@@ -576,11 +576,11 @@ func testAccMaxAbstracts(t *testing.T) {
|
||||
FilterIDs: nil,
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 20,
|
||||
Weight: 20,
|
||||
},
|
||||
},
|
||||
Type: "*concrete",
|
||||
Units: utils.Float64Pointer(213),
|
||||
Type: "*concrete",
|
||||
Units: utils.Float64Pointer(213),
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -707,7 +707,7 @@ func testAccDebitAbstracts(t *testing.T) {
|
||||
FilterIDs: []string{"*string:~*req.Account:1004"},
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 0,
|
||||
Weight: 0,
|
||||
},
|
||||
},
|
||||
Balances: map[string]*utils.ExtBalance{
|
||||
@@ -715,7 +715,7 @@ func testAccDebitAbstracts(t *testing.T) {
|
||||
ID: "AbstractBalance1",
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 25,
|
||||
Weight: 25,
|
||||
},
|
||||
},
|
||||
Type: "*abstract",
|
||||
@@ -733,11 +733,11 @@ func testAccDebitAbstracts(t *testing.T) {
|
||||
FilterIDs: nil,
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 20,
|
||||
Weight: 20,
|
||||
},
|
||||
},
|
||||
Type: "*concrete",
|
||||
Units: utils.Float64Pointer(213),
|
||||
Type: "*concrete",
|
||||
Units: utils.Float64Pointer(213),
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -843,12 +843,12 @@ func testAccMaxConcretes(t *testing.T) {
|
||||
},
|
||||
Accounting: map[string]*utils.ExtAccountCharge{
|
||||
accKEy: &utils.ExtAccountCharge{
|
||||
AccountID: "TEST_ACC_IT_TEST6",
|
||||
BalanceID: "ConcreteBalance2",
|
||||
Units: utils.Float64Pointer(213),
|
||||
BalanceLimit: utils.Float64Pointer(0),
|
||||
UnitFactorID: "",
|
||||
RatingID: rtID,
|
||||
AccountID: "TEST_ACC_IT_TEST6",
|
||||
BalanceID: "ConcreteBalance2",
|
||||
Units: utils.Float64Pointer(213),
|
||||
BalanceLimit: utils.Float64Pointer(0),
|
||||
UnitFactorID: "",
|
||||
RatingID: rtID,
|
||||
},
|
||||
},
|
||||
UnitFactors: map[string]*utils.ExtUnitFactor{},
|
||||
@@ -861,7 +861,7 @@ func testAccMaxConcretes(t *testing.T) {
|
||||
FilterIDs: []string{"*string:~*req.Account:1004"},
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 0,
|
||||
Weight: 0,
|
||||
},
|
||||
},
|
||||
Balances: map[string]*utils.ExtBalance{
|
||||
@@ -869,7 +869,7 @@ func testAccMaxConcretes(t *testing.T) {
|
||||
ID: "AbstractBalance1",
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 25,
|
||||
Weight: 25,
|
||||
},
|
||||
},
|
||||
Type: "*abstract",
|
||||
@@ -883,14 +883,14 @@ func testAccMaxConcretes(t *testing.T) {
|
||||
Units: utils.Float64Pointer(40000000000),
|
||||
},
|
||||
"ConcreteBalance2": {
|
||||
ID: "ConcreteBalance2",
|
||||
ID: "ConcreteBalance2",
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 20,
|
||||
Weight: 20,
|
||||
},
|
||||
},
|
||||
Type: "*concrete",
|
||||
Units: utils.Float64Pointer(0),
|
||||
Type: "*concrete",
|
||||
Units: utils.Float64Pointer(0),
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -996,12 +996,12 @@ func testAccDebitConcretes(t *testing.T) {
|
||||
},
|
||||
Accounting: map[string]*utils.ExtAccountCharge{
|
||||
accKEy: &utils.ExtAccountCharge{
|
||||
AccountID: "TEST_ACC_IT_TEST7",
|
||||
BalanceID: "ConcreteBalance2",
|
||||
Units: utils.Float64Pointer(213),
|
||||
BalanceLimit: utils.Float64Pointer(0),
|
||||
UnitFactorID: "",
|
||||
RatingID: rtID,
|
||||
AccountID: "TEST_ACC_IT_TEST7",
|
||||
BalanceID: "ConcreteBalance2",
|
||||
Units: utils.Float64Pointer(213),
|
||||
BalanceLimit: utils.Float64Pointer(0),
|
||||
UnitFactorID: "",
|
||||
RatingID: rtID,
|
||||
},
|
||||
},
|
||||
UnitFactors: map[string]*utils.ExtUnitFactor{},
|
||||
@@ -1014,7 +1014,7 @@ func testAccDebitConcretes(t *testing.T) {
|
||||
FilterIDs: []string{"*string:~*req.Account:1004"},
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 0,
|
||||
Weight: 0,
|
||||
},
|
||||
},
|
||||
Balances: map[string]*utils.ExtBalance{
|
||||
@@ -1022,7 +1022,7 @@ func testAccDebitConcretes(t *testing.T) {
|
||||
ID: "AbstractBalance1",
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 25,
|
||||
Weight: 25,
|
||||
},
|
||||
},
|
||||
Type: "*abstract",
|
||||
@@ -1036,14 +1036,14 @@ func testAccDebitConcretes(t *testing.T) {
|
||||
Units: utils.Float64Pointer(40000000000),
|
||||
},
|
||||
"ConcreteBalance2": {
|
||||
ID: "ConcreteBalance2",
|
||||
ID: "ConcreteBalance2",
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 20,
|
||||
Weight: 20,
|
||||
},
|
||||
},
|
||||
Type: "*concrete",
|
||||
Units: utils.Float64Pointer(0),
|
||||
Type: "*concrete",
|
||||
Units: utils.Float64Pointer(0),
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -1140,7 +1140,7 @@ func testAccActionSetRmvBalance(t *testing.T) {
|
||||
FilterIDs: []string{"*string:~*req.Account:1001"},
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 12,
|
||||
Weight: 12,
|
||||
},
|
||||
},
|
||||
Type: "*abstract",
|
||||
@@ -1152,7 +1152,7 @@ func testAccActionSetRmvBalance(t *testing.T) {
|
||||
},
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 10,
|
||||
Weight: 10,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -139,6 +139,11 @@ func (pstr *AMQPee) Connect() (err error) {
|
||||
func (pstr *AMQPee) ExportEvent(content interface{}, _ string) (err error) {
|
||||
pstr.reqs.get()
|
||||
pstr.RLock()
|
||||
if pstr.postChan == nil {
|
||||
pstr.RUnlock()
|
||||
pstr.reqs.done()
|
||||
return utils.ErrDisconnected
|
||||
}
|
||||
err = pstr.postChan.Publish(
|
||||
pstr.exchange, // exchange
|
||||
pstr.routingKey, // routing key
|
||||
|
||||
@@ -85,6 +85,9 @@ func (pstr *AMQPv1EE) ExportEvent(content interface{}, _ string) (err error) {
|
||||
pstr.RUnlock()
|
||||
pstr.reqs.done()
|
||||
}()
|
||||
if pstr.session == nil {
|
||||
return utils.ErrDisconnected
|
||||
}
|
||||
sender, err := pstr.session.NewSender(
|
||||
amqpv1.LinkTargetAddress(pstr.queueID),
|
||||
)
|
||||
|
||||
13
ees/ees.go
13
ees/ees.go
@@ -251,9 +251,13 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
|
||||
}
|
||||
|
||||
func exportEventWithExporter(exp EventExporter, ev *utils.CGREvent, oneTime bool, cfg *config.CGRConfig, filterS *engine.FilterS) (err error) {
|
||||
if oneTime {
|
||||
defer exp.Close()
|
||||
}
|
||||
defer func() {
|
||||
updateEEMetrics(exp.GetMetrics(), ev.ID, ev.Event, err != nil, utils.FirstNonEmpty(exp.Cfg().Timezone,
|
||||
cfg.GeneralCfg().DefaultTimezone))
|
||||
if oneTime {
|
||||
exp.Close()
|
||||
}
|
||||
}()
|
||||
var eEv interface{}
|
||||
|
||||
exp.GetMetrics().Lock()
|
||||
@@ -310,7 +314,8 @@ func ExportWithAttempts(exp EventExporter, eEv interface{}, key string) (err err
|
||||
return
|
||||
}
|
||||
for i := 0; i < exp.Cfg().Attempts; i++ {
|
||||
if err = exp.ExportEvent(eEv, key); err == nil {
|
||||
if err = exp.ExportEvent(eEv, key); err == nil ||
|
||||
err == utils.ErrDisconnected { // special error in case the exporter was closed
|
||||
break
|
||||
}
|
||||
if i+1 < exp.Cfg().Attempts {
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/esapi"
|
||||
|
||||
@@ -49,6 +50,7 @@ type ElasticEE struct {
|
||||
dc *utils.SafeMapStorage
|
||||
opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap
|
||||
reqs *concReq
|
||||
sync.RWMutex
|
||||
bytePreparing
|
||||
}
|
||||
|
||||
@@ -106,19 +108,28 @@ func (eEe *ElasticEE) prepareOpts() (err error) {
|
||||
func (eEe *ElasticEE) Cfg() *config.EventExporterCfg { return eEe.cfg }
|
||||
|
||||
func (eEe *ElasticEE) Connect() (err error) {
|
||||
eEe.Lock()
|
||||
// create the client
|
||||
if eEe.eClnt == nil {
|
||||
eEe.eClnt, err = elasticsearch.NewClient(
|
||||
elasticsearch.Config{Addresses: strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep)},
|
||||
)
|
||||
}
|
||||
eEe.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// ExportEvent implements EventExporter
|
||||
func (eEe *ElasticEE) ExportEvent(ev interface{}, key string) (err error) {
|
||||
eEe.reqs.get()
|
||||
defer eEe.reqs.done()
|
||||
eEe.RLock()
|
||||
defer func() {
|
||||
eEe.RUnlock()
|
||||
eEe.reqs.done()
|
||||
}()
|
||||
if eEe.eClnt == nil {
|
||||
return utils.ErrDisconnected
|
||||
}
|
||||
eReq := esapi.IndexRequest{
|
||||
Index: eEe.opts.Index,
|
||||
DocumentID: key,
|
||||
@@ -153,7 +164,9 @@ func (eEe *ElasticEE) ExportEvent(ev interface{}, key string) (err error) {
|
||||
}
|
||||
|
||||
func (eEe *ElasticEE) Close() (_ error) {
|
||||
eEe.Lock()
|
||||
eEe.eClnt = nil
|
||||
eEe.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
19
ees/kafka.go
19
ees/kafka.go
@@ -32,6 +32,7 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE
|
||||
cfg: cfg,
|
||||
dc: dc,
|
||||
topic: utils.DefaultQueueID,
|
||||
reqs: newConcReq(cfg.ConcurrentRequests),
|
||||
}
|
||||
if vals, has := cfg.Opts[utils.KafkaTopic]; has {
|
||||
kfkPstr.topic = utils.IfaceAsString(vals)
|
||||
@@ -56,11 +57,16 @@ func (pstr *KafkaEE) Cfg() *config.EventExporterCfg { return pstr.cfg }
|
||||
func (pstr *KafkaEE) Connect() (_ error) {
|
||||
pstr.Lock()
|
||||
if pstr.writer == nil {
|
||||
pstr.writer = kafka.NewWriter(kafka.WriterConfig{
|
||||
Brokers: []string{pstr.Cfg().ExportPath},
|
||||
MaxAttempts: pstr.Cfg().Attempts,
|
||||
pstr.writer = &kafka.Writer{
|
||||
Addr: kafka.TCP(pstr.Cfg().ExportPath),
|
||||
Topic: pstr.topic,
|
||||
})
|
||||
MaxAttempts: pstr.Cfg().Attempts,
|
||||
}
|
||||
// pstr.writer = kafka.NewWriter(kafka.WriterConfig{
|
||||
// Brokers: []string{pstr.Cfg().ExportPath},
|
||||
// MaxAttempts: pstr.Cfg().Attempts,
|
||||
// Topic: pstr.topic,
|
||||
// })
|
||||
}
|
||||
pstr.Unlock()
|
||||
return
|
||||
@@ -69,6 +75,11 @@ func (pstr *KafkaEE) Connect() (_ error) {
|
||||
func (pstr *KafkaEE) ExportEvent(content interface{}, key string) (err error) {
|
||||
pstr.reqs.get()
|
||||
pstr.RLock()
|
||||
if pstr.writer == nil {
|
||||
pstr.RUnlock()
|
||||
pstr.reqs.done()
|
||||
return utils.ErrDisconnected
|
||||
}
|
||||
err = pstr.writer.WriteMessages(context.Background(), kafka.Message{
|
||||
Key: []byte(key),
|
||||
Value: content.([]byte),
|
||||
|
||||
@@ -37,7 +37,8 @@ func TestWriteFldPosts(t *testing.T) {
|
||||
// can convert & write
|
||||
dir := "/tmp/engine/libcdre_test/"
|
||||
exportEvent := &ExportEvents{
|
||||
module: "module",
|
||||
failedPostsDir: dir,
|
||||
module: "module",
|
||||
}
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Fatal("Error removing folder: ", dir, err)
|
||||
|
||||
@@ -104,6 +104,9 @@ func TestAddFldPost(t *testing.T) {
|
||||
if !reflect.DeepEqual(eOut, failedPost) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
|
||||
}
|
||||
for _, id := range failedPostCache.GetItemIDs("") {
|
||||
failedPostCache.Set(id, nil, nil)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilePath(t *testing.T) {
|
||||
|
||||
@@ -102,6 +102,11 @@ func (pstr *NatsEE) Connect() (err error) {
|
||||
func (pstr *NatsEE) ExportEvent(content interface{}, _ string) (err error) {
|
||||
pstr.reqs.get()
|
||||
pstr.RLock()
|
||||
if pstr.poster == nil {
|
||||
pstr.RUnlock()
|
||||
pstr.reqs.done()
|
||||
return utils.ErrDisconnected
|
||||
}
|
||||
if pstr.jetStream {
|
||||
_, err = pstr.posterJS.Publish(pstr.subject, content.([]byte))
|
||||
} else {
|
||||
|
||||
@@ -58,6 +58,7 @@ func TestKafkaParseURL(t *testing.T) {
|
||||
exp := &KafkaEE{
|
||||
cfg: cfg,
|
||||
topic: "cdr_billing",
|
||||
reqs: newConcReq(0),
|
||||
}
|
||||
if kfk := NewKafkaEE(cfg, nil); !reflect.DeepEqual(exp, kfk) {
|
||||
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(kfk))
|
||||
|
||||
@@ -1,484 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package ees
|
||||
|
||||
/*
|
||||
func TestPosterJsonMapID(t *testing.T) {
|
||||
pstrEE := &PosterJSONMapEE{
|
||||
id: "3",
|
||||
}
|
||||
if rcv := pstrEE.ID(); !reflect.DeepEqual(rcv, "3") {
|
||||
t.Errorf("Expected %+v but got %+v", "3", rcv)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapGetMetrics(t *testing.T) {
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pstrEE := &PosterJSONMapEE{
|
||||
dc: dc,
|
||||
}
|
||||
|
||||
if rcv := pstrEE.GetMetrics(); !reflect.DeepEqual(rcv, pstrEE.dc) {
|
||||
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(pstrEE.dc))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapNewPosterJSONMapEECase2(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaAMQPV1jsonMap
|
||||
cgrCfg.EEsCfg().Exporters[0].ExportPath = utils.EmptyString
|
||||
filterS := engine.NewFilterS(cgrCfg, nil, nil)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pstrJSON, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pstrJSONExpect := engine.NewAMQPv1Poster(cgrCfg.EEsCfg().Exporters[0].ExportPath,
|
||||
cgrCfg.EEsCfg().Exporters[0].Attempts, cgrCfg.EEsCfg().Exporters[0].Opts)
|
||||
if !reflect.DeepEqual(pstrJSON.poster, pstrJSONExpect) {
|
||||
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(pstrJSONExpect), utils.ToJSON(pstrJSON.poster))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapNewPosterJSONMapEECase3(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQSjsonMap
|
||||
cgrCfg.EEsCfg().Exporters[0].ExportPath = utils.EmptyString
|
||||
filterS := engine.NewFilterS(cgrCfg, nil, nil)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pstrJSON, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if _, canCast := pstrJSON.poster.(*engine.SQSPoster); !canCast {
|
||||
t.Error("Can't cast")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapNewPosterJSONMapEECase4(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaKafkajsonMap
|
||||
filterS := engine.NewFilterS(cgrCfg, nil, nil)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pstrJSON, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pstrJSONExpect := engine.NewKafkaPoster(cgrCfg.EEsCfg().Exporters[0].ExportPath,
|
||||
cgrCfg.EEsCfg().Exporters[0].Attempts, cgrCfg.EEsCfg().Exporters[0].Opts)
|
||||
if !reflect.DeepEqual(pstrJSON.poster, pstrJSONExpect) {
|
||||
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(pstrJSONExpect), utils.ToJSON(pstrJSON.poster))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapNewPosterJSONMapEECase5(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaS3jsonMap
|
||||
filterS := engine.NewFilterS(cgrCfg, nil, nil)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pstrJSON, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pstrJSONExpect := engine.NewS3Poster(cgrCfg.EEsCfg().Exporters[0].ExportPath,
|
||||
cgrCfg.EEsCfg().Exporters[0].Attempts, cgrCfg.EEsCfg().Exporters[0].Opts)
|
||||
if !reflect.DeepEqual(pstrJSON.poster, pstrJSONExpect) {
|
||||
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(pstrJSONExpect), utils.ToJSON(pstrJSON.poster))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapExportEvent(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQSjsonMap
|
||||
cgrEv := new(utils.CGREvent)
|
||||
newIDb := engine.NewInternalDB(nil, nil, true)
|
||||
newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil)
|
||||
filterS := engine.NewFilterS(cgrCfg, nil, newDM)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
pstrEE, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
cgrEv.Event = map[string]interface{}{
|
||||
"test": "string",
|
||||
}
|
||||
cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Fields = []*config.FCTemplate{
|
||||
{
|
||||
Path: "*exp.1", Type: utils.MetaVariable,
|
||||
Value: config.NewRSRParsersMustCompile("~*req.field1", utils.InfieldSep),
|
||||
},
|
||||
{
|
||||
Path: "*exp.2", Type: utils.MetaVariable,
|
||||
Value: config.NewRSRParsersMustCompile("*req.field2", utils.InfieldSep),
|
||||
},
|
||||
}
|
||||
for _, field := range cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Fields {
|
||||
field.ComputePath()
|
||||
}
|
||||
errExpect := "MissingRegion: could not find region configuration"
|
||||
if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %q but received %q", errExpect, err)
|
||||
}
|
||||
dcExpect := int64(1)
|
||||
if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
|
||||
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
|
||||
}
|
||||
cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ComputeFields()
|
||||
if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %q but received %q", errExpect, err)
|
||||
}
|
||||
dcExpect = int64(2)
|
||||
if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
|
||||
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
|
||||
}
|
||||
}
|
||||
|
||||
type testPoster struct {
|
||||
body []byte
|
||||
}
|
||||
|
||||
func (pstr *testPoster) Close() {}
|
||||
func (pstr *testPoster) Post(body []byte, key string) error {
|
||||
pstr.body = body
|
||||
return nil
|
||||
}
|
||||
func TestPosterJsonMapExportEvent1(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaAMQPjsonMap
|
||||
cgrEv := new(utils.CGREvent)
|
||||
newIDb := engine.NewInternalDB(nil, nil, true)
|
||||
newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil)
|
||||
filterS := engine.NewFilterS(cgrCfg, nil, newDM)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
////
|
||||
////
|
||||
tstPstr := &testPoster{}
|
||||
pstrEE := &PosterJSONMapEE{
|
||||
id: cgrCfg.EEsCfg().Exporters[0].ID,
|
||||
cgrCfg: cgrCfg,
|
||||
cfgIdx: 0,
|
||||
filterS: filterS,
|
||||
dc: dc,
|
||||
poster: tstPstr,
|
||||
reqs: newConcReq(0),
|
||||
}
|
||||
// pstrEE.poster = tstPstr
|
||||
cgrEv.Event = map[string]interface{}{
|
||||
"test": "string",
|
||||
}
|
||||
cgrCfg.EEsCfg().Exporters[0].Fields = []*config.FCTemplate{
|
||||
{
|
||||
Path: "*exp.1", Type: utils.MetaVariable,
|
||||
Value: config.NewRSRParsersMustCompile("~*req.field1", utils.InfieldSep),
|
||||
},
|
||||
{
|
||||
Path: "*exp.2", Type: utils.MetaVariable,
|
||||
Value: config.NewRSRParsersMustCompile("*req.field2", utils.InfieldSep),
|
||||
},
|
||||
}
|
||||
for _, field := range cgrCfg.EEsCfg().Exporters[0].Fields {
|
||||
field.ComputePath()
|
||||
}
|
||||
cgrCfg.EEsCfg().Exporters[0].ComputeFields()
|
||||
if err := pstrEE.ExportEvent(cgrEv); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dcExpect := int64(1)
|
||||
if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
|
||||
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
|
||||
}
|
||||
bodyExpect := map[string]interface{}{
|
||||
"2": "*req.field2",
|
||||
}
|
||||
var rcv map[string]interface{}
|
||||
if err := json.Unmarshal(tstPstr.body, &rcv); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !reflect.DeepEqual(rcv, bodyExpect) {
|
||||
t.Errorf("Expected %s but received %s", utils.ToJSON(bodyExpect), utils.ToJSON(rcv))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapExportEvent2(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQSjsonMap
|
||||
cgrEv := new(utils.CGREvent)
|
||||
newIDb := engine.NewInternalDB(nil, nil, true)
|
||||
newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil)
|
||||
filterS := engine.NewFilterS(cgrCfg, nil, newDM)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
pstrEE, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
cgrEv.Event = map[string]interface{}{
|
||||
"test": "string",
|
||||
}
|
||||
cgrCfg.EEsCfg().Exporters[0].Fields = []*config.FCTemplate{
|
||||
{
|
||||
Path: "*exp.1", Type: utils.MetaVariable,
|
||||
Value: config.NewRSRParsersMustCompile("~*req.field1", utils.InfieldSep),
|
||||
Filters: []string{"*wrong-type"},
|
||||
},
|
||||
{
|
||||
Path: "*exp.1", Type: utils.MetaVariable,
|
||||
Value: config.NewRSRParsersMustCompile("~*req.field1", utils.InfieldSep),
|
||||
Filters: []string{"*wrong-type"},
|
||||
},
|
||||
}
|
||||
for _, field := range cgrCfg.EEsCfg().Exporters[0].Fields {
|
||||
field.ComputePath()
|
||||
}
|
||||
cgrCfg.EEsCfg().Exporters[0].ComputeFields()
|
||||
errExpect := "inline parse error for string: <*wrong-type>"
|
||||
if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %q but received %q", errExpect, err)
|
||||
}
|
||||
dcExpect := int64(1)
|
||||
if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
|
||||
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapExportEvent3(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQSjsonMap
|
||||
cgrEv := new(utils.CGREvent)
|
||||
newIDb := engine.NewInternalDB(nil, nil, true)
|
||||
newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil)
|
||||
filterS := engine.NewFilterS(cgrCfg, nil, newDM)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
pstrEE, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
cgrEv.Event = map[string]interface{}{
|
||||
"test": "string",
|
||||
}
|
||||
cgrEv.Event = map[string]interface{}{
|
||||
"test": make(chan int),
|
||||
}
|
||||
cgrCfg.EEsCfg().Exporters[0].Fields = []*config.FCTemplate{{}}
|
||||
for _, field := range cgrCfg.EEsCfg().Exporters[0].Fields {
|
||||
field.ComputePath()
|
||||
}
|
||||
cgrCfg.EEsCfg().Exporters[0].ComputeFields()
|
||||
errExpect := "json: unsupported type: chan int"
|
||||
if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %q but received %q", errExpect, err)
|
||||
}
|
||||
dcExpect := int64(1)
|
||||
if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
|
||||
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
|
||||
}
|
||||
pstrEE.OnEvicted("test", "test")
|
||||
}
|
||||
|
||||
type mockPoster struct {
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
func (mp mockPoster) Post(body []byte, key string) error {
|
||||
// resp, err := http.Get(mp.url)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// defer resp.Body.Close()
|
||||
time.Sleep(3 * time.Second)
|
||||
mp.wg.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mockPoster) Close() {
|
||||
return
|
||||
}
|
||||
|
||||
func TestPosterJsonMapSync(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
var cfgIdx int
|
||||
cfgIdx = 0
|
||||
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*http_json_map"
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone,
|
||||
cgrCfg.GeneralCfg().DefaultTimezone))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
//Create an event
|
||||
cgrEvent := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]interface{}{
|
||||
"Account": "1001",
|
||||
"Destination": "1002",
|
||||
},
|
||||
}
|
||||
|
||||
var wg1 = &sync.WaitGroup{}
|
||||
|
||||
wg1.Add(3)
|
||||
|
||||
test := make(chan struct{})
|
||||
go func() {
|
||||
wg1.Wait()
|
||||
close(test)
|
||||
}()
|
||||
|
||||
mckPoster := mockPoster{
|
||||
wg: wg1,
|
||||
}
|
||||
exp := &PosterJSONMapEE{
|
||||
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
|
||||
cgrCfg: cgrCfg,
|
||||
cfgIdx: cfgIdx,
|
||||
filterS: new(engine.FilterS),
|
||||
poster: mckPoster,
|
||||
dc: dc,
|
||||
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
go exp.ExportEvent(cgrEvent)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-test:
|
||||
return
|
||||
case <-time.After(4 * time.Second):
|
||||
t.Error("Can't asynchronously export events")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapSyncLimit(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
var cfgIdx int
|
||||
cfgIdx = 0
|
||||
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*http_json_map"
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests = 1
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone,
|
||||
cgrCfg.GeneralCfg().DefaultTimezone))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
//Create an event
|
||||
cgrEvent := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]interface{}{
|
||||
"Account": "1001",
|
||||
"Destination": "1002",
|
||||
},
|
||||
}
|
||||
|
||||
var wg1 = &sync.WaitGroup{}
|
||||
|
||||
wg1.Add(3)
|
||||
|
||||
test := make(chan struct{})
|
||||
go func() {
|
||||
wg1.Wait()
|
||||
close(test)
|
||||
}()
|
||||
|
||||
mckPoster := mockPoster{
|
||||
wg: wg1,
|
||||
}
|
||||
exp := &PosterJSONMapEE{
|
||||
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
|
||||
cgrCfg: cgrCfg,
|
||||
cfgIdx: cfgIdx,
|
||||
filterS: new(engine.FilterS),
|
||||
poster: mckPoster,
|
||||
dc: dc,
|
||||
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
go exp.ExportEvent(cgrEvent)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-test:
|
||||
t.Error("Should not have been possible to asynchronously export events")
|
||||
case <-time.After(4 * time.Second):
|
||||
return
|
||||
}
|
||||
}
|
||||
*/
|
||||
22
ees/sql.go
22
ees/sql.go
@@ -23,6 +23,7 @@ import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"gorm.io/driver/mysql"
|
||||
"gorm.io/driver/postgres"
|
||||
@@ -53,6 +54,7 @@ type SQLEe struct {
|
||||
|
||||
dialect gorm.Dialector
|
||||
tableName string
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
type sqlPosterRequest struct {
|
||||
@@ -132,20 +134,36 @@ func openDB(dialect gorm.Dialector, opts map[string]interface{}) (db *gorm.DB, s
|
||||
func (sqlEe *SQLEe) Cfg() *config.EventExporterCfg { return sqlEe.cfg }
|
||||
|
||||
func (sqlEe *SQLEe) Connect() (err error) {
|
||||
sqlEe.Lock()
|
||||
if sqlEe.db == nil || sqlEe.sqldb == nil {
|
||||
sqlEe.db, sqlEe.sqldb, err = openDB(sqlEe.dialect, sqlEe.Cfg().Opts)
|
||||
}
|
||||
sqlEe.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (sqlEe *SQLEe) ExportEvent(req interface{}, _ string) error {
|
||||
sqlEe.reqs.get()
|
||||
defer sqlEe.reqs.done()
|
||||
sqlEe.RLock()
|
||||
defer func() {
|
||||
sqlEe.RUnlock()
|
||||
sqlEe.reqs.done()
|
||||
}()
|
||||
if sqlEe.db == nil {
|
||||
return utils.ErrDisconnected
|
||||
}
|
||||
sReq := req.(*sqlPosterRequest)
|
||||
return sqlEe.db.Table(sqlEe.tableName).Exec(sReq.Querry, sReq.Values...).Error
|
||||
}
|
||||
|
||||
func (sqlEe *SQLEe) Close() error { return sqlEe.sqldb.Close() }
|
||||
func (sqlEe *SQLEe) Close() (err error) {
|
||||
sqlEe.Lock()
|
||||
err = sqlEe.sqldb.Close()
|
||||
sqlEe.db = nil
|
||||
sqlEe.sqldb = nil
|
||||
sqlEe.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage { return sqlEe.dc }
|
||||
|
||||
|
||||
@@ -296,7 +296,7 @@ func TestSQLExportEvent1(t *testing.T) {
|
||||
if err := sqlEe.Connect(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := sqlEe.ExportEvent(&sqlPosterRequest{Querry: "INSERT INTO expTable VALUES (); ", Values: []interface{}{}}, ""); err != nil {
|
||||
if err := sqlEe.ExportEvent(&sqlPosterRequest{Querry: "INSERT INTO cdrs VALUES (); ", Values: []interface{}{}}, ""); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
sqlEe.Close()
|
||||
|
||||
@@ -118,6 +118,7 @@ func TestKafkaERServe(t *testing.T) {
|
||||
rdr.Config().Opts = map[string]interface{}{}
|
||||
rdr.Config().ProcessedPath = ""
|
||||
rdr.(*KafkaER).createPoster()
|
||||
close(rdrExit)
|
||||
}
|
||||
|
||||
func TestKafkaERServe2(t *testing.T) {
|
||||
|
||||
@@ -465,7 +465,7 @@ func testSectConfigSReloadEES(t *testing.T) {
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Expected OK received: %+v", reply)
|
||||
}
|
||||
cfgStr := "{\"ees\":{\"attributes_conns\":[],\"cache\":{\"*file_csv\":{\"limit\":-1,\"precache\":false,\"replicate\":false,\"static_ttl\":false,\"ttl\":\"5s\"}},\"enabled\":true,\"exporters\":[{\"attempts\":1,\"attribute_context\":\"\",\"attribute_ids\":[],\"concurrent_requests\":0,\"export_path\":\"/var/spool/cgrates/ees\",\"fields\":[],\"filters\":[],\"flags\":[],\"id\":\"*default\",\"opts\":{},\"synchronous\":false,\"timezone\":\"\",\"type\":\"*none\"}]}}"
|
||||
cfgStr := "{\"ees\":{\"attributes_conns\":[],\"cache\":{\"*file_csv\":{\"limit\":-1,\"precache\":false,\"replicate\":false,\"static_ttl\":false,\"ttl\":\"5s\"}},\"enabled\":true,\"exporters\":[{\"attempts\":1,\"attribute_context\":\"\",\"attribute_ids\":[],\"concurrent_requests\":0,\"export_path\":\"/var/spool/cgrates/ees\",\"failed_posts_dir\":\"/var/spool/cgrates/failed_posts\",\"fields\":[],\"filters\":[],\"flags\":[],\"id\":\"*default\",\"opts\":{},\"synchronous\":false,\"timezone\":\"\",\"type\":\"*none\"}]}}"
|
||||
var rpl string
|
||||
if err := testSectRPC.Call(context.Background(), utils.ConfigSv1GetConfigAsJSON, &config.SectionWithAPIOpts{
|
||||
Tenant: "cgrates.org",
|
||||
|
||||
Reference in New Issue
Block a user