From 6e535f11af678cbfa87a03a9b59145bef9c7e06f Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 16 Nov 2016 20:58:01 +0100 Subject: [PATCH] MongoDB driver does not longer handle indexing on each connection --- apier/v2/cdrs_it_test.go | 2 +- apier/v2/tp_it_test.go | 16 +-- data/conf/samples/tutmongo/cgrates.json | 3 - engine/calldesc.go | 4 +- engine/storage_mongo_datadb.go | 150 +++++++++----------- engine/storage_mongo_it_test.go | 3 +- engine/storage_utils.go | 10 +- integration_tests.sh => integration_test.sh | 0 utils/consts.go | 3 + 9 files changed, 87 insertions(+), 104 deletions(-) rename integration_tests.sh => integration_test.sh (100%) diff --git a/apier/v2/cdrs_it_test.go b/apier/v2/cdrs_it_test.go index 25644c26e..882909f85 100644 --- a/apier/v2/cdrs_it_test.go +++ b/apier/v2/cdrs_it_test.go @@ -110,7 +110,7 @@ func testV2CDRsInjectUnratedCdr(t *testing.T) { cdrsCfg.StorDBMaxOpenConns, cdrsCfg.StorDBMaxIdleConns) case "cdrsv2mongo": db, err = engine.NewMongoStorage(cdrsCfg.StorDBHost, cdrsCfg.StorDBPort, cdrsCfg.StorDBName, - cdrsCfg.StorDBUser, cdrsCfg.StorDBPass, cdrsCfg.StorDBCDRSIndexes, nil, 10) + cdrsCfg.StorDBUser, cdrsCfg.StorDBPass, utils.StorDB, cdrsCfg.StorDBCDRSIndexes, nil, 10) } if err != nil { t.Error("Error on opening database connection: ", err) diff --git a/apier/v2/tp_it_test.go b/apier/v2/tp_it_test.go index b55592646..517b2dab8 100644 --- a/apier/v2/tp_it_test.go +++ b/apier/v2/tp_it_test.go @@ -227,15 +227,12 @@ func testTPitDestinations(t *testing.T) { } } // Test get - /* - FixMe for mongodb - var rplyDst *utils.TPDestination - if err := tpRPC.Call("ApierV2.GetTPDestination", v1.AttrGetTPDestination{testTPid, dstDEMobile.DestinationId}, &rplyDst); err != nil { - t.Error("Calling ApierV2.GetTPDestination, got error: ", err.Error()) - } else if len(dstDEMobile.Prefixes) != len(rplyDst.Prefixes) { - t.Errorf("Calling ApierV2.GetTPDestination expected: %v, received: %v", dstDEMobile, rplyDst) - } - */ + var rplyDst *utils.TPDestination + if err := tpRPC.Call("ApierV2.GetTPDestination", v1.AttrGetTPDestination{testTPid, dstDEMobile.DestinationId}, &rplyDst); err != nil { + t.Error("Calling ApierV2.GetTPDestination, got error: ", err.Error()) + } else if len(dstDEMobile.Prefixes) != len(rplyDst.Prefixes) { + t.Errorf("Calling ApierV2.GetTPDestination expected: %v, received: %v", dstDEMobile, rplyDst) + } // Test remove if err := tpRPC.Call("ApierV2.RemTPDestination", v1.AttrGetTPDestination{testTPid, dstDUMMY.DestinationId}, &reply); err != nil { t.Error("Calling ApierV1.RemTPTiming, got error: ", err.Error()) @@ -250,5 +247,4 @@ func testTPitDestinations(t *testing.T) { } else if len(expectedDstIds) != len(rplyDstIds) { t.Errorf("Calling ApierV2.GetTPDestinationIds expected: %v, received: %v", expectedDstIds, rplyDstIds) } - } diff --git a/data/conf/samples/tutmongo/cgrates.json b/data/conf/samples/tutmongo/cgrates.json index 406f01cf5..a6cc6e53f 100644 --- a/data/conf/samples/tutmongo/cgrates.json +++ b/data/conf/samples/tutmongo/cgrates.json @@ -13,20 +13,17 @@ "tariffplan_db": { // database used to store active tariff plan configuration "db_type": "mongo", // stor database type to use: "db_port": 27017, // the port to reach the stordb - "db_name": "tpdb", }, "data_db": { // database used to store runtime data (eg: accounts, cdr stats) "db_type": "mongo", // stor database type to use: "db_port": 27017, // the port to reach the stordb - "db_name": "datadb", }, "stor_db": { "db_type": "mongo", // stor database type to use: "db_port": 27017, // the port to reach the stordb - "db_name": "stordb", }, diff --git a/engine/calldesc.go b/engine/calldesc.go index c37c1e730..41e732988 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -47,11 +47,11 @@ func init() { ratingStorage, _ = NewMapStorage() accountingStorage, _ = NewMapStorage() case "mongo": - ratingStorage, err = NewMongoStorage("127.0.0.1", "27017", "cgrates_rating_test", "", "", nil, &config.CacheConfig{RatingPlans: &config.CacheParamConfig{Precache: true}}, 10) + ratingStorage, err = NewMongoStorage("127.0.0.1", "27017", "cgrates_rating_test", "", "", utils.TariffPlanDB, nil, &config.CacheConfig{RatingPlans: &config.CacheParamConfig{Precache: true}}, 10) if err != nil { log.Fatal(err) } - accountingStorage, err = NewMongoStorage("127.0.0.1", "27017", "cgrates_accounting_test", "", "", nil, &config.CacheConfig{RatingPlans: &config.CacheParamConfig{Precache: true}}, 10) + accountingStorage, err = NewMongoStorage("127.0.0.1", "27017", "cgrates_accounting_test", "", "", utils.DataDB, nil, &config.CacheConfig{RatingPlans: &config.CacheParamConfig{Precache: true}}, 10) if err != nil { log.Fatal(err) } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 385e80524..ac0c573bc 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -85,20 +85,7 @@ var ( CostLow = strings.ToLower(utils.COST) ) -type MongoStorage struct { - session *mgo.Session - db string - ms Marshaler - cacheCfg *config.CacheConfig - loadHistorySize int -} - -func (ms *MongoStorage) conn(col string) (*mgo.Session, *mgo.Collection) { - sessionCopy := ms.session.Copy() - return sessionCopy, sessionCopy.DB(ms.db).C(col) -} - -func NewMongoStorage(host, port, db, user, pass string, cdrsIndexes []string, cacheCfg *config.CacheConfig, loadHistorySize int) (*MongoStorage, error) { +func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes []string, cacheCfg *config.CacheConfig, loadHistorySize int) (ms *MongoStorage, err error) { address := fmt.Sprintf("%s:%s", host, port) if user != "" && pass != "" { address = fmt.Sprintf("%s:%s@%s", user, pass, address) @@ -107,169 +94,169 @@ func NewMongoStorage(host, port, db, user, pass string, cdrsIndexes []string, ca if err != nil { return nil, err } - - ndb := session.DB(db) session.SetMode(mgo.Strong, true) - index := mgo.Index{ + return &MongoStorage{db: db, session: session, storageType: storageType, ms: NewCodecMsgpackMarshaler(), cacheCfg: cacheCfg, loadHistorySize: loadHistorySize, cdrsIndexes: cdrsIndexes}, nil +} + +type MongoStorage struct { + session *mgo.Session + db string + storageType string // tariffplandb, datadb, stordb + ms Marshaler + cacheCfg *config.CacheConfig + loadHistorySize int + cdrsIndexes []string +} + +func (ms *MongoStorage) conn(col string) (*mgo.Session, *mgo.Collection) { + sessionCopy := ms.session.Copy() + return sessionCopy, sessionCopy.DB(ms.db).C(col) +} + +func (ms *MongoStorage) EnsureIndexes() (err error) { + dbSession, _ := ms.conn("") + defer dbSession.Close() + db := dbSession.DB(ms.db) + idx := mgo.Index{ Key: []string{"key"}, Unique: true, // Prevent two documents from having the same index key DropDups: false, // Drop documents with the same index key as a previously indexed one Background: false, // Build index in background and return immediately Sparse: false, // Only index documents containing the Key fields } - collections := []string{colAct, colApl, colAtr, colDcs, colAls, colRls, colUsr, colLcr, colLht, colRpl, colDst, colRds} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err + for _, col := range []string{colAct, colApl, colAtr, colDcs, colAls, colRls, colUsr, colLcr, colLht, colRpl, colDst, colRds} { + if err = db.C(col).EnsureIndex(idx); err != nil { + return } } - index = mgo.Index{ + idx = mgo.Index{ Key: []string{"id"}, Unique: true, DropDups: false, Background: false, Sparse: false, } - collections = []string{colRpf, colShg, colAcc, colCrs} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err + for _, col := range []string{colRpf, colShg, colAcc, colCrs} { + if err = db.C(col).EnsureIndex(idx); err != nil { + return } } - index = mgo.Index{ + idx = mgo.Index{ Key: []string{"tpid", "tag"}, Unique: true, DropDups: false, Background: false, Sparse: false, } - collections = []string{utils.TBL_TP_TIMINGS, utils.TBL_TP_DESTINATIONS, utils.TBL_TP_DESTINATION_RATES, utils.TBL_TP_RATING_PLANS, utils.TBL_TP_SHARED_GROUPS, utils.TBL_TP_CDR_STATS, utils.TBL_TP_ACTIONS, utils.TBL_TP_ACTION_PLANS, utils.TBL_TP_ACTION_TRIGGERS} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err + for _, col := range []string{utils.TBL_TP_TIMINGS, utils.TBL_TP_DESTINATIONS, utils.TBL_TP_DESTINATION_RATES, utils.TBL_TP_RATING_PLANS, + utils.TBL_TP_SHARED_GROUPS, utils.TBL_TP_CDR_STATS, utils.TBL_TP_ACTIONS, utils.TBL_TP_ACTION_PLANS, utils.TBL_TP_ACTION_TRIGGERS} { + if err = db.C(col).EnsureIndex(idx); err != nil { + return } } - index = mgo.Index{ + idx = mgo.Index{ Key: []string{"tpid", "direction", "tenant", "category", "subject", "loadid"}, Unique: true, DropDups: false, Background: false, Sparse: false, } - collections = []string{utils.TBL_TP_RATE_PROFILES} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err - } + if err = db.C(utils.TBL_TP_RATE_PROFILES).EnsureIndex(idx); err != nil { + return } - index = mgo.Index{ + idx = mgo.Index{ Key: []string{"tpid", "direction", "tenant", "category", "account", "subject"}, Unique: true, DropDups: false, Background: false, Sparse: false, } - collections = []string{utils.TBL_TP_LCRS} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err - } + if err = db.C(utils.TBL_TP_LCRS).EnsureIndex(idx); err != nil { + return } - index = mgo.Index{ + idx = mgo.Index{ Key: []string{"tpid", "tenant", "username"}, Unique: true, DropDups: false, Background: false, Sparse: false, } - collections = []string{utils.TBL_TP_USERS} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err - } + if err = db.C(utils.TBL_TP_USERS).EnsureIndex(idx); err != nil { + return } - index = mgo.Index{ + idx = mgo.Index{ Key: []string{"tpid", "direction", "tenant", "category", "account", "subject", "context"}, Unique: true, DropDups: false, Background: false, Sparse: false, } - collections = []string{utils.TBL_TP_LCRS} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err - } + if err = db.C(utils.TBL_TP_LCRS).EnsureIndex(idx); err != nil { + return } - index = mgo.Index{ + idx = mgo.Index{ Key: []string{"tpid", "direction", "tenant", "category", "subject", "account", "loadid"}, Unique: true, DropDups: false, Background: false, Sparse: false, } - collections = []string{utils.TBL_TP_DERIVED_CHARGERS} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err - } + if err = db.C(utils.TBL_TP_DERIVED_CHARGERS).EnsureIndex(idx); err != nil { + return } - index = mgo.Index{ + idx = mgo.Index{ Key: []string{"tpid", "direction", "tenant", "account", "loadid"}, Unique: true, DropDups: false, Background: false, Sparse: false, } - collections = []string{utils.TBL_TP_DERIVED_CHARGERS} - for _, col := range collections { - if err = ndb.C(col).EnsureIndex(index); err != nil { - return nil, err - } + if err = db.C(utils.TBL_TP_DERIVED_CHARGERS).EnsureIndex(idx); err != nil { + return } - index = mgo.Index{ + idx = mgo.Index{ Key: []string{CGRIDLow, RunIDLow, OriginIDLow}, Unique: true, DropDups: false, Background: false, Sparse: false, } - if err = ndb.C(utils.TBL_CDRS).EnsureIndex(index); err != nil { - return nil, err + if err = db.C(utils.TBL_CDRS).EnsureIndex(idx); err != nil { + return } - for _, idxKey := range cdrsIndexes { - index = mgo.Index{ + for _, idxKey := range ms.cdrsIndexes { + idx = mgo.Index{ Key: []string{idxKey}, Unique: false, DropDups: false, Background: false, Sparse: false, } - if err = ndb.C(utils.TBL_CDRS).EnsureIndex(index); err != nil { - return nil, err + if err = db.C(utils.TBL_CDRS).EnsureIndex(idx); err != nil { + return } } - index = mgo.Index{ + idx = mgo.Index{ Key: []string{CGRIDLow, RunIDLow}, Unique: true, DropDups: false, Background: false, Sparse: false, } - if err = ndb.C(utils.TBLSMCosts).EnsureIndex(index); err != nil { - return nil, err + if err = db.C(utils.TBLSMCosts).EnsureIndex(idx); err != nil { + return } - index = mgo.Index{ + idx = mgo.Index{ Key: []string{OriginHostLow, OriginIDLow}, Unique: false, DropDups: false, Background: false, Sparse: false, } - if err = ndb.C(utils.TBLSMCosts).EnsureIndex(index); err != nil { - return nil, err + if err = db.C(utils.TBLSMCosts).EnsureIndex(idx); err != nil { + return } - return &MongoStorage{db: db, session: session, ms: NewCodecMsgpackMarshaler(), cacheCfg: cacheCfg, loadHistorySize: loadHistorySize}, err + return } func (ms *MongoStorage) getColNameForPrefix(prefix string) (name string, ok bool) { @@ -327,7 +314,6 @@ func (ms *MongoStorage) RebuildReverseForPrefix(prefix string) error { if !ok { return utils.ErrInvalidKey } - session, col := ms.conn(colName) defer session.Close() diff --git a/engine/storage_mongo_it_test.go b/engine/storage_mongo_it_test.go index 02b4a6442..a28372c2e 100644 --- a/engine/storage_mongo_it_test.go +++ b/engine/storage_mongo_it_test.go @@ -38,7 +38,8 @@ func TestMGOitConnect(t *testing.T) { if err != nil { t.Fatal(err) } - if mgoITDB, err = NewMongoStorage(mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, mgoITCfg.StorDBPass, nil, mgoITCfg.CacheConfig, mgoITCfg.LoadHistorySize); err != nil { + if mgoITDB, err = NewMongoStorage(mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, mgoITCfg.StorDBPass, + utils.StorDB, nil, mgoITCfg.CacheConfig, mgoITCfg.LoadHistorySize); err != nil { t.Fatal(err) } } diff --git a/engine/storage_utils.go b/engine/storage_utils.go index 285fcd842..99b1d0f60 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -43,7 +43,7 @@ func ConfigureRatingStorage(db_type, host, port, name, user, pass, marshaler str } d, err = NewRedisStorage(host, db_nb, pass, marshaler, utils.REDIS_MAX_CONNS, cacheCfg, loadHistorySize) case utils.MONGO: - d, err = NewMongoStorage(host, port, name, user, pass, nil, cacheCfg, loadHistorySize) + d, err = NewMongoStorage(host, port, name, user, pass, utils.TariffPlanDB, nil, cacheCfg, loadHistorySize) db = d.(RatingStorage) default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'", @@ -70,7 +70,7 @@ func ConfigureAccountingStorage(db_type, host, port, name, user, pass, marshaler } d, err = NewRedisStorage(host, db_nb, pass, marshaler, utils.REDIS_MAX_CONNS, cacheCfg, loadHistorySize) case utils.MONGO: - d, err = NewMongoStorage(host, port, name, user, pass, nil, cacheCfg, loadHistorySize) + d, err = NewMongoStorage(host, port, name, user, pass, utils.DataDB, nil, cacheCfg, loadHistorySize) db = d.(AccountingStorage) default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'", @@ -99,7 +99,7 @@ func ConfigureStorStorage(db_type, host, port, name, user, pass, marshaler strin d, err = NewRedisStorage(host, db_nb, pass, marshaler) */ case utils.MONGO: - d, err = NewMongoStorage(host, port, name, user, pass, cdrsIndexes, nil, 1) + d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, 1) case utils.POSTGRES: d, err = NewPostgresStorage(host, port, name, user, pass, maxConn, maxIdleConn) case utils.MYSQL: @@ -122,7 +122,7 @@ func ConfigureLoadStorage(db_type, host, port, name, user, pass, marshaler strin case utils.MYSQL: d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn) case utils.MONGO: - d, err = NewMongoStorage(host, port, name, user, pass, cdrsIndexes, nil, 1) + d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, 1) default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s]", db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES)) @@ -141,7 +141,7 @@ func ConfigureCdrStorage(db_type, host, port, name, user, pass string, maxConn, case utils.MYSQL: d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn) case utils.MONGO: - d, err = NewMongoStorage(host, port, name, user, pass, cdrsIndexes, nil, 1) + d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, 1) default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s]", db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES)) diff --git a/integration_tests.sh b/integration_test.sh similarity index 100% rename from integration_tests.sh rename to integration_test.sh diff --git a/utils/consts.go b/utils/consts.go index 1985c6aaa..1f0a28663 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -334,4 +334,7 @@ const ( EVT_ACTION_TRIGGER_FIRED = "ACTION_TRIGGER_FIRED" EVT_ACTION_TIMING_FIRED = "ACTION_TRIGGER_FIRED" SMAsterisk = "sm_asterisk" + TariffPlanDB = "tariffplan_db" + DataDB = "data_db" + StorDB = "stor_db" )