Updated thresholds migration

This commit is contained in:
adragusin
2020-06-26 18:10:40 +03:00
parent f1ff03d7c2
commit 3d5fc613e3
2 changed files with 87 additions and 85 deletions

View File

@@ -101,46 +101,15 @@ func (m *Migrator) migrateCurrentThresholds() (err error) {
return
}
func (m *Migrator) migrateV2ActionTriggers() (err error) {
func (m *Migrator) migrateV2ActionTriggers() (thp *engine.ThresholdProfile, th *engine.Threshold, filter *engine.Filter, err error) {
var v2ACT *v2ActionTrigger
for {
v2ACT, err = m.dmIN.getV2ActionTrigger()
if err != nil && err != utils.ErrNoMoreData {
return err
}
if err == utils.ErrNoMoreData {
break
}
if v2ACT.ID != "" {
thp, th, filter, err := v2ACT.AsThreshold()
if err != nil {
return err
}
if m.dryRun {
continue
}
if err := m.dmOut.DataManager().SetFilter(filter, true); err != nil {
return err
}
if err := m.dmOut.DataManager().SetThreshold(th); err != nil {
return err
}
if err := m.dmOut.DataManager().SetThresholdProfile(thp, true); err != nil {
return err
}
m.stats[utils.Thresholds] += 1
}
}
if m.dryRun {
if v2ACT, err = m.dmIN.getV2ActionTrigger(); err != nil {
return
}
// All done, update version wtih current one
vrs := engine.Versions{utils.Thresholds: engine.CurrentStorDBVersions()[utils.Thresholds]}
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Thresholds version into dataDB", err.Error()))
if v2ACT.ID != "" {
if thp, th, filter, err = v2ACT.AsThreshold(); err != nil {
return
}
}
return
}
@@ -162,50 +131,22 @@ func (m *Migrator) removeV2Thresholds() (err error) {
return
}
func (m *Migrator) migrateV2Thresholds() (err error) {
func (m *Migrator) migrateV2Thresholds() (v3 *engine.ThresholdProfile, err error) {
var v2T *v2Threshold
for {
v2T, err = m.dmIN.getV2ThresholdProfile()
if err != nil && err != utils.ErrNoMoreData {
return err
}
if err == utils.ErrNoMoreData {
break
}
if v2T == nil || m.dryRun {
continue
}
th := v2T.V2toV3Threshold()
if err = m.dmOut.DataManager().SetThresholdProfile(th, true); err != nil {
return err
}
m.stats[utils.Thresholds] += 1
}
if m.dryRun {
if v2T, err = m.dmIN.getV2ThresholdProfile(); err != nil {
return
}
if !m.sameDataDB {
if err = m.removeV2Thresholds(); err != nil && err != utils.ErrNoMoreData {
return
}
}
// All done, update version wtih current one
vrs := engine.Versions{utils.Thresholds: engine.CurrentDataDBVersions()[utils.Thresholds]}
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Thresholds version into dataDB", err.Error()))
if v2T == nil {
return
}
v3 = v2T.V2toV3Threshold()
return
}
func (m *Migrator) migrateThresholds() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmIN.DataManager().DataDB().GetVersions("")
vrs, err = m.dmIN.DataManager().DataDB().GetVersions(utils.EmptyString)
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
@@ -217,23 +158,84 @@ func (m *Migrator) migrateThresholds() (err error) {
utils.UndefinedVersion,
"version number is not defined for ActionTriggers model")
}
switch vrs[utils.Thresholds] {
case current[utils.Thresholds]:
if m.sameDataDB {
migrated := true
migratedFrom := 0
var thp *engine.ThresholdProfile
var th *engine.Threshold
var filter *engine.Filter
var v3 *engine.ThresholdProfile
for {
version := vrs[utils.Thresholds]
for {
switch version {
case current[utils.Thresholds]:
migrated = false
if m.sameDataDB {
break
}
if err = m.migrateCurrentThresholds(); err != nil {
return err
}
case 1:
if thp, th, filter, err = m.migrateV2ActionTriggers(); err != nil && err != utils.ErrNoMoreData {
return err
}
version = 3
migratedFrom = 1
case 2:
if v3, err = m.migrateV2Thresholds(); err != nil && err != utils.ErrNoMoreData {
return err
}
version = 3
migratedFrom = 2
}
if version == current[utils.Thresholds] || err == utils.ErrNoMoreData {
break
}
}
if err == utils.ErrNoMoreData || !migrated {
break
}
if err = m.migrateCurrentThresholds(); err != nil {
return err
if !m.dryRun && migrated {
//set threshond
switch migratedFrom {
case 1:
if err := m.dmOut.DataManager().SetFilter(filter, true); err != nil {
return err
}
if err := m.dmOut.DataManager().SetThreshold(th); err != nil {
return err
}
if err := m.dmOut.DataManager().SetThresholdProfile(thp, true); err != nil {
return err
}
case 2:
if err = m.dmOut.DataManager().SetThresholdProfile(v3, true); err != nil {
return err
}
}
}
case 1:
if err = m.migrateV2ActionTriggers(); err != nil {
return err
}
case 2:
if err = m.migrateV2Thresholds(); err != nil {
return err
m.stats[utils.Thresholds]++
}
if m.dryRun || !migrated {
return nil
}
// remove old threshonds
if !m.sameDataDB && migratedFrom == 2 {
if err = m.removeV2Thresholds(); err != nil && err != utils.ErrNoMoreData {
return
}
}
// All done, update version wtih current one
vrs = engine.Versions{utils.Thresholds: engine.CurrentDataDBVersions()[utils.Thresholds]}
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Thresholds version into dataDB", err.Error()))
}
return m.ensureIndexesDataDB(engine.ColTps)
}

View File

@@ -345,7 +345,7 @@ func testTrsITMigrateAndMove(t *testing.T) {
t.Error("Error when getting Thresholds ", err.Error())
}
if !reflect.DeepEqual(tresProf2, result) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(tresProf2), utils.ToJSON(result))
t.Errorf("Expecting: %+v,\nReceived: %+v", utils.ToJSON(tresProf2), utils.ToJSON(result))
}
result, err = trsMigrator.dmOut.DataManager().GetThresholdProfile(tresProf3.Tenant, tresProf3.ID, false, false, utils.NonTransactional)
@@ -353,7 +353,7 @@ func testTrsITMigrateAndMove(t *testing.T) {
t.Error("Error when getting Thresholds ", err.Error())
}
if !reflect.DeepEqual(tresProf3, result) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(tresProf3), utils.ToJSON(result))
t.Errorf("Expecting: %+v,\nReceived: %+v", utils.ToJSON(tresProf3), utils.ToJSON(result))
}
case utils.Move: