From 3e004016c5ec5050e608869615cb0a74ce17c62f Mon Sep 17 00:00:00 2001 From: adragusin Date: Wed, 24 Jun 2020 18:00:21 +0300 Subject: [PATCH 1/3] Updated filters migration --- migrator/filters.go | 210 ++++++++++++++++++++++++++------------------ 1 file changed, 123 insertions(+), 87 deletions(-) diff --git a/migrator/filters.go b/migrator/filters.go index 074096d99..abcda5d73 100644 --- a/migrator/filters.go +++ b/migrator/filters.go @@ -191,23 +191,7 @@ func migrateInlineFilterV2(fl string) string { return fmt.Sprintf("%s::~%s", ruleSplt[0], utils.MetaReq+utils.NestingSep+strings.Join(ruleSplt[2:], utils.InInFieldSep)) } -func (m *Migrator) migrateRequestFilterV1() (err error) { - for { - fl, err := m.dmIN.getV1Filter() - if err != nil && err != utils.ErrNoMoreData { - return err - } - if err == utils.ErrNoMoreData { - break - } - if m.dryRun || fl == nil { - continue - } - if err := m.dmOut.DataManager().SetFilter(migrateFilterV1(fl), true); err != nil { - return err - } - m.stats[utils.RQF]++ - } +func (m *Migrator) migrateOthersv1() (err error) { if err = m.migrateResourceProfileFiltersV1(); err != nil { return err } @@ -229,34 +213,26 @@ func (m *Migrator) migrateRequestFilterV1() (err error) { if err = m.migrateDispatcherProfileFiltersV1(); err != nil { return err } - vrs := engine.Versions{utils.RQF: engine.CurrentDataDBVersions()[utils.RQF]} - if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating Filters version into dataDB", err.Error())) - } return } -func (m *Migrator) migrateRequestFilterV2() (err error) { - for { - fl, err := m.dmIN.getV1Filter() - if err != nil && err != utils.ErrNoMoreData { - return err - } - if err == utils.ErrNoMoreData { - break - } - if m.dryRun || fl == nil { - continue - } - if err := m.dmOut.DataManager().SetFilter(migrateFilterV2(fl), true); err != nil { - return fmt.Errorf("Error: <%s> when setting filter with tenant: <%s> and id: <%s> after migration", - err.Error(), fl.Tenant, fl.ID) - } - m.stats[utils.RQF]++ +func (m *Migrator) migrateRequestFilterV1() (fltr *engine.Filter, err error) { + // var v1Fltr *v1Filter + v1Fltr, err := m.dmIN.getV1Filter() + fmt.Println("inside migrateRequestFilterV1 ================ err: ", err) + fmt.Println("inside migrateRequestFilterV1 ================ fltr: ", utils.ToJSON(v1Fltr)) + if err != nil { + fmt.Println("err!=nil: ", err) + return } + if v1Fltr == nil { + return nil, utils.ErrNoMoreData + } + fltr = migrateFilterV1(v1Fltr) + return +} + +func (m *Migrator) migrateOthersV2() (err error) { if err = m.migrateResourceProfileFiltersV2(); err != nil { return fmt.Errorf("Error: <%s> when trying to migrate filter for ResourceProfiles", err.Error()) @@ -285,41 +261,30 @@ func (m *Migrator) migrateRequestFilterV2() (err error) { return fmt.Errorf("Error: <%s> when trying to migrate filter for DispatcherProfiles", err.Error()) } - vrs := engine.Versions{utils.RQF: engine.CurrentDataDBVersions()[utils.RQF]} - if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating Filters version into dataDB", err.Error())) - } return } -func (m *Migrator) migrateRequestFilterV3() (err error) { - for { - fl, err := m.dmIN.getV1Filter() - if err != nil && err != utils.ErrNoMoreData { - return err - } - if err == utils.ErrNoMoreData { - break - } - if m.dryRun || fl == nil { - continue - } - if err := m.dmOut.DataManager().SetFilter(migrateFilterV3(fl), true); err != nil { - return fmt.Errorf("Error: <%s> when setting filter with tenant: <%s> and id: <%s> after migration", - err.Error(), fl.Tenant, fl.ID) - } - m.stats[utils.RQF]++ +func (m *Migrator) migrateRequestFilterV2() (fltr *engine.Filter, err error) { + var v1Fltr *v1Filter + if v1Fltr, err = m.dmIN.getV1Filter(); err != nil { + return nil, err } - vrs := engine.Versions{utils.RQF: engine.CurrentDataDBVersions()[utils.RQF]} - if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating Filters version into dataDB", err.Error())) + if err == utils.ErrNoMoreData { + return nil, nil } + fltr = migrateFilterV2(v1Fltr) + return +} + +func (m *Migrator) migrateRequestFilterV3() (fltr *engine.Filter, err error) { + var v1Fltr *v1Filter + if v1Fltr, err = m.dmIN.getV1Filter(); err != nil { + return nil, err + } + if v1Fltr == nil { + return + } + fltr = migrateFilterV3(v1Fltr) return } @@ -338,26 +303,97 @@ func (m *Migrator) migrateFilters() (err error) { utils.UndefinedVersion, "version number is not defined for ActionTriggers model") } - switch vrs[utils.RQF] { - case 3: - if err = m.migrateRequestFilterV3(); err != nil { - return err + migrated := true + migratedFrom := 0 + var fltr *engine.Filter + for { + fmt.Println("first_for") + version := vrs[utils.RQF] + for { + fmt.Println("second_for\n\tversion = ", version) + switch version { + case current[utils.RQF]: + migrated = false + if m.sameDataDB { + break + } + if err = m.migrateCurrentRequestFilter(); err != nil { + return err + } + case 1: + fmt.Println("case 1 :AICI") + if fltr, err = m.migrateRequestFilterV1(); err != nil && err != utils.ErrNoMoreData { + return err + } + migratedFrom = 1 + version = 4 + case 2: + fmt.Println("case 2: AICI") + if fltr, err = m.migrateRequestFilterV2(); err != nil && err != utils.ErrNoMoreData { + return err + } + migratedFrom = 2 + version = 4 + case 3: + fmt.Println("case 3: AICI") + if fltr, err = m.migrateRequestFilterV3(); err != nil && err != utils.ErrNoMoreData { + return err + } + migratedFrom = 3 + version = 4 + } + fmt.Println("end_switch_statement\n\tversion = ", version) + fmt.Println("current[utils.RQF]: ", current[utils.RQF]) + if version == current[utils.RQF] || err == utils.ErrNoMoreData { + break + } } - case 2: - if err = m.migrateRequestFilterV2(); err != nil { - return err - } - case 1: - if err = m.migrateRequestFilterV1(); err != nil { - return err - } - case current[utils.RQF]: - if m.sameDataDB { + fmt.Println("err: ", err) + fmt.Println("!migrated: ", !migrated) + if err == utils.ErrNoMoreData || !migrated { break } - if err = m.migrateCurrentRequestFilter(); err != nil { - return err + fmt.Println("before this check !m.dryRun && migrated") + if !m.dryRun && migrated { + //set filters + switch migratedFrom { + case 1: + fmt.Println("SET___case 1 :AICI") + if err := m.migrateOthersv1(); err != nil { + return err + } + if err := m.dmOut.DataManager().SetFilter(fltr, true); err != nil { + return fmt.Errorf("Error: <%s> when setting filter with tenant: <%s> and id: <%s> after migration", + err.Error(), fltr.Tenant, fltr.ID) + } + case 2: + fmt.Println("SET___case 2: AICI") + if err := m.migrateOthersV2(); err != nil { + return err + } + if err := m.dmOut.DataManager().SetFilter(fltr, true); err != nil { + return fmt.Errorf("Error: <%s> when setting filter with tenant: <%s> and id: <%s> after migration", + err.Error(), fltr.Tenant, fltr.ID) + } + case 3: + fmt.Println("SET___case 3: AICI") + if err := m.dmOut.DataManager().SetFilter(fltr, true); err != nil { + return fmt.Errorf("Error: <%s> when setting filter with tenant: <%s> and id: <%s> after migration", + err.Error(), fltr.Tenant, fltr.ID) + } + } } + m.stats[utils.RQF]++ + } + if m.dryRun || !migrated { + return nil + } + vrs = engine.Versions{utils.RQF: engine.CurrentDataDBVersions()[utils.RQF]} + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when updating Filters version into dataDB", err.Error())) } return m.ensureIndexesDataDB(engine.ColFlt) } From f1ff03d7c24e3afc095f8de8562690b235d2320c Mon Sep 17 00:00:00 2001 From: adragusin Date: Thu, 25 Jun 2020 14:44:44 +0300 Subject: [PATCH 2/3] Finished filter migration --- migrator/filters.go | 48 ++++++++++++++++++--------------------------- 1 file changed, 19 insertions(+), 29 deletions(-) diff --git a/migrator/filters.go b/migrator/filters.go index abcda5d73..25b625a5a 100644 --- a/migrator/filters.go +++ b/migrator/filters.go @@ -45,11 +45,11 @@ func (m *Migrator) migrateCurrentRequestFilter() (err error) { if m.dryRun || fl == nil { continue } - if err := m.dmIN.DataManager().RemoveFilter(tntID[0], tntID[1], - utils.NonTransactional, true); err != nil { + if err := m.dmOut.DataManager().SetFilter(fl, true); err != nil { return err } - if err := m.dmOut.DataManager().SetFilter(fl, true); err != nil { + if err := m.dmIN.DataManager().RemoveFilter(tntID[0], tntID[1], + utils.NonTransactional, true); err != nil { return err } m.stats[utils.RQF]++ @@ -217,16 +217,12 @@ func (m *Migrator) migrateOthersv1() (err error) { } func (m *Migrator) migrateRequestFilterV1() (fltr *engine.Filter, err error) { - // var v1Fltr *v1Filter - v1Fltr, err := m.dmIN.getV1Filter() - fmt.Println("inside migrateRequestFilterV1 ================ err: ", err) - fmt.Println("inside migrateRequestFilterV1 ================ fltr: ", utils.ToJSON(v1Fltr)) - if err != nil { - fmt.Println("err!=nil: ", err) + var v1Fltr *v1Filter + if v1Fltr, err = m.dmIN.getV1Filter(); err != nil { return } if v1Fltr == nil { - return nil, utils.ErrNoMoreData + return } fltr = migrateFilterV1(v1Fltr) return @@ -307,10 +303,8 @@ func (m *Migrator) migrateFilters() (err error) { migratedFrom := 0 var fltr *engine.Filter for { - fmt.Println("first_for") version := vrs[utils.RQF] for { - fmt.Println("second_for\n\tversion = ", version) switch version { case current[utils.RQF]: migrated = false @@ -320,63 +314,47 @@ func (m *Migrator) migrateFilters() (err error) { if err = m.migrateCurrentRequestFilter(); err != nil { return err } + version = 4 case 1: - fmt.Println("case 1 :AICI") if fltr, err = m.migrateRequestFilterV1(); err != nil && err != utils.ErrNoMoreData { return err } migratedFrom = 1 version = 4 case 2: - fmt.Println("case 2: AICI") if fltr, err = m.migrateRequestFilterV2(); err != nil && err != utils.ErrNoMoreData { return err } migratedFrom = 2 version = 4 case 3: - fmt.Println("case 3: AICI") if fltr, err = m.migrateRequestFilterV3(); err != nil && err != utils.ErrNoMoreData { return err } migratedFrom = 3 version = 4 } - fmt.Println("end_switch_statement\n\tversion = ", version) - fmt.Println("current[utils.RQF]: ", current[utils.RQF]) if version == current[utils.RQF] || err == utils.ErrNoMoreData { break } } - fmt.Println("err: ", err) - fmt.Println("!migrated: ", !migrated) if err == utils.ErrNoMoreData || !migrated { break } - fmt.Println("before this check !m.dryRun && migrated") if !m.dryRun && migrated { //set filters switch migratedFrom { case 1: - fmt.Println("SET___case 1 :AICI") - if err := m.migrateOthersv1(); err != nil { - return err - } if err := m.dmOut.DataManager().SetFilter(fltr, true); err != nil { return fmt.Errorf("Error: <%s> when setting filter with tenant: <%s> and id: <%s> after migration", err.Error(), fltr.Tenant, fltr.ID) } case 2: - fmt.Println("SET___case 2: AICI") - if err := m.migrateOthersV2(); err != nil { - return err - } if err := m.dmOut.DataManager().SetFilter(fltr, true); err != nil { return fmt.Errorf("Error: <%s> when setting filter with tenant: <%s> and id: <%s> after migration", err.Error(), fltr.Tenant, fltr.ID) } case 3: - fmt.Println("SET___case 3: AICI") if err := m.dmOut.DataManager().SetFilter(fltr, true); err != nil { return fmt.Errorf("Error: <%s> when setting filter with tenant: <%s> and id: <%s> after migration", err.Error(), fltr.Tenant, fltr.ID) @@ -388,6 +366,18 @@ func (m *Migrator) migrateFilters() (err error) { if m.dryRun || !migrated { return nil } + + switch migratedFrom { + case 1: + if err := m.migrateOthersv1(); err != nil { + return err + } + case 2: + if err := m.migrateOthersV2(); err != nil { + return err + } + } + vrs = engine.Versions{utils.RQF: engine.CurrentDataDBVersions()[utils.RQF]} if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, From 3d5fc613e38440f9f114c478b6e4299bf025e52d Mon Sep 17 00:00:00 2001 From: adragusin Date: Fri, 26 Jun 2020 18:10:40 +0300 Subject: [PATCH 3/3] Updated thresholds migration --- migrator/thresholds.go | 168 +++++++++++++++++---------------- migrator/thresholds_it_test.go | 4 +- 2 files changed, 87 insertions(+), 85 deletions(-) diff --git a/migrator/thresholds.go b/migrator/thresholds.go index 90b546939..14417b910 100644 --- a/migrator/thresholds.go +++ b/migrator/thresholds.go @@ -101,46 +101,15 @@ func (m *Migrator) migrateCurrentThresholds() (err error) { return } -func (m *Migrator) migrateV2ActionTriggers() (err error) { +func (m *Migrator) migrateV2ActionTriggers() (thp *engine.ThresholdProfile, th *engine.Threshold, filter *engine.Filter, err error) { var v2ACT *v2ActionTrigger - for { - v2ACT, err = m.dmIN.getV2ActionTrigger() - if err != nil && err != utils.ErrNoMoreData { - return err - } - if err == utils.ErrNoMoreData { - break - } - if v2ACT.ID != "" { - thp, th, filter, err := v2ACT.AsThreshold() - if err != nil { - return err - } - if m.dryRun { - continue - } - if err := m.dmOut.DataManager().SetFilter(filter, true); err != nil { - return err - } - if err := m.dmOut.DataManager().SetThreshold(th); err != nil { - return err - } - if err := m.dmOut.DataManager().SetThresholdProfile(thp, true); err != nil { - return err - } - m.stats[utils.Thresholds] += 1 - } - } - if m.dryRun { + if v2ACT, err = m.dmIN.getV2ActionTrigger(); err != nil { return } - // All done, update version wtih current one - vrs := engine.Versions{utils.Thresholds: engine.CurrentStorDBVersions()[utils.Thresholds]} - if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating Thresholds version into dataDB", err.Error())) + if v2ACT.ID != "" { + if thp, th, filter, err = v2ACT.AsThreshold(); err != nil { + return + } } return } @@ -162,50 +131,22 @@ func (m *Migrator) removeV2Thresholds() (err error) { return } -func (m *Migrator) migrateV2Thresholds() (err error) { +func (m *Migrator) migrateV2Thresholds() (v3 *engine.ThresholdProfile, err error) { var v2T *v2Threshold - for { - v2T, err = m.dmIN.getV2ThresholdProfile() - if err != nil && err != utils.ErrNoMoreData { - return err - } - if err == utils.ErrNoMoreData { - break - } - if v2T == nil || m.dryRun { - continue - } - th := v2T.V2toV3Threshold() - - if err = m.dmOut.DataManager().SetThresholdProfile(th, true); err != nil { - return err - } - m.stats[utils.Thresholds] += 1 - } - if m.dryRun { + if v2T, err = m.dmIN.getV2ThresholdProfile(); err != nil { return } - - if !m.sameDataDB { - if err = m.removeV2Thresholds(); err != nil && err != utils.ErrNoMoreData { - return - } - } - // All done, update version wtih current one - vrs := engine.Versions{utils.Thresholds: engine.CurrentDataDBVersions()[utils.Thresholds]} - if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating Thresholds version into dataDB", err.Error())) + if v2T == nil { + return } + v3 = v2T.V2toV3Threshold() return } func (m *Migrator) migrateThresholds() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmIN.DataManager().DataDB().GetVersions("") + vrs, err = m.dmIN.DataManager().DataDB().GetVersions(utils.EmptyString) if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, @@ -217,23 +158,84 @@ func (m *Migrator) migrateThresholds() (err error) { utils.UndefinedVersion, "version number is not defined for ActionTriggers model") } - switch vrs[utils.Thresholds] { - case current[utils.Thresholds]: - if m.sameDataDB { + migrated := true + migratedFrom := 0 + var thp *engine.ThresholdProfile + var th *engine.Threshold + var filter *engine.Filter + var v3 *engine.ThresholdProfile + for { + version := vrs[utils.Thresholds] + for { + switch version { + case current[utils.Thresholds]: + migrated = false + if m.sameDataDB { + break + } + if err = m.migrateCurrentThresholds(); err != nil { + return err + } + case 1: + if thp, th, filter, err = m.migrateV2ActionTriggers(); err != nil && err != utils.ErrNoMoreData { + return err + } + version = 3 + migratedFrom = 1 + case 2: + if v3, err = m.migrateV2Thresholds(); err != nil && err != utils.ErrNoMoreData { + return err + } + version = 3 + migratedFrom = 2 + } + if version == current[utils.Thresholds] || err == utils.ErrNoMoreData { + break + } + } + if err == utils.ErrNoMoreData || !migrated { break } - if err = m.migrateCurrentThresholds(); err != nil { - return err + + if !m.dryRun && migrated { + //set threshond + switch migratedFrom { + case 1: + if err := m.dmOut.DataManager().SetFilter(filter, true); err != nil { + return err + } + if err := m.dmOut.DataManager().SetThreshold(th); err != nil { + return err + } + if err := m.dmOut.DataManager().SetThresholdProfile(thp, true); err != nil { + return err + } + case 2: + if err = m.dmOut.DataManager().SetThresholdProfile(v3, true); err != nil { + return err + } + } + } - case 1: - if err = m.migrateV2ActionTriggers(); err != nil { - return err - } - case 2: - if err = m.migrateV2Thresholds(); err != nil { - return err + m.stats[utils.Thresholds]++ + } + if m.dryRun || !migrated { + return nil + } + // remove old threshonds + if !m.sameDataDB && migratedFrom == 2 { + if err = m.removeV2Thresholds(); err != nil && err != utils.ErrNoMoreData { + return } } + // All done, update version wtih current one + vrs = engine.Versions{utils.Thresholds: engine.CurrentDataDBVersions()[utils.Thresholds]} + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when updating Thresholds version into dataDB", err.Error())) + } return m.ensureIndexesDataDB(engine.ColTps) } diff --git a/migrator/thresholds_it_test.go b/migrator/thresholds_it_test.go index 8b09c552a..921b2b11b 100644 --- a/migrator/thresholds_it_test.go +++ b/migrator/thresholds_it_test.go @@ -345,7 +345,7 @@ func testTrsITMigrateAndMove(t *testing.T) { t.Error("Error when getting Thresholds ", err.Error()) } if !reflect.DeepEqual(tresProf2, result) { - t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(tresProf2), utils.ToJSON(result)) + t.Errorf("Expecting: %+v,\nReceived: %+v", utils.ToJSON(tresProf2), utils.ToJSON(result)) } result, err = trsMigrator.dmOut.DataManager().GetThresholdProfile(tresProf3.Tenant, tresProf3.ID, false, false, utils.NonTransactional) @@ -353,7 +353,7 @@ func testTrsITMigrateAndMove(t *testing.T) { t.Error("Error when getting Thresholds ", err.Error()) } if !reflect.DeepEqual(tresProf3, result) { - t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(tresProf3), utils.ToJSON(result)) + t.Errorf("Expecting: %+v,\nReceived: %+v", utils.ToJSON(tresProf3), utils.ToJSON(result)) } case utils.Move: