Updated ActionTriggers migration process

This commit is contained in:
adragusin
2020-06-19 17:57:54 +03:00
committed by Dan Christian Bogos
parent 35e3893a8d
commit a23b501c05
6 changed files with 105 additions and 40 deletions

View File

@@ -74,48 +74,44 @@ func (m *Migrator) migrateCurrentActionTrigger() (err error) {
return
}
func (m *Migrator) migrateV1ActionTrigger() (err error) {
func (m *Migrator) migrateV1ActionTrigger() (acts engine.ActionTriggers, err error) {
var v1ACTs *v1ActionTriggers
var acts engine.ActionTriggers
for {
v1ACTs, err = m.dmIN.getV1ActionTriggers()
if err != nil && err != utils.ErrNoMoreData {
return err
}
if err == utils.ErrNoMoreData {
break
}
if *v1ACTs == nil || m.dryRun {
continue
}
for _, v1ac := range *v1ACTs {
act := v1ac.AsActionTrigger()
acts = append(acts, act)
}
if err := m.dmOut.DataManager().SetActionTriggers(acts[0].ID, acts, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.ActionTriggers] += 1
v1ACTs, err = m.dmIN.getV1ActionTriggers()
if err != nil {
return nil, err
}
if v1ACTs == nil {
return nil, nil
}
for _, v1ac := range *v1ACTs {
act := v1ac.AsActionTrigger()
acts = append(acts, act)
}
if m.dryRun {
return
}
// All done, update version wtih current one
vrs := engine.Versions{utils.ActionTriggers: engine.CurrentDataDBVersions()[utils.ActionTriggers]}
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating ActionTriggers version into DataDB", err.Error()))
}
return
}
func (m *Migrator) removeV1ActionTriggers() (err error) {
var v1ACTs *v1ActionTriggers
for {
if v1ACTs, err = m.dmIN.getV1ActionTriggers(); err != nil && err != utils.ErrNoMoreData {
return err
}
if v1ACTs == nil {
return nil
}
if err = m.dmIN.remV1ActionTriggers(v1ACTs); err != nil {
return err
}
}
}
func (m *Migrator) migrateActionTriggers() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmIN.DataManager().DataDB().GetVersions("")
vrs, err = m.dmIN.DataManager().DataDB().GetVersions(utils.EmptyString)
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
@@ -127,19 +123,67 @@ func (m *Migrator) migrateActionTriggers() (err error) {
utils.UndefinedVersion,
"version number is not defined for ActionTriggers model")
}
switch vrs[utils.ActionTriggers] {
case current[utils.ActionTriggers]:
if m.sameDataDB {
migrated := true
migratedFrom := 0
var v2 engine.ActionTriggers
for {
version := vrs[utils.ActionTriggers]
for {
switch version {
case current[utils.ActionTriggers]:
if m.sameDataDB {
migrated = false
break
}
if err = m.migrateCurrentActionTrigger(); err != nil {
return err
}
migrated = false
case 1:
fmt.Println("migration starts")
if v2, err = m.migrateV1ActionTrigger(); err != nil && err != utils.ErrNoMoreData {
return err
}
migratedFrom = 1
version = 2
}
if version == current[utils.ActionTriggers] || err == utils.ErrNoMoreData {
break
}
}
if err == utils.ErrNoMoreData || !migrated {
break
}
if err = m.migrateCurrentActionTrigger(); err != nil {
return err
if !m.dryRun && migrated {
//set action triggers
if err := m.dmOut.DataManager().SetActionTriggers(v2[0].ID, v2, utils.NonTransactional); err != nil {
return err
}
}
case 1:
if err = m.migrateV1ActionTrigger(); err != nil {
return err
m.stats[utils.ActionTriggers] += 1
}
if m.dryRun || !migrated {
return nil
}
// remove old action triggers
if !m.sameDataDB {
switch migratedFrom {
case 1:
if err = m.removeV1ActionTriggers(); err != nil {
return
}
}
}
// All done, update version wtih current one
vrs = engine.Versions{utils.ActionTriggers: engine.CurrentDataDBVersions()[utils.ActionTriggers]}
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating ActionTriggers version into DataDB", err.Error()))
}
return m.ensureIndexesDataDB(engine.ColAtr)
}

View File

@@ -230,7 +230,7 @@ func testActTrgITMigrateAndMove(t *testing.T) {
t.Error("Error when getting ActionTriggers ", err.Error())
}
if !reflect.DeepEqual(actTrg, result) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(actTrg), utils.ToJSON(result))
t.Errorf("Expecting: %+v,\nReceived: %+v", utils.ToJSON(actTrg), utils.ToJSON(result))
}
// utils.tojson si verificat
case utils.Move:

View File

@@ -33,6 +33,7 @@ type MigratorDataDB interface {
setV1Actions(x *v1Actions) (err error)
getV1ActionTriggers() (v1acts *v1ActionTriggers, err error)
setV1ActionTriggers(x *v1ActionTriggers) (err error)
remV1ActionTriggers(x *v1ActionTriggers) (err error)
getV1SharedGroup() (v1acts *v1SharedGroup, err error)
setV1SharedGroup(x *v1SharedGroup) (err error)
getV1Stats() (v1st *v1Stat, err error)

View File

@@ -112,6 +112,11 @@ func (iDBMig *internalMigrator) setV1ActionTriggers(x *v1ActionTriggers) (err er
return utils.ErrNotImplemented
}
//rem
func (iDBMig *internalMigrator) remV1ActionTriggers(x *v1ActionTriggers) (err error) {
return utils.ErrNotImplemented
}
//SharedGroup methods
//get
func (iDBMig *internalMigrator) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {

View File

@@ -236,6 +236,15 @@ func (v1ms *mongoMigrator) setV1ActionTriggers(act *v1ActionTriggers) (err error
return
}
//rem
func (v1ms *mongoMigrator) remV1ActionTriggers(x *v1ActionTriggers) (err error) {
for _, item := range *x {
_, err = v1ms.mgoDB.DB().Collection(v1ActionTriggersCol).DeleteOne(v1ms.mgoDB.GetContext(), bson.M{"id": item.Id})
return
}
return
}
//Actions methods
//get
func (v1ms *mongoMigrator) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {

View File

@@ -278,6 +278,12 @@ func (v1rs *redisMigrator) setV1ActionTriggers(x *v1ActionTriggers) (err error)
return
}
//rem
func (v1rs *redisMigrator) remV1ActionTriggers(x *v1ActionTriggers) (err error) {
key := utils.ACTION_TRIGGER_PREFIX + (*x)[0].Id
return v1rs.rds.Cmd("DEL", key).Err
}
//SharedGroup methods
//get
func (v1rs *redisMigrator) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {