diff --git a/engine/poster.go b/engine/poster.go index 887bd1a77..7ba9bdf94 100644 --- a/engine/poster.go +++ b/engine/poster.go @@ -24,13 +24,14 @@ import ( "sync" ) +// General constants for posters const ( - defaultQueueID = "cgrates_cdrs" - defaultExchangeType = "direct" - queueID = "queue_id" - exchange = "exchange" - exchangeType = "exchange_type" - routingKey = "routing_key" + DefaultQueueID = "cgrates_cdrs" + QueueID = "queue_id" + DefaultExchangeType = "direct" + Exchange = "exchange" + ExchangeType = "exchange_type" + RoutingKey = "routing_key" awsToken = "aws_token" folderPath = "folder_path" @@ -69,8 +70,8 @@ func parseURL(dialURL string) (URL string, qID string, err error) { } qry := u.Query() URL = strings.Split(dialURL, "?")[0] - qID = defaultQueueID - if vals, has := qry[queueID]; has && len(vals) != 0 { + qID = DefaultQueueID + if vals, has := qry[QueueID]; has && len(vals) != 0 { qID = vals[0] } return diff --git a/engine/pstr_amqp.go b/engine/pstr_amqp.go index b95778da1..5eab8167c 100644 --- a/engine/pstr_amqp.go +++ b/engine/pstr_amqp.go @@ -29,7 +29,8 @@ import ( "github.com/streadway/amqp" ) -var amqpQuery = []string{"cacertfile", "certfile", "keyfile", "verify", "server_name_indication", "auth_mechanism", "heartbeat", "connection_timeout", "channel_max"} +// AMQPPosibleQuery the lists of posible AMQP values +var AMQPPosibleQuery = []string{"cacertfile", "certfile", "keyfile", "verify", "server_name_indication", "auth_mechanism", "heartbeat", "connection_timeout", "channel_max"} // NewAMQPPoster creates a new amqp poster // "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs" @@ -62,25 +63,25 @@ func (pstr *AMQPPoster) parseURL(dialURL string) error { } qry := u.Query() q := url.Values{} - for _, key := range amqpQuery { + for _, key := range AMQPPosibleQuery { if vals, has := qry[key]; has && len(vals) != 0 { q.Add(key, vals[0]) } } pstr.dialURL = strings.Split(dialURL, "?")[0] + "?" + q.Encode() - pstr.queueID = defaultQueueID - pstr.routingKey = defaultQueueID - if vals, has := qry[queueID]; has && len(vals) != 0 { + pstr.queueID = DefaultQueueID + pstr.routingKey = DefaultQueueID + if vals, has := qry[QueueID]; has && len(vals) != 0 { pstr.queueID = vals[0] } - if vals, has := qry[routingKey]; has && len(vals) != 0 { + if vals, has := qry[RoutingKey]; has && len(vals) != 0 { pstr.routingKey = vals[0] } - if vals, has := qry[exchange]; has && len(vals) != 0 { + if vals, has := qry[Exchange]; has && len(vals) != 0 { pstr.exchange = vals[0] - pstr.exchangeType = defaultExchangeType + pstr.exchangeType = DefaultExchangeType } - if vals, has := qry[exchangeType]; has && len(vals) != 0 { + if vals, has := qry[ExchangeType]; has && len(vals) != 0 { pstr.exchangeType = vals[0] } return nil diff --git a/engine/pstr_kafka.go b/engine/pstr_kafka.go index a02fe8620..233994311 100644 --- a/engine/pstr_kafka.go +++ b/engine/pstr_kafka.go @@ -48,7 +48,7 @@ type KafkaPoster struct { } func (pstr *KafkaPoster) parseURL(dialURL string) error { - pstr.topic = defaultQueueID + pstr.topic = DefaultQueueID i := strings.IndexByte(dialURL, '?') if i < 0 { pstr.dialURL = dialURL diff --git a/engine/pstr_s3.go b/engine/pstr_s3.go index 80694545c..9703a5f43 100644 --- a/engine/pstr_s3.go +++ b/engine/pstr_s3.go @@ -63,8 +63,8 @@ func (pstr *S3Poster) parseURL(dialURL string) { pstr.dialURL = strings.Split(dialURL, "?")[0] pstr.dialURL = strings.TrimSuffix(pstr.dialURL, "/") // used to remove / to point to correct endpoint - pstr.queueID = defaultQueueID - if val, has := qry[queueID]; has { + pstr.queueID = DefaultQueueID + if val, has := qry[QueueID]; has { pstr.queueID = val } if val, has := qry[folderPath]; has { diff --git a/engine/pstr_sqs.go b/engine/pstr_sqs.go index b6f95cd4e..bf4b175e5 100644 --- a/engine/pstr_sqs.go +++ b/engine/pstr_sqs.go @@ -64,8 +64,8 @@ func (pstr *SQSPoster) parseURL(dialURL string) { pstr.dialURL = strings.Split(dialURL, "?")[0] pstr.dialURL = strings.TrimSuffix(pstr.dialURL, "/") // used to remove / to point to correct endpoint - pstr.queueID = defaultQueueID - if val, has := qry[queueID]; has { + pstr.queueID = DefaultQueueID + if val, has := qry[QueueID]; has { pstr.queueID = val } if val, has := qry[utils.AWSRegion]; has { diff --git a/ers/amqp.go b/ers/amqp.go new file mode 100644 index 000000000..3c74f4936 --- /dev/null +++ b/ers/amqp.go @@ -0,0 +1,267 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "encoding/json" + "fmt" + "net/url" + "strings" + "time" + + "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "github.com/streadway/amqp" +) + +const ( + defaultConsumerTag = "cgrates" + consumerTag = "consumer_tag" +) + +// NewAMQPER return a new kafka event reader +func NewAMQPER(cfg *config.CGRConfig, cfgIdx int, + rdrEvents chan *erEvent, rdrErr chan error, + fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { + + rdr := &AMQPER{ + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + rdrEvents: rdrEvents, + rdrExit: rdrExit, + rdrErr: rdrErr, + } + if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { + rdr.cap = make(chan struct{}, concReq) + for i := 0; i < concReq; i++ { + rdr.cap <- struct{}{} + } + } + er = rdr + err = rdr.setURL(rdr.Config().SourcePath) + return +} + +// AMQPER implements EventReader interface for kafka message +type AMQPER struct { + // sync.RWMutex + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + fltrS *engine.FilterS + + dialURL string + queueID string + tag string + exchange string + exchangeType string + routingKey string + + rdrEvents chan *erEvent // channel to dispatch the events created to + rdrExit chan struct{} + rdrErr chan error + cap chan struct{} + + conn *amqp.Connection + channel *amqp.Channel +} + +// Config returns the curent configuration +func (rdr *AMQPER) Config() *config.EventReaderCfg { + return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] +} + +// Serve will start the gorutines needed to watch the kafka topic +func (rdr *AMQPER) Serve() (err error) { + if rdr.conn, err = amqp.Dial(rdr.dialURL); err != nil { + return + } + if rdr.channel, err = rdr.conn.Channel(); err != nil { + rdr.close() + return + } + if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API + return + } + + if rdr.exchange != "" { + if err = rdr.channel.ExchangeDeclare( + rdr.exchange, // name + rdr.exchangeType, // type + true, // durable + false, // audo-delete + false, // internal + false, // no-wait + nil, // args + ); err != nil { + return + } + } + + if _, err = rdr.channel.QueueDeclare( + rdr.queueID, // name + true, // durable + false, // auto-delete + false, // exclusive + false, // no-wait + nil, // args + ); err != nil { + return + } + + if rdr.exchange != "" { + if err = rdr.channel.QueueBind( + rdr.queueID, // queue + rdr.routingKey, // key + rdr.exchange, // exchange + false, // no-wait + nil, // args + ); err != nil { + return + } + } + + var msgChan <-chan amqp.Delivery + if msgChan, err = rdr.channel.Consume(rdr.queueID, rdr.tag, + false, false, false, true, nil); err != nil { + return + } + go rdr.readLoop(msgChan) // read until the connection is closed + return +} + +func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) { + for { + if rdr.Config().ConcurrentReqs != -1 { + <-rdr.cap // do not try to read if the limit is reached + } + select { + case <-rdr.rdrExit: + utils.Logger.Info( + fmt.Sprintf("<%s> stop monitoring kafka path <%s>", + utils.ERs, rdr.dialURL)) + rdr.close() + return + case msg := <-msgChan: + if len(msg.Body) == 0 { + continue + } + go func(msg amqp.Delivery) { + if err := rdr.processMessage(msg.Body); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> processing message %s error: %s", + utils.ERs, msg.MessageId, err.Error())) + } + if rdr.Config().ProcessedPath != utils.EmptyString { // post it + if err := engine.PostersCache.PostAMQP(rdr.Config().ProcessedPath, + rdr.cgrCfg.GeneralCfg().PosterAttempts, msg.Body); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> writing message %s error: %s", + utils.ERs, msg.MessageId, err.Error())) + } + } + if rdr.Config().ConcurrentReqs != -1 { + rdr.cap <- struct{}{} + } + }(msg) + } + } +} + +func (rdr *AMQPER) processMessage(msg []byte) (err error) { + var decodedMessage map[string]interface{} + if err = json.Unmarshal(msg, &decodedMessage); err != nil { + return + } + agReq := agents.NewAgentRequest( + utils.MapStorage(decodedMessage), nil, + nil, nil, nil, rdr.Config().Tenant, + rdr.cgrCfg.GeneralCfg().DefaultTenant, + utils.FirstNonEmpty(rdr.Config().Timezone, + rdr.cgrCfg.GeneralCfg().DefaultTimezone), + rdr.fltrS, nil, nil) // create an AgentRequest + var pass bool + if pass, err = rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, + agReq); err != nil || !pass { + return + } + if err = agReq.SetFields(rdr.Config().Fields); err != nil { + return + } + rdr.rdrEvents <- &erEvent{ + cgrEvent: config.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep), + rdrCfg: rdr.Config(), + opts: config.NMAsMapInterface(agReq.Opts, utils.NestingSep), + } + return +} + +func (rdr *AMQPER) setURL(dialURL string) (err error) { + var u *url.URL + if u, err = url.Parse(dialURL); err != nil { + return + } + qry := u.Query() + q := url.Values{} + for _, key := range engine.AMQPPosibleQuery { + if vals, has := qry[key]; has && len(vals) != 0 { + q.Add(key, vals[0]) + } + } + rdr.dialURL = strings.Split(dialURL, "?")[0] + if params := q.Encode(); params != utils.EmptyString { + rdr.dialURL += "?" + params + + } + rdr.queueID = engine.DefaultQueueID + if vals, has := qry[engine.QueueID]; has && len(vals) != 0 { + rdr.queueID = vals[0] + } + rdr.tag = defaultConsumerTag + if vals, has := qry[consumerTag]; has && len(vals) != 0 { + rdr.tag = vals[0] + } + + if vals, has := qry[engine.RoutingKey]; has && len(vals) != 0 { + rdr.routingKey = vals[0] + } + if vals, has := qry[engine.Exchange]; has && len(vals) != 0 { + rdr.exchange = vals[0] + rdr.exchangeType = engine.DefaultExchangeType + } + if vals, has := qry[engine.ExchangeType]; has && len(vals) != 0 { + rdr.exchangeType = vals[0] + } + + return nil +} + +func (rdr *AMQPER) close() (err error) { + if rdr.channel != nil { + if err = rdr.channel.Cancel(rdr.tag, true); err != nil { + return + } + if err = rdr.channel.Close(); err != nil { + return + } + } + return rdr.conn.Close() +} diff --git a/ers/amqp_it_test.go b/ers/amqp_it_test.go new file mode 100644 index 000000000..a23eb37b6 --- /dev/null +++ b/ers/amqp_it_test.go @@ -0,0 +1,123 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "fmt" + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "github.com/streadway/amqp" +) + +func TestAMQPER(t *testing.T) { + cfg, err := config.NewCGRConfigFromJsonStringWithDefaults(`{ +"ers": { // EventReaderService + "enabled": true, // starts the EventReader service: + "readers": [ + { + "id": "amqp", // identifier of the EventReader profile + "type": "*amqp_json_map", // reader type <*file_csv> + "run_delay": "-1", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together + "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited + "source_path": "amqp://guest:guest@localhost:5672/?queue_id=cdrs3&consumer_tag=test-key&exchange=test-exchange&exchange_type=direct&routing_key=test-key",// read data from this path + // "processed_path": "/var/spool/cgrates/ers/out", // move processed data here + "tenant": "cgrates.org", // tenant used by import + "filters": [], // limit parsing based on the filters + "flags": [], // flags to influence the event processing + "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, + ], + }, + ], +}, +}`) + if err != nil { + t.Fatal(err) + } + utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + + rdrEvents = make(chan *erEvent, 1) + rdrErr = make(chan error, 1) + rdrExit = make(chan struct{}, 1) + + if rdr, err = NewAMQPER(cfg, 1, rdrEvents, + rdrErr, new(engine.FilterS), rdrExit); err != nil { + t.Fatal(err) + } + connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + if err != nil { + t.Fatal(err) + } + defer connection.Close() + + channel, err := connection.Channel() + if err != nil { + t.Fatal(err) + } + + rdr.Serve() + randomCGRID := utils.UUIDSha1Prefix() + if err = channel.Publish( + "test-exchange", // publish to an exchange + "test-key", // routing to 0 or more queues + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: utils.CONTENT_JSON, + Body: []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID)), + DeliveryMode: amqp.Persistent, // 1=non-persistent, 2=persistent + }, + ); err != nil { + t.Fatal(err) + } + select { + case err = <-rdrErr: + t.Error(err) + case ev := <-rdrEvents: + if ev.rdrCfg.ID != "amqp" { + t.Errorf("Expected 'kakfa' received `%s`", ev.rdrCfg.ID) + } + expected := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: ev.cgrEvent.ID, + Time: ev.cgrEvent.Time, + Event: map[string]interface{}{ + "CGRID": randomCGRID, + }, + } + if !reflect.DeepEqual(ev.cgrEvent, expected) { + t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) + } + case <-time.After(10 * time.Second): + t.Fatal("Timeout") + } + + if _, err := channel.QueueDelete("cdrs3", false, false, false); err != nil { + t.Fatal(err) + } + rdrExit <- struct{}{} +} diff --git a/ers/amqp_test.go b/ers/amqp_test.go new file mode 100644 index 000000000..7e4823c86 --- /dev/null +++ b/ers/amqp_test.go @@ -0,0 +1,67 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "testing" +) + +func TestAMQPSetURL(t *testing.T) { + k := new(AMQPER) + expKafka := &AMQPER{ + dialURL: "amqp://localhost:2013", + queueID: "cdrs", + tag: "new", + } + url := "amqp://localhost:2013?queue_id=cdrs&consumer_tag=new" + if err := k.setURL(url); err != nil { + t.Fatal(err) + } else if expKafka.dialURL != k.dialURL { + t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL) + } else if expKafka.queueID != k.queueID { + t.Errorf("Expected: %s ,received: %s", expKafka.queueID, k.queueID) + } else if expKafka.tag != k.tag { + t.Errorf("Expected: %s ,received: %s", expKafka.tag, k.tag) + } + k = new(AMQPER) + expKafka = &AMQPER{ + dialURL: "amqp://localhost:2013", + queueID: "cgrates_cdrs", + tag: "cgrates", + } + url = "amqp://localhost:2013" + if err := k.setURL(url); err != nil { + t.Fatal(err) + } else if expKafka.dialURL != k.dialURL { + t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL) + } else if expKafka.queueID != k.queueID { + t.Errorf("Expected: %s ,received: %s", expKafka.queueID, k.queueID) + } else if expKafka.tag != k.tag { + t.Errorf("Expected: %s ,received: %s", expKafka.tag, k.tag) + } + k = new(AMQPER) + expKafka = &AMQPER{ + dialURL: "amqp://localhost:2013", + queueID: "cgrates", + tag: "cgrates", + } + if err := k.setURL("127.0.0.1:2013?queue_id=cdrs&consumer_tag=new"); err == nil { + t.Errorf("Expected error received: %v", err) + } +} diff --git a/ers/kafka_it_test.go b/ers/kafka_it_test.go index 7ed4c1670..7b104f418 100644 --- a/ers/kafka_it_test.go +++ b/ers/kafka_it_test.go @@ -37,7 +37,7 @@ var ( rdrEvents chan *erEvent rdrErr chan error rdrExit chan struct{} - kfk EventReader + rdr EventReader ) func TestKafkaER(t *testing.T) { @@ -70,7 +70,7 @@ func TestKafkaER(t *testing.T) { rdrErr = make(chan error, 1) rdrExit = make(chan struct{}, 1) - if kfk, err = NewKafkaER(cfg, 1, rdrEvents, + if rdr, err = NewKafkaER(cfg, 1, rdrEvents, rdrErr, new(engine.FilterS), rdrExit); err != nil { t.Fatal(err) } @@ -87,7 +87,7 @@ func TestKafkaER(t *testing.T) { ) w.Close() - kfk.Serve() + rdr.Serve() select { case err = <-rdrErr: diff --git a/ers/reader.go b/ers/reader.go index b29c97d61..6830b3bf9 100644 --- a/ers/reader.go +++ b/ers/reader.go @@ -54,6 +54,8 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int, return NewFlatstoreER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) case utils.MetaJSON: return NewJSONFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) + case utils.MetaAMQPjsonMap: + return NewAMQPER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) } return } diff --git a/ers/sql_it_test.go b/ers/sql_it_test.go index 5b829cc0b..42de3e326 100644 --- a/ers/sql_it_test.go +++ b/ers/sql_it_test.go @@ -45,7 +45,7 @@ var ( testSQLEmptyTable, testSQLPoster, - testSQLInitDB, + testSQLAddData, testSQLReader2, testSQLStop, @@ -136,11 +136,12 @@ func (_ *testModelSql) TableName() string { func testSQLInitDBs(t *testing.T) { var err error - if db, err = gorm.Open("mysql", fmt.Sprintf(dbConnString, "cgrates")); err != nil { + var db2 *gorm.DB + if db2, err = gorm.Open("mysql", fmt.Sprintf(dbConnString, "cgrates")); err != nil { t.Fatal(err) } - if _, err = db.DB().Exec(`CREATE DATABASE IF NOT EXISTS cgrates2;`); err != nil { + if _, err = db2.DB().Exec(`CREATE DATABASE IF NOT EXISTS cgrates2;`); err != nil { t.Fatal(err) } } @@ -166,6 +167,8 @@ func testSQLInitDB(t *testing.T) { t.Fatal(err) } } + tx.Commit() + tx = db.Begin() tx = tx.Table(utils.CDRsTBL) cdrSql := cdr.AsCDRsql() cdrSql.CreatedAt = time.Now() @@ -178,6 +181,19 @@ func testSQLInitDB(t *testing.T) { time.Sleep(10 * time.Millisecond) } +func testSQLAddData(t *testing.T) { + tx := db.Begin() + tx = tx.Table(utils.CDRsTBL) + cdrSql := cdr.AsCDRsql() + cdrSql.CreatedAt = time.Now() + saved := tx.Save(cdrSql) + if saved.Error != nil { + tx.Rollback() + t.Fatal(saved.Error) + } + tx.Commit() + time.Sleep(10 * time.Millisecond) +} func testSQLReader(t *testing.T) { rdrEvents = make(chan *erEvent, 1) rdrErr = make(chan error, 1) @@ -292,6 +308,9 @@ func testSQLPoster(t *testing.T) { } func testSQLStop(t *testing.T) { + if _, err := db.DB().Exec(`DROP DATABASE cgrates2;`); err != nil { + t.Fatal(err) + } rdrExit <- struct{}{} db = db.DropTable("cdrs2") db = db.DropTable("cdrs") diff --git a/packages/debian/changelog b/packages/debian/changelog index da535a8bb..7a8d185d9 100644 --- a/packages/debian/changelog +++ b/packages/debian/changelog @@ -95,6 +95,7 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium * [CGR-CONSOLE] Uniformize the commands between profile and subsystem * [StatS] Add rounding operation for duration metric (e.g. acd, tcd, etc...) * [DispatcherH] Added DispatcherH subsystem + * [ERs] Added support for *amqp_json_map type -- DanB Wed, 19 Feb 2020 13:25:52 +0200 diff --git a/services/datadb.go b/services/datadb.go index 2c7ee2661..33a8a3aae 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -162,7 +162,7 @@ func (db *DataDBService) needsConnectionReload() bool { // GetDMChan returns the DataManager chanel func (db *DataDBService) GetDMChan() chan *engine.DataManager { - db.Lock() - defer db.Unlock() + db.RLock() + defer db.RUnlock() return db.dbchan }