Updated actions migration process

This commit is contained in:
adragusin
2020-06-22 18:01:34 +03:00
committed by Dan Christian Bogos
parent 6379062dba
commit 20e493b4dc
7 changed files with 85 additions and 36 deletions

View File

@@ -62,41 +62,36 @@ func (m *Migrator) migrateCurrentActions() (err error) {
return
}
func (m *Migrator) migrateV1Actions() (err error) {
var v1ACs *v1Actions
var acts engine.Actions
func (m *Migrator) removeV1Actions() (err error) {
var v1 *v1Actions
for {
v1ACs, err = m.dmIN.getV1Actions()
if err != nil && err != utils.ErrNoMoreData {
if v1, err = m.dmIN.getV1Actions(); err != nil && err != utils.ErrNoMoreData {
return err
}
if err == utils.ErrNoMoreData {
break
if v1 == nil {
return nil
}
if *v1ACs == nil || m.dryRun {
continue
}
for _, v1ac := range *v1ACs {
act := v1ac.AsAction()
acts = append(acts, act)
}
if err := m.dmOut.DataManager().SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil {
if err = m.dmIN.remV1Actions(*v1); err != nil {
return err
}
m.stats[utils.Actions] += 1
}
if m.dryRun {
}
func (m *Migrator) migrateV1Actions() (acts engine.Actions, err error) {
var v1ACs *v1Actions
if v1ACs, err = m.dmIN.getV1Actions(); err != nil {
return nil, err
}
if *v1ACs == nil {
return
}
// All done, update version wtih current one
vrs := engine.Versions{utils.Actions: engine.CurrentStorDBVersions()[utils.Actions]}
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Actions version into dataDB", err.Error()))
for _, v1ac := range *v1ACs {
act := v1ac.AsAction()
acts = append(acts, act)
}
return
}
@@ -115,19 +110,55 @@ func (m *Migrator) migrateActions() (err error) {
utils.UndefinedVersion,
"version number is not defined for ActionTriggers model")
}
switch vrs[utils.Actions] {
case current[utils.Actions]:
if m.sameDataDB {
migrated := true
var acts engine.Actions
for {
version := vrs[utils.Actions]
for {
switch version {
case current[utils.Actions]:
migrated = false
if m.sameDataDB {
break
}
if err = m.migrateCurrentActions(); err != nil {
return err
}
case 1:
if acts, err = m.migrateV1Actions(); err != nil && err != utils.ErrNoMoreData {
return err
}
version = 2
}
if version == current[utils.Actions] || err == utils.ErrNoMoreData {
break
}
}
if err == utils.ErrNoMoreData || !migrated {
break
}
if err = m.migrateCurrentActions(); err != nil {
return err
}
case 1:
if err = m.migrateV1Actions(); err != nil {
return err
if !m.dryRun && migrated {
if err := m.dmOut.DataManager().SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil {
return err
}
}
m.stats[utils.Actions] += 1
}
if m.dryRun || !migrated {
return nil
}
// remove old actions
// All done, update version wtih current one
vrs = engine.Versions{utils.Actions: engine.CurrentStorDBVersions()[utils.Actions]}
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Actions version into dataDB", err.Error()))
}
return m.ensureIndexesDataDB(engine.ColAct)
}

View File

@@ -173,7 +173,7 @@ func testActITConnect(t *testing.T) {
actCfgOut.DataDbCfg().DataDbHost, actCfgOut.DataDbCfg().DataDbPort,
actCfgOut.DataDbCfg().DataDbName, actCfgOut.DataDbCfg().DataDbUser,
actCfgOut.DataDbCfg().DataDbPass, actCfgOut.GeneralCfg().DBDataEncoding,
config.CgrConfig().CacheCfg(), "", accCfgOut.DataDbCfg().Items)
config.CgrConfig().CacheCfg(), "", actCfgOut.DataDbCfg().Items)
if err != nil {
log.Fatal(err)
}

View File

@@ -140,7 +140,6 @@ func (m *Migrator) migrateActionTriggers() (err error) {
}
migrated = false
case 1:
fmt.Println("migration starts")
if v2, err = m.migrateV1ActionTrigger(); err != nil && err != utils.ErrNoMoreData {
return err
}

View File

@@ -31,6 +31,7 @@ type MigratorDataDB interface {
remV1ActionPlans(x *v1ActionPlans) (err error)
getV1Actions() (v1acs *v1Actions, err error)
setV1Actions(x *v1Actions) (err error)
remV1Actions(x v1Actions) (err error)
getV1ActionTriggers() (v1acts *v1ActionTriggers, err error)
setV1ActionTriggers(x *v1ActionTriggers) (err error)
remV1ActionTriggers(x *v1ActionTriggers) (err error)

View File

@@ -101,6 +101,11 @@ func (iDBMig *internalMigrator) setV1Actions(x *v1Actions) (err error) {
return utils.ErrNotImplemented
}
//rem
func (iDBMig *internalMigrator) remV1Actions(x v1Actions) (err error) {
return utils.ErrNotImplemented
}
//ActionTriggers methods
//get
func (iDBMig *internalMigrator) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) {

View File

@@ -204,6 +204,12 @@ func (v1ms *mongoMigrator) setV1Actions(x *v1Actions) (err error) {
return
}
//rem
func (v1ms *mongoMigrator) remV1Actions(x v1Actions) (err error) {
_, err = v1ms.mgoDB.DB().Collection("actions").DeleteOne(v1ms.mgoDB.GetContext(), bson.M{"id": x[0].Id})
return
}
//ActionTriggers methods
//get
func (v1ms *mongoMigrator) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) {

View File

@@ -237,6 +237,13 @@ func (v1rs *redisMigrator) setV1Actions(x *v1Actions) (err error) {
return
}
//rem
func (v1rs *redisMigrator) remV1Actions(x v1Actions) (err error) {
key := utils.ACTION_PREFIX + x[0].Id
return v1rs.rds.Cmd("DEL", key).Err
}
//ActionTriggers methods
//get
func (v1rs *redisMigrator) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) {