mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Separeted mongo storeDB and dataDB Versions
This commit is contained in:
committed by
Dan Christian Bogos
parent
d4494b9a9e
commit
7446aea042
@@ -63,7 +63,7 @@ func TestDMitMongo(t *testing.T) {
|
||||
dataDB, err := NewMongoStorage(mgoITCfg.StorDbCfg().StorDBHost,
|
||||
mgoITCfg.StorDbCfg().StorDBPort, mgoITCfg.StorDbCfg().StorDBName,
|
||||
mgoITCfg.StorDbCfg().StorDBUser, mgoITCfg.StorDbCfg().StorDBPass,
|
||||
utils.StorDB, nil, mgoITCfg.CacheCfg())
|
||||
utils.StorDB, nil, mgoITCfg.CacheCfg(), false)
|
||||
if err != nil {
|
||||
t.Fatal("Could not connect to Mongo", err.Error())
|
||||
}
|
||||
|
||||
@@ -85,7 +85,7 @@ func TestFilterIndexerITMongo(t *testing.T) {
|
||||
mongoDB, err := NewMongoStorage(mgoITCfg.StorDbCfg().StorDBHost,
|
||||
mgoITCfg.StorDbCfg().StorDBPort, mgoITCfg.StorDbCfg().StorDBName,
|
||||
mgoITCfg.StorDbCfg().StorDBUser, mgoITCfg.StorDbCfg().StorDBPass,
|
||||
utils.StorDB, nil, mgoITCfg.CacheCfg())
|
||||
utils.StorDB, nil, mgoITCfg.CacheCfg(), false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ func TestOnStorITMongo(t *testing.T) {
|
||||
if mgoITdb, err = NewMongoStorage(mgoITCfg.StorDbCfg().StorDBHost,
|
||||
mgoITCfg.StorDbCfg().StorDBPort, mgoITCfg.StorDbCfg().StorDBName,
|
||||
mgoITCfg.StorDbCfg().StorDBUser, mgoITCfg.StorDbCfg().StorDBPass,
|
||||
utils.StorDB, nil, mgoITCfg.CacheCfg()); err != nil {
|
||||
utils.StorDB, nil, mgoITCfg.CacheCfg(), false); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
onStorCfg = mgoITCfg.StorDbCfg().StorDBName
|
||||
|
||||
@@ -126,8 +126,8 @@ func TimeDecodeValue1(dc bsoncodec.DecodeContext, vr bsonrw.ValueReader, val ref
|
||||
}
|
||||
|
||||
// NewMongoStorage givese new mongo driver
|
||||
func NewMongoStorage(host, port, db, user, pass, storageType string,
|
||||
cdrsIndexes []string, cacheCfg config.CacheCfg) (ms *MongoStorage, err error) {
|
||||
func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes []string,
|
||||
cacheCfg config.CacheCfg, isDataDB bool) (ms *MongoStorage, err error) {
|
||||
url := host
|
||||
if port != "" {
|
||||
url += ":" + port
|
||||
@@ -172,6 +172,7 @@ func NewMongoStorage(host, port, db, user, pass, storageType string,
|
||||
ms: NewCodecMsgpackMarshaler(),
|
||||
cacheCfg: cacheCfg,
|
||||
cdrsIndexes: cdrsIndexes,
|
||||
isDataDB: isDataDB,
|
||||
}
|
||||
if err = ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) error {
|
||||
if col, err := ms.client.Database(dbName).ListCollections(sctx, nil, options.ListCollections().SetNameOnly(true)); err != nil {
|
||||
@@ -210,11 +211,15 @@ type MongoStorage struct {
|
||||
ctx context.Context
|
||||
db string
|
||||
storageType string // datadb, stordb
|
||||
|
||||
ms Marshaler
|
||||
cacheCfg config.CacheCfg
|
||||
cdrsIndexes []string
|
||||
cnter *utils.Counter
|
||||
isDataDB bool
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) IsDataDB() bool {
|
||||
return ms.isDataDB
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) EnusureIndex(colName string, uniq bool, keys ...string) error {
|
||||
|
||||
@@ -48,7 +48,7 @@ func ConfigureDataStorage(db_type, host, port, name, user, pass, marshaler strin
|
||||
d, err = NewRedisStorage(host, db_nb, pass, marshaler, utils.REDIS_MAX_CONNS, cacheCfg, sentinelName)
|
||||
dm = NewDataManager(d.(DataDB))
|
||||
case utils.MONGO:
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, utils.DataDB, nil, cacheCfg)
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, utils.DataDB, nil, cacheCfg, true)
|
||||
dm = NewDataManager(d.(DataDB))
|
||||
case utils.INTERNAL:
|
||||
if marshaler == utils.JSON {
|
||||
@@ -72,7 +72,7 @@ func ConfigureStorStorage(db_type, host, port, name, user, pass, marshaler strin
|
||||
var d Storage
|
||||
switch db_type {
|
||||
case utils.MONGO:
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil)
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, false)
|
||||
case utils.POSTGRES:
|
||||
d, err = NewPostgresStorage(host, port, name, user, pass, maxConn, maxIdleConn, connMaxLifetime)
|
||||
case utils.MYSQL:
|
||||
@@ -98,7 +98,7 @@ func ConfigureLoadStorage(db_type, host, port, name, user, pass, marshaler strin
|
||||
case utils.MYSQL:
|
||||
d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn, connMaxLifetime)
|
||||
case utils.MONGO:
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil)
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, false)
|
||||
case utils.INTERNAL:
|
||||
d, err = NewMapStorage()
|
||||
default:
|
||||
@@ -120,7 +120,7 @@ func ConfigureCdrStorage(db_type, host, port, name, user, pass string,
|
||||
case utils.MYSQL:
|
||||
d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn, connMaxLifetime)
|
||||
case utils.MONGO:
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil)
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, false)
|
||||
case utils.INTERNAL:
|
||||
d, err = NewMapStorage()
|
||||
default:
|
||||
@@ -142,7 +142,7 @@ func ConfigureStorDB(db_type, host, port, name, user, pass string,
|
||||
case utils.MYSQL:
|
||||
d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn, connMaxLifetime)
|
||||
case utils.MONGO:
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil)
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, false)
|
||||
case utils.INTERNAL:
|
||||
d, err = NewMapStorage()
|
||||
default:
|
||||
|
||||
@@ -107,7 +107,7 @@ func TestStorDBitMongo(t *testing.T) {
|
||||
if storDB, err = NewMongoStorage(cfg.StorDbCfg().StorDBHost,
|
||||
cfg.StorDbCfg().StorDBPort, cfg.StorDbCfg().StorDBName,
|
||||
cfg.StorDbCfg().StorDBUser, cfg.StorDbCfg().StorDBPass,
|
||||
utils.StorDB, cfg.StorDbCfg().StorDBCDRSIndexes, nil); err != nil {
|
||||
utils.StorDB, cfg.StorDbCfg().StorDBCDRSIndexes, nil, false); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
storDB2ndDBname = "todo"
|
||||
|
||||
@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package engine
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -58,7 +57,9 @@ type Versions map[string]int64 // map[item]versionNr
|
||||
func CheckVersions(storage Storage) error {
|
||||
// get current db version
|
||||
storType := storage.GetStorageType()
|
||||
x := CurrentDBVersions(storType)
|
||||
isDataDB := isDataDB(storage)
|
||||
|
||||
x := CurrentDBVersions(storType, isDataDB)
|
||||
dbVersion, err := storage.GetVersions("")
|
||||
if err == utils.ErrNotFound {
|
||||
empty, err := storage.IsDBEmpty()
|
||||
@@ -66,50 +67,68 @@ func CheckVersions(storage Storage) error {
|
||||
return err
|
||||
}
|
||||
if !empty {
|
||||
msg := "Migration needed: please backup cgrates data and run : <cgr-migrator>"
|
||||
return errors.New(msg)
|
||||
return fmt.Errorf("Migration needed: please backup cgrates data and run : <cgr-migrator>")
|
||||
}
|
||||
// no data, write version
|
||||
if err := SetDBVersions(storage); err != nil {
|
||||
// no data, safe to write version
|
||||
if err := OverwriteDBVersions(storage); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
} else {
|
||||
// comparing versions
|
||||
message := dbVersion.Compare(x, storType)
|
||||
if len(message) > 2 {
|
||||
// write the new values
|
||||
msg := "Migration needed: please backup cgr data and run : <" + message + ">"
|
||||
return errors.New(msg)
|
||||
message := dbVersion.Compare(x, storType, isDataDB)
|
||||
if message != "" {
|
||||
return fmt.Errorf("Migration needed: please backup cgr data and run : <%s>", message)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func SetDBVersions(storage Storage) (err error) {
|
||||
storType := storage.GetStorageType()
|
||||
x := CurrentDBVersions(storType)
|
||||
// relevant only for mongoDB
|
||||
func isDataDB(storage Storage) bool {
|
||||
conv, ok := storage.(*MongoStorage)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return conv.IsDataDB()
|
||||
}
|
||||
|
||||
func setDBVersions(storage Storage, overwrite bool) (err error) {
|
||||
x := CurrentDBVersions(storage.GetStorageType(), isDataDB(storage))
|
||||
// no data, write version
|
||||
if err = storage.SetVersions(x, false); err != nil {
|
||||
if err = storage.SetVersions(x, overwrite); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("Could not write current version to db: %v", err))
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (vers Versions) Compare(curent Versions, storType string) string {
|
||||
var x map[string]string
|
||||
func SetDBVersions(storage Storage) (err error) {
|
||||
return setDBVersions(storage, false)
|
||||
}
|
||||
|
||||
func OverwriteDBVersions(storage Storage) (err error) {
|
||||
return setDBVersions(storage, true)
|
||||
}
|
||||
|
||||
func (vers Versions) Compare(curent Versions, storType string, isDataDB bool) string {
|
||||
var message map[string]string
|
||||
switch storType {
|
||||
case utils.MONGO, utils.MAPSTOR:
|
||||
x = allVers
|
||||
case utils.MONGO:
|
||||
if isDataDB {
|
||||
message = dataDBVers
|
||||
} else {
|
||||
message = storDBVers
|
||||
}
|
||||
case utils.MAPSTOR:
|
||||
message = allVers
|
||||
case utils.POSTGRES, utils.MYSQL:
|
||||
x = storDBVers
|
||||
message = storDBVers
|
||||
case utils.REDIS:
|
||||
x = dataDBVers
|
||||
message = dataDBVers
|
||||
}
|
||||
for y, val := range x {
|
||||
if vers[y] != curent[y] {
|
||||
return val
|
||||
for subsis, reason := range message {
|
||||
if vers[subsis] != curent[subsis] {
|
||||
return reason
|
||||
}
|
||||
}
|
||||
return ""
|
||||
@@ -177,10 +196,9 @@ func CurrentStorDBVersions() Versions {
|
||||
}
|
||||
}
|
||||
|
||||
func CurrentDBVersions(storType string) Versions {
|
||||
func CurrentAllDBVersions() Versions {
|
||||
dataDbVersions := CurrentDataDBVersions()
|
||||
storDbVersions := CurrentStorDBVersions()
|
||||
|
||||
allVersions := make(Versions)
|
||||
for k, v := range dataDbVersions {
|
||||
allVersions[k] = v
|
||||
@@ -188,14 +206,22 @@ func CurrentDBVersions(storType string) Versions {
|
||||
for k, v := range storDbVersions {
|
||||
allVersions[k] = v
|
||||
}
|
||||
return allVersions
|
||||
}
|
||||
|
||||
func CurrentDBVersions(storType string, isDataDB bool) Versions {
|
||||
switch storType {
|
||||
case utils.MONGO, utils.MAPSTOR:
|
||||
return allVersions
|
||||
case utils.MONGO:
|
||||
if isDataDB {
|
||||
return CurrentDataDBVersions()
|
||||
}
|
||||
return CurrentStorDBVersions()
|
||||
case utils.MAPSTOR:
|
||||
return CurrentAllDBVersions()
|
||||
case utils.POSTGRES, utils.MYSQL:
|
||||
return storDbVersions
|
||||
return CurrentStorDBVersions()
|
||||
case utils.REDIS:
|
||||
return dataDbVersions
|
||||
return CurrentDataDBVersions()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -36,9 +36,7 @@ func TestVersionCompare(t *testing.T) {
|
||||
q := Versions{utils.Accounts: 2, utils.Actions: 2,
|
||||
utils.ActionTriggers: 2, utils.ActionPlans: 2,
|
||||
utils.SharedGroups: 1, utils.CostDetails: 2}
|
||||
c := Versions{utils.Accounts: 2, utils.Actions: 2,
|
||||
utils.ActionTriggers: 2, utils.ActionPlans: 2,
|
||||
utils.SharedGroups: 2, utils.CostDetails: 1}
|
||||
c := Versions{utils.CostDetails: 1}
|
||||
a := Versions{utils.Accounts: 2, utils.Actions: 2,
|
||||
utils.ActionTriggers: 2, utils.ActionPlans: 2,
|
||||
utils.SharedGroups: 2, utils.CostDetails: 2,
|
||||
@@ -47,27 +45,27 @@ func TestVersionCompare(t *testing.T) {
|
||||
utils.ActionTriggers: 2, utils.ActionPlans: 2,
|
||||
utils.SharedGroups: 2, utils.CostDetails: 2,
|
||||
utils.SessionSCosts: 2}
|
||||
message1 := y.Compare(x, utils.MONGO)
|
||||
message1 := y.Compare(x, utils.MONGO, true)
|
||||
if message1 != "cgr-migrator -migrate=*accounts" {
|
||||
t.Errorf("Error failed to compare to curent version expected: %s received: %s", "cgr-migrator -migrate=*accounts", message1)
|
||||
}
|
||||
message2 := z.Compare(x, utils.MONGO)
|
||||
message2 := z.Compare(x, utils.MONGO, true)
|
||||
if message2 != "cgr-migrator -migrate=*action_plans" {
|
||||
t.Errorf("Error failed to compare to curent version expected: %s received: %s", "cgr-migrator -migrate=*action_plans", message2)
|
||||
}
|
||||
message3 := q.Compare(x, utils.MONGO)
|
||||
message3 := q.Compare(x, utils.MONGO, true)
|
||||
if message3 != "cgr-migrator -migrate=*shared_groups" {
|
||||
t.Errorf("Error failed to compare to curent version expected: %s received: %s", "cgr-migrator -migrate=*shared_groups", message3)
|
||||
}
|
||||
message4 := c.Compare(x, utils.MONGO)
|
||||
message4 := c.Compare(x, utils.MONGO, false)
|
||||
if message4 != "cgr-migrator -migrate=*cost_details" {
|
||||
t.Errorf("Error failed to compare to curent version expected: %s received: %s", "cgr-migrator -migrate=*cost_details", message4)
|
||||
}
|
||||
message5 := a.Compare(b, utils.MYSQL)
|
||||
message5 := a.Compare(b, utils.MYSQL, false)
|
||||
if message5 != "cgr-migrator -migrate=*sessions_costs" {
|
||||
t.Errorf("Error failed to compare to curent version expected: %s received: %s", "cgr-migrator -migrate=*sessions_costs", message5)
|
||||
}
|
||||
message6 := a.Compare(b, utils.POSTGRES)
|
||||
message6 := a.Compare(b, utils.POSTGRES, false)
|
||||
if message6 != "cgr-migrator -migrate=*sessions_costs" {
|
||||
t.Errorf("Error failed to compare to curent version expected: %s received: %s", "cgr-migrator -migrate=*sessions_costs", message6)
|
||||
}
|
||||
|
||||
@@ -156,12 +156,12 @@ func testVersion(t *testing.T) {
|
||||
|
||||
storType := dm3.DataDB().GetStorageType()
|
||||
switch storType {
|
||||
case utils.MONGO, utils.MAPSTOR:
|
||||
case utils.MAPSTOR:
|
||||
currentVersion = allVersions
|
||||
testVersion = allVersions
|
||||
testVersion[utils.Accounts] = 1
|
||||
test = "Migration needed: please backup cgr data and run : <cgr-migrator -migrate=*accounts>"
|
||||
case utils.REDIS:
|
||||
case utils.MONGO, utils.REDIS:
|
||||
currentVersion = dataDbVersions
|
||||
testVersion = dataDbVersions
|
||||
testVersion[utils.Accounts] = 1
|
||||
@@ -198,12 +198,12 @@ func testVersion(t *testing.T) {
|
||||
}
|
||||
storType = storageDb.GetStorageType()
|
||||
switch storType {
|
||||
case utils.MONGO, utils.MAPSTOR:
|
||||
case utils.MAPSTOR:
|
||||
currentVersion = allVersions
|
||||
testVersion = allVersions
|
||||
testVersion[utils.Accounts] = 1
|
||||
test = "Migration needed: please backup cgr data and run : <cgr-migrator -migrate=*accounts>"
|
||||
case utils.POSTGRES, utils.MYSQL:
|
||||
case utils.MONGO, utils.POSTGRES, utils.MYSQL:
|
||||
currentVersion = storDbVersions
|
||||
testVersion = allVersions
|
||||
testVersion[utils.CostDetails] = 1
|
||||
@@ -228,7 +228,7 @@ func testVersion(t *testing.T) {
|
||||
if err := storageDb.SetVersions(testVersion, false); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := CheckVersions(storageDb); err.Error() != test {
|
||||
if err := CheckVersions(storageDb); err != nil && err.Error() != test {
|
||||
t.Error(err)
|
||||
}
|
||||
if err = storageDb.RemoveVersions(testVersion); err != nil {
|
||||
|
||||
@@ -69,15 +69,13 @@ func (m *Migrator) Migrate(taskIDs []string) (err error, stats map[string]int) {
|
||||
fmt.Sprintf("task <%s> is not a supported migration task", taskID))
|
||||
case utils.MetaSetVersions:
|
||||
if m.dryRun != true {
|
||||
if err := m.dmOut.DataManager().DataDB().SetVersions(
|
||||
engine.CurrentDBVersions(m.dmOut.DataManager().DataDB().GetStorageType()), true); err != nil {
|
||||
if err := engine.OverwriteDBVersions(m.dmOut.DataManager().DataDB()); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error())), nil
|
||||
}
|
||||
if err := m.storDBOut.StorDB().SetVersions(
|
||||
engine.CurrentDBVersions(m.storDBOut.StorDB().GetStorageType()), true); err != nil {
|
||||
if err := engine.OverwriteDBVersions(m.storDBOut.StorDB()); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
|
||||
Reference in New Issue
Block a user