Migrator fixes for mongo accounts v2->v3

This commit is contained in:
DanB
2018-04-24 19:33:24 +02:00
parent d08fc21b72
commit 23bd29f91d
5 changed files with 48 additions and 28 deletions

View File

@@ -48,7 +48,6 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en
<-cacheS.GetPrecacheChannel(utils.CacheActionPlans)
<-cacheS.GetPrecacheChannel(utils.CacheAccountActionPlans)
<-cacheS.GetPrecacheChannel(utils.CacheActionTriggers)
<-cacheS.GetPrecacheChannel(utils.CacheActionTriggers)
<-cacheS.GetPrecacheChannel(utils.CacheSharedGroups)
<-cacheS.GetPrecacheChannel(utils.CacheLCRRules)
<-cacheS.GetPrecacheChannel(utils.CacheDerivedChargers)

View File

@@ -125,18 +125,29 @@ func main() {
}
var dmIN *engine.DataManager
dmIN, _ = engine.ConfigureDataStorage(*inDataDBType, *inDataDBHost, *inDataDBPort,
*inDataDBName, *inDataDBUser, *inDataDBPass, *dbDataEncoding, config.CgrConfig().CacheCfg(), *loadHistorySize)
instorDB, err := engine.ConfigureStorStorage(*inStorDBType, *inStorDBHost, *inStorDBPort, *inStorDBName, *inStorDBUser, *inStorDBPass, *inDBDataEncoding,
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
if err != nil {
var err error
if dmIN, err = engine.ConfigureDataStorage(*inDataDBType, *inDataDBHost, *inDataDBPort,
*inDataDBName, *inDataDBUser, *inDataDBPass, *dbDataEncoding,
config.CgrConfig().CacheCfg(), *loadHistorySize); err != nil {
log.Fatal(err)
}
var instorDB engine.Storage
if instorDB, err = engine.ConfigureStorStorage(*inStorDBType, *inStorDBHost, *inStorDBPort,
*inStorDBName, *inStorDBUser, *inStorDBPass, *inDBDataEncoding,
config.CgrConfig().StorDBMaxOpenConns,
config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime,
config.CgrConfig().StorDBCDRSIndexes); err != nil {
log.Fatal(err)
}
var dmOUT *engine.DataManager
dmOUT, _ = engine.ConfigureDataStorage(*outDataDBType, *outDataDBHost, *outDataDBPort,
*outDataDBName, *outDataDBUser, *outDataDBPass, *dbDataEncoding, config.CgrConfig().CacheCfg(), *loadHistorySize)
outDataDB, err := migrator.ConfigureV1DataStorage(*outDataDBType, *outDataDBHost, *outDataDBPort, *outDataDBName, *outDataDBUser, *outDataDBPass, *dbDataEncoding)
if err != nil {
if dmOUT, err = engine.ConfigureDataStorage(*outDataDBType, *outDataDBHost, *outDataDBPort,
*outDataDBName, *outDataDBUser, *outDataDBPass, *dbDataEncoding,
config.CgrConfig().CacheCfg(), *loadHistorySize); err != nil {
log.Fatal(err)
}
var outDataDB migrator.MigratorDataDB
if outDataDB, err = migrator.ConfigureV1DataStorage(*outDataDBType, *outDataDBHost, *outDataDBPort,
*outDataDBName, *outDataDBUser, *outDataDBPass, *dbDataEncoding); err != nil {
log.Fatal(err)
}

View File

@@ -80,7 +80,7 @@ func (m *Migrator) migrateV1Accounts() (err error) {
}
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
vrs := engine.Versions{utils.Accounts: engine.CurrentDataDBVersions()[utils.Accounts]}
if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
@@ -113,7 +113,7 @@ func (m *Migrator) migrateV2Accounts() (err error) {
}
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
vrs := engine.Versions{utils.Accounts: engine.CurrentDataDBVersions()[utils.Accounts]}
if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
@@ -126,8 +126,7 @@ func (m *Migrator) migrateV2Accounts() (err error) {
func (m *Migrator) migrateAccounts() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions(utils.TBLVersions)
vrs, err = m.dmIN.DataDB().GetVersions(utils.TBLVersions)
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
@@ -137,8 +136,9 @@ func (m *Migrator) migrateAccounts() (err error) {
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.UndefinedVersion,
"version number is not defined for ActionTriggers model")
"version number is not defined for Actions")
}
current := engine.CurrentDataDBVersions()
switch vrs[utils.Accounts] {
case current[utils.Accounts]:
if m.sameDataDB {

View File

@@ -28,7 +28,8 @@ import (
func NewMigrator(dmIN *engine.DataManager, dmOut *engine.DataManager, dataDBType, dataDBEncoding string,
storDB engine.Storage, storDBType string, oldDataDB MigratorDataDB, oldDataDBType, oldDataDBEncoding string,
oldStorDB engine.Storage, oldStorDBType string, dryRun bool, sameDataDB bool, sameStorDB bool, datadb_versions bool, stordb_versions bool) (m *Migrator, err error) {
oldStorDB engine.Storage, oldStorDBType string, dryRun bool, sameDataDB bool, sameStorDB bool,
datadb_versions bool, stordb_versions bool) (m *Migrator, err error) {
var mrshlr engine.Marshaler
var oldmrshlr engine.Marshaler
if dataDBEncoding == utils.MSGPACK {
@@ -86,12 +87,7 @@ func (m *Migrator) Migrate(taskIDs []string) (err error, stats map[string]int) {
fmt.Sprintf("task <%s> is not a supported migration task", taskID))
case utils.MetaSetVersions:
if m.dryRun != true {
if err := m.storDB.SetVersions(engine.CurrentDBVersions(m.storDBType), true); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error())), nil
}
if err := m.dmOut.DataDB().SetVersions(engine.CurrentDBVersions(m.dataDBType), true); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
@@ -105,6 +101,13 @@ func (m *Migrator) Migrate(taskIDs []string) (err error, stats map[string]int) {
}
log.Print("After migrate, DataDB versions :", vrs)
}
if err := m.storDB.SetVersions(engine.CurrentDBVersions(m.storDBType), true); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error())), nil
}
if m.stordb_versions {
vrs, err := m.storDB.GetVersions(utils.TBLVersions)
if err != nil {
@@ -112,6 +115,7 @@ func (m *Migrator) Migrate(taskIDs []string) (err error, stats map[string]int) {
}
log.Print("After migrate, StorDB versions :", vrs)
}
} else {
log.Print("Cannot dryRun SetVersions!")
}

View File

@@ -26,6 +26,12 @@ import (
"github.com/cgrates/mgo"
)
const (
v2AccountsCol = "accounts"
v1ActionTriggersCol = "action_triggers"
v1AttributeProfilesCol = "attribute_profiles"
)
type v1Mongo struct {
session *mgo.Session
db string
@@ -95,7 +101,7 @@ func (v1ms *v1Mongo) setV1Account(x *v1Account) (err error) {
//get
func (v1ms *v1Mongo) getv2Account() (v2Acnt *v2Account, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.ACCOUNT_PREFIX).Find(nil).Iter()
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v2AccountsCol).Find(nil).Iter()
}
v1ms.qryIter.Next(&v2Acnt)
@@ -109,7 +115,7 @@ func (v1ms *v1Mongo) getv2Account() (v2Acnt *v2Account, err error) {
//set
func (v1ms *v1Mongo) setV2Account(x *v2Account) (err error) {
if err := v1ms.session.DB(v1ms.db).C(utils.ACCOUNT_PREFIX).Insert(x); err != nil {
if err := v1ms.session.DB(v1ms.db).C(v2AccountsCol).Insert(x); err != nil {
return err
}
return
@@ -227,7 +233,7 @@ func (v1ms *v1Mongo) setV1Stats(x *v1Stat) (err error) {
//get
func (v1ms *v1Mongo) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.ACTION_TRIGGER_PREFIX).Find(nil).Iter()
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1ActionTriggersCol).Find(nil).Iter()
}
v1ms.qryIter.Next(&v2at)
if v2at == nil {
@@ -240,7 +246,7 @@ func (v1ms *v1Mongo) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
//set
func (v1ms *v1Mongo) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
if err := v1ms.session.DB(v1ms.db).C(utils.ACTION_TRIGGER_PREFIX).Insert(x); err != nil {
if err := v1ms.session.DB(v1ms.db).C(v1ActionTriggersCol).Insert(x); err != nil {
return err
}
return
@@ -250,7 +256,7 @@ func (v1ms *v1Mongo) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
//get
func (v1ms *v1Mongo) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.AttributeProfilePrefix).Find(nil).Iter()
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1AttributeProfilesCol).Find(nil).Iter()
}
v1ms.qryIter.Next(&v1attrPrf)
if v1attrPrf == nil {
@@ -263,7 +269,7 @@ func (v1ms *v1Mongo) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err
//set
func (v1ms *v1Mongo) setV1AttributeProfile(x *v1AttributeProfile) (err error) {
if err := v1ms.session.DB(v1ms.db).C(utils.AttributeProfilePrefix).Insert(x); err != nil {
if err := v1ms.session.DB(v1ms.db).C(v1AttributeProfilesCol).Insert(x); err != nil {
return err
}
return