From 547024a7304d9d619b545ff14f4b4703490e6e42 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 8 Jul 2020 17:54:05 +0300 Subject: [PATCH] Added migration for *rsr filters --- engine/version.go | 10 +- migrator/accounts.go | 21 ++-- migrator/action.go | 19 ++-- migrator/action_plan.go | 44 ++++----- migrator/action_trigger.go | 25 ++--- migrator/alias.go | 8 +- migrator/attributes_it_test.go | 46 ++++----- migrator/cdrs.go | 18 ++-- migrator/chargers.go | 61 ++++++++++-- migrator/derived_chargers.go | 16 +-- migrator/derived_chargers_it_test.go | 6 +- migrator/derived_chargers_test.go | 6 +- migrator/destinations.go | 41 ++++++-- migrator/dispatchers.go | 69 +++++++++++-- migrator/filters.go | 84 +++++++++++----- migrator/filters_it_test.go | 14 +-- migrator/filters_test.go | 94 ++++++++++++++++++ migrator/load_ids.go | 2 +- migrator/migrator_datadb.go | 7 ++ migrator/rating_plan.go | 40 ++++++-- migrator/rating_profile.go | 40 ++++++-- migrator/resource.go | 39 ++++++-- migrator/routes.go | 67 +++++++++++-- migrator/session_costs.go | 6 +- migrator/sharedgroup.go | 9 +- migrator/stats.go | 52 +++++++--- migrator/storage_map_datadb.go | 20 ++++ migrator/storage_mongo_datadb.go | 95 ++++++++++++++++++ migrator/storage_redis.go | 140 ++++++++++++++++++++++++++- migrator/thresholds.go | 64 +++++++----- migrator/timings.go | 7 +- migrator/tp_account_actions.go | 2 +- migrator/tp_action_plans.go | 2 +- 33 files changed, 932 insertions(+), 242 deletions(-) diff --git a/engine/version.go b/engine/version.go index 54de38a26..04df8f59f 100644 --- a/engine/version.go +++ b/engine/version.go @@ -144,14 +144,14 @@ func (vers Versions) Compare(curent Versions, storType string, isDataDB bool) st // CurrentDataDBVersions returns the needed DataDB versions func CurrentDataDBVersions() Versions { return Versions{ - utils.StatS: 3, + utils.StatS: 4, utils.Accounts: 3, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 3, utils.SharedGroups: 2, - utils.Thresholds: 3, - utils.Routes: 1, + utils.Thresholds: 4, + utils.Routes: 2, utils.Attributes: 6, utils.Timing: 1, utils.RQF: 5, @@ -161,8 +161,8 @@ func CurrentDataDBVersions() Versions { utils.ReverseDestinations: 1, utils.RatingPlan: 1, utils.RatingProfile: 1, - utils.Chargers: 1, - utils.Dispatchers: 1, + utils.Chargers: 2, + utils.Dispatchers: 2, utils.LoadIDsVrs: 1, utils.RateProfiles: 1, } diff --git a/migrator/accounts.go b/migrator/accounts.go index 1bdd1fc68..2fdbe5ae7 100755 --- a/migrator/accounts.go +++ b/migrator/accounts.go @@ -20,6 +20,7 @@ package migrator import ( "errors" + "fmt" "log" "strings" "time" @@ -55,7 +56,7 @@ func (m *Migrator) migrateCurrentAccounts() (err error) { if err := m.dmIN.DataManager().RemoveAccount(idg); err != nil { return err } - m.stats[utils.Accounts] += 1 + m.stats[utils.Accounts]++ } return } @@ -129,34 +130,34 @@ func (m *Migrator) migrateAccounts() (err error) { var v3Acnt *engine.Account for { version := vrs[utils.Accounts] + migratedFrom = int(version) for { switch version { + default: + return fmt.Errorf("Unsupported version %v", version) case current[utils.Accounts]: + migrated = false if m.sameDataDB { - migrated = false break } if err = m.migrateCurrentAccounts(); err != nil { - return err + return } version = 3 - migrated = false case 1: //migrate v1 to v3 if v3Acnt, err = m.migrateV1Accounts(); err != nil && err != utils.ErrNoMoreData { return err } else if err == utils.ErrNoMoreData { break } - migratedFrom = 1 version = 3 case 2: //migrate v2 to v3 if v3Acnt, err = m.migrateV2Accounts(); err != nil && err != utils.ErrNoMoreData { - return err + return } else if err == utils.ErrNoMoreData { break } version = 3 - migratedFrom = 2 } if version == current[utils.Accounts] || err == utils.ErrNoMoreData { break @@ -166,12 +167,12 @@ func (m *Migrator) migrateAccounts() (err error) { break } - if !m.dryRun && migrated { + if !m.dryRun { if err = m.dmOut.DataManager().SetAccount(v3Acnt); err != nil { - return err + return } } - m.stats[utils.Accounts] += 1 + m.stats[utils.Accounts]++ } if m.dryRun || !migrated { diff --git a/migrator/action.go b/migrator/action.go index 259c90710..5bd4d9ddc 100644 --- a/migrator/action.go +++ b/migrator/action.go @@ -19,6 +19,7 @@ along with this program. If not, see package migrator import ( + "fmt" "strings" "github.com/cgrates/cgrates/engine" @@ -56,7 +57,7 @@ func (m *Migrator) migrateCurrentActions() (err error) { if err := m.dmOut.DataManager().SetActions(idg, acts, utils.NonTransactional); err != nil { return err } - m.stats[utils.Actions] += 1 + m.stats[utils.Actions]++ } return } @@ -106,17 +107,19 @@ func (m *Migrator) migrateActions() (err error) { version := vrs[utils.Actions] for { switch version { + default: + return fmt.Errorf("Unsupported version %v", version) case current[utils.Actions]: migrated = false if m.sameDataDB { break } if err = m.migrateCurrentActions(); err != nil { - return err + return } case 1: if acts, err = m.migrateV1Actions(); err != nil && err != utils.ErrNoMoreData { - return err + return } version = 2 } @@ -127,12 +130,12 @@ func (m *Migrator) migrateActions() (err error) { if err == utils.ErrNoMoreData || !migrated { break } - if !m.dryRun && migrated { - if err := m.dmOut.DataManager().SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil { - return err + if !m.dryRun { + if err = m.dmOut.DataManager().SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil { + return } } - m.stats[utils.Actions] += 1 + m.stats[utils.Actions]++ } if m.dryRun || !migrated { @@ -142,7 +145,7 @@ func (m *Migrator) migrateActions() (err error) { // All done, update version wtih current one if err = m.setVersions(utils.Actions); err != nil { - return err + return } return m.ensureIndexesDataDB(engine.ColAct) diff --git a/migrator/action_plan.go b/migrator/action_plan.go index 13339c9c7..9fb992f23 100644 --- a/migrator/action_plan.go +++ b/migrator/action_plan.go @@ -51,24 +51,24 @@ func (m *Migrator) migrateCurrentActionPlans() (err error) { var ids []string ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ACTION_PLAN_PREFIX) if err != nil { - return err + return } for _, id := range ids { idg := strings.TrimPrefix(id, utils.ACTION_PLAN_PREFIX) - acts, err := m.dmIN.DataManager().GetActionPlan(idg, true, utils.NonTransactional) - if err != nil { - return err + var acts *engine.ActionPlan + if acts, err = m.dmIN.DataManager().GetActionPlan(idg, true, utils.NonTransactional); err != nil { + return } if acts == nil || m.dryRun { continue } - if err := m.dmOut.DataManager().SetActionPlan(idg, acts, true, utils.NonTransactional); err != nil { - return err + if err = m.dmOut.DataManager().SetActionPlan(idg, acts, true, utils.NonTransactional); err != nil { + return } - if err := m.dmIN.DataManager().RemoveActionPlan(idg, utils.NonTransactional); err != nil { - return err + if err = m.dmIN.DataManager().RemoveActionPlan(idg, utils.NonTransactional); err != nil { + return } - m.stats[utils.ActionPlans] += 1 + m.stats[utils.ActionPlans]++ } return } @@ -76,13 +76,13 @@ func (m *Migrator) removeV1ActionPlans() (err error) { var v1 *v1ActionPlans for { if v1, err = m.dmIN.getV1ActionPlans(); err != nil && err != utils.ErrNoMoreData { - return err + return } if v1 == nil { return nil } if err = m.dmIN.remV1ActionPlans(v1); err != nil { - return err + return } } } @@ -111,7 +111,7 @@ func (m *Migrator) migrateActionPlans() (err error) { return fmt.Errorf("Storage type %s could not be cated to <*engine.RedisStorage>", m.dmIN.DataManager().DataDB().GetStorageType()) } if err = redisDB.RebbuildActionPlanKeys(); err != nil { - return err + return } } migrated := true @@ -119,21 +119,23 @@ func (m *Migrator) migrateActionPlans() (err error) { var v3 []*engine.ActionPlan for { version := vrs[utils.ActionPlans] + migratedFrom = int(version) for { switch version { + default: + return fmt.Errorf("Unsupported version %v", version) case current[utils.ActionPlans]: migrated = false if m.sameDataDB { break } if err = m.migrateCurrentActionPlans(); err != nil && err != utils.ErrNoMoreData { - return err + return } case 1: if v3, err = m.migrateV1ActionPlans(); err != nil && err != utils.ErrNoMoreData { - return err + return } - migratedFrom = 1 version = 3 case 2: // neded to rebuild action plan indexes for redis // All done, update version wtih current one @@ -144,7 +146,6 @@ func (m *Migrator) migrateActionPlans() (err error) { err.Error(), fmt.Sprintf("error: <%s> when updating ActionPlans version into dataDB", err.Error())) } - migratedFrom = 2 version = 3 } if version == current[utils.ActionPlans] || err == utils.ErrNoMoreData { @@ -155,11 +156,11 @@ func (m *Migrator) migrateActionPlans() (err error) { break } - if !m.dryRun && migrated { + if !m.dryRun { //set action plan for _, ap := range v3 { if err = m.dmOut.DataManager().SetActionPlan(ap.Id, ap, true, utils.NonTransactional); err != nil { - return err + return } } } @@ -170,8 +171,7 @@ func (m *Migrator) migrateActionPlans() (err error) { } // remove old action plans if !m.sameDataDB { - switch migratedFrom { - case 1: + if migratedFrom == 1 { if err = m.removeV1ActionPlans(); err != nil { return } @@ -186,8 +186,8 @@ func (m *Migrator) migrateActionPlans() (err error) { } func (v1AP v1ActionPlan) AsActionPlan() (ap *engine.ActionPlan) { - for idx, actionId := range v1AP.AccountIds { - idElements := strings.Split(actionId, "_") + for idx, actionID := range v1AP.AccountIds { + idElements := strings.Split(actionID, "_") if len(idElements) != 2 { continue } diff --git a/migrator/action_trigger.go b/migrator/action_trigger.go index 4de981558..7287e8a1e 100644 --- a/migrator/action_trigger.go +++ b/migrator/action_trigger.go @@ -19,6 +19,7 @@ along with this program. If not, see package migrator import ( + "fmt" "strings" "time" @@ -120,22 +121,23 @@ func (m *Migrator) migrateActionTriggers() (err error) { var v2 engine.ActionTriggers for { version := vrs[utils.ActionTriggers] + migratedFrom = int(version) for { switch version { + default: + return fmt.Errorf("Unsupported version %v", version) case current[utils.ActionTriggers]: + migrated = false if m.sameDataDB { - migrated = false break } if err = m.migrateCurrentActionTrigger(); err != nil { - return err + return } - migrated = false case 1: if v2, err = m.migrateV1ActionTrigger(); err != nil && err != utils.ErrNoMoreData { - return err + return } - migratedFrom = 1 version = 2 } if version == current[utils.ActionTriggers] || err == utils.ErrNoMoreData { @@ -145,21 +147,20 @@ func (m *Migrator) migrateActionTriggers() (err error) { if err == utils.ErrNoMoreData || !migrated { break } - if !m.dryRun && migrated { + if !m.dryRun { //set action triggers - if err := m.dmOut.DataManager().SetActionTriggers(v2[0].ID, v2, utils.NonTransactional); err != nil { - return err + if err = m.dmOut.DataManager().SetActionTriggers(v2[0].ID, v2, utils.NonTransactional); err != nil { + return } } - m.stats[utils.ActionTriggers] += 1 + m.stats[utils.ActionTriggers]++ } if m.dryRun || !migrated { return nil } // remove old action triggers if !m.sameDataDB { - switch migratedFrom { - case 1: + if migratedFrom == 1 { if err = m.removeV1ActionTriggers(); err != nil { return } @@ -168,7 +169,7 @@ func (m *Migrator) migrateActionTriggers() (err error) { // All done, update version wtih current one if err = m.setVersions(utils.ActionTriggers); err != nil { - return err + return } return m.ensureIndexesDataDB(engine.ColAtr) diff --git a/migrator/alias.go b/migrator/alias.go index b47a42261..a5a11091e 100644 --- a/migrator/alias.go +++ b/migrator/alias.go @@ -40,8 +40,8 @@ type v1Alias struct { } var ( - ALIASES_PREFIX = "als_" - Alias = "Alias" + AliasesPrefix = "als_" + Alias = "Alias" ) type v1AliasValues []*v1AliasValue @@ -184,7 +184,7 @@ func (m *Migrator) migrateAlias2Attributes() (err error) { if err := m.dmOut.DataManager().SetAttributeProfile(attr, true); err != nil { return err } - m.stats[Alias] += 1 + m.stats[Alias]++ } if m.dryRun { return @@ -207,7 +207,7 @@ func (m *Migrator) migrateAlias2Attributes() (err error) { func (m *Migrator) migrateAlias() (err error) { if err = m.migrateAlias2Attributes(); err != nil { - return err + return } return m.ensureIndexesDataDB(engine.ColAttr) } diff --git a/migrator/attributes_it_test.go b/migrator/attributes_it_test.go index 088383065..eff9af41f 100755 --- a/migrator/attributes_it_test.go +++ b/migrator/attributes_it_test.go @@ -273,7 +273,7 @@ func testAttrITMigrateOnlyVersion(t *testing.T) { if vrs, err := attrMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil { t.Error(err) - } else if vrs[utils.Attributes] != 5 { + } else if vrs[utils.Attributes] != 6 { t.Errorf("Unexpected version returned: %d", vrs[utils.Attributes]) } if attrMigrator.stats[utils.Attributes] != 0 { @@ -365,7 +365,7 @@ func testAttrITMigrateAndMove(t *testing.T) { if vrs, err := attrMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil { t.Error(err) - } else if vrs[utils.Attributes] != 5 { + } else if vrs[utils.Attributes] != 6 { t.Errorf("Unexpected version returned: %d", vrs[utils.Attributes]) } result, err := attrMigrator.dmOut.DataManager().GetAttributeProfile("cgrates.org", @@ -457,7 +457,7 @@ func testAttrITMigrateV2(t *testing.T) { ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), }, Attributes: []*v2Attribute{ - &v2Attribute{ + { FieldName: "FL1", Initial: "In1", Substitute: config.NewRSRParsersMustCompile("Al1", utils.INFIELD_SEP), @@ -510,7 +510,7 @@ func testAttrITMigrateV2(t *testing.T) { if vrs, err := attrMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil { t.Error(err) - } else if vrs[utils.Attributes] != 5 { + } else if vrs[utils.Attributes] != 6 { t.Errorf("Unexpected version returned: %d", vrs[utils.Attributes]) } result, err := attrMigrator.dmOut.DataManager().GetAttributeProfile("cgrates.org", @@ -540,7 +540,7 @@ func testAttrITMigrateV3(t *testing.T) { ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), }, Attributes: []*v3Attribute{ - &v3Attribute{ + { FilterIDs: []string{"*string:FL1:In1"}, FieldName: "FL1", Substitute: config.NewRSRParsersMustCompile("Al1", utils.INFIELD_SEP), @@ -592,7 +592,7 @@ func testAttrITMigrateV3(t *testing.T) { if vrs, err := attrMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil { t.Error(err) - } else if vrs[utils.Attributes] != 5 { + } else if vrs[utils.Attributes] != 6 { t.Errorf("Unexpected version returned: %d", vrs[utils.Attributes]) } result, err := attrMigrator.dmOut.DataManager().GetAttributeProfile("cgrates.org", @@ -625,7 +625,7 @@ func testAttrITMigrateV4(t *testing.T) { ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), }, Attributes: []*v4Attribute{ - &v4Attribute{ + { FilterIDs: []string{"*string:~*req.FL1:In1"}, FieldName: "FL1", Value: config.NewRSRParsersMustCompile("~Category:s/(.*)/${1}_UK_Mobile_Vodafone_GBRVF/", utils.INFIELD_SEP), @@ -678,7 +678,7 @@ func testAttrITMigrateV4(t *testing.T) { if vrs, err := attrMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil { t.Error(err) - } else if vrs[utils.Attributes] != 5 { + } else if vrs[utils.Attributes] != 6 { t.Errorf("Unexpected version returned: %d", vrs[utils.Attributes]) } result, err := attrMigrator.dmOut.DataManager().GetAttributeProfile("cgrates.org", @@ -754,7 +754,7 @@ func testAttrITV1ToV5(t *testing.T) { ExpiryTime: time.Date(2020, 4, 18, 14, 25, 0, 0, time.UTC), }, Attributes: []*engine.Attribute{ - &engine.Attribute{ + { FilterIDs: []string{"*string:FL1:In1"}, Path: utils.MetaReq + utils.NestingSep + "FL1", Type: utils.MetaVariable, @@ -772,7 +772,7 @@ func testAttrITV1ToV5(t *testing.T) { ExpiryTime: time.Date(2020, 4, 18, 14, 25, 0, 0, time.UTC), }, Attributes: []*engine.Attribute{ - &engine.Attribute{ + { FilterIDs: []string{"*string:FL1:In1"}, Path: utils.MetaReq + utils.NestingSep + "FL1", Type: utils.MetaVariable, @@ -788,7 +788,7 @@ func testAttrITV1ToV5(t *testing.T) { // check the version if vrs, err := attrMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil { t.Error(err) - } else if vrs[utils.Attributes] != 5 { + } else if vrs[utils.Attributes] != 6 { t.Errorf("Unexpected version returned: %d", vrs[utils.Attributes]) } @@ -844,7 +844,7 @@ func testAttrITV2ToV5(t *testing.T) { ExpiryTime: time.Date(2020, 4, 18, 14, 25, 0, 0, time.UTC), }, Attributes: []*v2Attribute{ - &v2Attribute{ + { FieldName: "FL1", Initial: "In1", Substitute: sbstPrsr, @@ -863,7 +863,7 @@ func testAttrITV2ToV5(t *testing.T) { ExpiryTime: time.Date(2020, 4, 18, 14, 25, 0, 0, time.UTC), }, Attributes: []*v2Attribute{ - &v2Attribute{ + { FieldName: "FL1", Initial: "In1", Substitute: sbstPrsr, @@ -893,7 +893,7 @@ func testAttrITV2ToV5(t *testing.T) { ExpiryTime: time.Date(2020, 4, 18, 14, 25, 0, 0, time.UTC), }, Attributes: []*engine.Attribute{ - &engine.Attribute{ + { FilterIDs: []string{"*string:FL1:In1"}, Path: utils.MetaReq + utils.NestingSep + "FL1", Type: utils.MetaVariable, @@ -911,7 +911,7 @@ func testAttrITV2ToV5(t *testing.T) { ExpiryTime: time.Date(2020, 4, 18, 14, 25, 0, 0, time.UTC), }, Attributes: []*engine.Attribute{ - &engine.Attribute{ + { FilterIDs: []string{"*string:FL1:In1"}, Path: utils.MetaReq + utils.NestingSep + "FL1", Type: utils.MetaVariable, @@ -927,7 +927,7 @@ func testAttrITV2ToV5(t *testing.T) { //check the version if vrs, err := attrMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil { t.Error(err) - } else if vrs[utils.Attributes] != 5 { + } else if vrs[utils.Attributes] != 6 { t.Errorf("Unexpected version returned: %d", vrs[utils.Attributes]) } @@ -974,7 +974,7 @@ func testAttrITV3ToV5(t *testing.T) { ExpiryTime: time.Date(2020, 4, 18, 14, 25, 0, 0, time.UTC), }, Attributes: []*v3Attribute{ - &v3Attribute{ + { FieldName: "FL1", Substitute: sbstPrsr, FilterIDs: []string{"*string:FL1:In1"}, @@ -992,7 +992,7 @@ func testAttrITV3ToV5(t *testing.T) { ExpiryTime: time.Date(2020, 4, 18, 14, 25, 0, 0, time.UTC), }, Attributes: []*v3Attribute{ - &v3Attribute{ + { FieldName: "FL1", Substitute: sbstPrsr, FilterIDs: []string{"*string:FL1:In1"}, @@ -1021,7 +1021,7 @@ func testAttrITV3ToV5(t *testing.T) { ExpiryTime: time.Date(2020, 4, 18, 14, 25, 0, 0, time.UTC), }, Attributes: []*engine.Attribute{ - &engine.Attribute{ + { FilterIDs: []string{"*string:FL1:In1"}, Path: utils.MetaReq + utils.NestingSep + "FL1", Type: utils.MetaVariable, @@ -1039,7 +1039,7 @@ func testAttrITV3ToV5(t *testing.T) { ExpiryTime: time.Date(2020, 4, 18, 14, 25, 0, 0, time.UTC), }, Attributes: []*engine.Attribute{ - &engine.Attribute{ + { FilterIDs: []string{"*string:FL1:In1"}, Path: utils.MetaReq + utils.NestingSep + "FL1", Type: utils.MetaVariable, @@ -1055,7 +1055,7 @@ func testAttrITV3ToV5(t *testing.T) { //check the version if vrs, err := attrMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil { t.Error(err) - } else if vrs[utils.Attributes] != 5 { + } else if vrs[utils.Attributes] != 6 { t.Errorf("Unexpected version returned: %d", vrs[utils.Attributes]) } @@ -1112,7 +1112,7 @@ func testAttrITdryRunV2ToV5(t *testing.T) { ExpiryTime: time.Date(2020, 4, 18, 14, 25, 0, 0, time.UTC), }, Attributes: []*v2Attribute{ - &v2Attribute{ + { FieldName: "FL1", Initial: "In1", Substitute: sbstPrsr, @@ -1171,7 +1171,7 @@ func testAttrITdryRunV3ToV5(t *testing.T) { ExpiryTime: time.Date(2020, 4, 18, 14, 25, 0, 0, time.UTC), }, Attributes: []*v3Attribute{ - &v3Attribute{ + { FieldName: "FL1", Substitute: sbstPrsr, FilterIDs: []string{"*string:FL1:In1"}, diff --git a/migrator/cdrs.go b/migrator/cdrs.go index 7f11366c1..9fe05fdc4 100755 --- a/migrator/cdrs.go +++ b/migrator/cdrs.go @@ -20,6 +20,7 @@ package migrator import ( "encoding/json" + "fmt" "time" "github.com/cgrates/cgrates/engine" @@ -54,15 +55,20 @@ func (m *Migrator) migrateCDRs() (err error) { for { version := vrs[utils.CDRs] for { - switch vrs[utils.CDRs] { + switch version { + default: + return fmt.Errorf("Unsupported version %v", version) case current[utils.CDRs]: migrated = false + if m.sameStorDB { + break + } if err = m.migrateCurrentCDRs(); err != nil { - return err + return } case 1: if v2, err = m.migrateV1CDRs(); err != nil && err != utils.ErrNoMoreData { - return err + return } version = 2 } @@ -74,17 +80,17 @@ func (m *Migrator) migrateCDRs() (err error) { break } - if !m.dryRun && migrated { + if !m.dryRun { //set action plan if err = m.storDBOut.StorDB().SetCDR(v2, true); err != nil { - return err + return } } m.stats[utils.CDRs]++ } // All done, update version wtih current one if err = m.setVersions(utils.CDRs); err != nil { - return err + return } return m.ensureIndexesStorDB(engine.ColCDRs) } diff --git a/migrator/chargers.go b/migrator/chargers.go index 9b2be9cd5..5b83ed1b7 100755 --- a/migrator/chargers.go +++ b/migrator/chargers.go @@ -19,6 +19,7 @@ along with this program. If not, see package migrator import ( + "errors" "fmt" "strings" @@ -51,7 +52,7 @@ func (m *Migrator) migrateCurrentCharger() (err error) { tntID[1], utils.NonTransactional, false); err != nil { return err } - m.stats[utils.Chargers] += 1 + m.stats[utils.Chargers]++ } return } @@ -62,14 +63,62 @@ func (m *Migrator) migrateChargers() (err error) { if vrs, err = m.getVersions(utils.Chargers); err != nil { return } - switch vrs[utils.Chargers] { - case current[utils.Chargers]: - if m.sameDataDB { + migrated := true + var v2 *engine.ChargerProfile + for { + version := vrs[utils.Chargers] + for { + switch version { + default: + return fmt.Errorf("Unsupported version %v", version) + case current[utils.Chargers]: + migrated = false + if m.sameDataDB { + break + } + if err = m.migrateCurrentCharger(); err != nil { + return + } + case 1: + if v2, err = m.migrateV1ToV2Chargers(); err != nil && err != utils.ErrNoMoreData { + return + } else if err == utils.ErrNoMoreData { + break + } + version = 2 + } + if version == current[utils.Chargers] || err == utils.ErrNoMoreData { + break + } + } + if err == utils.ErrNoMoreData || !migrated { break } - if err = m.migrateCurrentCharger(); err != nil { - return err + + if !m.dryRun { + //set action plan + if err = m.dmOut.DataManager().SetChargerProfile(v2, true); err != nil { + return + } } + m.stats[utils.Chargers]++ + } + // All done, update version wtih current one + if err = m.setVersions(utils.Chargers); err != nil { + return } return m.ensureIndexesDataDB(engine.ColCpp) } + +func (m *Migrator) migrateV1ToV2Chargers() (v4Cpp *engine.ChargerProfile, err error) { + v4Cpp, err = m.dmIN.getV1ChargerProfile() + if err != nil { + return nil, err + } else if v4Cpp == nil { + return nil, errors.New("Charger NIL") + } + if v4Cpp.FilterIDs, err = migrateInlineFilterV4(v4Cpp.FilterIDs); err != nil { + return nil, err + } + return +} diff --git a/migrator/derived_chargers.go b/migrator/derived_chargers.go index 3c8313345..d28d7886f 100644 --- a/migrator/derived_chargers.go +++ b/migrator/derived_chargers.go @@ -141,7 +141,11 @@ func derivedChargers2Charger(dc *v1DerivedCharger, tenant string, key string, fi if strings.HasPrefix(filter, utils.DynamicDataPrefix) { filter = filter[1:] } - ch.FilterIDs = append(ch.FilterIDs, "*rsr::"+utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep+filter) + flt, err := migrateInlineFilterV4([]string{"*rsr::" + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + filter}) + if err != nil { + return + } + ch.FilterIDs = append(ch.FilterIDs, flt...) } return } @@ -193,10 +197,10 @@ func (m *Migrator) removeV1DerivedChargers() (err error) { break } if err != nil { - return err + return } if err = m.dmIN.remV1DerivedChargers(dck.Key); err != nil { - return err + return } } return @@ -210,13 +214,13 @@ func (m *Migrator) migrateV1DerivedChargers() (err error) { break } if err != nil { - return err + return } if dck == nil || m.dryRun { continue } if err = m.derivedChargers2Chargers(dck); err != nil { - return err + return } m.stats[utils.DerivedChargersV]++ @@ -240,7 +244,7 @@ func (m *Migrator) migrateV1DerivedChargers() (err error) { func (m *Migrator) migrateDerivedChargers() (err error) { if err = m.migrateV1DerivedChargers(); err != nil { - return err + return } return m.ensureIndexesDataDB(engine.ColCpp, engine.ColAttr) } diff --git a/migrator/derived_chargers_it_test.go b/migrator/derived_chargers_it_test.go index 58e27612f..f53774909 100644 --- a/migrator/derived_chargers_it_test.go +++ b/migrator/derived_chargers_it_test.go @@ -121,7 +121,7 @@ func testDCITMigrateAndMove(t *testing.T) { dcGetMapKeys = func(m utils.StringMap) (keys []string) { //make sure destination are in order keys = make([]string, len(m)) i := 0 - for k, _ := range m { + for k := range m { keys[i] = k i += 1 } @@ -135,7 +135,7 @@ func testDCITMigrateAndMove(t *testing.T) { Chargers: []*v1DerivedCharger{ { RunID: "RunID", - RunFilters: "~filterhdr1:s/(.+)/special_run3/", + RunFilters: "~filterhdr1(.+)", RequestTypeField: utils.MetaDefault, CategoryField: utils.MetaDefault, @@ -176,7 +176,7 @@ func testDCITMigrateAndMove(t *testing.T) { FilterIDs: []string{ "*destinations:~*req.Destination:1001;1002;1003", "*string:~*req.Account:1003", - "*rsr::~*req.filterhdr1:s/(.+)/special_run3/", + "*rsr:~*req.filterhdr1:.+", }, ActivationInterval: nil, RunID: "RunID", diff --git a/migrator/derived_chargers_test.go b/migrator/derived_chargers_test.go index 64a53e154..77ae648d4 100644 --- a/migrator/derived_chargers_test.go +++ b/migrator/derived_chargers_test.go @@ -222,7 +222,7 @@ func TestDerivedChargers2Charger(t *testing.T) { FilterIDs: []string{ "*string:~*req.Category:*voice1", "*string:~*req.Account:1001", - "*rsr::~*req.Header4:s/a/${1}b/{*duration_seconds&*round:2}(b&c)", + "*rsr:~*req.Header4:b;c", }, ActivationInterval: nil, RunID: "runID", @@ -233,7 +233,7 @@ func TestDerivedChargers2Charger(t *testing.T) { { DC: &v1DerivedCharger{ RunID: "runID2", - RunFilters: "^1003", + RunFilters: "~Account(^1003)", AccountField: "^1003", }, Tenant: defaultTenant, @@ -242,7 +242,7 @@ func TestDerivedChargers2Charger(t *testing.T) { Expected: &engine.ChargerProfile{ Tenant: defaultTenant, ID: "key2", - FilterIDs: []string{"*rsr::~*req.1003"}, + FilterIDs: []string{"*rsr:~*req.Account:^1003"}, ActivationInterval: nil, RunID: "runID2", AttributeIDs: make([]string, 0), diff --git a/migrator/destinations.go b/migrator/destinations.go index 2c1d1c60d..45dc82dff 100644 --- a/migrator/destinations.go +++ b/migrator/destinations.go @@ -44,7 +44,7 @@ func (m *Migrator) migrateCurrentDestinations() (err error) { if err := m.dmOut.DataManager().SetDestination(dst, utils.NonTransactional); err != nil { return err } - m.stats[utils.Destinations] += 1 + m.stats[utils.Destinations]++ } return } @@ -55,13 +55,40 @@ func (m *Migrator) migrateDestinations() (err error) { if vrs, err = m.getVersions(utils.Destinations); err != nil { return } - - switch vrs[utils.Destinations] { - case current[utils.Destinations]: - if m.sameDataDB { - return + migrated := true + for { + version := vrs[utils.Destinations] + for { + switch version { + default: + return fmt.Errorf("Unsupported version %v", version) + case current[utils.Destinations]: + migrated = false + if m.sameDataDB { + break + } + if err = m.migrateCurrentDestinations(); err != nil { + return + } + } + if version == current[utils.Destinations] || err == utils.ErrNoMoreData { + break + } } - return m.migrateCurrentDestinations() + if err == utils.ErrNoMoreData || !migrated { + break + } + + // if !m.dryRun { + // if err = m.dmIN.DataManager().SetDestination(v2, true); err != nil { + // return + // } + // } + m.stats[utils.Destinations]++ + } + // All done, update version wtih current one + if err = m.setVersions(utils.Destinations); err != nil { + return } return } diff --git a/migrator/dispatchers.go b/migrator/dispatchers.go index 7a079d158..4c63c60bf 100644 --- a/migrator/dispatchers.go +++ b/migrator/dispatchers.go @@ -19,6 +19,7 @@ along with this program. If not, see package migrator import ( + "errors" "fmt" "strings" @@ -30,7 +31,7 @@ func (m *Migrator) migrateCurrentDispatcher() (err error) { var ids []string ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.DispatcherProfilePrefix) if err != nil { - return err + return } for _, id := range ids { tntID := strings.SplitN(strings.TrimPrefix(id, utils.DispatcherProfilePrefix), utils.InInFieldSep, 2) @@ -51,7 +52,7 @@ func (m *Migrator) migrateCurrentDispatcher() (err error) { tntID[1], utils.NonTransactional, false); err != nil { return err } - m.stats[utils.Dispatchers] += 1 + m.stats[utils.Dispatchers]++ } return } @@ -91,17 +92,65 @@ func (m *Migrator) migrateDispatchers() (err error) { if vrs, err = m.getVersions(utils.Dispatchers); err != nil { return } - switch vrs[utils.Dispatchers] { - case current[utils.Dispatchers]: - if m.sameDataDB { + migrated := true + var v2 *engine.DispatcherProfile + for { + version := vrs[utils.Dispatchers] + for { + switch version { + default: + return fmt.Errorf("Unsupported version %v", version) + case current[utils.Dispatchers]: + migrated = false + if m.sameDataDB { + break + } + if err = m.migrateCurrentDispatcher(); err != nil { + return + } + if err = m.migrateCurrentDispatcherHost(); err != nil { + return + } + case 1: + if v2, err = m.migrateV1ToV2Dispatchers(); err != nil && err != utils.ErrNoMoreData { + return + } else if err == utils.ErrNoMoreData { + break + } + version = 2 + } + if version == current[utils.Dispatchers] || err == utils.ErrNoMoreData { + break + } + } + if err == utils.ErrNoMoreData || !migrated { break } - if err = m.migrateCurrentDispatcher(); err != nil { - return err - } - if err = m.migrateCurrentDispatcherHost(); err != nil { - return err + + if !m.dryRun { + //set action plan + if err = m.dmOut.DataManager().SetDispatcherProfile(v2, true); err != nil { + return + } } + m.stats[utils.Dispatchers]++ + } + // All done, update version wtih current one + if err = m.setVersions(utils.Dispatchers); err != nil { + return } return m.ensureIndexesDataDB(engine.ColDpp, engine.ColDph) } + +func (m *Migrator) migrateV1ToV2Dispatchers() (v4Cpp *engine.DispatcherProfile, err error) { + v4Cpp, err = m.dmIN.getV1DispatcherProfile() + if err != nil { + return nil, err + } else if v4Cpp == nil { + return nil, errors.New("Dispatcher NIL") + } + if v4Cpp.FilterIDs, err = migrateInlineFilterV4(v4Cpp.FilterIDs); err != nil { + return nil, err + } + return +} diff --git a/migrator/filters.go b/migrator/filters.go index 486d041e2..19d134989 100644 --- a/migrator/filters.go +++ b/migrator/filters.go @@ -20,6 +20,7 @@ package migrator import ( "fmt" + "regexp" "strings" "github.com/cgrates/cgrates/config" @@ -179,16 +180,12 @@ func migrateInlineFilterV2(fl string) string { } if ruleSplt[0] != utils.MetaRSR { - if strings.HasPrefix(ruleSplt[1], utils.DynamicDataPrefix) { - // remove dynamic data prefix from fieldName - ruleSplt[1] = ruleSplt[1][1:] - } + // remove dynamic data prefix from fieldName + ruleSplt[1] = strings.TrimPrefix(ruleSplt[1], utils.DynamicDataPrefix) return fmt.Sprintf("%s:~%s:%s", ruleSplt[0], utils.MetaReq+utils.NestingSep+ruleSplt[1], strings.Join(ruleSplt[2:], utils.InInFieldSep)) } // in case of *rsr filter we need to add the prefix at fieldValue - if strings.HasPrefix(ruleSplt[2], utils.DynamicDataPrefix) { - // remove dynamic data prefix from fieldName - ruleSplt[2] = ruleSplt[2][1:] - } + // remove dynamic data prefix from fieldName + ruleSplt[2] = strings.TrimPrefix(ruleSplt[2], utils.DynamicDataPrefix) return fmt.Sprintf("%s::~%s", ruleSplt[0], utils.MetaReq+utils.NestingSep+strings.Join(ruleSplt[2:], utils.InInFieldSep)) } @@ -329,6 +326,11 @@ func (m *Migrator) migrateFilters() (err error) { if fltr, err = m.migrateRequestFilterV4(v4Fltr); err != nil && err != utils.ErrNoMoreData { return } + + // remove the filter to not compile the old rule on set + // if err = m.dmOut.DataManager().DataDB().SetFilterDrv(fltr); err != nil { + // return + // } version = 5 } if version == current[utils.RQF] || err == utils.ErrNoMoreData { @@ -822,28 +824,60 @@ func (m *Migrator) migrateRequestFilterV4(v4Fltr *engine.Filter) (fltr *engine.F } for _, rule := range v4Fltr.Rules { if rule.Type != utils.MetaRSR && - rule.Type == utils.MetaNotRSR { + rule.Type != utils.MetaNotRSR { fltr.Rules = append(fltr.Rules, rule) continue } for _, val := range rule.Values { - if !strings.HasSuffix(val, utils.FilterValEnd) { // is not a filter so we ignore this value - continue + el, vals, err := migrateRSRFilterV4(val) + if err != nil { + return nil, fmt.Errorf("%s for filter<%s>", err.Error(), fltr.TenantID()) } - fltrStart := strings.Index(val, utils.FilterValStart) - if fltrStart < 1 { - return nil, fmt.Errorf("invalid RSRFilter start rule in string: <%s> for filter<%s>", val, fltr.TenantID()) + if len(vals) == 0 { // is not a filter so we ignore this value + continue } fltr.Rules = append(fltr.Rules, &engine.FilterRule{ Type: rule.Type, - Element: val[:fltrStart], - Values: strings.Split(val[fltrStart+1:len(val)-1], utils.ANDSep), + Element: el, + Values: vals, }) } } return } +var ( + spltRgxp = regexp.MustCompile(`:s\/`) +) + +func migrateRSRFilterV4(rsr string) (el string, vals []string, err error) { + if !strings.HasSuffix(rsr, utils.FilterValEnd) { // is not a filter so we ignore this value + return + } + fltrStart := strings.Index(rsr, utils.FilterValStart) + if fltrStart < 1 { + err = fmt.Errorf("invalid RSRFilter start rule in string: <%s> ", rsr) + return + } + vals = strings.Split(rsr[fltrStart+1:len(rsr)-1], utils.ANDSep) + el = rsr[:fltrStart] + + if idxConverters := strings.Index(el, "{*"); idxConverters != -1 { // converters in the string + if !strings.HasSuffix(el, "}") { + err = fmt.Errorf("invalid converter terminator in rule: <%s>", el) + return + } + el = el[:idxConverters] + } + if !strings.HasPrefix(el, utils.DynamicDataPrefix) || + len(el) == 1 { // special case when RSR is defined as static attribute + return + } + // dynamic content via attributeNames + el = spltRgxp.Split(el, -1)[0] + return +} + func migrateInlineFilterV4(v4fltIDs []string) (fltrIDs []string, err error) { fltrIDs = make([]string, 0, len(v4fltIDs)) for _, v4flt := range v4fltIDs { @@ -857,17 +891,17 @@ func migrateInlineFilterV4(v4fltIDs []string) (fltrIDs []string, err error) { } else { fltrIDs = append(fltrIDs, v4flt) } - for _, val := range strings.Split(v4flt, utils.InInFieldSep) { - if !strings.HasSuffix(val, utils.FilterValEnd) { // is not a filter so we ignore this value + for _, val := range strings.Split(v4flt, utils.INFIELD_SEP) { + el, vals, err := migrateRSRFilterV4(val) + if err != nil { + return nil, err + } + if len(vals) == 0 { // is not a filter so we ignore this value continue } - fltrStart := strings.Index(val, utils.FilterValStart) - if fltrStart < 1 { - return nil, fmt.Errorf("invalid RSRFilter start rule in string: <%s> ", val) - } - fltrVal := val[fltrStart+1 : len(val)-1] - fltrIDs = append(fltrIDs, fltr+val[:fltrStart]+utils.InInFieldSep+ - strings.Replace(fltrVal, utils.ANDSep, utils.INFIELD_SEP, strings.Count(fltrVal, utils.ANDSep))) + + fltrIDs = append(fltrIDs, fltr+el+utils.InInFieldSep+ + strings.Join(vals, utils.INFIELD_SEP)) } } return diff --git a/migrator/filters_it_test.go b/migrator/filters_it_test.go index 7e4a1c60e..ecebe050e 100644 --- a/migrator/filters_it_test.go +++ b/migrator/filters_it_test.go @@ -218,7 +218,7 @@ func testFltrITMigrateAndMove(t *testing.T) { //check if version was updated if vrs, err := fltrMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil { t.Error(err) - } else if vrs[utils.RQF] != 4 { + } else if vrs[utils.RQF] != 5 { t.Errorf("Unexpected version returned: %d", vrs[utils.RQF]) } //check if Filters was migrate correctly @@ -302,8 +302,8 @@ func testFltrITMigratev2(t *testing.T) { }, { Type: utils.MetaRSR, - FieldName: "~Tenant", - Values: []string{`~^cgr.*\.org$`}, + FieldName: utils.EmptyString, + Values: []string{`~Tenant(~^cgr.*\.org$)`}, }, }, } @@ -387,7 +387,7 @@ func testFltrITMigratev2(t *testing.T) { //check if version was updated if vrs, err := fltrMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil { t.Error(err) - } else if vrs[utils.RQF] != 4 { + } else if vrs[utils.RQF] != 5 { t.Errorf("Unexpected version returned: %d", vrs[utils.RQF]) } //check if Filters was migrate correctly @@ -443,8 +443,8 @@ func testFltrITMigratev3(t *testing.T) { }, { Type: utils.MetaRSR, - FieldName: "~*req.Tenant", - Values: []string{"~^cgr.*\\.org$"}, + FieldName: utils.EmptyString, + Values: []string{"~*req.Tenant(~^cgr.*\\.org$)"}, }, }, } @@ -493,7 +493,7 @@ func testFltrITMigratev3(t *testing.T) { //check if version was updated if vrs, err := fltrMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil { t.Error(err) - } else if vrs[utils.RQF] != 4 { + } else if vrs[utils.RQF] != 5 { t.Errorf("Unexpected version returned: %d", vrs[utils.RQF]) } //check if Filters was migrate correctly diff --git a/migrator/filters_test.go b/migrator/filters_test.go index b6d370ab0..632c9e956 100644 --- a/migrator/filters_test.go +++ b/migrator/filters_test.go @@ -281,3 +281,97 @@ func TestFiltersInlineV2Migrate(t *testing.T) { } } + +func TestMigrateInlineFilterV4(t *testing.T) { + flts := []string{ + "*string:*~req.Account:1001", + "*rsr::*~req.Destination", + "*notrsr::*~req.MaxUsage(<0);*req.Account(^10)", + } + exp := []string{ + "*string:*~req.Account:1001", + "*notrsr:*~req.MaxUsage:<0", + "*notrsr:*req.Account:^10", + } + if rply, err := migrateInlineFilterV4(flts); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(exp, rply) { + t.Errorf("Expected: %s,received: %s", exp, rply) + } + + flts = []string{ + "*string:*~req.Account:1001", + "*rsr::*~req.Destination)", + } + if _, err := migrateInlineFilterV4(flts); err == nil { + t.Error("Expected error received none") + } + flts = []string{ + "*rsr::*~req.Destination{*(1001)", + } + if _, err := migrateInlineFilterV4(flts); err == nil { + t.Error("Expected error received none") + } +} + +func TestMigrateRequestFilterV4(t *testing.T) { + flt := &engine.Filter{ + Tenant: "cgrates.org", + ID: "FLT_1", + Rules: []*engine.FilterRule{ + { + Type: utils.MetaString, + Element: "~*req.Account", + Values: []string{"1001"}, + }, + { + Type: utils.MetaRSR, + Element: utils.EmptyString, + Values: []string{"~*req.Account"}, + }, + { + Type: utils.MetaRSR, + Element: utils.EmptyString, + Values: []string{"~*req.Destination(^1001&1$)"}, + }, + }, + } + exp := &engine.Filter{ + Tenant: "cgrates.org", + ID: "FLT_1", + Rules: []*engine.FilterRule{ + { + Type: utils.MetaString, + Element: "~*req.Account", + Values: []string{"1001"}, + }, + { + Type: utils.MetaRSR, + Element: "~*req.Destination", + Values: []string{"^1001", "1$"}, + }, + }, + } + m := new(Migrator) + if rply, err := m.migrateRequestFilterV4(flt); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(exp, rply) { + t.Errorf("Expected: %s,received: %s", utils.ToJSON(exp), utils.ToJSON(rply)) + } + + flt = &engine.Filter{ + Tenant: "cgrates.org", + ID: "FLT_1", + Rules: []*engine.FilterRule{ + { + Type: utils.MetaRSR, + Element: utils.EmptyString, + Values: []string{"~*req.Destination^1001&1$)"}, + }, + }, + } + + if _, err := m.migrateRequestFilterV4(flt); err == nil { + t.Error("Expected error received none") + } +} diff --git a/migrator/load_ids.go b/migrator/load_ids.go index 35046c985..28c29656b 100644 --- a/migrator/load_ids.go +++ b/migrator/load_ids.go @@ -33,7 +33,7 @@ func (m *Migrator) migrateLoadIDs() (err error) { if err = m.dmOut.DataManager().DataDB().RemoveLoadIDsDrv(); err != nil { return } - if err = m.setVersions(utils.ActionTriggers); err != nil { + if err = m.setVersions(utils.LoadIDsVrs); err != nil { return err } } diff --git a/migrator/migrator_datadb.go b/migrator/migrator_datadb.go index bf5482b4a..a3ea977fc 100644 --- a/migrator/migrator_datadb.go +++ b/migrator/migrator_datadb.go @@ -81,6 +81,13 @@ type MigratorDataDB interface { setSupplier(spl *SupplierProfile) (err error) remSupplier(tenant, id string) (err error) + getV1ChargerProfile() (v1chrPrf *engine.ChargerProfile, err error) + getV1DispatcherProfile() (v1chrPrf *engine.DispatcherProfile, err error) + getV1RouteProfile() (v1chrPrf *engine.RouteProfile, err error) + + getV3Stats() (v1st *engine.StatQueueProfile, err error) + getV3ThresholdProfile() (v2T *engine.ThresholdProfile, err error) + DataManager() *engine.DataManager close() } diff --git a/migrator/rating_plan.go b/migrator/rating_plan.go index c867aa75d..0575c9574 100644 --- a/migrator/rating_plan.go +++ b/migrator/rating_plan.go @@ -19,6 +19,7 @@ along with this program. If not, see package migrator import ( + "fmt" "strings" "github.com/cgrates/cgrates/engine" @@ -46,7 +47,7 @@ func (m *Migrator) migrateCurrentRatingPlans() (err error) { if err := m.dmIN.DataManager().RemoveRatingPlan(idg, utils.NonTransactional); err != nil { return err } - m.stats[utils.RatingPlan] += 1 + m.stats[utils.RatingPlan]++ } return } @@ -58,14 +59,39 @@ func (m *Migrator) migrateRatingPlans() (err error) { return } - switch vrs[utils.RatingPlan] { - case current[utils.RatingPlan]: - if m.sameDataDB { + migrated := true + for { + version := vrs[utils.RatingPlan] + for { + switch version { + default: + return fmt.Errorf("Unsupported version %v", version) + case current[utils.RatingPlan]: + migrated = false + if m.sameDataDB { + break + } + if err = m.migrateCurrentRatingPlans(); err != nil { + return + } + } + if version == current[utils.RatingPlan] || err == utils.ErrNoMoreData { + break + } + } + if err == utils.ErrNoMoreData || !migrated { break } - if err = m.migrateCurrentRatingPlans(); err != nil { - return err - } + // if !m.dryRun { + // if err = m.dmIN.DataManager().SetRatingPlan(v2, true); err != nil { + // return + // } + // } + m.stats[utils.RatingPlan]++ + } + // All done, update version wtih current one + if err = m.setVersions(utils.RatingPlan); err != nil { + return } return m.ensureIndexesDataDB(engine.ColRpl) } diff --git a/migrator/rating_profile.go b/migrator/rating_profile.go index f64a240db..48a9d877f 100644 --- a/migrator/rating_profile.go +++ b/migrator/rating_profile.go @@ -19,6 +19,7 @@ along with this program. If not, see package migrator import ( + "fmt" "strings" "github.com/cgrates/cgrates/engine" @@ -46,7 +47,7 @@ func (m *Migrator) migrateCurrentRatingProfiles() (err error) { if err := m.dmIN.DataManager().RemoveRatingProfile(idg, utils.NonTransactional); err != nil { return err } - m.stats[utils.RatingProfile] += 1 + m.stats[utils.RatingProfile]++ } return } @@ -58,14 +59,39 @@ func (m *Migrator) migrateRatingProfiles() (err error) { return } - switch vrs[utils.RatingProfile] { - case current[utils.RatingProfile]: - if m.sameDataDB { + migrated := true + for { + version := vrs[utils.RatingProfile] + for { + switch version { + default: + return fmt.Errorf("Unsupported version %v", version) + case current[utils.RatingProfile]: + migrated = false + if m.sameDataDB { + break + } + if err = m.migrateCurrentRatingProfiles(); err != nil { + return err + } + } + if version == current[utils.RatingProfile] || err == utils.ErrNoMoreData { + break + } + } + if err == utils.ErrNoMoreData || !migrated { break } - if err = m.migrateCurrentRatingProfiles(); err != nil { - return err - } + // if !m.dryRun { + // if err = m.dmIN.DataManager().SetRatingProfile(v2, true); err != nil { + // return + // } + // } + m.stats[utils.RatingProfile]++ + } + // All done, update version wtih current one + if err = m.setVersions(utils.RatingProfile); err != nil { + return } return m.ensureIndexesDataDB(engine.ColRpf) } diff --git a/migrator/resource.go b/migrator/resource.go index c96da28fe..10ad9721f 100644 --- a/migrator/resource.go +++ b/migrator/resource.go @@ -50,7 +50,7 @@ func (m *Migrator) migrateCurrentResource() (err error) { if err := m.dmIN.DataManager().RemoveResourceProfile(tntID[0], tntID[1], utils.NonTransactional, false); err != nil { return err } - m.stats[utils.Resource] += 1 + m.stats[utils.Resource]++ } return } @@ -62,14 +62,39 @@ func (m *Migrator) migrateResources() (err error) { return } - switch vrs[utils.Resource] { - case current[utils.Resource]: - if m.sameDataDB { + migrated := true + for { + version := vrs[utils.Resource] + for { + switch version { + default: + return fmt.Errorf("Unsupported version %v", version) + case current[utils.Resource]: + migrated = false + if m.sameDataDB { + break + } + if err = m.migrateCurrentResource(); err != nil { + return + } + } + if version == current[utils.Resource] || err == utils.ErrNoMoreData { + break + } + } + if err == utils.ErrNoMoreData || !migrated { break } - if err = m.migrateCurrentResource(); err != nil { - return err - } + // if !m.dryRun { + // if err = m.dmIN.DataManager().SetResourceProfile(v2, true); err != nil { + // return + // } + // } + m.stats[utils.Resource]++ + } + // All done, update version wtih current one + if err = m.setVersions(utils.Resource); err != nil { + return } return m.ensureIndexesDataDB(engine.ColRsP) } diff --git a/migrator/routes.go b/migrator/routes.go index 52d225e3c..de5296ee9 100644 --- a/migrator/routes.go +++ b/migrator/routes.go @@ -19,6 +19,7 @@ along with this program. If not, see package migrator import ( + "errors" "fmt" "strings" @@ -62,10 +63,10 @@ func (m *Migrator) removeSupplier() (err error) { break } if err != nil { - return err + return } if err = m.dmIN.remSupplier(spp.Tenant, spp.ID); err != nil { - return err + return } } return @@ -88,7 +89,7 @@ func (m *Migrator) migrateFromSupplierToRoute() (err error) { if err := m.dmOut.DataManager().SetRouteProfile(convertSupplierToRoute(spp), true); err != nil { return err } - m.stats[utils.DerivedChargersV] += 1 + m.stats[utils.DerivedChargersV]++ } if m.dryRun { return @@ -142,23 +143,56 @@ func (m *Migrator) migrateRouteProfiles() (err error) { if vrs, err = m.getVersions(utils.ActionTriggers); err != nil { return } - if routeVersion, has := vrs[utils.Routes]; !has { + routeVersion, has := vrs[utils.Routes] + if !has { if vrs[utils.RQF] != current[utils.RQF] { return fmt.Errorf("please migrate the filters before migrating the routes") } if err = m.migrateFromSupplierToRoute(); err != nil { return } - } else { - switch routeVersion { - case current[utils.Routes]: - if m.sameDataDB { + } + migrated := true + var v2 *engine.RouteProfile + for { + version := routeVersion + for { + switch version { + default: + return fmt.Errorf("Unsupported version %v", version) + case current[utils.Routes]: + migrated = false + if m.sameDataDB { + break + } + if err = m.migrateCurrentRouteProfile(); err != nil { + return err + } + case 1: + if v2, err = m.migrateV1ToV2Routes(); err != nil && err != utils.ErrNoMoreData { + return + } else if err == utils.ErrNoMoreData { + break + } + version = 2 + } + if version == current[utils.Routes] || err == utils.ErrNoMoreData { break } - if err = m.migrateCurrentRouteProfile(); err != nil { - return err + } + if err == utils.ErrNoMoreData || !migrated { + break + } + if !m.dryRun { + if err = m.dmIN.DataManager().SetRouteProfile(v2, true); err != nil { + return } } + m.stats[utils.Routes]++ + } + // All done, update version wtih current one + if err = m.setVersions(utils.Routes); err != nil { + return } return m.ensureIndexesDataDB(engine.ColRts) @@ -190,3 +224,16 @@ func convertSupplierToRoute(spp *SupplierProfile) (route *engine.RouteProfile) { } return } + +func (m *Migrator) migrateV1ToV2Routes() (v4Cpp *engine.RouteProfile, err error) { + v4Cpp, err = m.dmIN.getV1RouteProfile() + if err != nil { + return nil, err + } else if v4Cpp == nil { + return nil, errors.New("Dispatcher NIL") + } + if v4Cpp.FilterIDs, err = migrateInlineFilterV4(v4Cpp.FilterIDs); err != nil { + return nil, err + } + return +} diff --git a/migrator/session_costs.go b/migrator/session_costs.go index 2bd77e277..93407694a 100644 --- a/migrator/session_costs.go +++ b/migrator/session_costs.go @@ -61,7 +61,9 @@ func (m *Migrator) migrateSessionSCosts() (err error) { utils.UndefinedVersion, "version number is not defined for SessionsCosts model") } - switch vrs[utils.SessionSCosts] { + switch version := vrs[utils.SessionSCosts]; version { + default: + return fmt.Errorf("Unsupported version %v", version) case 0, 1: if err = m.migrateV1SessionSCosts(); err != nil { return err @@ -115,7 +117,7 @@ func (m *Migrator) migrateV2SessionSCosts() (err error) { if err = m.storDBIn.remV2SMCost(v2Cost); err != nil { return err } - m.stats[utils.SessionSCosts] += 1 + m.stats[utils.SessionSCosts]++ } if m.dryRun { return diff --git a/migrator/sharedgroup.go b/migrator/sharedgroup.go index d2ba96e92..bcb1b1a93 100644 --- a/migrator/sharedgroup.go +++ b/migrator/sharedgroup.go @@ -19,6 +19,7 @@ along with this program. If not, see package migrator import ( + "fmt" "strings" "github.com/cgrates/cgrates/engine" @@ -49,7 +50,7 @@ func (m *Migrator) migrateCurrentSharedGroups() (err error) { if err := m.dmOut.DataManager().SetSharedGroup(sgs, utils.NonTransactional); err != nil { return err } - m.stats[utils.SharedGroups] += 1 + m.stats[utils.SharedGroups]++ } return } @@ -71,7 +72,7 @@ func (m *Migrator) migrateV1SharedGroups() (err error) { if err = m.dmOut.DataManager().SetSharedGroup(acnt, utils.NonTransactional); err != nil { return err } - m.stats[utils.SharedGroups] += 1 + m.stats[utils.SharedGroups]++ } // All done, update version wtih current one if err = m.setVersions(utils.SharedGroups); err != nil { @@ -86,7 +87,9 @@ func (m *Migrator) migrateSharedGroups() (err error) { if vrs, err = m.getVersions(utils.SharedGroups); err != nil { return } - switch vrs[utils.SharedGroups] { + switch version := vrs[utils.SharedGroups]; version { + default: + return fmt.Errorf("Unsupported version %v", version) case current[utils.SharedGroups]: if m.sameDataDB { break diff --git a/migrator/stats.go b/migrator/stats.go index 021ef8651..661e9bb94 100644 --- a/migrator/stats.go +++ b/migrator/stats.go @@ -137,7 +137,6 @@ func (m *Migrator) migrateV1Stats() (filter *engine.Filter, v2Stats *engine.Stat if filter, v2Stats, sts, err = v1Sts.AsStatQP(); err != nil { return nil, nil, nil, err } - m.stats[utils.StatS]++ } return } @@ -179,37 +178,46 @@ func (m *Migrator) migrateStats() (err error) { } migrated := true var filter *engine.Filter - var sts *engine.StatQueueProfile + var v3sts *engine.StatQueueProfile + var v4sts *engine.StatQueueProfile var v2Stats *engine.StatQueue var v3Stats *engine.StatQueue for { version := vrs[utils.StatS] for { switch version { + default: + return fmt.Errorf("Unsupported version %v", version) case current[utils.StatS]: + migrated = false if m.sameDataDB { - migrated = false break } if err = m.migrateCurrentStats(); err != nil { - return err + return } version = 3 - migrated = false case 1: // migrate from V1 to V2 - if filter, v2Stats, sts, err = m.migrateV1Stats(); err != nil && err != utils.ErrNoMoreData { - return err + if filter, v2Stats, v3sts, err = m.migrateV1Stats(); err != nil && err != utils.ErrNoMoreData { + return } else if err == utils.ErrNoMoreData { break } version = 2 case 2: // migrate from V2 to V3 (actual) if v3Stats, err = m.migrateV2Stats(v2Stats); err != nil && err != utils.ErrNoMoreData { - return err + return } else if err == utils.ErrNoMoreData { break } version = 3 + case 3: + if v4sts, err = m.migrateV3ToV4Stats(v3sts); err != nil && err != utils.ErrNoMoreData { + return + } else if err == utils.ErrNoMoreData { + break + } + version = 4 } if version == current[utils.StatS] || err == utils.ErrNoMoreData { break @@ -218,20 +226,21 @@ func (m *Migrator) migrateStats() (err error) { if err == utils.ErrNoMoreData || !migrated { break } - if !m.dryRun && migrated { + if !m.dryRun { if vrs[utils.StatS] == 1 { - if err := m.dmOut.DataManager().SetFilter(filter, true); err != nil { - return err - } - if err := m.dmOut.DataManager().SetStatQueueProfile(sts, true); err != nil { - return err + if err = m.dmOut.DataManager().SetFilter(filter, true); err != nil { + return } } // Set the fresh-migrated Stats into DB + if err = m.dmOut.DataManager().SetStatQueueProfile(v4sts, true); err != nil { + return + } if err = m.dmOut.DataManager().SetStatQueue(v3Stats); err != nil { - return err + return } } + m.stats[utils.StatS]++ } if m.dryRun || !migrated { return nil @@ -427,3 +436,16 @@ func (v1Sts v1Stat) AsStatQP() (filter *engine.Filter, sq *engine.StatQueue, stq } return filter, sq, stq, nil } + +func (m *Migrator) migrateV3ToV4Stats(v3sts *engine.StatQueueProfile) (v4Cpp *engine.StatQueueProfile, err error) { + if v3sts == nil { + // read data from DataDB + if v3sts, err = m.dmIN.getV3Stats(); err != nil { + return + } + } + if v3sts.FilterIDs, err = migrateInlineFilterV4(v3sts.FilterIDs); err != nil { + return + } + return v3sts, nil +} diff --git a/migrator/storage_map_datadb.go b/migrator/storage_map_datadb.go index c2dbd3fac..0a55ac0ff 100755 --- a/migrator/storage_map_datadb.go +++ b/migrator/storage_map_datadb.go @@ -139,6 +139,10 @@ func (iDBMig *internalMigrator) getV1Stats() (v1st *v1Stat, err error) { return nil, utils.ErrNotImplemented } +func (iDBMig *internalMigrator) getV3Stats() (v1st *engine.StatQueueProfile, err error) { + return nil, utils.ErrNotImplemented +} + //set func (iDBMig *internalMigrator) setV1Stats(x *v1Stat) (err error) { return utils.ErrNotImplemented @@ -181,6 +185,10 @@ func (iDBMig *internalMigrator) getV2ThresholdProfile() (v2T *v2Threshold, err e return nil, utils.ErrNotImplemented } +func (iDBMig *internalMigrator) getV3ThresholdProfile() (v2T *engine.ThresholdProfile, err error) { + return nil, utils.ErrNotImplemented +} + //set func (iDBMig *internalMigrator) setV2ThresholdProfile(x *v2Threshold) (err error) { return utils.ErrNotImplemented @@ -327,3 +335,15 @@ func (iDBMig *internalMigrator) remSupplier(tenant, id string) (err error) { } func (iDBMig *internalMigrator) close() {} + +func (iDBMig *internalMigrator) getV1ChargerProfile() (v1chrPrf *engine.ChargerProfile, err error) { + return nil, utils.ErrNotImplemented +} + +func (iDBMig *internalMigrator) getV1DispatcherProfile() (v1chrPrf *engine.DispatcherProfile, err error) { + return nil, utils.ErrNotImplemented +} + +func (iDBMig *internalMigrator) getV1RouteProfile() (v1chrPrf *engine.RouteProfile, err error) { + return nil, utils.ErrNotImplemented +} diff --git a/migrator/storage_mongo_datadb.go b/migrator/storage_mongo_datadb.go index eaba15186..e629a5cb1 100644 --- a/migrator/storage_mongo_datadb.go +++ b/migrator/storage_mongo_datadb.go @@ -299,6 +299,25 @@ func (v1ms *mongoMigrator) getV1Stats() (v1st *v1Stat, err error) { return v1st, nil } +func (v1ms *mongoMigrator) getV3Stats() (v1st *engine.StatQueueProfile, err error) { + if v1ms.cursor == nil { + v1ms.cursor, err = v1ms.mgoDB.DB().Collection(engine.ColSqp).Find(v1ms.mgoDB.GetContext(), bson.D{}) + if err != nil { + return nil, err + } + } + if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) { + (*v1ms.cursor).Close(v1ms.mgoDB.GetContext()) + v1ms.cursor = nil + return nil, utils.ErrNoMoreData + } + v1st = new(engine.StatQueueProfile) + if err := (*v1ms.cursor).Decode(v1st); err != nil { + return nil, err + } + return v1st, nil +} + //set func (v1ms *mongoMigrator) setV1Stats(x *v1Stat) (err error) { _, err = v1ms.mgoDB.DB().Collection(utils.CDR_STATS_PREFIX).InsertOne(v1ms.mgoDB.GetContext(), x) @@ -406,6 +425,25 @@ func (v1ms *mongoMigrator) getV2ThresholdProfile() (v2T *v2Threshold, err error) return v2T, nil } +func (v1ms *mongoMigrator) getV3ThresholdProfile() (v2T *engine.ThresholdProfile, err error) { + if v1ms.cursor == nil { + v1ms.cursor, err = v1ms.mgoDB.DB().Collection(engine.ColTps).Find(v1ms.mgoDB.GetContext(), bson.D{}) + if err != nil { + return nil, err + } + } + if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) { + (*v1ms.cursor).Close(v1ms.mgoDB.GetContext()) + v1ms.cursor = nil + return nil, utils.ErrNoMoreData + } + v2T = new(engine.ThresholdProfile) + if err := (*v1ms.cursor).Decode(v2T); err != nil { + return nil, err + } + return v2T, nil +} + //set func (v1ms *mongoMigrator) setV2ThresholdProfile(x *v2Threshold) (err error) { _, err = v1ms.mgoDB.DB().Collection(v2ThresholdProfileCol).InsertOne(v1ms.mgoDB.GetContext(), x) @@ -776,3 +814,60 @@ func (v1ms *mongoMigrator) remSupplier(tenant, id string) (err error) { _, err = v1ms.mgoDB.DB().Collection(ColSpp).DeleteOne(v1ms.mgoDB.GetContext(), bson.M{"tenant": tenant, "id": id}) return } + +func (v1ms *mongoMigrator) getV1ChargerProfile() (v1chrPrf *engine.ChargerProfile, err error) { + if v1ms.cursor == nil { + v1ms.cursor, err = v1ms.mgoDB.DB().Collection(engine.ColCpp).Find(v1ms.mgoDB.GetContext(), bson.D{}) + if err != nil { + return nil, err + } + } + if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) { + (*v1ms.cursor).Close(v1ms.mgoDB.GetContext()) + v1ms.cursor = nil + return nil, utils.ErrNoMoreData + } + v1chrPrf = new(engine.ChargerProfile) + if err := (*v1ms.cursor).Decode(v1chrPrf); err != nil { + return nil, err + } + return +} + +func (v1ms *mongoMigrator) getV1DispatcherProfile() (v1dppPrf *engine.DispatcherProfile, err error) { + if v1ms.cursor == nil { + v1ms.cursor, err = v1ms.mgoDB.DB().Collection(engine.ColDpp).Find(v1ms.mgoDB.GetContext(), bson.D{}) + if err != nil { + return nil, err + } + } + if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) { + (*v1ms.cursor).Close(v1ms.mgoDB.GetContext()) + v1ms.cursor = nil + return nil, utils.ErrNoMoreData + } + v1dppPrf = new(engine.DispatcherProfile) + if err := (*v1ms.cursor).Decode(v1dppPrf); err != nil { + return nil, err + } + return +} + +func (v1ms *mongoMigrator) getV1RouteProfile() (v1dppPrf *engine.RouteProfile, err error) { + if v1ms.cursor == nil { + v1ms.cursor, err = v1ms.mgoDB.DB().Collection(engine.ColRpp).Find(v1ms.mgoDB.GetContext(), bson.D{}) + if err != nil { + return nil, err + } + } + if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) { + (*v1ms.cursor).Close(v1ms.mgoDB.GetContext()) + v1ms.cursor = nil + return nil, utils.ErrNoMoreData + } + v1dppPrf = new(engine.RouteProfile) + if err := (*v1ms.cursor).Decode(v1dppPrf); err != nil { + return nil, err + } + return +} diff --git a/migrator/storage_redis.go b/migrator/storage_redis.go index 7f1e28971..bff55bc58 100644 --- a/migrator/storage_redis.go +++ b/migrator/storage_redis.go @@ -360,6 +360,32 @@ func (v1rs *redisMigrator) getV1Stats() (v1st *v1Stat, err error) { return v1st, nil } +func (v1rs *redisMigrator) getV3Stats() (v1st *engine.StatQueueProfile, err error) { + if v1rs.qryIdx == nil { + v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.StatQueueProfilePrefix) + if err != nil { + return + } else if len(v1rs.dataKeys) == 0 { + return nil, utils.ErrNoMoreData + } + v1rs.qryIdx = utils.IntPointer(0) + } + if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 { + strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes() + if err != nil { + return nil, err + } + if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v1st); err != nil { + return nil, err + } + *v1rs.qryIdx = *v1rs.qryIdx + 1 + } else { + v1rs.qryIdx = nil + return nil, utils.ErrNoMoreData + } + return v1st, nil +} + //set func (v1rs *redisMigrator) setV1Stats(x *v1Stat) (err error) { key := utils.CDR_STATS_PREFIX + x.Id @@ -525,6 +551,32 @@ func (v1rs *redisMigrator) getV2ThresholdProfile() (v2T *v2Threshold, err error) return v2Th, nil } +func (v1rs *redisMigrator) getV3ThresholdProfile() (v2T *engine.ThresholdProfile, err error) { + if v1rs.qryIdx == nil { + v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.ThresholdProfilePrefix) + if err != nil { + return + } else if len(v1rs.dataKeys) == 0 { + return nil, utils.ErrNoMoreData + } + v1rs.qryIdx = utils.IntPointer(0) + } + if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 { + strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes() + if err != nil { + return nil, err + } + if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v2T); err != nil { + return nil, err + } + *v1rs.qryIdx = *v1rs.qryIdx + 1 + } else { + v1rs.qryIdx = nil + return nil, utils.ErrNoMoreData + } + return v2T, nil +} + //set func (v1rs *redisMigrator) setV2ThresholdProfile(x *v2Threshold) (err error) { key := utils.ThresholdProfilePrefix + utils.ConcatenatedKey(x.Tenant, x.ID) @@ -549,7 +601,7 @@ func (v1rs *redisMigrator) remV2ThresholdProfile(tenant, id string) (err error) func (v1rs *redisMigrator) getV1Alias() (v1a *v1Alias, err error) { v1a = &v1Alias{Values: make(v1AliasValues, 0)} if v1rs.qryIdx == nil { - v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(ALIASES_PREFIX) + v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(AliasesPrefix) if err != nil { return } else if len(v1rs.dataKeys) == 0 { @@ -563,7 +615,7 @@ func (v1rs *redisMigrator) getV1Alias() (v1a *v1Alias, err error) { if err != nil { return nil, err } - v1a.SetId(strings.TrimPrefix(key, ALIASES_PREFIX)) + v1a.SetId(strings.TrimPrefix(key, AliasesPrefix)) if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v1a.Values); err != nil { return nil, err } @@ -582,7 +634,7 @@ func (v1rs *redisMigrator) setV1Alias(al *v1Alias) (err error) { if err != nil { return } - key := ALIASES_PREFIX + al.GetId() + key := AliasesPrefix + al.GetId() if err = v1rs.rds.Cmd("SET", key, result).Err; err != nil { return } @@ -596,7 +648,7 @@ func (v1rs *redisMigrator) remV1Alias(key string) (err error) { var values []byte if values, err = v1rs.rds.Cmd("GET", - ALIASES_PREFIX+key).Bytes(); err != nil { + AliasesPrefix+key).Bytes(); err != nil { if err == redis.ErrRespNil { // did not find the destination err = utils.ErrNotFound } @@ -608,7 +660,7 @@ func (v1rs *redisMigrator) remV1Alias(key string) (err error) { return err } - err = v1rs.rds.Cmd("DEL", ALIASES_PREFIX+key).Err + err = v1rs.rds.Cmd("DEL", AliasesPrefix+key).Err if err != nil { return err } @@ -1003,3 +1055,81 @@ func (v1rs *redisMigrator) remSupplier(tenant, id string) (err error) { key := SupplierProfilePrefix + utils.ConcatenatedKey(tenant, id) return v1rs.rds.Cmd("DEL", key).Err } + +func (v1rs *redisMigrator) getV1ChargerProfile() (v1chrPrf *engine.ChargerProfile, err error) { + if v1rs.qryIdx == nil { + v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.ChargerProfilePrefix) + if err != nil { + return + } else if len(v1rs.dataKeys) == 0 { + return nil, utils.ErrNoMoreData + } + v1rs.qryIdx = utils.IntPointer(0) + } + if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 { + strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes() + if err != nil { + return nil, err + } + if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v1chrPrf); err != nil { + return nil, err + } + *v1rs.qryIdx = *v1rs.qryIdx + 1 + } else { + v1rs.qryIdx = nil + return nil, utils.ErrNoMoreData + } + return +} + +func (v1rs *redisMigrator) getV1DispatcherProfile() (v1chrPrf *engine.DispatcherProfile, err error) { + if v1rs.qryIdx == nil { + v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.DispatcherProfilePrefix) + if err != nil { + return + } else if len(v1rs.dataKeys) == 0 { + return nil, utils.ErrNoMoreData + } + v1rs.qryIdx = utils.IntPointer(0) + } + if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 { + strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes() + if err != nil { + return nil, err + } + if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v1chrPrf); err != nil { + return nil, err + } + *v1rs.qryIdx = *v1rs.qryIdx + 1 + } else { + v1rs.qryIdx = nil + return nil, utils.ErrNoMoreData + } + return +} + +func (v1rs *redisMigrator) getV1RouteProfile() (v1chrPrf *engine.RouteProfile, err error) { + if v1rs.qryIdx == nil { + v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.RouteProfilePrefix) + if err != nil { + return + } else if len(v1rs.dataKeys) == 0 { + return nil, utils.ErrNoMoreData + } + v1rs.qryIdx = utils.IntPointer(0) + } + if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 { + strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes() + if err != nil { + return nil, err + } + if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v1chrPrf); err != nil { + return nil, err + } + *v1rs.qryIdx = *v1rs.qryIdx + 1 + } else { + v1rs.qryIdx = nil + return nil, utils.ErrNoMoreData + } + return +} diff --git a/migrator/thresholds.go b/migrator/thresholds.go index 3cb2f4495..41cd2b173 100644 --- a/migrator/thresholds.go +++ b/migrator/thresholds.go @@ -72,7 +72,7 @@ func (m *Migrator) migrateCurrentThresholds() (err error) { if err := m.dmIN.DataManager().RemoveThreshold(tntID[0], tntID[1], utils.NonTransactional); err != nil { return err } - m.stats[utils.Thresholds] += 1 + m.stats[utils.Thresholds]++ } //ThresholdProfiles ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ThresholdProfilePrefix) @@ -151,34 +151,42 @@ func (m *Migrator) migrateThresholds() (err error) { } migrated := true migratedFrom := 0 - var thp *engine.ThresholdProfile var th *engine.Threshold var filter *engine.Filter var v3 *engine.ThresholdProfile + var v4 *engine.ThresholdProfile for { version := vrs[utils.Thresholds] + migratedFrom = int(version) for { switch version { + default: + return fmt.Errorf("Unsupported version %v", version) case current[utils.Thresholds]: migrated = false if m.sameDataDB { break } if err = m.migrateCurrentThresholds(); err != nil { - return err + return } case 1: - if thp, th, filter, err = m.migrateV2ActionTriggers(); err != nil && err != utils.ErrNoMoreData { - return err + if v3, th, filter, err = m.migrateV2ActionTriggers(); err != nil && err != utils.ErrNoMoreData { + return } version = 3 - migratedFrom = 1 case 2: if v3, err = m.migrateV2Thresholds(); err != nil && err != utils.ErrNoMoreData { - return err + return } version = 3 - migratedFrom = 2 + case 3: + if v4, err = m.migrateV3ToV4Threshold(v3); err != nil && err != utils.ErrNoMoreData { + return + } else if err == utils.ErrNoMoreData { + break + } + version = 4 } if version == current[utils.Thresholds] || err == utils.ErrNoMoreData { break @@ -188,24 +196,19 @@ func (m *Migrator) migrateThresholds() (err error) { break } - if !m.dryRun && migrated { + if !m.dryRun { //set threshond - switch migratedFrom { - case 1: - if err := m.dmOut.DataManager().SetFilter(filter, true); err != nil { - return err + if migratedFrom == 1 { + if err = m.dmOut.DataManager().SetFilter(filter, true); err != nil { + return } - if err := m.dmOut.DataManager().SetThreshold(th); err != nil { - return err - } - if err := m.dmOut.DataManager().SetThresholdProfile(thp, true); err != nil { - return err - } - case 2: - if err = m.dmOut.DataManager().SetThresholdProfile(v3, true); err != nil { - return err + if err = m.dmOut.DataManager().SetThreshold(th); err != nil { + return } } + if err = m.dmOut.DataManager().SetThresholdProfile(v4, true); err != nil { + return + } } m.stats[utils.Thresholds]++ @@ -221,7 +224,7 @@ func (m *Migrator) migrateThresholds() (err error) { } // All done, update version wtih current one if err = m.setVersions(utils.Thresholds); err != nil { - return err + return } return m.ensureIndexesDataDB(engine.ColTps) } @@ -329,7 +332,7 @@ func (m *Migrator) SasThreshold(v2ATR *engine.ActionTrigger) (err error) { if err := m.dmOut.DataManager().SetThresholdProfile(thp, true); err != nil { return err } - m.stats[utils.Thresholds] += 1 + m.stats[utils.Thresholds]++ } // All done, update version wtih current one vrs = engine.Versions{utils.Thresholds: engine.CurrentStorDBVersions()[utils.Thresholds]} @@ -437,3 +440,16 @@ func (v2T v2Threshold) V2toV3Threshold() (th *engine.ThresholdProfile) { } return } + +func (m *Migrator) migrateV3ToV4Threshold(v3sts *engine.ThresholdProfile) (v4Cpp *engine.ThresholdProfile, err error) { + if v3sts == nil { + // read data from DataDB + if v3sts, err = m.dmIN.getV3ThresholdProfile(); err != nil { + return + } + } + if v3sts.FilterIDs, err = migrateInlineFilterV4(v3sts.FilterIDs); err != nil { + return + } + return v3sts, nil +} diff --git a/migrator/timings.go b/migrator/timings.go index e31f49a58..0c096f8cb 100644 --- a/migrator/timings.go +++ b/migrator/timings.go @@ -19,6 +19,7 @@ along with this program. If not, see package migrator import ( + "fmt" "strings" "github.com/cgrates/cgrates/engine" @@ -43,7 +44,7 @@ func (m *Migrator) migrateCurrentTiming() (err error) { if err := m.dmOut.DataManager().SetTiming(tm); err != nil { return err } - m.stats[utils.Timing] += 1 + m.stats[utils.Timing]++ } return } @@ -54,7 +55,9 @@ func (m *Migrator) migrateTimings() (err error) { if vrs, err = m.getVersions(utils.Timing); err != nil { return } - switch vrs[utils.Timing] { + switch version := vrs[utils.Timing]; version { + default: + return fmt.Errorf("Unsupported version %v", version) case current[utils.Timing]: if m.sameDataDB { break diff --git a/migrator/tp_account_actions.go b/migrator/tp_account_actions.go index 45514a629..edf171ca1 100644 --- a/migrator/tp_account_actions.go +++ b/migrator/tp_account_actions.go @@ -44,7 +44,7 @@ func (m *Migrator) migrateCurrentTPaccountAcction() (err error) { return err } } - m.stats[utils.TpAccountActionsV] += 1 + m.stats[utils.TpAccountActionsV]++ } } } diff --git a/migrator/tp_action_plans.go b/migrator/tp_action_plans.go index 061da902e..59f756d8f 100644 --- a/migrator/tp_action_plans.go +++ b/migrator/tp_action_plans.go @@ -50,7 +50,7 @@ func (m *Migrator) migrateCurrentTPactionplans() (err error) { return err } } - m.stats[utils.TpActionPlans] += 1 + m.stats[utils.TpActionPlans]++ } } }