mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
MongoDB driver does not longer handle indexing on each connection
This commit is contained in:
@@ -110,7 +110,7 @@ func testV2CDRsInjectUnratedCdr(t *testing.T) {
|
||||
cdrsCfg.StorDBMaxOpenConns, cdrsCfg.StorDBMaxIdleConns)
|
||||
case "cdrsv2mongo":
|
||||
db, err = engine.NewMongoStorage(cdrsCfg.StorDBHost, cdrsCfg.StorDBPort, cdrsCfg.StorDBName,
|
||||
cdrsCfg.StorDBUser, cdrsCfg.StorDBPass, cdrsCfg.StorDBCDRSIndexes, nil, 10)
|
||||
cdrsCfg.StorDBUser, cdrsCfg.StorDBPass, utils.StorDB, cdrsCfg.StorDBCDRSIndexes, nil, 10)
|
||||
}
|
||||
if err != nil {
|
||||
t.Error("Error on opening database connection: ", err)
|
||||
|
||||
@@ -227,15 +227,12 @@ func testTPitDestinations(t *testing.T) {
|
||||
}
|
||||
}
|
||||
// Test get
|
||||
/*
|
||||
FixMe for mongodb
|
||||
var rplyDst *utils.TPDestination
|
||||
if err := tpRPC.Call("ApierV2.GetTPDestination", v1.AttrGetTPDestination{testTPid, dstDEMobile.DestinationId}, &rplyDst); err != nil {
|
||||
t.Error("Calling ApierV2.GetTPDestination, got error: ", err.Error())
|
||||
} else if len(dstDEMobile.Prefixes) != len(rplyDst.Prefixes) {
|
||||
t.Errorf("Calling ApierV2.GetTPDestination expected: %v, received: %v", dstDEMobile, rplyDst)
|
||||
}
|
||||
*/
|
||||
var rplyDst *utils.TPDestination
|
||||
if err := tpRPC.Call("ApierV2.GetTPDestination", v1.AttrGetTPDestination{testTPid, dstDEMobile.DestinationId}, &rplyDst); err != nil {
|
||||
t.Error("Calling ApierV2.GetTPDestination, got error: ", err.Error())
|
||||
} else if len(dstDEMobile.Prefixes) != len(rplyDst.Prefixes) {
|
||||
t.Errorf("Calling ApierV2.GetTPDestination expected: %v, received: %v", dstDEMobile, rplyDst)
|
||||
}
|
||||
// Test remove
|
||||
if err := tpRPC.Call("ApierV2.RemTPDestination", v1.AttrGetTPDestination{testTPid, dstDUMMY.DestinationId}, &reply); err != nil {
|
||||
t.Error("Calling ApierV1.RemTPTiming, got error: ", err.Error())
|
||||
@@ -250,5 +247,4 @@ func testTPitDestinations(t *testing.T) {
|
||||
} else if len(expectedDstIds) != len(rplyDstIds) {
|
||||
t.Errorf("Calling ApierV2.GetTPDestinationIds expected: %v, received: %v", expectedDstIds, rplyDstIds)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -13,20 +13,17 @@
|
||||
"tariffplan_db": { // database used to store active tariff plan configuration
|
||||
"db_type": "mongo", // stor database type to use: <mysql|postgres>
|
||||
"db_port": 27017, // the port to reach the stordb
|
||||
"db_name": "tpdb",
|
||||
},
|
||||
|
||||
|
||||
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
|
||||
"db_type": "mongo", // stor database type to use: <mysql|postgres>
|
||||
"db_port": 27017, // the port to reach the stordb
|
||||
"db_name": "datadb",
|
||||
},
|
||||
|
||||
"stor_db": {
|
||||
"db_type": "mongo", // stor database type to use: <mysql|postgres>
|
||||
"db_port": 27017, // the port to reach the stordb
|
||||
"db_name": "stordb",
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -47,11 +47,11 @@ func init() {
|
||||
ratingStorage, _ = NewMapStorage()
|
||||
accountingStorage, _ = NewMapStorage()
|
||||
case "mongo":
|
||||
ratingStorage, err = NewMongoStorage("127.0.0.1", "27017", "cgrates_rating_test", "", "", nil, &config.CacheConfig{RatingPlans: &config.CacheParamConfig{Precache: true}}, 10)
|
||||
ratingStorage, err = NewMongoStorage("127.0.0.1", "27017", "cgrates_rating_test", "", "", utils.TariffPlanDB, nil, &config.CacheConfig{RatingPlans: &config.CacheParamConfig{Precache: true}}, 10)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
accountingStorage, err = NewMongoStorage("127.0.0.1", "27017", "cgrates_accounting_test", "", "", nil, &config.CacheConfig{RatingPlans: &config.CacheParamConfig{Precache: true}}, 10)
|
||||
accountingStorage, err = NewMongoStorage("127.0.0.1", "27017", "cgrates_accounting_test", "", "", utils.DataDB, nil, &config.CacheConfig{RatingPlans: &config.CacheParamConfig{Precache: true}}, 10)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -85,20 +85,7 @@ var (
|
||||
CostLow = strings.ToLower(utils.COST)
|
||||
)
|
||||
|
||||
type MongoStorage struct {
|
||||
session *mgo.Session
|
||||
db string
|
||||
ms Marshaler
|
||||
cacheCfg *config.CacheConfig
|
||||
loadHistorySize int
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) conn(col string) (*mgo.Session, *mgo.Collection) {
|
||||
sessionCopy := ms.session.Copy()
|
||||
return sessionCopy, sessionCopy.DB(ms.db).C(col)
|
||||
}
|
||||
|
||||
func NewMongoStorage(host, port, db, user, pass string, cdrsIndexes []string, cacheCfg *config.CacheConfig, loadHistorySize int) (*MongoStorage, error) {
|
||||
func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes []string, cacheCfg *config.CacheConfig, loadHistorySize int) (ms *MongoStorage, err error) {
|
||||
address := fmt.Sprintf("%s:%s", host, port)
|
||||
if user != "" && pass != "" {
|
||||
address = fmt.Sprintf("%s:%s@%s", user, pass, address)
|
||||
@@ -107,169 +94,169 @@ func NewMongoStorage(host, port, db, user, pass string, cdrsIndexes []string, ca
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ndb := session.DB(db)
|
||||
session.SetMode(mgo.Strong, true)
|
||||
index := mgo.Index{
|
||||
return &MongoStorage{db: db, session: session, storageType: storageType, ms: NewCodecMsgpackMarshaler(), cacheCfg: cacheCfg, loadHistorySize: loadHistorySize, cdrsIndexes: cdrsIndexes}, nil
|
||||
}
|
||||
|
||||
type MongoStorage struct {
|
||||
session *mgo.Session
|
||||
db string
|
||||
storageType string // tariffplandb, datadb, stordb
|
||||
ms Marshaler
|
||||
cacheCfg *config.CacheConfig
|
||||
loadHistorySize int
|
||||
cdrsIndexes []string
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) conn(col string) (*mgo.Session, *mgo.Collection) {
|
||||
sessionCopy := ms.session.Copy()
|
||||
return sessionCopy, sessionCopy.DB(ms.db).C(col)
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) EnsureIndexes() (err error) {
|
||||
dbSession, _ := ms.conn("")
|
||||
defer dbSession.Close()
|
||||
db := dbSession.DB(ms.db)
|
||||
idx := mgo.Index{
|
||||
Key: []string{"key"},
|
||||
Unique: true, // Prevent two documents from having the same index key
|
||||
DropDups: false, // Drop documents with the same index key as a previously indexed one
|
||||
Background: false, // Build index in background and return immediately
|
||||
Sparse: false, // Only index documents containing the Key fields
|
||||
}
|
||||
collections := []string{colAct, colApl, colAtr, colDcs, colAls, colRls, colUsr, colLcr, colLht, colRpl, colDst, colRds}
|
||||
for _, col := range collections {
|
||||
if err = ndb.C(col).EnsureIndex(index); err != nil {
|
||||
return nil, err
|
||||
for _, col := range []string{colAct, colApl, colAtr, colDcs, colAls, colRls, colUsr, colLcr, colLht, colRpl, colDst, colRds} {
|
||||
if err = db.C(col).EnsureIndex(idx); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
index = mgo.Index{
|
||||
idx = mgo.Index{
|
||||
Key: []string{"id"},
|
||||
Unique: true,
|
||||
DropDups: false,
|
||||
Background: false,
|
||||
Sparse: false,
|
||||
}
|
||||
collections = []string{colRpf, colShg, colAcc, colCrs}
|
||||
for _, col := range collections {
|
||||
if err = ndb.C(col).EnsureIndex(index); err != nil {
|
||||
return nil, err
|
||||
for _, col := range []string{colRpf, colShg, colAcc, colCrs} {
|
||||
if err = db.C(col).EnsureIndex(idx); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
index = mgo.Index{
|
||||
idx = mgo.Index{
|
||||
Key: []string{"tpid", "tag"},
|
||||
Unique: true,
|
||||
DropDups: false,
|
||||
Background: false,
|
||||
Sparse: false,
|
||||
}
|
||||
collections = []string{utils.TBL_TP_TIMINGS, utils.TBL_TP_DESTINATIONS, utils.TBL_TP_DESTINATION_RATES, utils.TBL_TP_RATING_PLANS, utils.TBL_TP_SHARED_GROUPS, utils.TBL_TP_CDR_STATS, utils.TBL_TP_ACTIONS, utils.TBL_TP_ACTION_PLANS, utils.TBL_TP_ACTION_TRIGGERS}
|
||||
for _, col := range collections {
|
||||
if err = ndb.C(col).EnsureIndex(index); err != nil {
|
||||
return nil, err
|
||||
for _, col := range []string{utils.TBL_TP_TIMINGS, utils.TBL_TP_DESTINATIONS, utils.TBL_TP_DESTINATION_RATES, utils.TBL_TP_RATING_PLANS,
|
||||
utils.TBL_TP_SHARED_GROUPS, utils.TBL_TP_CDR_STATS, utils.TBL_TP_ACTIONS, utils.TBL_TP_ACTION_PLANS, utils.TBL_TP_ACTION_TRIGGERS} {
|
||||
if err = db.C(col).EnsureIndex(idx); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
index = mgo.Index{
|
||||
idx = mgo.Index{
|
||||
Key: []string{"tpid", "direction", "tenant", "category", "subject", "loadid"},
|
||||
Unique: true,
|
||||
DropDups: false,
|
||||
Background: false,
|
||||
Sparse: false,
|
||||
}
|
||||
collections = []string{utils.TBL_TP_RATE_PROFILES}
|
||||
for _, col := range collections {
|
||||
if err = ndb.C(col).EnsureIndex(index); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = db.C(utils.TBL_TP_RATE_PROFILES).EnsureIndex(idx); err != nil {
|
||||
return
|
||||
}
|
||||
index = mgo.Index{
|
||||
idx = mgo.Index{
|
||||
Key: []string{"tpid", "direction", "tenant", "category", "account", "subject"},
|
||||
Unique: true,
|
||||
DropDups: false,
|
||||
Background: false,
|
||||
Sparse: false,
|
||||
}
|
||||
collections = []string{utils.TBL_TP_LCRS}
|
||||
for _, col := range collections {
|
||||
if err = ndb.C(col).EnsureIndex(index); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = db.C(utils.TBL_TP_LCRS).EnsureIndex(idx); err != nil {
|
||||
return
|
||||
}
|
||||
index = mgo.Index{
|
||||
idx = mgo.Index{
|
||||
Key: []string{"tpid", "tenant", "username"},
|
||||
Unique: true,
|
||||
DropDups: false,
|
||||
Background: false,
|
||||
Sparse: false,
|
||||
}
|
||||
collections = []string{utils.TBL_TP_USERS}
|
||||
for _, col := range collections {
|
||||
if err = ndb.C(col).EnsureIndex(index); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = db.C(utils.TBL_TP_USERS).EnsureIndex(idx); err != nil {
|
||||
return
|
||||
}
|
||||
index = mgo.Index{
|
||||
idx = mgo.Index{
|
||||
Key: []string{"tpid", "direction", "tenant", "category", "account", "subject", "context"},
|
||||
Unique: true,
|
||||
DropDups: false,
|
||||
Background: false,
|
||||
Sparse: false,
|
||||
}
|
||||
collections = []string{utils.TBL_TP_LCRS}
|
||||
for _, col := range collections {
|
||||
if err = ndb.C(col).EnsureIndex(index); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = db.C(utils.TBL_TP_LCRS).EnsureIndex(idx); err != nil {
|
||||
return
|
||||
}
|
||||
index = mgo.Index{
|
||||
idx = mgo.Index{
|
||||
Key: []string{"tpid", "direction", "tenant", "category", "subject", "account", "loadid"},
|
||||
Unique: true,
|
||||
DropDups: false,
|
||||
Background: false,
|
||||
Sparse: false,
|
||||
}
|
||||
collections = []string{utils.TBL_TP_DERIVED_CHARGERS}
|
||||
for _, col := range collections {
|
||||
if err = ndb.C(col).EnsureIndex(index); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = db.C(utils.TBL_TP_DERIVED_CHARGERS).EnsureIndex(idx); err != nil {
|
||||
return
|
||||
}
|
||||
index = mgo.Index{
|
||||
idx = mgo.Index{
|
||||
Key: []string{"tpid", "direction", "tenant", "account", "loadid"},
|
||||
Unique: true,
|
||||
DropDups: false,
|
||||
Background: false,
|
||||
Sparse: false,
|
||||
}
|
||||
collections = []string{utils.TBL_TP_DERIVED_CHARGERS}
|
||||
for _, col := range collections {
|
||||
if err = ndb.C(col).EnsureIndex(index); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = db.C(utils.TBL_TP_DERIVED_CHARGERS).EnsureIndex(idx); err != nil {
|
||||
return
|
||||
}
|
||||
index = mgo.Index{
|
||||
idx = mgo.Index{
|
||||
Key: []string{CGRIDLow, RunIDLow, OriginIDLow},
|
||||
Unique: true,
|
||||
DropDups: false,
|
||||
Background: false,
|
||||
Sparse: false,
|
||||
}
|
||||
if err = ndb.C(utils.TBL_CDRS).EnsureIndex(index); err != nil {
|
||||
return nil, err
|
||||
if err = db.C(utils.TBL_CDRS).EnsureIndex(idx); err != nil {
|
||||
return
|
||||
}
|
||||
for _, idxKey := range cdrsIndexes {
|
||||
index = mgo.Index{
|
||||
for _, idxKey := range ms.cdrsIndexes {
|
||||
idx = mgo.Index{
|
||||
Key: []string{idxKey},
|
||||
Unique: false,
|
||||
DropDups: false,
|
||||
Background: false,
|
||||
Sparse: false,
|
||||
}
|
||||
if err = ndb.C(utils.TBL_CDRS).EnsureIndex(index); err != nil {
|
||||
return nil, err
|
||||
if err = db.C(utils.TBL_CDRS).EnsureIndex(idx); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
index = mgo.Index{
|
||||
idx = mgo.Index{
|
||||
Key: []string{CGRIDLow, RunIDLow},
|
||||
Unique: true,
|
||||
DropDups: false,
|
||||
Background: false,
|
||||
Sparse: false,
|
||||
}
|
||||
if err = ndb.C(utils.TBLSMCosts).EnsureIndex(index); err != nil {
|
||||
return nil, err
|
||||
if err = db.C(utils.TBLSMCosts).EnsureIndex(idx); err != nil {
|
||||
return
|
||||
}
|
||||
index = mgo.Index{
|
||||
idx = mgo.Index{
|
||||
Key: []string{OriginHostLow, OriginIDLow},
|
||||
Unique: false,
|
||||
DropDups: false,
|
||||
Background: false,
|
||||
Sparse: false,
|
||||
}
|
||||
if err = ndb.C(utils.TBLSMCosts).EnsureIndex(index); err != nil {
|
||||
return nil, err
|
||||
if err = db.C(utils.TBLSMCosts).EnsureIndex(idx); err != nil {
|
||||
return
|
||||
}
|
||||
return &MongoStorage{db: db, session: session, ms: NewCodecMsgpackMarshaler(), cacheCfg: cacheCfg, loadHistorySize: loadHistorySize}, err
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) getColNameForPrefix(prefix string) (name string, ok bool) {
|
||||
@@ -327,7 +314,6 @@ func (ms *MongoStorage) RebuildReverseForPrefix(prefix string) error {
|
||||
if !ok {
|
||||
return utils.ErrInvalidKey
|
||||
}
|
||||
|
||||
session, col := ms.conn(colName)
|
||||
defer session.Close()
|
||||
|
||||
|
||||
@@ -38,7 +38,8 @@ func TestMGOitConnect(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if mgoITDB, err = NewMongoStorage(mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, mgoITCfg.StorDBPass, nil, mgoITCfg.CacheConfig, mgoITCfg.LoadHistorySize); err != nil {
|
||||
if mgoITDB, err = NewMongoStorage(mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, mgoITCfg.StorDBPass,
|
||||
utils.StorDB, nil, mgoITCfg.CacheConfig, mgoITCfg.LoadHistorySize); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ func ConfigureRatingStorage(db_type, host, port, name, user, pass, marshaler str
|
||||
}
|
||||
d, err = NewRedisStorage(host, db_nb, pass, marshaler, utils.REDIS_MAX_CONNS, cacheCfg, loadHistorySize)
|
||||
case utils.MONGO:
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, nil, cacheCfg, loadHistorySize)
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, utils.TariffPlanDB, nil, cacheCfg, loadHistorySize)
|
||||
db = d.(RatingStorage)
|
||||
default:
|
||||
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'",
|
||||
@@ -70,7 +70,7 @@ func ConfigureAccountingStorage(db_type, host, port, name, user, pass, marshaler
|
||||
}
|
||||
d, err = NewRedisStorage(host, db_nb, pass, marshaler, utils.REDIS_MAX_CONNS, cacheCfg, loadHistorySize)
|
||||
case utils.MONGO:
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, nil, cacheCfg, loadHistorySize)
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, utils.DataDB, nil, cacheCfg, loadHistorySize)
|
||||
db = d.(AccountingStorage)
|
||||
default:
|
||||
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'",
|
||||
@@ -99,7 +99,7 @@ func ConfigureStorStorage(db_type, host, port, name, user, pass, marshaler strin
|
||||
d, err = NewRedisStorage(host, db_nb, pass, marshaler)
|
||||
*/
|
||||
case utils.MONGO:
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, cdrsIndexes, nil, 1)
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, 1)
|
||||
case utils.POSTGRES:
|
||||
d, err = NewPostgresStorage(host, port, name, user, pass, maxConn, maxIdleConn)
|
||||
case utils.MYSQL:
|
||||
@@ -122,7 +122,7 @@ func ConfigureLoadStorage(db_type, host, port, name, user, pass, marshaler strin
|
||||
case utils.MYSQL:
|
||||
d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn)
|
||||
case utils.MONGO:
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, cdrsIndexes, nil, 1)
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, 1)
|
||||
default:
|
||||
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s]",
|
||||
db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES))
|
||||
@@ -141,7 +141,7 @@ func ConfigureCdrStorage(db_type, host, port, name, user, pass string, maxConn,
|
||||
case utils.MYSQL:
|
||||
d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn)
|
||||
case utils.MONGO:
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, cdrsIndexes, nil, 1)
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, 1)
|
||||
default:
|
||||
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s]",
|
||||
db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES))
|
||||
|
||||
@@ -334,4 +334,7 @@ const (
|
||||
EVT_ACTION_TRIGGER_FIRED = "ACTION_TRIGGER_FIRED"
|
||||
EVT_ACTION_TIMING_FIRED = "ACTION_TRIGGER_FIRED"
|
||||
SMAsterisk = "sm_asterisk"
|
||||
TariffPlanDB = "tariffplan_db"
|
||||
DataDB = "data_db"
|
||||
StorDB = "stor_db"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user