mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Updated integration tests
This commit is contained in:
committed by
Dan Christian Bogos
parent
cd0964dfea
commit
d80a5668ac
@@ -139,6 +139,11 @@ func (pstr *AMQPee) Connect() (err error) {
|
||||
func (pstr *AMQPee) ExportEvent(content interface{}, _ string) (err error) {
|
||||
pstr.reqs.get()
|
||||
pstr.RLock()
|
||||
if pstr.postChan == nil {
|
||||
pstr.RUnlock()
|
||||
pstr.reqs.done()
|
||||
return utils.ErrDisconnected
|
||||
}
|
||||
err = pstr.postChan.Publish(
|
||||
pstr.exchange, // exchange
|
||||
pstr.routingKey, // routing key
|
||||
|
||||
@@ -85,6 +85,9 @@ func (pstr *AMQPv1EE) ExportEvent(content interface{}, _ string) (err error) {
|
||||
pstr.RUnlock()
|
||||
pstr.reqs.done()
|
||||
}()
|
||||
if pstr.session == nil {
|
||||
return utils.ErrDisconnected
|
||||
}
|
||||
sender, err := pstr.session.NewSender(
|
||||
amqpv1.LinkTargetAddress(pstr.queueID),
|
||||
)
|
||||
|
||||
13
ees/ees.go
13
ees/ees.go
@@ -260,9 +260,13 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
|
||||
}
|
||||
|
||||
func exportEventWithExporter(exp EventExporter, ev *utils.CGREvent, oneTime bool, cfg *config.CGRConfig, filterS *engine.FilterS) (err error) {
|
||||
if oneTime {
|
||||
defer exp.Close()
|
||||
}
|
||||
defer func() {
|
||||
updateEEMetrics(exp.GetMetrics(), ev.ID, ev.Event, err != nil, utils.FirstNonEmpty(exp.Cfg().Timezone,
|
||||
cfg.GeneralCfg().DefaultTimezone))
|
||||
if oneTime {
|
||||
exp.Close()
|
||||
}
|
||||
}()
|
||||
var eEv interface{}
|
||||
|
||||
exp.GetMetrics().Lock()
|
||||
@@ -319,7 +323,8 @@ func ExportWithAttempts(exp EventExporter, eEv interface{}, key string) (err err
|
||||
return
|
||||
}
|
||||
for i := 0; i < exp.Cfg().Attempts; i++ {
|
||||
if err = exp.ExportEvent(eEv, key); err == nil {
|
||||
if err = exp.ExportEvent(eEv, key); err == nil ||
|
||||
err == utils.ErrDisconnected { // special error in case the exporter was closed
|
||||
break
|
||||
}
|
||||
if i+1 < exp.Cfg().Attempts {
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/esapi"
|
||||
|
||||
@@ -49,6 +50,7 @@ type ElasticEE struct {
|
||||
dc *utils.SafeMapStorage
|
||||
opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap
|
||||
reqs *concReq
|
||||
sync.RWMutex
|
||||
bytePreparing
|
||||
}
|
||||
|
||||
@@ -106,19 +108,28 @@ func (eEe *ElasticEE) prepareOpts() (err error) {
|
||||
func (eEe *ElasticEE) Cfg() *config.EventExporterCfg { return eEe.cfg }
|
||||
|
||||
func (eEe *ElasticEE) Connect() (err error) {
|
||||
eEe.Lock()
|
||||
// create the client
|
||||
if eEe.eClnt == nil {
|
||||
eEe.eClnt, err = elasticsearch.NewClient(
|
||||
elasticsearch.Config{Addresses: strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep)},
|
||||
)
|
||||
}
|
||||
eEe.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// ExportEvent implements EventExporter
|
||||
func (eEe *ElasticEE) ExportEvent(ev interface{}, key string) (err error) {
|
||||
eEe.reqs.get()
|
||||
defer eEe.reqs.done()
|
||||
eEe.RLock()
|
||||
defer func() {
|
||||
eEe.RUnlock()
|
||||
eEe.reqs.done()
|
||||
}()
|
||||
if eEe.eClnt == nil {
|
||||
return utils.ErrDisconnected
|
||||
}
|
||||
eReq := esapi.IndexRequest{
|
||||
Index: eEe.opts.Index,
|
||||
DocumentID: key,
|
||||
@@ -153,7 +164,9 @@ func (eEe *ElasticEE) ExportEvent(ev interface{}, key string) (err error) {
|
||||
}
|
||||
|
||||
func (eEe *ElasticEE) Close() (_ error) {
|
||||
eEe.Lock()
|
||||
eEe.eClnt = nil
|
||||
eEe.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
19
ees/kafka.go
19
ees/kafka.go
@@ -32,6 +32,7 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE
|
||||
cfg: cfg,
|
||||
dc: dc,
|
||||
topic: utils.DefaultQueueID,
|
||||
reqs: newConcReq(cfg.ConcurrentRequests),
|
||||
}
|
||||
if vals, has := cfg.Opts[utils.KafkaTopic]; has {
|
||||
kfkPstr.topic = utils.IfaceAsString(vals)
|
||||
@@ -56,11 +57,16 @@ func (pstr *KafkaEE) Cfg() *config.EventExporterCfg { return pstr.cfg }
|
||||
func (pstr *KafkaEE) Connect() (_ error) {
|
||||
pstr.Lock()
|
||||
if pstr.writer == nil {
|
||||
pstr.writer = kafka.NewWriter(kafka.WriterConfig{
|
||||
Brokers: []string{pstr.Cfg().ExportPath},
|
||||
MaxAttempts: pstr.Cfg().Attempts,
|
||||
pstr.writer = &kafka.Writer{
|
||||
Addr: kafka.TCP(pstr.Cfg().ExportPath),
|
||||
Topic: pstr.topic,
|
||||
})
|
||||
MaxAttempts: pstr.Cfg().Attempts,
|
||||
}
|
||||
// pstr.writer = kafka.NewWriter(kafka.WriterConfig{
|
||||
// Brokers: []string{pstr.Cfg().ExportPath},
|
||||
// MaxAttempts: pstr.Cfg().Attempts,
|
||||
// Topic: pstr.topic,
|
||||
// })
|
||||
}
|
||||
pstr.Unlock()
|
||||
return
|
||||
@@ -69,6 +75,11 @@ func (pstr *KafkaEE) Connect() (_ error) {
|
||||
func (pstr *KafkaEE) ExportEvent(content interface{}, key string) (err error) {
|
||||
pstr.reqs.get()
|
||||
pstr.RLock()
|
||||
if pstr.writer == nil {
|
||||
pstr.RUnlock()
|
||||
pstr.reqs.done()
|
||||
return utils.ErrDisconnected
|
||||
}
|
||||
err = pstr.writer.WriteMessages(context.Background(), kafka.Message{
|
||||
Key: []byte(key),
|
||||
Value: content.([]byte),
|
||||
|
||||
@@ -61,7 +61,11 @@ func callURL(ub *engine.Account, a *engine.Action, _ engine.Actions, extraData i
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return ExportWithAttempts(pstr, &HTTPPosterRequest{Body: body, Header: make(http.Header)}, "")
|
||||
err = ExportWithAttempts(pstr, &HTTPPosterRequest{Body: body, Header: make(http.Header)}, "")
|
||||
if config.CgrConfig().GeneralCfg().FailedPostsDir != utils.MetaNone {
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Does not block for posts, no error reports
|
||||
@@ -97,5 +101,9 @@ func postEvent(_ *engine.Account, a *engine.Action, _ engine.Actions, extraData
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return ExportWithAttempts(pstr, &HTTPPosterRequest{Body: body, Header: make(http.Header)}, "")
|
||||
err = ExportWithAttempts(pstr, &HTTPPosterRequest{Body: body, Header: make(http.Header)}, "")
|
||||
if config.CgrConfig().GeneralCfg().FailedPostsDir != utils.MetaNone {
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -37,7 +37,8 @@ func TestWriteFldPosts(t *testing.T) {
|
||||
// can convert & write
|
||||
dir := "/tmp/engine/libcdre_test/"
|
||||
exportEvent := &ExportEvents{
|
||||
module: "module",
|
||||
failedPostsDir: dir,
|
||||
module: "module",
|
||||
}
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Fatal("Error removing folder: ", dir, err)
|
||||
|
||||
@@ -104,6 +104,9 @@ func TestAddFldPost(t *testing.T) {
|
||||
if !reflect.DeepEqual(eOut, failedPost) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
|
||||
}
|
||||
for _, id := range failedPostCache.GetItemIDs("") {
|
||||
failedPostCache.Set(id, nil, nil)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilePath(t *testing.T) {
|
||||
|
||||
@@ -102,6 +102,11 @@ func (pstr *NatsEE) Connect() (err error) {
|
||||
func (pstr *NatsEE) ExportEvent(content interface{}, _ string) (err error) {
|
||||
pstr.reqs.get()
|
||||
pstr.RLock()
|
||||
if pstr.poster == nil {
|
||||
pstr.RUnlock()
|
||||
pstr.reqs.done()
|
||||
return utils.ErrDisconnected
|
||||
}
|
||||
if pstr.jetStream {
|
||||
_, err = pstr.posterJS.Publish(pstr.subject, content.([]byte))
|
||||
} else {
|
||||
|
||||
@@ -58,6 +58,7 @@ func TestKafkaParseURL(t *testing.T) {
|
||||
exp := &KafkaEE{
|
||||
cfg: cfg,
|
||||
topic: "cdr_billing",
|
||||
reqs: newConcReq(0),
|
||||
}
|
||||
if kfk := NewKafkaEE(cfg, nil); !reflect.DeepEqual(exp, kfk) {
|
||||
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(kfk))
|
||||
|
||||
@@ -1,479 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package ees
|
||||
|
||||
/*
|
||||
func TestPosterJsonMapID(t *testing.T) {
|
||||
pstrEE := &PosterJSONMapEE{
|
||||
id: "3",
|
||||
}
|
||||
if rcv := pstrEE.ID(); !reflect.DeepEqual(rcv, "3") {
|
||||
t.Errorf("Expected %+v but got %+v", "3", rcv)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapGetMetrics(t *testing.T) {
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pstrEE := &PosterJSONMapEE{
|
||||
dc: dc,
|
||||
}
|
||||
|
||||
if rcv := pstrEE.GetMetrics(); !reflect.DeepEqual(rcv, pstrEE.dc) {
|
||||
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(pstrEE.dc))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapNewPosterJSONMapEECase2(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaAMQPV1jsonMap
|
||||
cgrCfg.EEsCfg().Exporters[0].ExportPath = utils.EmptyString
|
||||
filterS := engine.NewFilterS(cgrCfg, nil, nil)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pstrJSON, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pstrJSONExpect := engine.NewAMQPv1Poster(cgrCfg.EEsCfg().Exporters[0].ExportPath,
|
||||
cgrCfg.EEsCfg().Exporters[0].Attempts, cgrCfg.EEsCfg().Exporters[0].Opts)
|
||||
if !reflect.DeepEqual(pstrJSON.poster, pstrJSONExpect) {
|
||||
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(pstrJSONExpect), utils.ToJSON(pstrJSON.poster))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapNewPosterJSONMapEECase3(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQSjsonMap
|
||||
cgrCfg.EEsCfg().Exporters[0].ExportPath = utils.EmptyString
|
||||
filterS := engine.NewFilterS(cgrCfg, nil, nil)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pstrJSON, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if _, canCast := pstrJSON.poster.(*engine.SQSPoster); !canCast {
|
||||
t.Error("Can't cast")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapNewPosterJSONMapEECase4(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaKafkajsonMap
|
||||
filterS := engine.NewFilterS(cgrCfg, nil, nil)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pstrJSON, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pstrJSONExpect := engine.NewKafkaPoster(cgrCfg.EEsCfg().Exporters[0].ExportPath,
|
||||
cgrCfg.EEsCfg().Exporters[0].Attempts, cgrCfg.EEsCfg().Exporters[0].Opts)
|
||||
if !reflect.DeepEqual(pstrJSON.poster, pstrJSONExpect) {
|
||||
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(pstrJSONExpect), utils.ToJSON(pstrJSON.poster))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapNewPosterJSONMapEECase5(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaS3jsonMap
|
||||
filterS := engine.NewFilterS(cgrCfg, nil, nil)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pstrJSON, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pstrJSONExpect := engine.NewS3Poster(cgrCfg.EEsCfg().Exporters[0].ExportPath,
|
||||
cgrCfg.EEsCfg().Exporters[0].Attempts, cgrCfg.EEsCfg().Exporters[0].Opts)
|
||||
if !reflect.DeepEqual(pstrJSON.poster, pstrJSONExpect) {
|
||||
t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(pstrJSONExpect), utils.ToJSON(pstrJSON.poster))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapExportEvent(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQSjsonMap
|
||||
cgrEv := new(utils.CGREvent)
|
||||
newIDb := engine.NewInternalDB(nil, nil, true)
|
||||
newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil)
|
||||
filterS := engine.NewFilterS(cgrCfg, nil, newDM)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
pstrEE, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
cgrEv.Event = map[string]interface{}{
|
||||
"test": "string",
|
||||
}
|
||||
cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Fields = []*config.FCTemplate{
|
||||
{
|
||||
Path: "*exp.1", Type: utils.MetaVariable,
|
||||
Value: config.NewRSRParsersMustCompile("~*req.field1", utils.InfieldSep),
|
||||
},
|
||||
{
|
||||
Path: "*exp.2", Type: utils.MetaVariable,
|
||||
Value: config.NewRSRParsersMustCompile("*req.field2", utils.InfieldSep),
|
||||
},
|
||||
}
|
||||
for _, field := range cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Fields {
|
||||
field.ComputePath()
|
||||
}
|
||||
errExpect := "MissingRegion: could not find region configuration"
|
||||
if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %q but received %q", errExpect, err)
|
||||
}
|
||||
dcExpect := int64(1)
|
||||
if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
|
||||
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
|
||||
}
|
||||
cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ComputeFields()
|
||||
if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %q but received %q", errExpect, err)
|
||||
}
|
||||
dcExpect = int64(2)
|
||||
if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
|
||||
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
|
||||
}
|
||||
}
|
||||
|
||||
type testPoster struct {
|
||||
body []byte
|
||||
}
|
||||
|
||||
func (pstr *testPoster) Close() {}
|
||||
func (pstr *testPoster) Post(body []byte, key string) error {
|
||||
pstr.body = body
|
||||
return nil
|
||||
}
|
||||
func TestPosterJsonMapExportEvent1(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaAMQPjsonMap
|
||||
cgrEv := new(utils.CGREvent)
|
||||
newIDb := engine.NewInternalDB(nil, nil, true)
|
||||
newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil)
|
||||
filterS := engine.NewFilterS(cgrCfg, nil, newDM)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
////
|
||||
////
|
||||
tstPstr := &testPoster{}
|
||||
pstrEE := &PosterJSONMapEE{
|
||||
id: cgrCfg.EEsCfg().Exporters[0].ID,
|
||||
cgrCfg: cgrCfg,
|
||||
cfgIdx: 0,
|
||||
filterS: filterS,
|
||||
dc: dc,
|
||||
poster: tstPstr,
|
||||
reqs: newConcReq(0),
|
||||
}
|
||||
// pstrEE.poster = tstPstr
|
||||
cgrEv.Event = map[string]interface{}{
|
||||
"test": "string",
|
||||
}
|
||||
cgrCfg.EEsCfg().Exporters[0].Fields = []*config.FCTemplate{
|
||||
{
|
||||
Path: "*exp.1", Type: utils.MetaVariable,
|
||||
Value: config.NewRSRParsersMustCompile("~*req.field1", utils.InfieldSep),
|
||||
},
|
||||
{
|
||||
Path: "*exp.2", Type: utils.MetaVariable,
|
||||
Value: config.NewRSRParsersMustCompile("*req.field2", utils.InfieldSep),
|
||||
},
|
||||
}
|
||||
for _, field := range cgrCfg.EEsCfg().Exporters[0].Fields {
|
||||
field.ComputePath()
|
||||
}
|
||||
cgrCfg.EEsCfg().Exporters[0].ComputeFields()
|
||||
if err := pstrEE.ExportEvent(cgrEv); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
dcExpect := int64(1)
|
||||
if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
|
||||
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
|
||||
}
|
||||
bodyExpect := map[string]interface{}{
|
||||
"2": "*req.field2",
|
||||
}
|
||||
var rcv map[string]interface{}
|
||||
if err := json.Unmarshal(tstPstr.body, &rcv); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !reflect.DeepEqual(rcv, bodyExpect) {
|
||||
t.Errorf("Expected %s but received %s", utils.ToJSON(bodyExpect), utils.ToJSON(rcv))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapExportEvent2(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQSjsonMap
|
||||
cgrEv := new(utils.CGREvent)
|
||||
newIDb := engine.NewInternalDB(nil, nil, true)
|
||||
newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil)
|
||||
filterS := engine.NewFilterS(cgrCfg, nil, newDM)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
pstrEE, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
cgrEv.Event = map[string]interface{}{
|
||||
"test": "string",
|
||||
}
|
||||
cgrCfg.EEsCfg().Exporters[0].Fields = []*config.FCTemplate{
|
||||
{
|
||||
Path: "*exp.1", Type: utils.MetaVariable,
|
||||
Value: config.NewRSRParsersMustCompile("~*req.field1", utils.InfieldSep),
|
||||
Filters: []string{"*wrong-type"},
|
||||
},
|
||||
{
|
||||
Path: "*exp.1", Type: utils.MetaVariable,
|
||||
Value: config.NewRSRParsersMustCompile("~*req.field1", utils.InfieldSep),
|
||||
Filters: []string{"*wrong-type"},
|
||||
},
|
||||
}
|
||||
for _, field := range cgrCfg.EEsCfg().Exporters[0].Fields {
|
||||
field.ComputePath()
|
||||
}
|
||||
cgrCfg.EEsCfg().Exporters[0].ComputeFields()
|
||||
errExpect := "inline parse error for string: <*wrong-type>"
|
||||
if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %q but received %q", errExpect, err)
|
||||
}
|
||||
dcExpect := int64(1)
|
||||
if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
|
||||
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapExportEvent3(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQSjsonMap
|
||||
cgrEv := new(utils.CGREvent)
|
||||
newIDb := engine.NewInternalDB(nil, nil, true)
|
||||
newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil)
|
||||
filterS := engine.NewFilterS(cgrCfg, nil, newDM)
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
"Local",
|
||||
utils.EmptyString,
|
||||
))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
pstrEE, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
cgrEv.Event = map[string]interface{}{
|
||||
"test": "string",
|
||||
}
|
||||
cgrEv.Event = map[string]interface{}{
|
||||
"test": make(chan int),
|
||||
}
|
||||
cgrCfg.EEsCfg().Exporters[0].Fields = []*config.FCTemplate{{}}
|
||||
for _, field := range cgrCfg.EEsCfg().Exporters[0].Fields {
|
||||
field.ComputePath()
|
||||
}
|
||||
cgrCfg.EEsCfg().Exporters[0].ComputeFields()
|
||||
errExpect := "json: unsupported type: chan int"
|
||||
if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %q but received %q", errExpect, err)
|
||||
}
|
||||
dcExpect := int64(1)
|
||||
if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) {
|
||||
t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents])
|
||||
}
|
||||
pstrEE.OnEvicted("test", "test")
|
||||
}
|
||||
|
||||
type mockPoster struct {
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
func (mp mockPoster) Post(body []byte, key string) error {
|
||||
time.Sleep(3 * time.Second)
|
||||
mp.wg.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mockPoster) Close() {
|
||||
return
|
||||
}
|
||||
|
||||
func TestPosterJsonMapSync(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
var cfgIdx int
|
||||
cfgIdx = 0
|
||||
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Type = utils.MetaHTTPjsonMap
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone,
|
||||
cgrCfg.GeneralCfg().DefaultTimezone))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
//Create an event
|
||||
cgrEvent := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]interface{}{
|
||||
"Account": "1001",
|
||||
"Destination": "1002",
|
||||
},
|
||||
}
|
||||
|
||||
var wg1 = &sync.WaitGroup{}
|
||||
|
||||
wg1.Add(3)
|
||||
|
||||
test := make(chan struct{})
|
||||
go func() {
|
||||
wg1.Wait()
|
||||
close(test)
|
||||
}()
|
||||
|
||||
mckPoster := mockPoster{
|
||||
wg: wg1,
|
||||
}
|
||||
exp := &PosterJSONMapEE{
|
||||
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
|
||||
cgrCfg: cgrCfg,
|
||||
cfgIdx: cfgIdx,
|
||||
filterS: new(engine.FilterS),
|
||||
poster: mckPoster,
|
||||
dc: dc,
|
||||
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
go exp.ExportEvent(cgrEvent)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-test:
|
||||
return
|
||||
case <-time.After(4 * time.Second):
|
||||
t.Error("Can't asynchronously export events")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosterJsonMapSyncLimit(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
var cfgIdx int
|
||||
cfgIdx = 0
|
||||
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Type = utils.MetaHTTPjsonMap
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests = 1
|
||||
dc, err := newEEMetrics(utils.FirstNonEmpty(
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone,
|
||||
cgrCfg.GeneralCfg().DefaultTimezone))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
//Create an event
|
||||
cgrEvent := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]interface{}{
|
||||
"Account": "1001",
|
||||
"Destination": "1002",
|
||||
},
|
||||
}
|
||||
|
||||
var wg1 = &sync.WaitGroup{}
|
||||
|
||||
wg1.Add(3)
|
||||
|
||||
test := make(chan struct{})
|
||||
go func() {
|
||||
wg1.Wait()
|
||||
close(test)
|
||||
}()
|
||||
|
||||
mckPoster := mockPoster{
|
||||
wg: wg1,
|
||||
}
|
||||
exp := &PosterJSONMapEE{
|
||||
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
|
||||
cgrCfg: cgrCfg,
|
||||
cfgIdx: cfgIdx,
|
||||
filterS: new(engine.FilterS),
|
||||
poster: mckPoster,
|
||||
dc: dc,
|
||||
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
go exp.ExportEvent(cgrEvent)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-test:
|
||||
t.Error("Should not have been possible to asynchronously export events")
|
||||
case <-time.After(4 * time.Second):
|
||||
return
|
||||
}
|
||||
}
|
||||
*/
|
||||
22
ees/sql.go
22
ees/sql.go
@@ -23,6 +23,7 @@ import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"gorm.io/driver/mysql"
|
||||
"gorm.io/driver/postgres"
|
||||
@@ -53,6 +54,7 @@ type SQLEe struct {
|
||||
|
||||
dialect gorm.Dialector
|
||||
tableName string
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
type sqlPosterRequest struct {
|
||||
@@ -132,20 +134,36 @@ func openDB(dialect gorm.Dialector, opts map[string]interface{}) (db *gorm.DB, s
|
||||
func (sqlEe *SQLEe) Cfg() *config.EventExporterCfg { return sqlEe.cfg }
|
||||
|
||||
func (sqlEe *SQLEe) Connect() (err error) {
|
||||
sqlEe.Lock()
|
||||
if sqlEe.db == nil || sqlEe.sqldb == nil {
|
||||
sqlEe.db, sqlEe.sqldb, err = openDB(sqlEe.dialect, sqlEe.Cfg().Opts)
|
||||
}
|
||||
sqlEe.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (sqlEe *SQLEe) ExportEvent(req interface{}, _ string) error {
|
||||
sqlEe.reqs.get()
|
||||
defer sqlEe.reqs.done()
|
||||
sqlEe.RLock()
|
||||
defer func() {
|
||||
sqlEe.RUnlock()
|
||||
sqlEe.reqs.done()
|
||||
}()
|
||||
if sqlEe.db == nil {
|
||||
return utils.ErrDisconnected
|
||||
}
|
||||
sReq := req.(*sqlPosterRequest)
|
||||
return sqlEe.db.Table(sqlEe.tableName).Exec(sReq.Querry, sReq.Values...).Error
|
||||
}
|
||||
|
||||
func (sqlEe *SQLEe) Close() error { return sqlEe.sqldb.Close() }
|
||||
func (sqlEe *SQLEe) Close() (err error) {
|
||||
sqlEe.Lock()
|
||||
err = sqlEe.sqldb.Close()
|
||||
sqlEe.db = nil
|
||||
sqlEe.sqldb = nil
|
||||
sqlEe.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage { return sqlEe.dc }
|
||||
|
||||
|
||||
@@ -298,7 +298,7 @@ func TestSQLExportEvent1(t *testing.T) {
|
||||
if err := sqlEe.Connect(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := sqlEe.ExportEvent(&sqlPosterRequest{Querry: "INSERT INTO expTable VALUES (); ", Values: []interface{}{}}, ""); err != nil {
|
||||
if err := sqlEe.ExportEvent(&sqlPosterRequest{Querry: "INSERT INTO cdrs VALUES (); ", Values: []interface{}{}}, ""); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
sqlEe.Close()
|
||||
|
||||
Reference in New Issue
Block a user