This commit is contained in:
TeoV
2018-08-30 06:21:41 -04:00
committed by Dan Christian Bogos
parent 116a978713
commit 739edadea8
11 changed files with 170 additions and 40 deletions

View File

@@ -144,37 +144,52 @@ func main() {
// outDataDB
if *outDataDBType == utils.MetaDataDB {
mgrCfg.MigratorCgrConfig.OutDataDBType = mgrCfg.DataDbType
if dfltCfg.MigratorCgrConfig.OutDataDBType == mgrCfg.MigratorCgrConfig.OutDataDBType {
mgrCfg.MigratorCgrConfig.OutDataDBType = mgrCfg.DataDbType
}
} else {
mgrCfg.MigratorCgrConfig.OutDataDBType = strings.TrimPrefix(*outDataDBType, "*")
}
if *outDataDBHost == utils.MetaDataDB {
mgrCfg.MigratorCgrConfig.OutDataDBHost = mgrCfg.DataDbHost
if dfltCfg.MigratorCgrConfig.OutDataDBHost == mgrCfg.MigratorCgrConfig.OutDataDBHost {
mgrCfg.MigratorCgrConfig.OutDataDBHost = mgrCfg.DataDbHost
}
} else {
mgrCfg.MigratorCgrConfig.OutDataDBHost = *outDataDBHost
}
if *outDataDBPort == utils.MetaDataDB {
mgrCfg.MigratorCgrConfig.OutDataDBPort = mgrCfg.DataDbPort
if dfltCfg.MigratorCgrConfig.OutDataDBPort == mgrCfg.MigratorCgrConfig.OutDataDBPort {
mgrCfg.MigratorCgrConfig.OutDataDBPort = mgrCfg.DataDbPort
}
} else {
mgrCfg.MigratorCgrConfig.OutDataDBPort = *outDataDBPort
}
if *outDataDBName == utils.MetaDataDB {
mgrCfg.MigratorCgrConfig.OutDataDBName = mgrCfg.DataDbName
if dfltCfg.MigratorCgrConfig.OutDataDBName == mgrCfg.MigratorCgrConfig.OutDataDBName {
mgrCfg.MigratorCgrConfig.OutDataDBName = mgrCfg.DataDbName
}
} else {
mgrCfg.MigratorCgrConfig.OutDataDBName = *outDataDBName
}
if *outDataDBUser == utils.MetaDataDB {
mgrCfg.MigratorCgrConfig.OutDataDBUser = mgrCfg.DataDbUser
if dfltCfg.MigratorCgrConfig.OutDataDBUser == mgrCfg.MigratorCgrConfig.OutDataDBUser {
mgrCfg.MigratorCgrConfig.OutDataDBUser = mgrCfg.DataDbUser
}
} else {
mgrCfg.MigratorCgrConfig.OutDataDBUser = *outDataDBUser
}
if *outDataDBPass == utils.MetaDataDB {
mgrCfg.MigratorCgrConfig.OutDataDBPassword = mgrCfg.DataDbPass
if dfltCfg.MigratorCgrConfig.OutDataDBPassword == mgrCfg.MigratorCgrConfig.OutDataDBPassword {
mgrCfg.MigratorCgrConfig.OutDataDBPassword = mgrCfg.DataDbPass
}
} else {
mgrCfg.MigratorCgrConfig.OutDataDBPassword = *outDataDBPass
}
if *outDBDataEncoding == utils.MetaDataDB {
mgrCfg.MigratorCgrConfig.OutDataDBEncoding = mgrCfg.DBDataEncoding
if dfltCfg.MigratorCgrConfig.OutDataDBEncoding == mgrCfg.MigratorCgrConfig.OutDataDBEncoding {
mgrCfg.MigratorCgrConfig.OutDataDBEncoding = mgrCfg.DBDataEncoding
}
} else {
mgrCfg.MigratorCgrConfig.OutDataDBEncoding = *outDBDataEncoding
}
@@ -201,12 +216,16 @@ func main() {
// outStorDB
if *outStorDBType == utils.MetaStorDB {
mgrCfg.MigratorCgrConfig.OutStorDBType = mgrCfg.StorDBType
if dfltCfg.MigratorCgrConfig.OutStorDBType == mgrCfg.MigratorCgrConfig.OutStorDBType {
mgrCfg.MigratorCgrConfig.OutStorDBType = mgrCfg.StorDBType
}
} else {
mgrCfg.MigratorCgrConfig.OutStorDBType = strings.TrimPrefix(*outStorDBType, "*")
}
if *outStorDBHost == utils.MetaStorDB {
mgrCfg.MigratorCgrConfig.OutStorDBHost = mgrCfg.StorDBHost
if dfltCfg.MigratorCgrConfig.OutStorDBHost == mgrCfg.MigratorCgrConfig.OutStorDBHost {
mgrCfg.MigratorCgrConfig.OutStorDBHost = mgrCfg.StorDBHost
}
} else {
mgrCfg.MigratorCgrConfig.OutStorDBHost = *outStorDBHost
}
@@ -216,17 +235,23 @@ func main() {
mgrCfg.MigratorCgrConfig.OutStorDBPort = *outStorDBPort
}
if *outStorDBName == utils.MetaStorDB {
mgrCfg.MigratorCgrConfig.OutStorDBName = mgrCfg.StorDBName
if dfltCfg.MigratorCgrConfig.OutStorDBName == mgrCfg.MigratorCgrConfig.OutStorDBName {
mgrCfg.MigratorCgrConfig.OutStorDBName = mgrCfg.StorDBName
}
} else {
mgrCfg.MigratorCgrConfig.OutStorDBName = *outStorDBName
}
if *outStorDBUser == utils.MetaStorDB {
mgrCfg.MigratorCgrConfig.OutStorDBUser = mgrCfg.StorDBUser
if dfltCfg.MigratorCgrConfig.OutStorDBUser == mgrCfg.MigratorCgrConfig.OutStorDBUser {
mgrCfg.MigratorCgrConfig.OutStorDBUser = mgrCfg.StorDBUser
}
} else {
mgrCfg.MigratorCgrConfig.OutStorDBUser = *outStorDBUser
}
if *outStorDBPass == utils.MetaStorDB {
mgrCfg.MigratorCgrConfig.OutStorDBPassword = mgrCfg.StorDBPass
if dfltCfg.MigratorCgrConfig.OutStorDBPassword == mgrCfg.MigratorCgrConfig.OutStorDBPassword {
mgrCfg.MigratorCgrConfig.OutStorDBPassword = mgrCfg.StorDBPass
}
} else {
mgrCfg.MigratorCgrConfig.OutStorDBPassword = *outStorDBPass
}

View File

@@ -1518,3 +1518,32 @@ func TestCgrMigratorCfgDefault(t *testing.T) {
t.Errorf("received: %+v, expecting: %+v", utils.ToJSON(cgrCfg.MigratorCgrConfig), utils.ToJSON(eMgrCfg))
}
}
func TestCgrMigratorCfg2(t *testing.T) {
JSN_CFG := `
{
"migrator": {
"out_datadb_type": "redis",
"out_datadb_host": "0.0.0.0",
"out_datadb_port": "9999",
"out_datadb_name": "9999",
"out_datadb_user": "cgrates",
"out_datadb_password": "",
"out_datadb_encoding" : "msgpack",
"out_stordb_type": "mysql",
"out_stordb_host": "0.0.0.0",
"out_stordb_port": "9999",
"out_stordb_name": "cgrates",
"out_stordb_user": "cgrates",
"out_stordb_password": "",
},
}`
if cgrCfg, err := NewCGRConfigFromJsonString(JSN_CFG); err != nil {
t.Error(err)
} else if cgrCfg.MigratorCgrConfig.OutDataDBHost != "0.0.0.0" {
t.Errorf("Expected: 0.0.0.0 , received: %+v", cgrCfg.MigratorCgrConfig.OutDataDBHost)
} else if cgrCfg.MigratorCgrConfig.OutDataDBPort != "9999" {
t.Errorf("Expected: 9999, received: %+v", cgrCfg.MigratorCgrConfig.OutDataDBPassword)
}
}

View File

@@ -0,0 +1,28 @@
{
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "redis", // data_db type: <redis|mongo>
"db_port": 9999, // data_db port to reach the database
"db_name": "9999", // data_db database name to connect to
},
"stor_db": { // database used to store offline tariff plans and CDRs
"db_host": "*internal",
},
"migrator": {
"out_datadb_type": "mongo",
"out_datadb_host": "127.0.0.1",
"out_datadb_port": "27017",
"out_datadb_name": "data_backup",
"out_datadb_user": "",
"out_datadb_password":"",
"out_stordb_type": "mongo",
"out_datadb_host": "127.0.0.1",
"out_stordb_port": "27017",
"out_stordb_name": "store-backup",
"out_stordb_user": "",
"out_stordb_password": "",
},
}

View File

@@ -49,7 +49,7 @@ func ConfigureDataStorage(db_type, host, port, name, user, pass, marshaler strin
case utils.MONGO:
d, err = NewMongoStorage(host, port, name, user, pass, utils.DataDB, nil, cacheCfg, loadHistorySize)
dm = NewDataManager(d.(DataDB))
case utils.MetaInternal:
case utils.INTERNAL:
if marshaler == utils.JSON {
d, err = NewMapStorageJson()
} else {
@@ -58,7 +58,7 @@ func ConfigureDataStorage(db_type, host, port, name, user, pass, marshaler strin
dm = NewDataManager(d.(DataDB))
default:
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s or '%s'",
db_type, utils.REDIS, utils.MONGO, utils.MetaInternal))
db_type, utils.REDIS, utils.MONGO, utils.INTERNAL))
}
if err != nil {
return nil, err
@@ -76,11 +76,11 @@ func ConfigureStorStorage(db_type, host, port, name, user, pass, marshaler strin
d, err = NewPostgresStorage(host, port, name, user, pass, maxConn, maxIdleConn, connMaxLifetime)
case utils.MYSQL:
d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn, connMaxLifetime)
case utils.MetaInternal:
case utils.INTERNAL:
d, err = NewMapStorage()
default:
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s, %s]",
db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.MetaInternal))
db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.INTERNAL))
}
if err != nil {
return nil, err
@@ -98,11 +98,11 @@ func ConfigureLoadStorage(db_type, host, port, name, user, pass, marshaler strin
d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn, connMaxLifetime)
case utils.MONGO:
d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, 1)
case utils.MetaInternal:
case utils.INTERNAL:
d, err = NewMapStorage()
default:
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s, %s]",
db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.MetaInternal))
db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.INTERNAL))
}
if err != nil {
return nil, err
@@ -120,11 +120,11 @@ func ConfigureCdrStorage(db_type, host, port, name, user, pass string,
d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn, connMaxLifetime)
case utils.MONGO:
d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, 1)
case utils.MetaInternal:
case utils.INTERNAL:
d, err = NewMapStorage()
default:
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s, %s]",
db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.MetaInternal))
db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.INTERNAL))
}
if err != nil {
return nil, err
@@ -142,11 +142,11 @@ func ConfigureStorDB(db_type, host, port, name, user, pass string,
d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn, connMaxLifetime)
case utils.MONGO:
d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, 1)
case utils.MetaInternal:
case utils.INTERNAL:
d, err = NewMapStorage()
default:
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s, %s]",
db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.MetaInternal))
db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.INTERNAL))
}
if err != nil {
return nil, err

View File

@@ -77,6 +77,9 @@ func (m *Migrator) migrateV1Accounts() (err error) {
if err = m.dmOut.DataManager().DataDB().SetAccount(acnt); err != nil {
return err
}
if err = m.dmIN.remV1Account(v1Acnt.Id); err != nil {
return err
}
m.stats[utils.Accounts] += 1
}
}
@@ -110,6 +113,9 @@ func (m *Migrator) migrateV2Accounts() (err error) {
if err = m.dmOut.DataManager().DataDB().SetAccount(acnt); err != nil {
return err
}
if err = m.dmIN.remV2Account(v2Acnt.ID); err != nil {
return err
}
m.stats[utils.Accounts] += 1
}
}

View File

@@ -50,7 +50,7 @@ var sTestsAccIT = []func(t *testing.T){
testAccITMigrateAndMove,
}
func TestAccountITRedis(t *testing.T) {
func TestAccountMigrateITRedis(t *testing.T) {
var err error
accPathIn = path.Join(*dataDir, "conf", "samples", "tutmysql")
accCfgIn, err = config.NewCGRConfigFromFolder(accPathIn)
@@ -67,7 +67,7 @@ func TestAccountITRedis(t *testing.T) {
}
}
func TestAccountITMongo(t *testing.T) {
func TestAccountMigrateITMongo(t *testing.T) {
var err error
accPathIn = path.Join(*dataDir, "conf", "samples", "tutmongo")
accCfgIn, err = config.NewCGRConfigFromFolder(accPathIn)
@@ -166,6 +166,10 @@ func testAccITFlush(t *testing.T) {
if err := engine.SetDBVersions(accMigrator.dmOut.DataManager().DataDB()); err != nil {
t.Error("Error ", err.Error())
}
accMigrator.dmIN.DataManager().DataDB().Flush("")
if err := engine.SetDBVersions(accMigrator.dmIN.DataManager().DataDB()); err != nil {
t.Error("Error ", err.Error())
}
}
func testAccITMigrateAndMove(t *testing.T) {
@@ -244,10 +248,12 @@ func testAccITMigrateAndMove(t *testing.T) {
}
switch accAction {
case utils.Migrate:
// set v1Account
err := accMigrator.dmIN.setV1Account(v1Acc)
if err != nil {
t.Error("Error when setting v1 Accounts ", err.Error())
}
//set version for account : 1
currentVersion := engine.Versions{
utils.StatS: 2,
utils.Thresholds: 2,
@@ -260,46 +266,52 @@ func testAccITMigrateAndMove(t *testing.T) {
if err != nil {
t.Error("Error when setting version for Accounts ", err.Error())
}
//check if version was set correctly
if vrs, err := accMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil {
t.Error(err)
} else if vrs[utils.Accounts] != 1 {
t.Errorf("Unexpected version returned: %d", vrs[utils.Accounts])
}
//migrate account
err, _ = accMigrator.Migrate([]string{utils.MetaAccounts})
if err != nil {
t.Error("Error when migrating Accounts ", err.Error())
}
//check if version was updated
if vrs, err := accMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil {
t.Error(err)
} else if vrs[utils.Accounts] != 3 {
t.Errorf("Unexpected version returned: %d", vrs[utils.Accounts])
}
//check if account was migrate correctly
result, err := accMigrator.dmOut.DataManager().DataDB().GetAccount(testAccount.ID)
if err != nil {
t.Error("Error when getting Accounts ", err.Error())
}
if !reflect.DeepEqual(testAccount.BalanceMap["*voice"][0], result.BalanceMap["*voice"][0]) {
t.Errorf("Expecting: %+v, received: %+v", testAccount.BalanceMap["*voice"][0], result.BalanceMap["*voice"][0])
} else if !reflect.DeepEqual(testAccount, result) {
if !reflect.DeepEqual(testAccount, result) {
t.Errorf("Expecting: %+v, received: %+v", testAccount, result)
}
case utils.Move:
if err := accMigrator.dmIN.DataManager().DataDB().SetAccount(testAccount); err != nil {
log.Print("GOT ERR DMIN", err)
//check if old account was deleted
if _, err = accMigrator.dmIN.getv1Account(); err != utils.ErrNoMoreData {
t.Error("Error should be not found : ", err)
}
case utils.Move:
//set an account in dmIN
if err := accMigrator.dmIN.DataManager().DataDB().SetAccount(testAccount); err != nil {
t.Error(err)
}
//set versions for account
currentVersion := engine.CurrentDataDBVersions()
err := accMigrator.dmOut.DataManager().DataDB().SetVersions(currentVersion, false)
if err != nil {
t.Error("Error when setting version for Accounts ", err.Error())
}
//migrate accounts
err, _ = accMigrator.Migrate([]string{utils.MetaAccounts})
if err != nil {
t.Error("Error when accMigratorrating Accounts ", err.Error())
}
//check if account was migrate correctly
result, err := accMigrator.dmOut.DataManager().DataDB().GetAccount(testAccount.ID)
if err != nil {
t.Error(err)
@@ -307,6 +319,7 @@ func testAccITMigrateAndMove(t *testing.T) {
if !reflect.DeepEqual(testAccount, result) {
t.Errorf("Expecting: %+v, received: %+v", testAccount, result)
}
//check if old account was deleted
result, err = accMigrator.dmIN.DataManager().DataDB().GetAccount(testAccount.ID)
if err != utils.ErrNotFound {
t.Error(err)

View File

@@ -25,6 +25,7 @@ import (
type MigratorDataDB interface {
getv1Account() (v1Acnt *v1Account, err error)
setV1Account(x *v1Account) (err error)
remV1Account(id string) (err error)
getV1ActionPlans() (v1aps *v1ActionPlans, err error)
setV1ActionPlans(x *v1ActionPlans) (err error)
getV1Actions() (v1acs *v1Actions, err error)
@@ -39,6 +40,7 @@ type MigratorDataDB interface {
setV2ActionTrigger(x *v2ActionTrigger) (err error)
getv2Account() (v2Acnt *v2Account, err error)
setV2Account(x *v2Account) (err error)
remV2Account(id string) (err error)
getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error)
setV1AttributeProfile(x *v1AttributeProfile) (err error)
getV2ThresholdProfile() (v2T *v2Threshold, err error)

View File

@@ -42,9 +42,11 @@ func NewMigratorDataDB(db_type, host, port, name, user, pass, marshaler string,
case utils.MONGO:
d = newMongoMigrator(dm)
db = d.(MigratorDataDB)
case utils.INTERNAL:
//do nothing for the moment
default:
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'",
db_type, utils.REDIS, utils.MONGO))
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s or '%s'",
db_type, utils.REDIS, utils.MONGO, utils.INTERNAL))
}
return d, nil
}
@@ -67,9 +69,11 @@ func NewMigratorStorDB(db_type, host, port, name, user, pass string,
case utils.POSTGRES:
d = newMigratorSQL(storDb)
db = d.(MigratorStorDB)
case utils.INTERNAL:
//for the momen do nothing
default:
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'",
db_type, utils.MONGO, utils.MYSQL))
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s, %s]",
db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.INTERNAL))
}
return d, nil
}

View File

@@ -84,6 +84,11 @@ func (v1ms *mongoMigrator) setV1Account(x *v1Account) (err error) {
return
}
//rem
func (v1ms *mongoMigrator) remV1Account(id string) (err error) {
return v1ms.mgoDB.DB().C(v1AccountDBPrefix).Remove(bson.M{"id": id})
}
//V2
//get
func (v1ms *mongoMigrator) getv2Account() (v2Acnt *v2Account, err error) {
@@ -108,6 +113,11 @@ func (v1ms *mongoMigrator) setV2Account(x *v2Account) (err error) {
return
}
//rem
func (v1ms *mongoMigrator) remV2Account(id string) (err error) {
return v1ms.mgoDB.DB().C(v2AccountsCol).Remove(bson.M{"id": id})
}
//Action methods
//get
func (v1ms *mongoMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {

View File

@@ -50,7 +50,7 @@ func (v1rs *redisMigrator) getv1Account() (v1Acnt *v1Account, err error) {
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
return nil, utils.ErrNotFound
return nil, utils.ErrNoMoreData
}
v1rs.qryIdx = utils.IntPointer(0)
}
@@ -84,6 +84,12 @@ func (v1rs *redisMigrator) setV1Account(x *v1Account) (err error) {
return
}
//rem
func (v1rs *redisMigrator) remV1Account(id string) (err error) {
key := v1AccountDBPrefix + id
return v1rs.rds.Cmd("DEL", key).Err
}
//V2
//get
func (v1rs *redisMigrator) getv2Account() (v2Acnt *v2Account, err error) {
@@ -92,7 +98,7 @@ func (v1rs *redisMigrator) getv2Account() (v2Acnt *v2Account, err error) {
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
return nil, utils.ErrNotFound
return nil, utils.ErrNoMoreData
}
v1rs.qryIdx = utils.IntPointer(0)
}
@@ -126,6 +132,12 @@ func (v1rs *redisMigrator) setV2Account(x *v2Account) (err error) {
return
}
//rem
func (v1rs *redisMigrator) remV2Account(id string) (err error) {
key := utils.ACCOUNT_PREFIX + id
return v1rs.rds.Cmd("DEL", key).Err
}
//ActionPlans methods
//get
func (v1rs *redisMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {

View File

@@ -96,6 +96,7 @@ const (
POSTGRES = "postgres"
MYSQL = "mysql"
MONGO = "mongo"
INTERNAL = "internal"
DataManager = "DataManager"
REDIS = "redis"
MAPSTOR = "mapstor"