mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Merge pull request #2246 from adragusin/master
Updated filters migration
This commit is contained in:
@@ -45,11 +45,11 @@ func (m *Migrator) migrateCurrentRequestFilter() (err error) {
|
||||
if m.dryRun || fl == nil {
|
||||
continue
|
||||
}
|
||||
if err := m.dmIN.DataManager().RemoveFilter(tntID[0], tntID[1],
|
||||
utils.NonTransactional, true); err != nil {
|
||||
if err := m.dmOut.DataManager().SetFilter(fl, true); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := m.dmOut.DataManager().SetFilter(fl, true); err != nil {
|
||||
if err := m.dmIN.DataManager().RemoveFilter(tntID[0], tntID[1],
|
||||
utils.NonTransactional, true); err != nil {
|
||||
return err
|
||||
}
|
||||
m.stats[utils.RQF]++
|
||||
@@ -191,23 +191,7 @@ func migrateInlineFilterV2(fl string) string {
|
||||
return fmt.Sprintf("%s::~%s", ruleSplt[0], utils.MetaReq+utils.NestingSep+strings.Join(ruleSplt[2:], utils.InInFieldSep))
|
||||
}
|
||||
|
||||
func (m *Migrator) migrateRequestFilterV1() (err error) {
|
||||
for {
|
||||
fl, err := m.dmIN.getV1Filter()
|
||||
if err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
}
|
||||
if err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
if m.dryRun || fl == nil {
|
||||
continue
|
||||
}
|
||||
if err := m.dmOut.DataManager().SetFilter(migrateFilterV1(fl), true); err != nil {
|
||||
return err
|
||||
}
|
||||
m.stats[utils.RQF]++
|
||||
}
|
||||
func (m *Migrator) migrateOthersv1() (err error) {
|
||||
if err = m.migrateResourceProfileFiltersV1(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -229,34 +213,22 @@ func (m *Migrator) migrateRequestFilterV1() (err error) {
|
||||
if err = m.migrateDispatcherProfileFiltersV1(); err != nil {
|
||||
return err
|
||||
}
|
||||
vrs := engine.Versions{utils.RQF: engine.CurrentDataDBVersions()[utils.RQF]}
|
||||
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Filters version into dataDB", err.Error()))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Migrator) migrateRequestFilterV2() (err error) {
|
||||
for {
|
||||
fl, err := m.dmIN.getV1Filter()
|
||||
if err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
}
|
||||
if err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
if m.dryRun || fl == nil {
|
||||
continue
|
||||
}
|
||||
if err := m.dmOut.DataManager().SetFilter(migrateFilterV2(fl), true); err != nil {
|
||||
return fmt.Errorf("Error: <%s> when setting filter with tenant: <%s> and id: <%s> after migration",
|
||||
err.Error(), fl.Tenant, fl.ID)
|
||||
}
|
||||
m.stats[utils.RQF]++
|
||||
func (m *Migrator) migrateRequestFilterV1() (fltr *engine.Filter, err error) {
|
||||
var v1Fltr *v1Filter
|
||||
if v1Fltr, err = m.dmIN.getV1Filter(); err != nil {
|
||||
return
|
||||
}
|
||||
if v1Fltr == nil {
|
||||
return
|
||||
}
|
||||
fltr = migrateFilterV1(v1Fltr)
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Migrator) migrateOthersV2() (err error) {
|
||||
if err = m.migrateResourceProfileFiltersV2(); err != nil {
|
||||
return fmt.Errorf("Error: <%s> when trying to migrate filter for ResourceProfiles",
|
||||
err.Error())
|
||||
@@ -285,41 +257,30 @@ func (m *Migrator) migrateRequestFilterV2() (err error) {
|
||||
return fmt.Errorf("Error: <%s> when trying to migrate filter for DispatcherProfiles",
|
||||
err.Error())
|
||||
}
|
||||
vrs := engine.Versions{utils.RQF: engine.CurrentDataDBVersions()[utils.RQF]}
|
||||
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Filters version into dataDB", err.Error()))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Migrator) migrateRequestFilterV3() (err error) {
|
||||
for {
|
||||
fl, err := m.dmIN.getV1Filter()
|
||||
if err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
}
|
||||
if err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
if m.dryRun || fl == nil {
|
||||
continue
|
||||
}
|
||||
if err := m.dmOut.DataManager().SetFilter(migrateFilterV3(fl), true); err != nil {
|
||||
return fmt.Errorf("Error: <%s> when setting filter with tenant: <%s> and id: <%s> after migration",
|
||||
err.Error(), fl.Tenant, fl.ID)
|
||||
}
|
||||
m.stats[utils.RQF]++
|
||||
func (m *Migrator) migrateRequestFilterV2() (fltr *engine.Filter, err error) {
|
||||
var v1Fltr *v1Filter
|
||||
if v1Fltr, err = m.dmIN.getV1Filter(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vrs := engine.Versions{utils.RQF: engine.CurrentDataDBVersions()[utils.RQF]}
|
||||
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Filters version into dataDB", err.Error()))
|
||||
if err == utils.ErrNoMoreData {
|
||||
return nil, nil
|
||||
}
|
||||
fltr = migrateFilterV2(v1Fltr)
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Migrator) migrateRequestFilterV3() (fltr *engine.Filter, err error) {
|
||||
var v1Fltr *v1Filter
|
||||
if v1Fltr, err = m.dmIN.getV1Filter(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if v1Fltr == nil {
|
||||
return
|
||||
}
|
||||
fltr = migrateFilterV3(v1Fltr)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -338,27 +299,92 @@ func (m *Migrator) migrateFilters() (err error) {
|
||||
utils.UndefinedVersion,
|
||||
"version number is not defined for ActionTriggers model")
|
||||
}
|
||||
switch vrs[utils.RQF] {
|
||||
case 3:
|
||||
if err = m.migrateRequestFilterV3(); err != nil {
|
||||
migrated := true
|
||||
migratedFrom := 0
|
||||
var fltr *engine.Filter
|
||||
for {
|
||||
version := vrs[utils.RQF]
|
||||
for {
|
||||
switch version {
|
||||
case current[utils.RQF]:
|
||||
migrated = false
|
||||
if m.sameDataDB {
|
||||
break
|
||||
}
|
||||
if err = m.migrateCurrentRequestFilter(); err != nil {
|
||||
return err
|
||||
}
|
||||
version = 4
|
||||
case 1:
|
||||
if fltr, err = m.migrateRequestFilterV1(); err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
}
|
||||
migratedFrom = 1
|
||||
version = 4
|
||||
case 2:
|
||||
if fltr, err = m.migrateRequestFilterV2(); err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
}
|
||||
migratedFrom = 2
|
||||
version = 4
|
||||
case 3:
|
||||
if fltr, err = m.migrateRequestFilterV3(); err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
}
|
||||
migratedFrom = 3
|
||||
version = 4
|
||||
}
|
||||
if version == current[utils.RQF] || err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err == utils.ErrNoMoreData || !migrated {
|
||||
break
|
||||
}
|
||||
if !m.dryRun && migrated {
|
||||
//set filters
|
||||
switch migratedFrom {
|
||||
case 1:
|
||||
if err := m.dmOut.DataManager().SetFilter(fltr, true); err != nil {
|
||||
return fmt.Errorf("Error: <%s> when setting filter with tenant: <%s> and id: <%s> after migration",
|
||||
err.Error(), fltr.Tenant, fltr.ID)
|
||||
}
|
||||
case 2:
|
||||
if err := m.dmOut.DataManager().SetFilter(fltr, true); err != nil {
|
||||
return fmt.Errorf("Error: <%s> when setting filter with tenant: <%s> and id: <%s> after migration",
|
||||
err.Error(), fltr.Tenant, fltr.ID)
|
||||
}
|
||||
case 3:
|
||||
if err := m.dmOut.DataManager().SetFilter(fltr, true); err != nil {
|
||||
return fmt.Errorf("Error: <%s> when setting filter with tenant: <%s> and id: <%s> after migration",
|
||||
err.Error(), fltr.Tenant, fltr.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
m.stats[utils.RQF]++
|
||||
}
|
||||
if m.dryRun || !migrated {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch migratedFrom {
|
||||
case 1:
|
||||
if err := m.migrateOthersv1(); err != nil {
|
||||
return err
|
||||
}
|
||||
case 2:
|
||||
if err = m.migrateRequestFilterV2(); err != nil {
|
||||
return err
|
||||
}
|
||||
case 1:
|
||||
if err = m.migrateRequestFilterV1(); err != nil {
|
||||
return err
|
||||
}
|
||||
case current[utils.RQF]:
|
||||
if m.sameDataDB {
|
||||
break
|
||||
}
|
||||
if err = m.migrateCurrentRequestFilter(); err != nil {
|
||||
if err := m.migrateOthersV2(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
vrs = engine.Versions{utils.RQF: engine.CurrentDataDBVersions()[utils.RQF]}
|
||||
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Filters version into dataDB", err.Error()))
|
||||
}
|
||||
return m.ensureIndexesDataDB(engine.ColFlt)
|
||||
}
|
||||
|
||||
|
||||
@@ -101,46 +101,15 @@ func (m *Migrator) migrateCurrentThresholds() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Migrator) migrateV2ActionTriggers() (err error) {
|
||||
func (m *Migrator) migrateV2ActionTriggers() (thp *engine.ThresholdProfile, th *engine.Threshold, filter *engine.Filter, err error) {
|
||||
var v2ACT *v2ActionTrigger
|
||||
for {
|
||||
v2ACT, err = m.dmIN.getV2ActionTrigger()
|
||||
if err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
}
|
||||
if err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
if v2ACT.ID != "" {
|
||||
thp, th, filter, err := v2ACT.AsThreshold()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if m.dryRun {
|
||||
continue
|
||||
}
|
||||
if err := m.dmOut.DataManager().SetFilter(filter, true); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := m.dmOut.DataManager().SetThreshold(th); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := m.dmOut.DataManager().SetThresholdProfile(thp, true); err != nil {
|
||||
return err
|
||||
}
|
||||
m.stats[utils.Thresholds] += 1
|
||||
}
|
||||
}
|
||||
if m.dryRun {
|
||||
if v2ACT, err = m.dmIN.getV2ActionTrigger(); err != nil {
|
||||
return
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.Thresholds: engine.CurrentStorDBVersions()[utils.Thresholds]}
|
||||
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Thresholds version into dataDB", err.Error()))
|
||||
if v2ACT.ID != "" {
|
||||
if thp, th, filter, err = v2ACT.AsThreshold(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -162,50 +131,22 @@ func (m *Migrator) removeV2Thresholds() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Migrator) migrateV2Thresholds() (err error) {
|
||||
func (m *Migrator) migrateV2Thresholds() (v3 *engine.ThresholdProfile, err error) {
|
||||
var v2T *v2Threshold
|
||||
for {
|
||||
v2T, err = m.dmIN.getV2ThresholdProfile()
|
||||
if err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
}
|
||||
if err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
if v2T == nil || m.dryRun {
|
||||
continue
|
||||
}
|
||||
th := v2T.V2toV3Threshold()
|
||||
|
||||
if err = m.dmOut.DataManager().SetThresholdProfile(th, true); err != nil {
|
||||
return err
|
||||
}
|
||||
m.stats[utils.Thresholds] += 1
|
||||
}
|
||||
if m.dryRun {
|
||||
if v2T, err = m.dmIN.getV2ThresholdProfile(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if !m.sameDataDB {
|
||||
if err = m.removeV2Thresholds(); err != nil && err != utils.ErrNoMoreData {
|
||||
return
|
||||
}
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.Thresholds: engine.CurrentDataDBVersions()[utils.Thresholds]}
|
||||
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Thresholds version into dataDB", err.Error()))
|
||||
if v2T == nil {
|
||||
return
|
||||
}
|
||||
v3 = v2T.V2toV3Threshold()
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Migrator) migrateThresholds() (err error) {
|
||||
var vrs engine.Versions
|
||||
current := engine.CurrentDataDBVersions()
|
||||
vrs, err = m.dmIN.DataManager().DataDB().GetVersions("")
|
||||
vrs, err = m.dmIN.DataManager().DataDB().GetVersions(utils.EmptyString)
|
||||
if err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
@@ -217,23 +158,84 @@ func (m *Migrator) migrateThresholds() (err error) {
|
||||
utils.UndefinedVersion,
|
||||
"version number is not defined for ActionTriggers model")
|
||||
}
|
||||
switch vrs[utils.Thresholds] {
|
||||
case current[utils.Thresholds]:
|
||||
if m.sameDataDB {
|
||||
migrated := true
|
||||
migratedFrom := 0
|
||||
var thp *engine.ThresholdProfile
|
||||
var th *engine.Threshold
|
||||
var filter *engine.Filter
|
||||
var v3 *engine.ThresholdProfile
|
||||
for {
|
||||
version := vrs[utils.Thresholds]
|
||||
for {
|
||||
switch version {
|
||||
case current[utils.Thresholds]:
|
||||
migrated = false
|
||||
if m.sameDataDB {
|
||||
break
|
||||
}
|
||||
if err = m.migrateCurrentThresholds(); err != nil {
|
||||
return err
|
||||
}
|
||||
case 1:
|
||||
if thp, th, filter, err = m.migrateV2ActionTriggers(); err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
}
|
||||
version = 3
|
||||
migratedFrom = 1
|
||||
case 2:
|
||||
if v3, err = m.migrateV2Thresholds(); err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
}
|
||||
version = 3
|
||||
migratedFrom = 2
|
||||
}
|
||||
if version == current[utils.Thresholds] || err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err == utils.ErrNoMoreData || !migrated {
|
||||
break
|
||||
}
|
||||
if err = m.migrateCurrentThresholds(); err != nil {
|
||||
return err
|
||||
|
||||
if !m.dryRun && migrated {
|
||||
//set threshond
|
||||
switch migratedFrom {
|
||||
case 1:
|
||||
if err := m.dmOut.DataManager().SetFilter(filter, true); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := m.dmOut.DataManager().SetThreshold(th); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := m.dmOut.DataManager().SetThresholdProfile(thp, true); err != nil {
|
||||
return err
|
||||
}
|
||||
case 2:
|
||||
if err = m.dmOut.DataManager().SetThresholdProfile(v3, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
case 1:
|
||||
if err = m.migrateV2ActionTriggers(); err != nil {
|
||||
return err
|
||||
}
|
||||
case 2:
|
||||
if err = m.migrateV2Thresholds(); err != nil {
|
||||
return err
|
||||
m.stats[utils.Thresholds]++
|
||||
}
|
||||
if m.dryRun || !migrated {
|
||||
return nil
|
||||
}
|
||||
// remove old threshonds
|
||||
if !m.sameDataDB && migratedFrom == 2 {
|
||||
if err = m.removeV2Thresholds(); err != nil && err != utils.ErrNoMoreData {
|
||||
return
|
||||
}
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs = engine.Versions{utils.Thresholds: engine.CurrentDataDBVersions()[utils.Thresholds]}
|
||||
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Thresholds version into dataDB", err.Error()))
|
||||
}
|
||||
return m.ensureIndexesDataDB(engine.ColTps)
|
||||
}
|
||||
|
||||
|
||||
@@ -345,7 +345,7 @@ func testTrsITMigrateAndMove(t *testing.T) {
|
||||
t.Error("Error when getting Thresholds ", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(tresProf2, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(tresProf2), utils.ToJSON(result))
|
||||
t.Errorf("Expecting: %+v,\nReceived: %+v", utils.ToJSON(tresProf2), utils.ToJSON(result))
|
||||
}
|
||||
|
||||
result, err = trsMigrator.dmOut.DataManager().GetThresholdProfile(tresProf3.Tenant, tresProf3.ID, false, false, utils.NonTransactional)
|
||||
@@ -353,7 +353,7 @@ func testTrsITMigrateAndMove(t *testing.T) {
|
||||
t.Error("Error when getting Thresholds ", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(tresProf3, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(tresProf3), utils.ToJSON(result))
|
||||
t.Errorf("Expecting: %+v,\nReceived: %+v", utils.ToJSON(tresProf3), utils.ToJSON(result))
|
||||
}
|
||||
|
||||
case utils.Move:
|
||||
|
||||
Reference in New Issue
Block a user