diff --git a/ees/amqp.go b/ees/amqp.go index 22c974406..d03e2b071 100644 --- a/ees/amqp.go +++ b/ees/amqp.go @@ -139,6 +139,11 @@ func (pstr *AMQPee) Connect() (err error) { func (pstr *AMQPee) ExportEvent(content interface{}, _ string) (err error) { pstr.reqs.get() pstr.RLock() + if pstr.postChan == nil { + pstr.RUnlock() + pstr.reqs.done() + return utils.ErrDisconnected + } err = pstr.postChan.Publish( pstr.exchange, // exchange pstr.routingKey, // routing key diff --git a/ees/amqpv1.go b/ees/amqpv1.go index 150b2cbdf..bda9aca6e 100644 --- a/ees/amqpv1.go +++ b/ees/amqpv1.go @@ -85,6 +85,9 @@ func (pstr *AMQPv1EE) ExportEvent(content interface{}, _ string) (err error) { pstr.RUnlock() pstr.reqs.done() }() + if pstr.session == nil { + return utils.ErrDisconnected + } sender, err := pstr.session.NewSender( amqpv1.LinkTargetAddress(pstr.queueID), ) diff --git a/ees/ees.go b/ees/ees.go index bb3b64bb6..e30f4d753 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -260,9 +260,13 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply * } func exportEventWithExporter(exp EventExporter, ev *utils.CGREvent, oneTime bool, cfg *config.CGRConfig, filterS *engine.FilterS) (err error) { - if oneTime { - defer exp.Close() - } + defer func() { + updateEEMetrics(exp.GetMetrics(), ev.ID, ev.Event, err != nil, utils.FirstNonEmpty(exp.Cfg().Timezone, + cfg.GeneralCfg().DefaultTimezone)) + if oneTime { + exp.Close() + } + }() var eEv interface{} exp.GetMetrics().Lock() @@ -319,7 +323,8 @@ func ExportWithAttempts(exp EventExporter, eEv interface{}, key string) (err err return } for i := 0; i < exp.Cfg().Attempts; i++ { - if err = exp.ExportEvent(eEv, key); err == nil { + if err = exp.ExportEvent(eEv, key); err == nil || + err == utils.ErrDisconnected { // special error in case the exporter was closed break } if i+1 < exp.Cfg().Attempts { diff --git a/ees/elastic.go b/ees/elastic.go index 38380d7f7..47ba58221 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -24,6 +24,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "github.com/elastic/go-elasticsearch/esapi" @@ -49,6 +50,7 @@ type ElasticEE struct { dc *utils.SafeMapStorage opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap reqs *concReq + sync.RWMutex bytePreparing } @@ -106,19 +108,28 @@ func (eEe *ElasticEE) prepareOpts() (err error) { func (eEe *ElasticEE) Cfg() *config.EventExporterCfg { return eEe.cfg } func (eEe *ElasticEE) Connect() (err error) { + eEe.Lock() // create the client if eEe.eClnt == nil { eEe.eClnt, err = elasticsearch.NewClient( elasticsearch.Config{Addresses: strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep)}, ) } + eEe.Unlock() return } // ExportEvent implements EventExporter func (eEe *ElasticEE) ExportEvent(ev interface{}, key string) (err error) { eEe.reqs.get() - defer eEe.reqs.done() + eEe.RLock() + defer func() { + eEe.RUnlock() + eEe.reqs.done() + }() + if eEe.eClnt == nil { + return utils.ErrDisconnected + } eReq := esapi.IndexRequest{ Index: eEe.opts.Index, DocumentID: key, @@ -153,7 +164,9 @@ func (eEe *ElasticEE) ExportEvent(ev interface{}, key string) (err error) { } func (eEe *ElasticEE) Close() (_ error) { + eEe.Lock() eEe.eClnt = nil + eEe.Unlock() return } diff --git a/ees/kafka.go b/ees/kafka.go index 523e0fb0a..47dd98f9a 100644 --- a/ees/kafka.go +++ b/ees/kafka.go @@ -32,6 +32,7 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE cfg: cfg, dc: dc, topic: utils.DefaultQueueID, + reqs: newConcReq(cfg.ConcurrentRequests), } if vals, has := cfg.Opts[utils.KafkaTopic]; has { kfkPstr.topic = utils.IfaceAsString(vals) @@ -56,11 +57,16 @@ func (pstr *KafkaEE) Cfg() *config.EventExporterCfg { return pstr.cfg } func (pstr *KafkaEE) Connect() (_ error) { pstr.Lock() if pstr.writer == nil { - pstr.writer = kafka.NewWriter(kafka.WriterConfig{ - Brokers: []string{pstr.Cfg().ExportPath}, - MaxAttempts: pstr.Cfg().Attempts, + pstr.writer = &kafka.Writer{ + Addr: kafka.TCP(pstr.Cfg().ExportPath), Topic: pstr.topic, - }) + MaxAttempts: pstr.Cfg().Attempts, + } + // pstr.writer = kafka.NewWriter(kafka.WriterConfig{ + // Brokers: []string{pstr.Cfg().ExportPath}, + // MaxAttempts: pstr.Cfg().Attempts, + // Topic: pstr.topic, + // }) } pstr.Unlock() return @@ -69,6 +75,11 @@ func (pstr *KafkaEE) Connect() (_ error) { func (pstr *KafkaEE) ExportEvent(content interface{}, key string) (err error) { pstr.reqs.get() pstr.RLock() + if pstr.writer == nil { + pstr.RUnlock() + pstr.reqs.done() + return utils.ErrDisconnected + } err = pstr.writer.WriteMessages(context.Background(), kafka.Message{ Key: []byte(key), Value: content.([]byte), diff --git a/ees/libactions.go b/ees/libactions.go index 8a3cfb348..051f3f2b2 100644 --- a/ees/libactions.go +++ b/ees/libactions.go @@ -61,7 +61,11 @@ func callURL(ub *engine.Account, a *engine.Action, _ engine.Actions, extraData i if err != nil { return err } - return ExportWithAttempts(pstr, &HTTPPosterRequest{Body: body, Header: make(http.Header)}, "") + err = ExportWithAttempts(pstr, &HTTPPosterRequest{Body: body, Header: make(http.Header)}, "") + if config.CgrConfig().GeneralCfg().FailedPostsDir != utils.MetaNone { + err = nil + } + return err } // Does not block for posts, no error reports @@ -97,5 +101,9 @@ func postEvent(_ *engine.Account, a *engine.Action, _ engine.Actions, extraData if err != nil { return err } - return ExportWithAttempts(pstr, &HTTPPosterRequest{Body: body, Header: make(http.Header)}, "") + err = ExportWithAttempts(pstr, &HTTPPosterRequest{Body: body, Header: make(http.Header)}, "") + if config.CgrConfig().GeneralCfg().FailedPostsDir != utils.MetaNone { + err = nil + } + return err } diff --git a/ees/libcdre_it_test.go b/ees/libcdre_it_test.go index 4f7745353..16cefc67e 100644 --- a/ees/libcdre_it_test.go +++ b/ees/libcdre_it_test.go @@ -37,7 +37,8 @@ func TestWriteFldPosts(t *testing.T) { // can convert & write dir := "/tmp/engine/libcdre_test/" exportEvent := &ExportEvents{ - module: "module", + failedPostsDir: dir, + module: "module", } if err := os.RemoveAll(dir); err != nil { t.Fatal("Error removing folder: ", dir, err) diff --git a/ees/libcdre_test.go b/ees/libcdre_test.go index dd9fa4d33..2e97dbef3 100644 --- a/ees/libcdre_test.go +++ b/ees/libcdre_test.go @@ -104,6 +104,9 @@ func TestAddFldPost(t *testing.T) { if !reflect.DeepEqual(eOut, failedPost) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) } + for _, id := range failedPostCache.GetItemIDs("") { + failedPostCache.Set(id, nil, nil) + } } func TestFilePath(t *testing.T) { diff --git a/ees/nats.go b/ees/nats.go index fca1097da..fc9929864 100644 --- a/ees/nats.go +++ b/ees/nats.go @@ -102,6 +102,11 @@ func (pstr *NatsEE) Connect() (err error) { func (pstr *NatsEE) ExportEvent(content interface{}, _ string) (err error) { pstr.reqs.get() pstr.RLock() + if pstr.poster == nil { + pstr.RUnlock() + pstr.reqs.done() + return utils.ErrDisconnected + } if pstr.jetStream { _, err = pstr.posterJS.Publish(pstr.subject, content.([]byte)) } else { diff --git a/ees/poster_test.go b/ees/poster_test.go index b85c1a3d4..f3c9364b7 100644 --- a/ees/poster_test.go +++ b/ees/poster_test.go @@ -58,6 +58,7 @@ func TestKafkaParseURL(t *testing.T) { exp := &KafkaEE{ cfg: cfg, topic: "cdr_billing", + reqs: newConcReq(0), } if kfk := NewKafkaEE(cfg, nil); !reflect.DeepEqual(exp, kfk) { t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(kfk)) diff --git a/ees/posterjsonmap_test.go b/ees/posterjsonmap_test.go deleted file mode 100644 index a4577f811..000000000 --- a/ees/posterjsonmap_test.go +++ /dev/null @@ -1,479 +0,0 @@ -/* -Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package ees - -/* -func TestPosterJsonMapID(t *testing.T) { - pstrEE := &PosterJSONMapEE{ - id: "3", - } - if rcv := pstrEE.ID(); !reflect.DeepEqual(rcv, "3") { - t.Errorf("Expected %+v but got %+v", "3", rcv) - } -} - -func TestPosterJsonMapGetMetrics(t *testing.T) { - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - pstrEE := &PosterJSONMapEE{ - dc: dc, - } - - if rcv := pstrEE.GetMetrics(); !reflect.DeepEqual(rcv, pstrEE.dc) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(rcv), utils.ToJSON(pstrEE.dc)) - } -} - -func TestPosterJsonMapNewPosterJSONMapEECase2(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaAMQPV1jsonMap - cgrCfg.EEsCfg().Exporters[0].ExportPath = utils.EmptyString - filterS := engine.NewFilterS(cgrCfg, nil, nil) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - pstrJSON, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc) - if err != nil { - t.Error(err) - } - pstrJSONExpect := engine.NewAMQPv1Poster(cgrCfg.EEsCfg().Exporters[0].ExportPath, - cgrCfg.EEsCfg().Exporters[0].Attempts, cgrCfg.EEsCfg().Exporters[0].Opts) - if !reflect.DeepEqual(pstrJSON.poster, pstrJSONExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(pstrJSONExpect), utils.ToJSON(pstrJSON.poster)) - } -} - -func TestPosterJsonMapNewPosterJSONMapEECase3(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQSjsonMap - cgrCfg.EEsCfg().Exporters[0].ExportPath = utils.EmptyString - filterS := engine.NewFilterS(cgrCfg, nil, nil) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - pstrJSON, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc) - if err != nil { - t.Error(err) - } - - if _, canCast := pstrJSON.poster.(*engine.SQSPoster); !canCast { - t.Error("Can't cast") - } -} - -func TestPosterJsonMapNewPosterJSONMapEECase4(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaKafkajsonMap - filterS := engine.NewFilterS(cgrCfg, nil, nil) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - pstrJSON, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc) - if err != nil { - t.Error(err) - } - pstrJSONExpect := engine.NewKafkaPoster(cgrCfg.EEsCfg().Exporters[0].ExportPath, - cgrCfg.EEsCfg().Exporters[0].Attempts, cgrCfg.EEsCfg().Exporters[0].Opts) - if !reflect.DeepEqual(pstrJSON.poster, pstrJSONExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(pstrJSONExpect), utils.ToJSON(pstrJSON.poster)) - } -} - -func TestPosterJsonMapNewPosterJSONMapEECase5(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaS3jsonMap - filterS := engine.NewFilterS(cgrCfg, nil, nil) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - pstrJSON, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc) - if err != nil { - t.Error(err) - } - pstrJSONExpect := engine.NewS3Poster(cgrCfg.EEsCfg().Exporters[0].ExportPath, - cgrCfg.EEsCfg().Exporters[0].Attempts, cgrCfg.EEsCfg().Exporters[0].Opts) - if !reflect.DeepEqual(pstrJSON.poster, pstrJSONExpect) { - t.Errorf("Expected %+v \n but got %+v", utils.ToJSON(pstrJSONExpect), utils.ToJSON(pstrJSON.poster)) - } -} - -func TestPosterJsonMapExportEvent(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQSjsonMap - cgrEv := new(utils.CGREvent) - newIDb := engine.NewInternalDB(nil, nil, true) - newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil) - filterS := engine.NewFilterS(cgrCfg, nil, newDM) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - - pstrEE, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc) - if err != nil { - t.Error(err) - } - cgrEv.Event = map[string]interface{}{ - "test": "string", - } - cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Fields = []*config.FCTemplate{ - { - Path: "*exp.1", Type: utils.MetaVariable, - Value: config.NewRSRParsersMustCompile("~*req.field1", utils.InfieldSep), - }, - { - Path: "*exp.2", Type: utils.MetaVariable, - Value: config.NewRSRParsersMustCompile("*req.field2", utils.InfieldSep), - }, - } - for _, field := range cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Fields { - field.ComputePath() - } - errExpect := "MissingRegion: could not find region configuration" - if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect { - t.Errorf("Expected %q but received %q", errExpect, err) - } - dcExpect := int64(1) - if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) { - t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) - } - cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ComputeFields() - if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect { - t.Errorf("Expected %q but received %q", errExpect, err) - } - dcExpect = int64(2) - if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) { - t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) - } -} - -type testPoster struct { - body []byte -} - -func (pstr *testPoster) Close() {} -func (pstr *testPoster) Post(body []byte, key string) error { - pstr.body = body - return nil -} -func TestPosterJsonMapExportEvent1(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaAMQPjsonMap - cgrEv := new(utils.CGREvent) - newIDb := engine.NewInternalDB(nil, nil, true) - newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil) - filterS := engine.NewFilterS(cgrCfg, nil, newDM) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - //// - //// - tstPstr := &testPoster{} - pstrEE := &PosterJSONMapEE{ - id: cgrCfg.EEsCfg().Exporters[0].ID, - cgrCfg: cgrCfg, - cfgIdx: 0, - filterS: filterS, - dc: dc, - poster: tstPstr, - reqs: newConcReq(0), - } - // pstrEE.poster = tstPstr - cgrEv.Event = map[string]interface{}{ - "test": "string", - } - cgrCfg.EEsCfg().Exporters[0].Fields = []*config.FCTemplate{ - { - Path: "*exp.1", Type: utils.MetaVariable, - Value: config.NewRSRParsersMustCompile("~*req.field1", utils.InfieldSep), - }, - { - Path: "*exp.2", Type: utils.MetaVariable, - Value: config.NewRSRParsersMustCompile("*req.field2", utils.InfieldSep), - }, - } - for _, field := range cgrCfg.EEsCfg().Exporters[0].Fields { - field.ComputePath() - } - cgrCfg.EEsCfg().Exporters[0].ComputeFields() - if err := pstrEE.ExportEvent(cgrEv); err != nil { - t.Error(err) - } - dcExpect := int64(1) - if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) { - t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) - } - bodyExpect := map[string]interface{}{ - "2": "*req.field2", - } - var rcv map[string]interface{} - if err := json.Unmarshal(tstPstr.body, &rcv); err != nil { - t.Fatal(err) - } - if !reflect.DeepEqual(rcv, bodyExpect) { - t.Errorf("Expected %s but received %s", utils.ToJSON(bodyExpect), utils.ToJSON(rcv)) - } -} - -func TestPosterJsonMapExportEvent2(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQSjsonMap - cgrEv := new(utils.CGREvent) - newIDb := engine.NewInternalDB(nil, nil, true) - newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil) - filterS := engine.NewFilterS(cgrCfg, nil, newDM) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - - pstrEE, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc) - if err != nil { - t.Error(err) - } - cgrEv.Event = map[string]interface{}{ - "test": "string", - } - cgrCfg.EEsCfg().Exporters[0].Fields = []*config.FCTemplate{ - { - Path: "*exp.1", Type: utils.MetaVariable, - Value: config.NewRSRParsersMustCompile("~*req.field1", utils.InfieldSep), - Filters: []string{"*wrong-type"}, - }, - { - Path: "*exp.1", Type: utils.MetaVariable, - Value: config.NewRSRParsersMustCompile("~*req.field1", utils.InfieldSep), - Filters: []string{"*wrong-type"}, - }, - } - for _, field := range cgrCfg.EEsCfg().Exporters[0].Fields { - field.ComputePath() - } - cgrCfg.EEsCfg().Exporters[0].ComputeFields() - errExpect := "inline parse error for string: <*wrong-type>" - if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect { - t.Errorf("Expected %q but received %q", errExpect, err) - } - dcExpect := int64(1) - if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) { - t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) - } -} - -func TestPosterJsonMapExportEvent3(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQSjsonMap - cgrEv := new(utils.CGREvent) - newIDb := engine.NewInternalDB(nil, nil, true) - newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil) - filterS := engine.NewFilterS(cgrCfg, nil, newDM) - dc, err := newEEMetrics(utils.FirstNonEmpty( - "Local", - utils.EmptyString, - )) - if err != nil { - t.Error(err) - } - - pstrEE, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc) - if err != nil { - t.Error(err) - } - cgrEv.Event = map[string]interface{}{ - "test": "string", - } - cgrEv.Event = map[string]interface{}{ - "test": make(chan int), - } - cgrCfg.EEsCfg().Exporters[0].Fields = []*config.FCTemplate{{}} - for _, field := range cgrCfg.EEsCfg().Exporters[0].Fields { - field.ComputePath() - } - cgrCfg.EEsCfg().Exporters[0].ComputeFields() - errExpect := "json: unsupported type: chan int" - if err := pstrEE.ExportEvent(cgrEv); err == nil || err.Error() != errExpect { - t.Errorf("Expected %q but received %q", errExpect, err) - } - dcExpect := int64(1) - if !reflect.DeepEqual(dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) { - t.Errorf("Expected %q but received %q", dcExpect, pstrEE.dc.MapStorage[utils.NumberOfEvents]) - } - pstrEE.OnEvicted("test", "test") -} - -type mockPoster struct { - wg *sync.WaitGroup -} - -func (mp mockPoster) Post(body []byte, key string) error { - time.Sleep(3 * time.Second) - mp.wg.Done() - return nil -} - -func (mockPoster) Close() { - return -} - -func TestPosterJsonMapSync(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - var cfgIdx int - cfgIdx = 0 - - cgrCfg.EEsCfg().Exporters[cfgIdx].Type = utils.MetaHTTPjsonMap - dc, err := newEEMetrics(utils.FirstNonEmpty( - cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, - cgrCfg.GeneralCfg().DefaultTimezone)) - if err != nil { - t.Error(err) - } - - //Create an event - cgrEvent := &utils.CGREvent{ - Tenant: "cgrates.org", - Event: map[string]interface{}{ - "Account": "1001", - "Destination": "1002", - }, - } - - var wg1 = &sync.WaitGroup{} - - wg1.Add(3) - - test := make(chan struct{}) - go func() { - wg1.Wait() - close(test) - }() - - mckPoster := mockPoster{ - wg: wg1, - } - exp := &PosterJSONMapEE{ - id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, - cgrCfg: cgrCfg, - cfgIdx: cfgIdx, - filterS: new(engine.FilterS), - poster: mckPoster, - dc: dc, - reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), - } - - for i := 0; i < 3; i++ { - go exp.ExportEvent(cgrEvent) - } - - select { - case <-test: - return - case <-time.After(4 * time.Second): - t.Error("Can't asynchronously export events") - } -} - -func TestPosterJsonMapSyncLimit(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - var cfgIdx int - cfgIdx = 0 - - cgrCfg.EEsCfg().Exporters[cfgIdx].Type = utils.MetaHTTPjsonMap - cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests = 1 - dc, err := newEEMetrics(utils.FirstNonEmpty( - cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, - cgrCfg.GeneralCfg().DefaultTimezone)) - if err != nil { - t.Error(err) - } - - //Create an event - cgrEvent := &utils.CGREvent{ - Tenant: "cgrates.org", - Event: map[string]interface{}{ - "Account": "1001", - "Destination": "1002", - }, - } - - var wg1 = &sync.WaitGroup{} - - wg1.Add(3) - - test := make(chan struct{}) - go func() { - wg1.Wait() - close(test) - }() - - mckPoster := mockPoster{ - wg: wg1, - } - exp := &PosterJSONMapEE{ - id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, - cgrCfg: cgrCfg, - cfgIdx: cfgIdx, - filterS: new(engine.FilterS), - poster: mckPoster, - dc: dc, - reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), - } - - for i := 0; i < 3; i++ { - go exp.ExportEvent(cgrEvent) - } - - select { - case <-test: - t.Error("Should not have been possible to asynchronously export events") - case <-time.After(4 * time.Second): - return - } -} -*/ diff --git a/ees/sql.go b/ees/sql.go index ccff3fdcc..bf8b011ec 100644 --- a/ees/sql.go +++ b/ees/sql.go @@ -23,6 +23,7 @@ import ( "fmt" "net/url" "strings" + "sync" "gorm.io/driver/mysql" "gorm.io/driver/postgres" @@ -53,6 +54,7 @@ type SQLEe struct { dialect gorm.Dialector tableName string + sync.RWMutex } type sqlPosterRequest struct { @@ -132,20 +134,36 @@ func openDB(dialect gorm.Dialector, opts map[string]interface{}) (db *gorm.DB, s func (sqlEe *SQLEe) Cfg() *config.EventExporterCfg { return sqlEe.cfg } func (sqlEe *SQLEe) Connect() (err error) { + sqlEe.Lock() if sqlEe.db == nil || sqlEe.sqldb == nil { sqlEe.db, sqlEe.sqldb, err = openDB(sqlEe.dialect, sqlEe.Cfg().Opts) } + sqlEe.Unlock() return } func (sqlEe *SQLEe) ExportEvent(req interface{}, _ string) error { sqlEe.reqs.get() - defer sqlEe.reqs.done() + sqlEe.RLock() + defer func() { + sqlEe.RUnlock() + sqlEe.reqs.done() + }() + if sqlEe.db == nil { + return utils.ErrDisconnected + } sReq := req.(*sqlPosterRequest) return sqlEe.db.Table(sqlEe.tableName).Exec(sReq.Querry, sReq.Values...).Error } -func (sqlEe *SQLEe) Close() error { return sqlEe.sqldb.Close() } +func (sqlEe *SQLEe) Close() (err error) { + sqlEe.Lock() + err = sqlEe.sqldb.Close() + sqlEe.db = nil + sqlEe.sqldb = nil + sqlEe.Unlock() + return +} func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage { return sqlEe.dc } diff --git a/ees/sql_it_test.go b/ees/sql_it_test.go index d7d7d3831..ff4c908c5 100644 --- a/ees/sql_it_test.go +++ b/ees/sql_it_test.go @@ -298,7 +298,7 @@ func TestSQLExportEvent1(t *testing.T) { if err := sqlEe.Connect(); err != nil { t.Fatal(err) } - if err := sqlEe.ExportEvent(&sqlPosterRequest{Querry: "INSERT INTO expTable VALUES (); ", Values: []interface{}{}}, ""); err != nil { + if err := sqlEe.ExportEvent(&sqlPosterRequest{Querry: "INSERT INTO cdrs VALUES (); ", Values: []interface{}{}}, ""); err != nil { t.Error(err) } sqlEe.Close() diff --git a/ers/kafka_test.go b/ers/kafka_test.go index ef0e6ebee..1b74191cc 100644 --- a/ers/kafka_test.go +++ b/ers/kafka_test.go @@ -118,6 +118,7 @@ func TestKafkaERServe(t *testing.T) { rdr.Config().Opts = map[string]interface{}{} rdr.Config().ProcessedPath = "" rdr.(*KafkaER).createPoster() + close(rdrExit) } func TestKafkaERServe2(t *testing.T) { diff --git a/general_tests/all_cfg_sect_rld_it_test.go b/general_tests/all_cfg_sect_rld_it_test.go index 108f273e6..792475ec8 100644 --- a/general_tests/all_cfg_sect_rld_it_test.go +++ b/general_tests/all_cfg_sect_rld_it_test.go @@ -520,7 +520,7 @@ func testSectConfigSReloadEES(t *testing.T) { } else if reply != utils.OK { t.Errorf("Expected OK received: %+v", reply) } - cfgStr := "{\"ees\":{\"attributes_conns\":[],\"cache\":{\"*file_csv\":{\"limit\":-1,\"precache\":false,\"replicate\":false,\"static_ttl\":false,\"ttl\":\"5s\"}},\"enabled\":true,\"exporters\":[{\"attempts\":1,\"attribute_context\":\"\",\"attribute_ids\":[],\"concurrent_requests\":0,\"export_path\":\"/var/spool/cgrates/ees\",\"fields\":[],\"filters\":[],\"flags\":[],\"id\":\"*default\",\"opts\":{},\"synchronous\":false,\"timezone\":\"\",\"type\":\"*none\"}]}}" + cfgStr := "{\"ees\":{\"attributes_conns\":[],\"cache\":{\"*file_csv\":{\"limit\":-1,\"precache\":false,\"replicate\":false,\"static_ttl\":false,\"ttl\":\"5s\"}},\"enabled\":true,\"exporters\":[{\"attempts\":1,\"attribute_context\":\"\",\"attribute_ids\":[],\"concurrent_requests\":0,\"export_path\":\"/var/spool/cgrates/ees\",\"failed_posts_dir\":\"/var/spool/cgrates/failed_posts\",\"fields\":[],\"filters\":[],\"flags\":[],\"id\":\"*default\",\"opts\":{},\"synchronous\":false,\"timezone\":\"\",\"type\":\"*none\"}]}}" var rpl string if err := testSectRPC.Call(utils.ConfigSv1GetConfigAsJSON, &config.SectionWithAPIOpts{ Tenant: "cgrates.org",