mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Adding database handling for accounts migration
This commit is contained in:
@@ -224,7 +224,7 @@ func main() {
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := migrator.NewMigrator(ratingDb, accountDb, *datadb_type, storDB, *stor_db_type).Migrate(*migrate); err != nil {
|
||||
if err := migrator.NewMigrator(ratingDb, accountDb, *datadb_type, *dbdata_encoding, storDB, *stor_db_type).Migrate(*migrate); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.Print("Done migrating!")
|
||||
|
||||
@@ -332,9 +332,9 @@ func (ms *MongoStorage) Marshaler() Marshaler {
|
||||
return ms.ms
|
||||
}
|
||||
|
||||
// CloneSession returns a clone of the existing session so we can perform queries from outside of engine package
|
||||
func (ms *MongoStorage) CloneSession() *mgo.Session {
|
||||
return ms.session.Copy()
|
||||
// DB returnes a database object with cloned session inside
|
||||
func (ms *MongoStorage) DB() *mgo.Database {
|
||||
return ms.session.Copy().DB(ms.db)
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SelectDatabase(dbName string) (err error) {
|
||||
|
||||
@@ -240,7 +240,7 @@ func (sv *StructVersion) CompareAndMigrate(dbVer *StructVersion) []*MigrationInf
|
||||
}
|
||||
|
||||
func CurrentStorDBVersions() Versions {
|
||||
return Versions{utils.COST_DETAILS: 2}
|
||||
return Versions{utils.COST_DETAILS: 2, utils.Accounts: 2}
|
||||
}
|
||||
|
||||
// Versions will keep trac of various item versions
|
||||
|
||||
@@ -25,12 +25,72 @@ import (
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
const (
|
||||
v1AccountDBPrefix = "ubl_"
|
||||
v1AccountTBL = "userbalances"
|
||||
)
|
||||
|
||||
func (m *Migrator) migrateAccounts() (err error) {
|
||||
var acntV1Keys []string
|
||||
acntV1Keys, err = m.dataDB.GetKeysForPrefix(v1AccountDBPrefix)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, acntV1Key := range acntV1Keys {
|
||||
v1Acnt, err := m.getV1AccountFromDB(acntV1Key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if acnt := v1Acnt.AsAccount(); acnt != nil {
|
||||
if err = m.dataDB.SetAccount(acnt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
|
||||
if err = m.dataDB.SetVersions(vrs); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error()))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Migrator) getV1AccountFromDB(key string) (*v1Account, error) {
|
||||
switch m.dataDBType {
|
||||
case utils.REDIS:
|
||||
dataDB := m.dataDB.(*engine.RedisStorage)
|
||||
if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
v1Acnt := &v1Account{Id: key}
|
||||
if err := m.mrshlr.Unmarshal(strVal, v1Acnt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1Acnt, nil
|
||||
}
|
||||
case utils.MONGO:
|
||||
dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
mgoDB := dataDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
v1Acnt := new(v1Account)
|
||||
if err := mgoDB.C(v1AccountTBL).Find(bson.M{"id": key}).One(v1Acnt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1Acnt, nil
|
||||
default:
|
||||
return nil, utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
utils.UnsupportedDB,
|
||||
fmt.Sprintf("error: unsupported: <%s> for getV1AccountFromDB method", m.dataDBType))
|
||||
}
|
||||
}
|
||||
|
||||
type v1Account struct {
|
||||
Id string
|
||||
BalanceMap map[string]v1BalanceChain
|
||||
@@ -73,9 +133,9 @@ func (b *v1Balance) IsDefault() bool {
|
||||
b.Disabled == false
|
||||
}
|
||||
|
||||
func (v1Acc v1Account) AsAccount() (ac engine.Account) {
|
||||
func (v1Acc v1Account) AsAccount() (ac *engine.Account) {
|
||||
// transfer data into new structure
|
||||
ac = engine.Account{
|
||||
ac = &engine.Account{
|
||||
ID: v1Acc.Id,
|
||||
BalanceMap: make(map[string]engine.Balances, len(v1Acc.BalanceMap)),
|
||||
UnitCounters: make(engine.UnitCounters, len(v1Acc.UnitCounters)),
|
||||
|
||||
@@ -24,8 +24,16 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewMigrator(tpDB engine.RatingStorage, dataDB engine.AccountingStorage, dataDBType string, storDB engine.Storage, storDBType string) *Migrator {
|
||||
return &Migrator{tpDB: tpDB, dataDB: dataDB, dataDBType: dataDBType, storDB: storDB, storDBType: storDBType}
|
||||
func NewMigrator(tpDB engine.RatingStorage, dataDB engine.AccountingStorage, dataDBType, dataDBEncoding string,
|
||||
storDB engine.Storage, storDBType string) *Migrator {
|
||||
var mrshlr engine.Marshaler
|
||||
if dataDBEncoding == utils.MSGPACK {
|
||||
mrshlr = engine.NewCodecMsgpackMarshaler()
|
||||
} else if dataDBEncoding == utils.JSON {
|
||||
mrshlr = new(engine.JSONMarshaler)
|
||||
}
|
||||
return &Migrator{tpDB: tpDB, dataDB: dataDB, dataDBType: dataDBType,
|
||||
storDB: storDB, storDBType: storDBType, mrshlr: mrshlr}
|
||||
}
|
||||
|
||||
type Migrator struct {
|
||||
@@ -34,6 +42,7 @@ type Migrator struct {
|
||||
dataDBType string
|
||||
storDB engine.Storage
|
||||
storDBType string
|
||||
mrshlr engine.Marshaler
|
||||
}
|
||||
|
||||
// Migrate implements the tasks to migrate, used as a dispatcher to the individual methods
|
||||
|
||||
@@ -368,4 +368,5 @@ const (
|
||||
CDRPoster = "cdr"
|
||||
MetaFileCSV = "*file_csv"
|
||||
MetaFileFWV = "*file_fwv"
|
||||
Accounts = "Accounts"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user