From f69086e612633f4311b70b9034cbbd5f14f45cd3 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 26 Feb 2017 18:30:35 +0100 Subject: [PATCH] Adding database handling for accounts migration --- cmd/cgr-loader/cgr-loader.go | 2 +- engine/storage_mongo_datadb.go | 6 ++-- engine/version.go | 2 +- migrator/accounts.go | 64 ++++++++++++++++++++++++++++++++-- migrator/migrator.go | 13 +++++-- utils/consts.go | 1 + 6 files changed, 79 insertions(+), 9 deletions(-) diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 8c978cd26..910a909c3 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -224,7 +224,7 @@ func main() { if err != nil { log.Fatal(err) } - if err := migrator.NewMigrator(ratingDb, accountDb, *datadb_type, storDB, *stor_db_type).Migrate(*migrate); err != nil { + if err := migrator.NewMigrator(ratingDb, accountDb, *datadb_type, *dbdata_encoding, storDB, *stor_db_type).Migrate(*migrate); err != nil { log.Fatal(err) } log.Print("Done migrating!") diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 7e6d6fa91..b7bee7a67 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -332,9 +332,9 @@ func (ms *MongoStorage) Marshaler() Marshaler { return ms.ms } -// CloneSession returns a clone of the existing session so we can perform queries from outside of engine package -func (ms *MongoStorage) CloneSession() *mgo.Session { - return ms.session.Copy() +// DB returnes a database object with cloned session inside +func (ms *MongoStorage) DB() *mgo.Database { + return ms.session.Copy().DB(ms.db) } func (ms *MongoStorage) SelectDatabase(dbName string) (err error) { diff --git a/engine/version.go b/engine/version.go index 2a2f145ae..5d938f018 100644 --- a/engine/version.go +++ b/engine/version.go @@ -240,7 +240,7 @@ func (sv *StructVersion) CompareAndMigrate(dbVer *StructVersion) []*MigrationInf } func CurrentStorDBVersions() Versions { - return Versions{utils.COST_DETAILS: 2} + return Versions{utils.COST_DETAILS: 2, utils.Accounts: 2} } // Versions will keep trac of various item versions diff --git a/migrator/accounts.go b/migrator/accounts.go index 9bcb46d19..63772a936 100644 --- a/migrator/accounts.go +++ b/migrator/accounts.go @@ -25,12 +25,72 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "gopkg.in/mgo.v2/bson" +) + +const ( + v1AccountDBPrefix = "ubl_" + v1AccountTBL = "userbalances" ) func (m *Migrator) migrateAccounts() (err error) { + var acntV1Keys []string + acntV1Keys, err = m.dataDB.GetKeysForPrefix(v1AccountDBPrefix) + if err != nil { + return + } + for _, acntV1Key := range acntV1Keys { + v1Acnt, err := m.getV1AccountFromDB(acntV1Key) + if err != nil { + return err + } + if acnt := v1Acnt.AsAccount(); acnt != nil { + if err = m.dataDB.SetAccount(acnt); err != nil { + return err + } + } + } + // All done, update version wtih current one + vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]} + if err = m.dataDB.SetVersions(vrs); err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error())) + } return } +func (m *Migrator) getV1AccountFromDB(key string) (*v1Account, error) { + switch m.dataDBType { + case utils.REDIS: + dataDB := m.dataDB.(*engine.RedisStorage) + if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil { + return nil, err + } else { + v1Acnt := &v1Account{Id: key} + if err := m.mrshlr.Unmarshal(strVal, v1Acnt); err != nil { + return nil, err + } + return v1Acnt, nil + } + case utils.MONGO: + dataDB := m.dataDB.(*engine.MongoStorage) + mgoDB := dataDB.DB() + defer mgoDB.Session.Close() + v1Acnt := new(v1Account) + if err := mgoDB.C(v1AccountTBL).Find(bson.M{"id": key}).One(v1Acnt); err != nil { + return nil, err + } + return v1Acnt, nil + default: + return nil, utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + utils.UnsupportedDB, + fmt.Sprintf("error: unsupported: <%s> for getV1AccountFromDB method", m.dataDBType)) + } +} + type v1Account struct { Id string BalanceMap map[string]v1BalanceChain @@ -73,9 +133,9 @@ func (b *v1Balance) IsDefault() bool { b.Disabled == false } -func (v1Acc v1Account) AsAccount() (ac engine.Account) { +func (v1Acc v1Account) AsAccount() (ac *engine.Account) { // transfer data into new structure - ac = engine.Account{ + ac = &engine.Account{ ID: v1Acc.Id, BalanceMap: make(map[string]engine.Balances, len(v1Acc.BalanceMap)), UnitCounters: make(engine.UnitCounters, len(v1Acc.UnitCounters)), diff --git a/migrator/migrator.go b/migrator/migrator.go index 2b9cc4bfe..489a79890 100644 --- a/migrator/migrator.go +++ b/migrator/migrator.go @@ -24,8 +24,16 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewMigrator(tpDB engine.RatingStorage, dataDB engine.AccountingStorage, dataDBType string, storDB engine.Storage, storDBType string) *Migrator { - return &Migrator{tpDB: tpDB, dataDB: dataDB, dataDBType: dataDBType, storDB: storDB, storDBType: storDBType} +func NewMigrator(tpDB engine.RatingStorage, dataDB engine.AccountingStorage, dataDBType, dataDBEncoding string, + storDB engine.Storage, storDBType string) *Migrator { + var mrshlr engine.Marshaler + if dataDBEncoding == utils.MSGPACK { + mrshlr = engine.NewCodecMsgpackMarshaler() + } else if dataDBEncoding == utils.JSON { + mrshlr = new(engine.JSONMarshaler) + } + return &Migrator{tpDB: tpDB, dataDB: dataDB, dataDBType: dataDBType, + storDB: storDB, storDBType: storDBType, mrshlr: mrshlr} } type Migrator struct { @@ -34,6 +42,7 @@ type Migrator struct { dataDBType string storDB engine.Storage storDBType string + mrshlr engine.Marshaler } // Migrate implements the tasks to migrate, used as a dispatcher to the individual methods diff --git a/utils/consts.go b/utils/consts.go index 037110ce9..8fdf7545c 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -368,4 +368,5 @@ const ( CDRPoster = "cdr" MetaFileCSV = "*file_csv" MetaFileFWV = "*file_fwv" + Accounts = "Accounts" )