diff --git a/actions/export.go b/actions/export.go index 0814c0ac7..eb9383e08 100644 --- a/actions/export.go +++ b/actions/export.go @@ -38,9 +38,10 @@ func newActHTTPPost(cfg *config.CGRConfig, aCfg *engine.APAction) (aL *actHTTPPo } for i, actD := range aL.cfg().Diktats { aL.pstrs[i], _ = ees.NewHTTPjsonMapEE(&config.EventExporterCfg{ - ID: aL.id(), - ExportPath: actD.Path, - Attempts: aL.config.GeneralCfg().PosterAttempts, + ID: aL.id(), + ExportPath: actD.Path, + Attempts: aL.config.GeneralCfg().PosterAttempts, + FailedPostsDir: cfg.GeneralCfg().FailedPostsDir, }, cfg, nil, nil) } return @@ -72,7 +73,11 @@ func (aL *actHTTPPost) execute(_ *context.Context, data utils.MapStorage, _ stri if async, has := aL.cfg().Opts[utils.MetaAsync]; has && utils.IfaceAsString(async) == utils.TrueStr { go ees.ExportWithAttempts(pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString) } else if err = ees.ExportWithAttempts(pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString); err != nil { - partExec = true + if pstr.Cfg().FailedPostsDir != utils.MetaNone { + err = nil + } else { + partExec = true + } } } if partExec { diff --git a/actions/export_test.go b/actions/export_test.go index 6542bcf6a..487c9bbcc 100644 --- a/actions/export_test.go +++ b/actions/export_test.go @@ -43,10 +43,7 @@ func TestACHTTPPostExecute(t *testing.T) { }, }, } - http := &actHTTPPost{ - config: cfg, - aCfg: apAction, - } + http := newActHTTPPost(cfg, apAction) dataStorage := utils.MapStorage{ utils.MetaReq: map[string]interface{}{ @@ -67,7 +64,7 @@ func TestACHTTPPostExecute(t *testing.T) { buff := new(bytes.Buffer) log.SetOutput(buff) - expected := "[WARNING] Posting to : <~*balance.TestBalance.Value>, error: " + expected := ` Exporter could not export because err: ` if err := http.execute(nil, dataStorage, utils.EmptyString); err != nil { t.Error(err) } else if rcv := buff.String(); !strings.Contains(rcv, expected) { @@ -110,10 +107,7 @@ func TestACHTTPPostValues(t *testing.T) { }, }, } - http := &actHTTPPost{ - config: cfg, - aCfg: apAction, - } + http := newActHTTPPost(cfg, apAction) dataStorage := utils.MapStorage{ utils.MetaReq: map[string]interface{}{ utils.AccountField: 1003, diff --git a/apis/account_it_test.go b/apis/account_it_test.go index 38f6673f1..b9c499d4a 100644 --- a/apis/account_it_test.go +++ b/apis/account_it_test.go @@ -452,15 +452,15 @@ func testAccMaxAbstracts(t *testing.T) { Type: utils.MetaConcrete, Units: 213, /* - CostIncrements: []*utils.APICostIncrement{ - { - Increment: utils.Float64Pointer(float64(time.Second)), - FixedFee: utils.Float64Pointer(0), - RecurrentFee: utils.Float64Pointer(0), + CostIncrements: []*utils.APICostIncrement{ + { + Increment: utils.Float64Pointer(float64(time.Second)), + FixedFee: utils.Float64Pointer(0), + RecurrentFee: utils.Float64Pointer(0), + }, }, - }, - */ + */ }, }, }, @@ -550,7 +550,7 @@ func testAccMaxAbstracts(t *testing.T) { FilterIDs: []string{"*string:~*req.Account:1004"}, Weights: utils.DynamicWeights{ { - Weight: 0, + Weight: 0, }, }, Balances: map[string]*utils.ExtBalance{ @@ -558,7 +558,7 @@ func testAccMaxAbstracts(t *testing.T) { ID: "AbstractBalance1", Weights: utils.DynamicWeights{ { - Weight: 25, + Weight: 25, }, }, Type: "*abstract", @@ -576,11 +576,11 @@ func testAccMaxAbstracts(t *testing.T) { FilterIDs: nil, Weights: utils.DynamicWeights{ { - Weight: 20, + Weight: 20, }, }, - Type: "*concrete", - Units: utils.Float64Pointer(213), + Type: "*concrete", + Units: utils.Float64Pointer(213), }, }, }, @@ -707,7 +707,7 @@ func testAccDebitAbstracts(t *testing.T) { FilterIDs: []string{"*string:~*req.Account:1004"}, Weights: utils.DynamicWeights{ { - Weight: 0, + Weight: 0, }, }, Balances: map[string]*utils.ExtBalance{ @@ -715,7 +715,7 @@ func testAccDebitAbstracts(t *testing.T) { ID: "AbstractBalance1", Weights: utils.DynamicWeights{ { - Weight: 25, + Weight: 25, }, }, Type: "*abstract", @@ -733,11 +733,11 @@ func testAccDebitAbstracts(t *testing.T) { FilterIDs: nil, Weights: utils.DynamicWeights{ { - Weight: 20, + Weight: 20, }, }, - Type: "*concrete", - Units: utils.Float64Pointer(213), + Type: "*concrete", + Units: utils.Float64Pointer(213), }, }, }, @@ -843,12 +843,12 @@ func testAccMaxConcretes(t *testing.T) { }, Accounting: map[string]*utils.ExtAccountCharge{ accKEy: &utils.ExtAccountCharge{ - AccountID: "TEST_ACC_IT_TEST6", - BalanceID: "ConcreteBalance2", - Units: utils.Float64Pointer(213), - BalanceLimit: utils.Float64Pointer(0), - UnitFactorID: "", - RatingID: rtID, + AccountID: "TEST_ACC_IT_TEST6", + BalanceID: "ConcreteBalance2", + Units: utils.Float64Pointer(213), + BalanceLimit: utils.Float64Pointer(0), + UnitFactorID: "", + RatingID: rtID, }, }, UnitFactors: map[string]*utils.ExtUnitFactor{}, @@ -861,7 +861,7 @@ func testAccMaxConcretes(t *testing.T) { FilterIDs: []string{"*string:~*req.Account:1004"}, Weights: utils.DynamicWeights{ { - Weight: 0, + Weight: 0, }, }, Balances: map[string]*utils.ExtBalance{ @@ -869,7 +869,7 @@ func testAccMaxConcretes(t *testing.T) { ID: "AbstractBalance1", Weights: utils.DynamicWeights{ { - Weight: 25, + Weight: 25, }, }, Type: "*abstract", @@ -883,14 +883,14 @@ func testAccMaxConcretes(t *testing.T) { Units: utils.Float64Pointer(40000000000), }, "ConcreteBalance2": { - ID: "ConcreteBalance2", + ID: "ConcreteBalance2", Weights: utils.DynamicWeights{ { - Weight: 20, + Weight: 20, }, }, - Type: "*concrete", - Units: utils.Float64Pointer(0), + Type: "*concrete", + Units: utils.Float64Pointer(0), }, }, }, @@ -996,12 +996,12 @@ func testAccDebitConcretes(t *testing.T) { }, Accounting: map[string]*utils.ExtAccountCharge{ accKEy: &utils.ExtAccountCharge{ - AccountID: "TEST_ACC_IT_TEST7", - BalanceID: "ConcreteBalance2", - Units: utils.Float64Pointer(213), - BalanceLimit: utils.Float64Pointer(0), - UnitFactorID: "", - RatingID: rtID, + AccountID: "TEST_ACC_IT_TEST7", + BalanceID: "ConcreteBalance2", + Units: utils.Float64Pointer(213), + BalanceLimit: utils.Float64Pointer(0), + UnitFactorID: "", + RatingID: rtID, }, }, UnitFactors: map[string]*utils.ExtUnitFactor{}, @@ -1014,7 +1014,7 @@ func testAccDebitConcretes(t *testing.T) { FilterIDs: []string{"*string:~*req.Account:1004"}, Weights: utils.DynamicWeights{ { - Weight: 0, + Weight: 0, }, }, Balances: map[string]*utils.ExtBalance{ @@ -1022,7 +1022,7 @@ func testAccDebitConcretes(t *testing.T) { ID: "AbstractBalance1", Weights: utils.DynamicWeights{ { - Weight: 25, + Weight: 25, }, }, Type: "*abstract", @@ -1036,14 +1036,14 @@ func testAccDebitConcretes(t *testing.T) { Units: utils.Float64Pointer(40000000000), }, "ConcreteBalance2": { - ID: "ConcreteBalance2", + ID: "ConcreteBalance2", Weights: utils.DynamicWeights{ { - Weight: 20, + Weight: 20, }, }, - Type: "*concrete", - Units: utils.Float64Pointer(0), + Type: "*concrete", + Units: utils.Float64Pointer(0), }, }, }, @@ -1140,7 +1140,7 @@ func testAccActionSetRmvBalance(t *testing.T) { FilterIDs: []string{"*string:~*req.Account:1001"}, Weights: utils.DynamicWeights{ { - Weight: 12, + Weight: 12, }, }, Type: "*abstract", @@ -1152,7 +1152,7 @@ func testAccActionSetRmvBalance(t *testing.T) { }, Weights: utils.DynamicWeights{ { - Weight: 10, + Weight: 10, }, }, } diff --git a/ees/amqp.go b/ees/amqp.go index 22c974406..d03e2b071 100644 --- a/ees/amqp.go +++ b/ees/amqp.go @@ -139,6 +139,11 @@ func (pstr *AMQPee) Connect() (err error) { func (pstr *AMQPee) ExportEvent(content interface{}, _ string) (err error) { pstr.reqs.get() pstr.RLock() + if pstr.postChan == nil { + pstr.RUnlock() + pstr.reqs.done() + return utils.ErrDisconnected + } err = pstr.postChan.Publish( pstr.exchange, // exchange pstr.routingKey, // routing key diff --git a/ees/amqpv1.go b/ees/amqpv1.go index 150b2cbdf..bda9aca6e 100644 --- a/ees/amqpv1.go +++ b/ees/amqpv1.go @@ -85,6 +85,9 @@ func (pstr *AMQPv1EE) ExportEvent(content interface{}, _ string) (err error) { pstr.RUnlock() pstr.reqs.done() }() + if pstr.session == nil { + return utils.ErrDisconnected + } sender, err := pstr.session.NewSender( amqpv1.LinkTargetAddress(pstr.queueID), ) diff --git a/ees/ees.go b/ees/ees.go index 9382b0ba0..a17f3ca9a 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -251,9 +251,13 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply * } func exportEventWithExporter(exp EventExporter, ev *utils.CGREvent, oneTime bool, cfg *config.CGRConfig, filterS *engine.FilterS) (err error) { - if oneTime { - defer exp.Close() - } + defer func() { + updateEEMetrics(exp.GetMetrics(), ev.ID, ev.Event, err != nil, utils.FirstNonEmpty(exp.Cfg().Timezone, + cfg.GeneralCfg().DefaultTimezone)) + if oneTime { + exp.Close() + } + }() var eEv interface{} exp.GetMetrics().Lock() @@ -310,7 +314,8 @@ func ExportWithAttempts(exp EventExporter, eEv interface{}, key string) (err err return } for i := 0; i < exp.Cfg().Attempts; i++ { - if err = exp.ExportEvent(eEv, key); err == nil { + if err = exp.ExportEvent(eEv, key); err == nil || + err == utils.ErrDisconnected { // special error in case the exporter was closed break } if i+1 < exp.Cfg().Attempts { diff --git a/ees/elastic.go b/ees/elastic.go index 38380d7f7..47ba58221 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -24,6 +24,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "github.com/elastic/go-elasticsearch/esapi" @@ -49,6 +50,7 @@ type ElasticEE struct { dc *utils.SafeMapStorage opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap reqs *concReq + sync.RWMutex bytePreparing } @@ -106,19 +108,28 @@ func (eEe *ElasticEE) prepareOpts() (err error) { func (eEe *ElasticEE) Cfg() *config.EventExporterCfg { return eEe.cfg } func (eEe *ElasticEE) Connect() (err error) { + eEe.Lock() // create the client if eEe.eClnt == nil { eEe.eClnt, err = elasticsearch.NewClient( elasticsearch.Config{Addresses: strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep)}, ) } + eEe.Unlock() return } // ExportEvent implements EventExporter func (eEe *ElasticEE) ExportEvent(ev interface{}, key string) (err error) { eEe.reqs.get() - defer eEe.reqs.done() + eEe.RLock() + defer func() { + eEe.RUnlock() + eEe.reqs.done() + }() + if eEe.eClnt == nil { + return utils.ErrDisconnected + } eReq := esapi.IndexRequest{ Index: eEe.opts.Index, DocumentID: key, @@ -153,7 +164,9 @@ func (eEe *ElasticEE) ExportEvent(ev interface{}, key string) (err error) { } func (eEe *ElasticEE) Close() (_ error) { + eEe.Lock() eEe.eClnt = nil + eEe.Unlock() return } diff --git a/ees/kafka.go b/ees/kafka.go index 523e0fb0a..47dd98f9a 100644 --- a/ees/kafka.go +++ b/ees/kafka.go @@ -32,6 +32,7 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE cfg: cfg, dc: dc, topic: utils.DefaultQueueID, + reqs: newConcReq(cfg.ConcurrentRequests), } if vals, has := cfg.Opts[utils.KafkaTopic]; has { kfkPstr.topic = utils.IfaceAsString(vals) @@ -56,11 +57,16 @@ func (pstr *KafkaEE) Cfg() *config.EventExporterCfg { return pstr.cfg } func (pstr *KafkaEE) Connect() (_ error) { pstr.Lock() if pstr.writer == nil { - pstr.writer = kafka.NewWriter(kafka.WriterConfig{ - Brokers: []string{pstr.Cfg().ExportPath}, - MaxAttempts: pstr.Cfg().Attempts, + pstr.writer = &kafka.Writer{ + Addr: kafka.TCP(pstr.Cfg().ExportPath), Topic: pstr.topic, - }) + MaxAttempts: pstr.Cfg().Attempts, + } + // pstr.writer = kafka.NewWriter(kafka.WriterConfig{ + // Brokers: []string{pstr.Cfg().ExportPath}, + // MaxAttempts: pstr.Cfg().Attempts, + // Topic: pstr.topic, + // }) } pstr.Unlock() return @@ -69,6 +75,11 @@ func (pstr *KafkaEE) Connect() (_ error) { func (pstr *KafkaEE) ExportEvent(content interface{}, key string) (err error) { pstr.reqs.get() pstr.RLock() + if pstr.writer == nil { + pstr.RUnlock() + pstr.reqs.done() + return utils.ErrDisconnected + } err = pstr.writer.WriteMessages(context.Background(), kafka.Message{ Key: []byte(key), Value: content.([]byte), diff --git a/ees/libcdre_it_test.go b/ees/libcdre_it_test.go index 4f7745353..16cefc67e 100644 --- a/ees/libcdre_it_test.go +++ b/ees/libcdre_it_test.go @@ -37,7 +37,8 @@ func TestWriteFldPosts(t *testing.T) { // can convert & write dir := "/tmp/engine/libcdre_test/" exportEvent := &ExportEvents{ - module: "module", + failedPostsDir: dir, + module: "module", } if err := os.RemoveAll(dir); err != nil { t.Fatal("Error removing folder: ", dir, err) diff --git a/ees/libcdre_test.go b/ees/libcdre_test.go index dd9fa4d33..2e97dbef3 100644 --- a/ees/libcdre_test.go +++ b/ees/libcdre_test.go @@ -104,6 +104,9 @@ func TestAddFldPost(t *testing.T) { if !reflect.DeepEqual(eOut, failedPost) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) } + for _, id := range failedPostCache.GetItemIDs("") { + failedPostCache.Set(id, nil, nil) + } } func TestFilePath(t *testing.T) { diff --git a/ees/nats.go b/ees/nats.go index fca1097da..fc9929864 100644 --- a/ees/nats.go +++ b/ees/nats.go @@ -102,6 +102,11 @@ func (pstr *NatsEE) Connect() (err error) { func (pstr *NatsEE) ExportEvent(content interface{}, _ string) (err error) { pstr.reqs.get() pstr.RLock() + if pstr.poster == nil { + pstr.RUnlock() + pstr.reqs.done() + return utils.ErrDisconnected + } if pstr.jetStream { _, err = pstr.posterJS.Publish(pstr.subject, content.([]byte)) } else { diff --git a/ees/poster_test.go b/ees/poster_test.go index b85c1a3d4..f3c9364b7 100644 --- a/ees/poster_test.go +++ b/ees/poster_test.go @@ -58,6 +58,7 @@ func TestKafkaParseURL(t *testing.T) { exp := &KafkaEE{ cfg: cfg, topic: "cdr_billing", + reqs: newConcReq(0), } if kfk := NewKafkaEE(cfg, nil); !reflect.DeepEqual(exp, kfk) { t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(kfk)) diff --git a/ees/posterjsonmap_test.go b/ees/posterjsonmap_test.go deleted file mode 100644 index d6afdef64..000000000 --- a/ees/posterjsonmap_test.go +++ /dev/null @@ -1,484 +0,0 @@ -/* -Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package ees - -/* -func TestPosterJsonMapID(t *testing.T) { - pstrEE := &PosterJSONMapEE{ - id: "3", - } - if rcv := pstrEE.ID(); !reflect.DeepEqual(rcv, "3") { - t.Errorf("Expected %+v but got %+v", "3", rcv) - } -} - -func TestPosterJsonMapGetMetrics(t *testing.T) { - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - pstrEE := &PosterJSONMapEE{ - dc: dc, - } - - if rcv := pstrEE.GetMetrics(); !reflect.DeepEqual(rcv, pstrEE.dc) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(pstrEE.dc)) - } -} - -func TestPosterJsonMapNewPosterJSONMapEECase2(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaAMQPV1jsonMap - cgrCfg.EEsCfg().Exporters[0].ExportPath = utils.EmptyString - filterS := engine.NewFilterS(cgrCfg, nil, nil) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - pstrJSON, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc) - if err != nil { - t.Error(err) - } - pstrJSONExpect := engine.NewAMQPv1Poster(cgrCfg.EEsCfg().Exporters[0].ExportPath, - cgrCfg.EEsCfg().Exporters[0].Attempts, cgrCfg.EEsCfg().Exporters[0].Opts) - if !reflect.DeepEqual(pstrJSON.poster, pstrJSONExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(pstrJSONExpect), utils.ToJSON(pstrJSON.poster)) - } -} - -func TestPosterJsonMapNewPosterJSONMapEECase3(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQSjsonMap - cgrCfg.EEsCfg().Exporters[0].ExportPath = utils.EmptyString - filterS := engine.NewFilterS(cgrCfg, nil, nil) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - pstrJSON, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc) - if err != nil { - t.Error(err) - } - - if _, canCast := pstrJSON.poster.(*engine.SQSPoster); !canCast { - t.Error("Can't cast") - } -} - -func TestPosterJsonMapNewPosterJSONMapEECase4(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaKafkajsonMap - filterS := engine.NewFilterS(cgrCfg, nil, nil) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - pstrJSON, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc) - if err != nil { - t.Error(err) - } - pstrJSONExpect := engine.NewKafkaPoster(cgrCfg.EEsCfg().Exporters[0].ExportPath, - cgrCfg.EEsCfg().Exporters[0].Attempts, cgrCfg.EEsCfg().Exporters[0].Opts) - if !reflect.DeepEqual(pstrJSON.poster, pstrJSONExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(pstrJSONExpect), utils.ToJSON(pstrJSON.poster)) - } -} - -func TestPosterJsonMapNewPosterJSONMapEECase5(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaS3jsonMap - filterS := engine.NewFilterS(cgrCfg, nil, nil) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - pstrJSON, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc) - if err != nil { - t.Error(err) - } - pstrJSONExpect := engine.NewS3Poster(cgrCfg.EEsCfg().Exporters[0].ExportPath, - cgrCfg.EEsCfg().Exporters[0].Attempts, cgrCfg.EEsCfg().Exporters[0].Opts) - if !reflect.DeepEqual(pstrJSON.poster, pstrJSONExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(pstrJSONExpect), utils.ToJSON(pstrJSON.poster)) - } -} - -func TestPosterJsonMapExportEvent(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQSjsonMap - cgrEv := new(utils.CGREvent) - newIDb := engine.NewInternalDB(nil, nil, true) - newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil) - filterS := engine.NewFilterS(cgrCfg, nil, newDM) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - - pstrEE, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc) - if err != nil { - t.Error(err) - } - cgrEv.Event = map[string]interface{}{ - "test": "string", - } - cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Fields = []*config.FCTemplate{ - { - Path: "*exp.1", Type: utils.MetaVariable, - Value: config.NewRSRParsersMustCompile("~*req.field1", utils.InfieldSep), - }, - { - Path: "*exp.2", Type: utils.MetaVariable, - Value: config.NewRSRParsersMustCompile("*req.field2", utils.InfieldSep), - }, - } - for _, field := range cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Fields { - field.ComputePath() - } - errExpect := "MissingRegion: could not find region configuration" - if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect { - t.Errorf("Expected %q but received %q", errExpect, err) - } - dcExpect := int64(1) - if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) { - t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) - } - cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ComputeFields() - if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect { - t.Errorf("Expected %q but received %q", errExpect, err) - } - dcExpect = int64(2) - if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) { - t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) - } -} - -type testPoster struct { - body []byte -} - -func (pstr *testPoster) Close() {} -func (pstr *testPoster) Post(body []byte, key string) error { - pstr.body = body - return nil -} -func TestPosterJsonMapExportEvent1(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaAMQPjsonMap - cgrEv := new(utils.CGREvent) - newIDb := engine.NewInternalDB(nil, nil, true) - newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil) - filterS := engine.NewFilterS(cgrCfg, nil, newDM) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - //// - //// - tstPstr := &testPoster{} - pstrEE := &PosterJSONMapEE{ - id: cgrCfg.EEsCfg().Exporters[0].ID, - cgrCfg: cgrCfg, - cfgIdx: 0, - filterS: filterS, - dc: dc, - poster: tstPstr, - reqs: newConcReq(0), - } - // pstrEE.poster = tstPstr - cgrEv.Event = map[string]interface{}{ - "test": "string", - } - cgrCfg.EEsCfg().Exporters[0].Fields = []*config.FCTemplate{ - { - Path: "*exp.1", Type: utils.MetaVariable, - Value: config.NewRSRParsersMustCompile("~*req.field1", utils.InfieldSep), - }, - { - Path: "*exp.2", Type: utils.MetaVariable, - Value: config.NewRSRParsersMustCompile("*req.field2", utils.InfieldSep), - }, - } - for _, field := range cgrCfg.EEsCfg().Exporters[0].Fields { - field.ComputePath() - } - cgrCfg.EEsCfg().Exporters[0].ComputeFields() - if err := pstrEE.ExportEvent(cgrEv); err != nil { - t.Error(err) - } - dcExpect := int64(1) - if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) { - t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) - } - bodyExpect := map[string]interface{}{ - "2": "*req.field2", - } - var rcv map[string]interface{} - if err := json.Unmarshal(tstPstr.body, &rcv); err != nil { - t.Fatal(err) - } - if !reflect.DeepEqual(rcv, bodyExpect) { - t.Errorf("Expected %s but received %s", utils.ToJSON(bodyExpect), utils.ToJSON(rcv)) - } -} - -func TestPosterJsonMapExportEvent2(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQSjsonMap - cgrEv := new(utils.CGREvent) - newIDb := engine.NewInternalDB(nil, nil, true) - newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil) - filterS := engine.NewFilterS(cgrCfg, nil, newDM) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - - pstrEE, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc) - if err != nil { - t.Error(err) - } - cgrEv.Event = map[string]interface{}{ - "test": "string", - } - cgrCfg.EEsCfg().Exporters[0].Fields = []*config.FCTemplate{ - { - Path: "*exp.1", Type: utils.MetaVariable, - Value: config.NewRSRParsersMustCompile("~*req.field1", utils.InfieldSep), - Filters: []string{"*wrong-type"}, - }, - { - Path: "*exp.1", Type: utils.MetaVariable, - Value: config.NewRSRParsersMustCompile("~*req.field1", utils.InfieldSep), - Filters: []string{"*wrong-type"}, - }, - } - for _, field := range cgrCfg.EEsCfg().Exporters[0].Fields { - field.ComputePath() - } - cgrCfg.EEsCfg().Exporters[0].ComputeFields() - errExpect := "inline parse error for string: <*wrong-type>" - if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect { - t.Errorf("Expected %q but received %q", errExpect, err) - } - dcExpect := int64(1) - if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) { - t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) - } -} - -func TestPosterJsonMapExportEvent3(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQSjsonMap - cgrEv := new(utils.CGREvent) - newIDb := engine.NewInternalDB(nil, nil, true) - newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil) - filterS := engine.NewFilterS(cgrCfg, nil, newDM) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - - pstrEE, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc) - if err != nil { - t.Error(err) - } - cgrEv.Event = map[string]interface{}{ - "test": "string", - } - cgrEv.Event = map[string]interface{}{ - "test": make(chan int), - } - cgrCfg.EEsCfg().Exporters[0].Fields = []*config.FCTemplate{{}} - for _, field := range cgrCfg.EEsCfg().Exporters[0].Fields { - field.ComputePath() - } - cgrCfg.EEsCfg().Exporters[0].ComputeFields() - errExpect := "json: unsupported type: chan int" - if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect { - t.Errorf("Expected %q but received %q", errExpect, err) - } - dcExpect := int64(1) - if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) { - t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) - } - pstrEE.OnEvicted("test", "test") -} - -type mockPoster struct { - wg *sync.WaitGroup -} - -func (mp mockPoster) Post(body []byte, key string) error { - // resp, err := http.Get(mp.url) - // if err != nil { - // return err - // } - // defer resp.Body.Close() - time.Sleep(3 * time.Second) - mp.wg.Done() - return nil -} - -func (mockPoster) Close() { - return -} - -func TestPosterJsonMapSync(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - var cfgIdx int - cfgIdx = 0 - - cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*http_json_map" - dc, err := newEEMetrics(utils.FirstNonEmpty( - cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, - cgrCfg.GeneralCfg().DefaultTimezone)) - if err != nil { - t.Error(err) - } - - //Create an event - cgrEvent := &utils.CGREvent{ - Tenant: "cgrates.org", - Event: map[string]interface{}{ - "Account": "1001", - "Destination": "1002", - }, - } - - var wg1 = &sync.WaitGroup{} - - wg1.Add(3) - - test := make(chan struct{}) - go func() { - wg1.Wait() - close(test) - }() - - mckPoster := mockPoster{ - wg: wg1, - } - exp := &PosterJSONMapEE{ - id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, - cgrCfg: cgrCfg, - cfgIdx: cfgIdx, - filterS: new(engine.FilterS), - poster: mckPoster, - dc: dc, - reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), - } - - for i := 0; i < 3; i++ { - go exp.ExportEvent(cgrEvent) - } - - select { - case <-test: - return - case <-time.After(4 * time.Second): - t.Error("Can't asynchronously export events") - } -} - -func TestPosterJsonMapSyncLimit(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - var cfgIdx int - cfgIdx = 0 - - cgrCfg.EEsCfg().Exporters[cfgIdx].Type = "*http_json_map" - cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests = 1 - dc, err := newEEMetrics(utils.FirstNonEmpty( - cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, - cgrCfg.GeneralCfg().DefaultTimezone)) - if err != nil { - t.Error(err) - } - - //Create an event - cgrEvent := &utils.CGREvent{ - Tenant: "cgrates.org", - Event: map[string]interface{}{ - "Account": "1001", - "Destination": "1002", - }, - } - - var wg1 = &sync.WaitGroup{} - - wg1.Add(3) - - test := make(chan struct{}) - go func() { - wg1.Wait() - close(test) - }() - - mckPoster := mockPoster{ - wg: wg1, - } - exp := &PosterJSONMapEE{ - id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, - cgrCfg: cgrCfg, - cfgIdx: cfgIdx, - filterS: new(engine.FilterS), - poster: mckPoster, - dc: dc, - reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), - } - - for i := 0; i < 3; i++ { - go exp.ExportEvent(cgrEvent) - } - - select { - case <-test: - t.Error("Should not have been possible to asynchronously export events") - case <-time.After(4 * time.Second): - return - } -} -*/ diff --git a/ees/sql.go b/ees/sql.go index ccff3fdcc..bf8b011ec 100644 --- a/ees/sql.go +++ b/ees/sql.go @@ -23,6 +23,7 @@ import ( "fmt" "net/url" "strings" + "sync" "gorm.io/driver/mysql" "gorm.io/driver/postgres" @@ -53,6 +54,7 @@ type SQLEe struct { dialect gorm.Dialector tableName string + sync.RWMutex } type sqlPosterRequest struct { @@ -132,20 +134,36 @@ func openDB(dialect gorm.Dialector, opts map[string]interface{}) (db *gorm.DB, s func (sqlEe *SQLEe) Cfg() *config.EventExporterCfg { return sqlEe.cfg } func (sqlEe *SQLEe) Connect() (err error) { + sqlEe.Lock() if sqlEe.db == nil || sqlEe.sqldb == nil { sqlEe.db, sqlEe.sqldb, err = openDB(sqlEe.dialect, sqlEe.Cfg().Opts) } + sqlEe.Unlock() return } func (sqlEe *SQLEe) ExportEvent(req interface{}, _ string) error { sqlEe.reqs.get() - defer sqlEe.reqs.done() + sqlEe.RLock() + defer func() { + sqlEe.RUnlock() + sqlEe.reqs.done() + }() + if sqlEe.db == nil { + return utils.ErrDisconnected + } sReq := req.(*sqlPosterRequest) return sqlEe.db.Table(sqlEe.tableName).Exec(sReq.Querry, sReq.Values...).Error } -func (sqlEe *SQLEe) Close() error { return sqlEe.sqldb.Close() } +func (sqlEe *SQLEe) Close() (err error) { + sqlEe.Lock() + err = sqlEe.sqldb.Close() + sqlEe.db = nil + sqlEe.sqldb = nil + sqlEe.Unlock() + return +} func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage { return sqlEe.dc } diff --git a/ees/sql_it_test.go b/ees/sql_it_test.go index 4fb1e0e33..e9cf8a5bc 100644 --- a/ees/sql_it_test.go +++ b/ees/sql_it_test.go @@ -296,7 +296,7 @@ func TestSQLExportEvent1(t *testing.T) { if err := sqlEe.Connect(); err != nil { t.Fatal(err) } - if err := sqlEe.ExportEvent(&sqlPosterRequest{Querry: "INSERT INTO expTable VALUES (); ", Values: []interface{}{}}, ""); err != nil { + if err := sqlEe.ExportEvent(&sqlPosterRequest{Querry: "INSERT INTO cdrs VALUES (); ", Values: []interface{}{}}, ""); err != nil { t.Error(err) } sqlEe.Close() diff --git a/ers/kafka_test.go b/ers/kafka_test.go index 4595a5a97..93e9dab45 100644 --- a/ers/kafka_test.go +++ b/ers/kafka_test.go @@ -118,6 +118,7 @@ func TestKafkaERServe(t *testing.T) { rdr.Config().Opts = map[string]interface{}{} rdr.Config().ProcessedPath = "" rdr.(*KafkaER).createPoster() + close(rdrExit) } func TestKafkaERServe2(t *testing.T) { diff --git a/general_tests/all_sections_cfg_rld_it_test.go b/general_tests/all_sections_cfg_rld_it_test.go index fd1471f00..9e0c3adb6 100644 --- a/general_tests/all_sections_cfg_rld_it_test.go +++ b/general_tests/all_sections_cfg_rld_it_test.go @@ -465,7 +465,7 @@ func testSectConfigSReloadEES(t *testing.T) { } else if reply != utils.OK { t.Errorf("Expected OK received: %+v", reply) } - cfgStr := "{\"ees\":{\"attributes_conns\":[],\"cache\":{\"*file_csv\":{\"limit\":-1,\"precache\":false,\"replicate\":false,\"static_ttl\":false,\"ttl\":\"5s\"}},\"enabled\":true,\"exporters\":[{\"attempts\":1,\"attribute_context\":\"\",\"attribute_ids\":[],\"concurrent_requests\":0,\"export_path\":\"/var/spool/cgrates/ees\",\"fields\":[],\"filters\":[],\"flags\":[],\"id\":\"*default\",\"opts\":{},\"synchronous\":false,\"timezone\":\"\",\"type\":\"*none\"}]}}" + cfgStr := "{\"ees\":{\"attributes_conns\":[],\"cache\":{\"*file_csv\":{\"limit\":-1,\"precache\":false,\"replicate\":false,\"static_ttl\":false,\"ttl\":\"5s\"}},\"enabled\":true,\"exporters\":[{\"attempts\":1,\"attribute_context\":\"\",\"attribute_ids\":[],\"concurrent_requests\":0,\"export_path\":\"/var/spool/cgrates/ees\",\"failed_posts_dir\":\"/var/spool/cgrates/failed_posts\",\"fields\":[],\"filters\":[],\"flags\":[],\"id\":\"*default\",\"opts\":{},\"synchronous\":false,\"timezone\":\"\",\"type\":\"*none\"}]}}" var rpl string if err := testSectRPC.Call(context.Background(), utils.ConfigSv1GetConfigAsJSON, &config.SectionWithAPIOpts{ Tenant: "cgrates.org",