From b6846ff1f5a7bc78f7be914cab210b9e086d7ba2 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 16 Jan 2019 12:32:16 +0200 Subject: [PATCH] Added migration from users to attributeProfile and integration tests --- engine/version.go | 2 +- migrator/migrator_datadb.go | 3 + migrator/storage_map_datadb.go | 16 ++ migrator/storage_mongo_datadb.go | 45 ++++++ migrator/storage_redis.go | 42 ++++++ migrator/user.go | 106 +++++++++++-- migrator/user_it_test.go | 251 +++++++++++++++++++++++++++++++ migrator/user_test.go | 138 +++++++++++++++++ 8 files changed, 588 insertions(+), 15 deletions(-) create mode 100644 migrator/user_it_test.go create mode 100644 migrator/user_test.go diff --git a/engine/version.go b/engine/version.go index 9ca5cd346..73ad19370 100644 --- a/engine/version.go +++ b/engine/version.go @@ -150,7 +150,7 @@ func CurrentDataDBVersions() Versions { utils.Resource: 1, utils.ReverseAlias: 1, utils.Alias: 2, - utils.User: 1, + utils.User: 2, utils.Subscribers: 1, utils.DerivedChargersV: 1, utils.Destinations: 1, diff --git a/migrator/migrator_datadb.go b/migrator/migrator_datadb.go index 664a1ba50..b755330bd 100644 --- a/migrator/migrator_datadb.go +++ b/migrator/migrator_datadb.go @@ -49,6 +49,9 @@ type MigratorDataDB interface { getV1Alias() (v1a *v1Alias, err error) setV1Alias(al *v1Alias) (err error) remV1Alias(key string) (err error) + getV1User() (v1u *v1UserProfile, err error) + setV1User(us *v1UserProfile) (err error) + remV1User(key string) (err error) DataManager() *engine.DataManager } diff --git a/migrator/storage_map_datadb.go b/migrator/storage_map_datadb.go index 1bc5d1ce5..704fc6dac 100755 --- a/migrator/storage_map_datadb.go +++ b/migrator/storage_map_datadb.go @@ -182,3 +182,19 @@ func (v1ms *mapMigrator) setV1Alias(al *v1Alias) (err error) { func (v1ms *mapMigrator) remV1Alias(key string) (err error) { return utils.ErrNotImplemented } + +// User methods +//get +func (v1ms *mapMigrator) getV1User() (v1u *v1UserProfile, err error) { + return nil, utils.ErrNotImplemented +} + +//set +func (v1ms *mapMigrator) setV1User(us *v1UserProfile) (err error) { + return utils.ErrNotImplemented +} + +//rem +func (v1ms *mapMigrator) remV1User(key string) (err error) { + return utils.ErrNotImplemented +} diff --git a/migrator/storage_mongo_datadb.go b/migrator/storage_mongo_datadb.go index 7ee13bb4e..4cc0478c4 100644 --- a/migrator/storage_mongo_datadb.go +++ b/migrator/storage_mongo_datadb.go @@ -32,6 +32,7 @@ const ( v1AttributeProfilesCol = "attribute_profiles" v2ThresholdProfileCol = "threshold_profiles" v1AliasCol = "aliases" + v1UserCol = "users" ) type mongoMigrator struct { @@ -438,3 +439,47 @@ func (v1ms *mongoMigrator) remV1Alias(key string) (err error) { } return } + +// User methods +//get +func (v1ms *mongoMigrator) getV1User() (v1u *v1UserProfile, err error) { + if v1ms.cursor == nil { + var cursor mongo.Cursor + cursor, err = v1ms.mgoDB.DB().Collection(v1UserCol).Find(v1ms.mgoDB.GetContext(), bson.D{}) + if err != nil { + return nil, err + } + v1ms.cursor = &cursor + } + if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) { + (*v1ms.cursor).Close(v1ms.mgoDB.GetContext()) + v1ms.cursor = nil + return nil, utils.ErrNoMoreData + } + var kv struct { + Key string + Value *v1UserProfile + } + if err := (*v1ms.cursor).Decode(&kv); err != nil { + return nil, err + } + return kv.Value, nil +} + +//set +func (v1ms *mongoMigrator) setV1User(us *v1UserProfile) (err error) { + _, err = v1ms.mgoDB.DB().Collection(v1UserCol).UpdateOne(v1ms.mgoDB.GetContext(), bson.M{"key": us.GetId()}, + bson.M{"$set": struct { + Key string + Value *v1UserProfile + }{Key: us.GetId(), Value: us}}, + options.Update().SetUpsert(true), + ) + return err +} + +//rem +func (v1ms *mongoMigrator) remV1User(key string) (err error) { + _, err = v1ms.mgoDB.DB().Collection(v1UserCol).DeleteOne(v1ms.mgoDB.GetContext(), bson.M{"key": key}) + return +} diff --git a/migrator/storage_redis.go b/migrator/storage_redis.go index e1dff68df..6e47f382c 100644 --- a/migrator/storage_redis.go +++ b/migrator/storage_redis.go @@ -561,3 +561,45 @@ func (v1rs *redisMigrator) remV1Alias(key string) (err error) { return v1rs.rds.Cmd("DEL", key).Err } + +// User methods +//get +func (v1rs *redisMigrator) getV1User() (v1u *v1UserProfile, err error) { + if v1rs.qryIdx == nil { + v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.USERS_PREFIX) + if err != nil { + return + } else if len(v1rs.dataKeys) == 0 { + return nil, utils.ErrNotFound + } + v1rs.qryIdx = utils.IntPointer(0) + } + if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 { + strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes() + if err != nil { + return nil, err + } + v1u = new(v1UserProfile) + if err := v1rs.rds.Marshaler().Unmarshal(strVal, v1u); err != nil { + return nil, err + } + *v1rs.qryIdx = *v1rs.qryIdx + 1 + return v1u, nil + } + v1rs.qryIdx = nil + return nil, utils.ErrNoMoreData +} + +//set +func (v1rs *redisMigrator) setV1User(us *v1UserProfile) (err error) { + bit, err := v1rs.rds.Marshaler().Marshal(us) + if err != nil { + return err + } + return v1rs.rds.Cmd("SET", utils.USERS_PREFIX+us.GetId(), bit).Err +} + +//rem +func (v1rs *redisMigrator) remV1User(key string) (err error) { + return v1rs.rds.Cmd("DEL", utils.USERS_PREFIX+key).Err +} diff --git a/migrator/user.go b/migrator/user.go index 3b37c8e15..030b27e21 100644 --- a/migrator/user.go +++ b/migrator/user.go @@ -22,10 +22,94 @@ import ( "fmt" "strings" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) +type v1UserProfile struct { + Tenant string + UserName string + Masked bool //disable if true + Profile map[string]string + Weight float64 + ponder int +} + +func (ud *v1UserProfile) GetId() string { + return utils.ConcatenatedKey(ud.Tenant, ud.UserName) +} + +func (ud *v1UserProfile) SetId(id string) error { + vals := strings.Split(id, utils.CONCATENATED_KEY_SEP) + if len(vals) != 2 { + return utils.ErrInvalidKey + } + ud.Tenant = vals[0] + ud.UserName = vals[1] + return nil +} + +func userProfile2attributeProfile(user *v1UserProfile) (attr *engine.AttributeProfile) { + attr = &engine.AttributeProfile{ + Tenant: user.Tenant, + ID: user.UserName, + Contexts: []string{utils.META_ANY}, + FilterIDs: make([]string, 0), + ActivationInterval: nil, + Attributes: make([]*engine.Attribute, 0), + Blocker: false, + Weight: user.Weight, + } + for fieldname, substitute := range user.Profile { + attr.Attributes = append(attr.Attributes, &engine.Attribute{ + FieldName: fieldname, + Initial: utils.META_ANY, + Substitute: config.NewRSRParsersMustCompile(substitute, true, utils.INFIELD_SEP), + Append: true, + }) + } + return +} + +func (m *Migrator) migrateV1User2AttributeProfile() (err error) { + for { + user, err := m.dmIN.getV1User() + if err == utils.ErrNoMoreData { + break + } + if err != nil { + return err + } + if user == nil || user.Masked || m.dryRun { + continue + } + attr := userProfile2attributeProfile(user) + if len(attr.Attributes) == 0 { + continue + } + if err := m.dmIN.remV1User(user.GetId()); err != nil { + return err + } + if err := m.dmOut.DataManager().DataDB().SetAttributeProfileDrv(attr); err != nil { + return err + } + m.stats[utils.User] += 1 + } + if m.dryRun { + return + } + // All done, update version wtih current one + vrs := engine.Versions{utils.User: engine.CurrentDataDBVersions()[utils.User]} + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when updating Alias version into dataDB", err.Error())) + } + return +} + func (m *Migrator) migrateCurrentUser() (err error) { var ids []string ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.USERS_PREFIX) @@ -55,25 +139,19 @@ func (m *Migrator) migrateUser() (err error) { current := engine.CurrentDataDBVersions() vrs, err = m.dmIN.DataManager().DataDB().GetVersions("") if err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when querying oldDataDB for versions", err.Error())) + return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, + err.Error(), fmt.Sprintf("error: <%s> when querying oldDataDB for versions", err.Error())) } else if len(vrs) == 0 { - return utils.NewCGRError(utils.Migrator, - utils.MandatoryIEMissingCaps, - utils.UndefinedVersion, - "version number is not defined for ActionTriggers model") + return utils.NewCGRError(utils.Migrator, utils.MandatoryIEMissingCaps, + utils.UndefinedVersion, "version number is not defined for Users model") } switch vrs[utils.User] { + case 1: + return m.migrateV1User2AttributeProfile() case current[utils.User]: - if m.sameStorDB { - return + if !m.sameStorDB { + return m.migrateCurrentUser() } - if err := m.migrateCurrentUser(); err != nil { - return err - } - return } return } diff --git a/migrator/user_it_test.go b/migrator/user_it_test.go new file mode 100644 index 000000000..484b2ce1a --- /dev/null +++ b/migrator/user_it_test.go @@ -0,0 +1,251 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package migrator + +import ( + "log" + "path" + "reflect" + "sort" + "testing" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + usrCfgIn *config.CGRConfig + usrCfgOut *config.CGRConfig + usrMigrator *Migrator + usrAction string +) + +var sTestsUsrIT = []func(t *testing.T){ + testUsrITConnect, + testUsrITFlush, + testUsrITMigrateAndMove, +} + +func TestUserMigrateITRedis(t *testing.T) { + inPath := path.Join(*dataDir, "conf", "samples", "tutmysql") + testUsrStart("TestUserMigrateITRedis", inPath, inPath, utils.Migrate, t) +} + +func TestUserMigrateITMongo(t *testing.T) { + inPath := path.Join(*dataDir, "conf", "samples", "tutmongo") + testUsrStart("TestUserMigrateITMongo", inPath, inPath, utils.Migrate, t) +} + +func TestUserITMove(t *testing.T) { + inPath := path.Join(*dataDir, "conf", "samples", "tutmongo") + outPath := path.Join(*dataDir, "conf", "samples", "tutmysql") + testUsrStart("TestUserITMove", inPath, outPath, utils.Move, t) +} + +func TestUserITMigrateMongo2Redis(t *testing.T) { + inPath := path.Join(*dataDir, "conf", "samples", "tutmongo") + outPath := path.Join(*dataDir, "conf", "samples", "tutmysql") + testUsrStart("TestUserITMigrateMongo2Redis", inPath, outPath, utils.Migrate, t) +} + +func TestUserITMoveEncoding(t *testing.T) { + inPath := path.Join(*dataDir, "conf", "samples", "tutmongo") + outPath := path.Join(*dataDir, "conf", "samples", "tutmongojson") + testUsrStart("TestUserITMoveEncoding", inPath, outPath, utils.Move, t) +} + +func TestUserITMoveEncoding2(t *testing.T) { + inPath := path.Join(*dataDir, "conf", "samples", "tutmysql") + outPath := path.Join(*dataDir, "conf", "samples", "tutmysqljson") + testUsrStart("TestUserITMoveEncoding2", inPath, outPath, utils.Move, t) +} + +func testUsrStart(testName, inPath, outPath, action string, t *testing.T) { + var err error + usrAction = action + if usrCfgIn, err = config.NewCGRConfigFromFolder(inPath); err != nil { + t.Fatal(err) + } + if usrCfgOut, err = config.NewCGRConfigFromFolder(outPath); err != nil { + t.Fatal(err) + } + for _, stest := range sTestsUsrIT { + t.Run(testName, stest) + } +} + +func testUsrITConnect(t *testing.T) { + dataDBIn, err := NewMigratorDataDB(usrCfgIn.DataDbCfg().DataDbType, + usrCfgIn.DataDbCfg().DataDbHost, usrCfgIn.DataDbCfg().DataDbPort, + usrCfgIn.DataDbCfg().DataDbName, usrCfgIn.DataDbCfg().DataDbUser, + usrCfgIn.DataDbCfg().DataDbPass, usrCfgIn.GeneralCfg().DBDataEncoding, + config.CgrConfig().CacheCfg(), "") + if err != nil { + log.Fatal(err) + } + dataDBOut, err := NewMigratorDataDB(usrCfgOut.DataDbCfg().DataDbType, + usrCfgOut.DataDbCfg().DataDbHost, usrCfgOut.DataDbCfg().DataDbPort, + usrCfgOut.DataDbCfg().DataDbName, usrCfgOut.DataDbCfg().DataDbUser, + usrCfgOut.DataDbCfg().DataDbPass, usrCfgOut.GeneralCfg().DBDataEncoding, + config.CgrConfig().CacheCfg(), "") + if err != nil { + log.Fatal(err) + } + usrMigrator, err = NewMigrator(dataDBIn, dataDBOut, + nil, nil, false, false, false) + if err != nil { + log.Fatal(err) + } +} + +func testUsrITFlush(t *testing.T) { + usrMigrator.dmOut.DataManager().DataDB().Flush("") + if err := engine.SetDBVersions(usrMigrator.dmOut.DataManager().DataDB()); err != nil { + t.Error("Error ", err.Error()) + } + usrMigrator.dmIN.DataManager().DataDB().Flush("") + if err := engine.SetDBVersions(usrMigrator.dmIN.DataManager().DataDB()); err != nil { + t.Error("Error ", err.Error()) + } +} + +func testUsrITMigrateAndMove(t *testing.T) { + user := &v1UserProfile{ + Tenant: defaultTenant, + UserName: "1001", + Masked: false, + Profile: map[string]string{ + "Account": "1002", + "ReqType": "*prepaid", + "msisdn": "123423534646752", + }, + Weight: 10, + } + attrProf := &engine.AttributeProfile{ + Tenant: defaultTenant, + ID: "1001", + Contexts: []string{utils.META_ANY}, + FilterIDs: make([]string, 0), + ActivationInterval: nil, + Attributes: []*engine.Attribute{ + { + FieldName: "Account", + Initial: utils.META_ANY, + Substitute: config.NewRSRParsersMustCompile("1002", true, utils.INFIELD_SEP), + Append: true, + }, + { + FieldName: "ReqType", + Initial: utils.META_ANY, + Substitute: config.NewRSRParsersMustCompile("*prepaid", true, utils.INFIELD_SEP), + Append: true, + }, + { + FieldName: "msisdn", + Initial: utils.META_ANY, + Substitute: config.NewRSRParsersMustCompile("123423534646752", true, utils.INFIELD_SEP), + Append: true, + }, + }, + Blocker: false, + Weight: 10, + } + attrProf.Compile() + switch usrAction { + case utils.Migrate: + err := usrMigrator.dmIN.setV1User(user) + if err != nil { + t.Error("Error when setting v1 User ", err.Error()) + } + currentVersion := engine.Versions{utils.User: 1} + err = usrMigrator.dmIN.DataManager().DataDB().SetVersions(currentVersion, false) + if err != nil { + t.Error("Error when setting version for User ", err.Error()) + } + //check if version was set correctly + if vrs, err := usrMigrator.dmIN.DataManager().DataDB().GetVersions(""); err != nil { + t.Error(err) + } else if vrs[utils.User] != 1 { + t.Errorf("Unexpected version returned: %d", vrs[utils.User]) + } + //migrate user + err, _ = usrMigrator.Migrate([]string{utils.MetaUser}) + if err != nil { + t.Error("Error when migrating User ", err.Error()) + } + //check if version was updated + if vrs, err := usrMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil { + t.Error(err) + } else if vrs[utils.User] != 2 { + t.Errorf("Unexpected version returned: %d", vrs[utils.User]) + } + //check if user was migrate correctly + result, err := usrMigrator.dmOut.DataManager().DataDB().GetAttributeProfileDrv(user.Tenant, user.UserName) + if err != nil { + t.Fatalf("Error when getting Attributes %v", err.Error()) + } + result.Compile() + sort.Slice(result.Attributes, func(i, j int) bool { + if result.Attributes[i].FieldName == result.Attributes[j].FieldName { + return result.Attributes[i].Initial.(string) < result.Attributes[j].Initial.(string) + } + return result.Attributes[i].FieldName < result.Attributes[j].FieldName + }) // only for test; map returns random keys + if !reflect.DeepEqual(*attrProf, *result) { + t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(attrProf), utils.ToJSON(result)) + } + //check if old account was deleted + if _, err = usrMigrator.dmIN.getV1Alias(); err != utils.ErrNoMoreData { + t.Error("Error should be not found : ", err) + } + + case utils.Move: + /* // No Move tests + if err := usrMigrator.dmIN.DataManager().DataDB().SetUserDrv(user); err != nil { + t.Error(err) + } + currentVersion := engine.CurrentDataDBVersions() + err := usrMigrator.dmOut.DataManager().DataDB().SetVersions(currentVersion, false) + if err != nil { + t.Error("Error when setting version for User ", err.Error()) + } + //migrate accounts + err, _ = usrMigrator.Migrate([]string{utils.MetaUser}) + if err != nil { + t.Error("Error when usrMigratorrating User ", err.Error()) + } + //check if account was migrate correctly + result, err := usrMigrator.dmOut.DataManager().DataDB().GetUserDrv(user.GetId(), false) + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(user, result) { + t.Errorf("Expecting: %+v, received: %+v", user, result) + } + //check if old account was deleted + result, err = usrMigrator.dmIN.DataManager().DataDB().GetUserDrv(user.GetId(), false) + if err != utils.ErrNotFound { + t.Error(err) + } + // */ + } +} diff --git a/migrator/user_test.go b/migrator/user_test.go new file mode 100644 index 000000000..75a8dc8ae --- /dev/null +++ b/migrator/user_test.go @@ -0,0 +1,138 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package migrator + +import ( + "reflect" + "sort" + "testing" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func TestUserProfile2attributeProfile(t *testing.T) { + users := map[int]*v1UserProfile{ + 0: &v1UserProfile{ + Tenant: defaultTenant, + UserName: "1001", + Masked: true, + Profile: map[string]string{}, + Weight: 10, + }, + 1: &v1UserProfile{ + Tenant: defaultTenant, + UserName: "1001", + Masked: true, + Profile: map[string]string{ + "Account": "1002", + "Subject": "call_1001", + }, + Weight: 10, + }, + 2: &v1UserProfile{ + Tenant: defaultTenant, + UserName: "1001", + Masked: false, + Profile: map[string]string{ + "Account": "1002", + "ReqType": "*prepaid", + "msisdn": "123423534646752", + }, + Weight: 10, + }, + } + expected := map[int]*engine.AttributeProfile{ + 0: { + Tenant: defaultTenant, + ID: "1001", + Contexts: []string{utils.META_ANY}, + FilterIDs: make([]string, 0), + ActivationInterval: nil, + Attributes: make([]*engine.Attribute, 0), + Blocker: false, + Weight: 10, + }, + 1: { + Tenant: defaultTenant, + ID: "1001", + Contexts: []string{utils.META_ANY}, + FilterIDs: make([]string, 0), + ActivationInterval: nil, + Attributes: []*engine.Attribute{ + { + FieldName: "Account", + Initial: utils.META_ANY, + Substitute: config.NewRSRParsersMustCompile("1002", true, utils.INFIELD_SEP), + Append: true, + }, + { + FieldName: "Subject", + Initial: utils.META_ANY, + Substitute: config.NewRSRParsersMustCompile("call_1001", true, utils.INFIELD_SEP), + Append: true, + }, + }, + Blocker: false, + Weight: 10, + }, + 2: { + Tenant: defaultTenant, + ID: "1001", + Contexts: []string{utils.META_ANY}, + FilterIDs: make([]string, 0), + ActivationInterval: nil, + Attributes: []*engine.Attribute{ + { + FieldName: "Account", + Initial: utils.META_ANY, + Substitute: config.NewRSRParsersMustCompile("1002", true, utils.INFIELD_SEP), + Append: true, + }, + { + FieldName: "ReqType", + Initial: utils.META_ANY, + Substitute: config.NewRSRParsersMustCompile("*prepaid", true, utils.INFIELD_SEP), + Append: true, + }, + { + FieldName: "msisdn", + Initial: utils.META_ANY, + Substitute: config.NewRSRParsersMustCompile("123423534646752", true, utils.INFIELD_SEP), + Append: true, + }, + }, + Blocker: false, + Weight: 10, + }, + } + for i := range expected { + rply := userProfile2attributeProfile(users[i]) + sort.Slice(rply.Attributes, func(i, j int) bool { + if rply.Attributes[i].FieldName == rply.Attributes[j].FieldName { + return rply.Attributes[i].Initial.(string) < rply.Attributes[j].Initial.(string) + } + return rply.Attributes[i].FieldName < rply.Attributes[j].FieldName + }) // only for test; map returns random keys + if !reflect.DeepEqual(expected[i], rply) { + t.Errorf("For %v expected: %s ,recived: %s ", i, utils.ToJSON(expected[i]), utils.ToJSON(rply)) + } + } +}