diff --git a/cmd/cgr-migrator/cgr-migrator.go b/cmd/cgr-migrator/cgr-migrator.go index 6b41a9046..5de8c6936 100755 --- a/cmd/cgr-migrator/cgr-migrator.go +++ b/cmd/cgr-migrator/cgr-migrator.go @@ -144,37 +144,52 @@ func main() { // outDataDB if *outDataDBType == utils.MetaDataDB { - mgrCfg.MigratorCgrConfig.OutDataDBType = mgrCfg.DataDbType + if dfltCfg.MigratorCgrConfig.OutDataDBType == mgrCfg.MigratorCgrConfig.OutDataDBType { + mgrCfg.MigratorCgrConfig.OutDataDBType = mgrCfg.DataDbType + } } else { mgrCfg.MigratorCgrConfig.OutDataDBType = strings.TrimPrefix(*outDataDBType, "*") } + if *outDataDBHost == utils.MetaDataDB { - mgrCfg.MigratorCgrConfig.OutDataDBHost = mgrCfg.DataDbHost + if dfltCfg.MigratorCgrConfig.OutDataDBHost == mgrCfg.MigratorCgrConfig.OutDataDBHost { + mgrCfg.MigratorCgrConfig.OutDataDBHost = mgrCfg.DataDbHost + } } else { mgrCfg.MigratorCgrConfig.OutDataDBHost = *outDataDBHost } if *outDataDBPort == utils.MetaDataDB { - mgrCfg.MigratorCgrConfig.OutDataDBPort = mgrCfg.DataDbPort + if dfltCfg.MigratorCgrConfig.OutDataDBPort == mgrCfg.MigratorCgrConfig.OutDataDBPort { + mgrCfg.MigratorCgrConfig.OutDataDBPort = mgrCfg.DataDbPort + } } else { mgrCfg.MigratorCgrConfig.OutDataDBPort = *outDataDBPort } if *outDataDBName == utils.MetaDataDB { - mgrCfg.MigratorCgrConfig.OutDataDBName = mgrCfg.DataDbName + if dfltCfg.MigratorCgrConfig.OutDataDBName == mgrCfg.MigratorCgrConfig.OutDataDBName { + mgrCfg.MigratorCgrConfig.OutDataDBName = mgrCfg.DataDbName + } } else { mgrCfg.MigratorCgrConfig.OutDataDBName = *outDataDBName } if *outDataDBUser == utils.MetaDataDB { - mgrCfg.MigratorCgrConfig.OutDataDBUser = mgrCfg.DataDbUser + if dfltCfg.MigratorCgrConfig.OutDataDBUser == mgrCfg.MigratorCgrConfig.OutDataDBUser { + mgrCfg.MigratorCgrConfig.OutDataDBUser = mgrCfg.DataDbUser + } } else { mgrCfg.MigratorCgrConfig.OutDataDBUser = *outDataDBUser } if *outDataDBPass == utils.MetaDataDB { - mgrCfg.MigratorCgrConfig.OutDataDBPassword = mgrCfg.DataDbPass + if dfltCfg.MigratorCgrConfig.OutDataDBPassword == mgrCfg.MigratorCgrConfig.OutDataDBPassword { + mgrCfg.MigratorCgrConfig.OutDataDBPassword = mgrCfg.DataDbPass + } } else { mgrCfg.MigratorCgrConfig.OutDataDBPassword = *outDataDBPass } if *outDBDataEncoding == utils.MetaDataDB { - mgrCfg.MigratorCgrConfig.OutDataDBEncoding = mgrCfg.DBDataEncoding + if dfltCfg.MigratorCgrConfig.OutDataDBEncoding == mgrCfg.MigratorCgrConfig.OutDataDBEncoding { + mgrCfg.MigratorCgrConfig.OutDataDBEncoding = mgrCfg.DBDataEncoding + } } else { mgrCfg.MigratorCgrConfig.OutDataDBEncoding = *outDBDataEncoding } @@ -201,12 +216,16 @@ func main() { // outStorDB if *outStorDBType == utils.MetaStorDB { - mgrCfg.MigratorCgrConfig.OutStorDBType = mgrCfg.StorDBType + if dfltCfg.MigratorCgrConfig.OutStorDBType == mgrCfg.MigratorCgrConfig.OutStorDBType { + mgrCfg.MigratorCgrConfig.OutStorDBType = mgrCfg.StorDBType + } } else { mgrCfg.MigratorCgrConfig.OutStorDBType = strings.TrimPrefix(*outStorDBType, "*") } if *outStorDBHost == utils.MetaStorDB { - mgrCfg.MigratorCgrConfig.OutStorDBHost = mgrCfg.StorDBHost + if dfltCfg.MigratorCgrConfig.OutStorDBHost == mgrCfg.MigratorCgrConfig.OutStorDBHost { + mgrCfg.MigratorCgrConfig.OutStorDBHost = mgrCfg.StorDBHost + } } else { mgrCfg.MigratorCgrConfig.OutStorDBHost = *outStorDBHost } @@ -216,17 +235,23 @@ func main() { mgrCfg.MigratorCgrConfig.OutStorDBPort = *outStorDBPort } if *outStorDBName == utils.MetaStorDB { - mgrCfg.MigratorCgrConfig.OutStorDBName = mgrCfg.StorDBName + if dfltCfg.MigratorCgrConfig.OutStorDBName == mgrCfg.MigratorCgrConfig.OutStorDBName { + mgrCfg.MigratorCgrConfig.OutStorDBName = mgrCfg.StorDBName + } } else { mgrCfg.MigratorCgrConfig.OutStorDBName = *outStorDBName } if *outStorDBUser == utils.MetaStorDB { - mgrCfg.MigratorCgrConfig.OutStorDBUser = mgrCfg.StorDBUser + if dfltCfg.MigratorCgrConfig.OutStorDBUser == mgrCfg.MigratorCgrConfig.OutStorDBUser { + mgrCfg.MigratorCgrConfig.OutStorDBUser = mgrCfg.StorDBUser + } } else { mgrCfg.MigratorCgrConfig.OutStorDBUser = *outStorDBUser } if *outStorDBPass == utils.MetaStorDB { - mgrCfg.MigratorCgrConfig.OutStorDBPassword = mgrCfg.StorDBPass + if dfltCfg.MigratorCgrConfig.OutStorDBPassword == mgrCfg.MigratorCgrConfig.OutStorDBPassword { + mgrCfg.MigratorCgrConfig.OutStorDBPassword = mgrCfg.StorDBPass + } } else { mgrCfg.MigratorCgrConfig.OutStorDBPassword = *outStorDBPass } diff --git a/config/config_test.go b/config/config_test.go index 9e306d282..3f07e2ba7 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -1518,3 +1518,32 @@ func TestCgrMigratorCfgDefault(t *testing.T) { t.Errorf("received: %+v, expecting: %+v", utils.ToJSON(cgrCfg.MigratorCgrConfig), utils.ToJSON(eMgrCfg)) } } + +func TestCgrMigratorCfg2(t *testing.T) { + JSN_CFG := ` +{ +"migrator": { + "out_datadb_type": "redis", + "out_datadb_host": "0.0.0.0", + "out_datadb_port": "9999", + "out_datadb_name": "9999", + "out_datadb_user": "cgrates", + "out_datadb_password": "", + "out_datadb_encoding" : "msgpack", + "out_stordb_type": "mysql", + "out_stordb_host": "0.0.0.0", + "out_stordb_port": "9999", + "out_stordb_name": "cgrates", + "out_stordb_user": "cgrates", + "out_stordb_password": "", +}, +}` + + if cgrCfg, err := NewCGRConfigFromJsonString(JSN_CFG); err != nil { + t.Error(err) + } else if cgrCfg.MigratorCgrConfig.OutDataDBHost != "0.0.0.0" { + t.Errorf("Expected: 0.0.0.0 , received: %+v", cgrCfg.MigratorCgrConfig.OutDataDBHost) + } else if cgrCfg.MigratorCgrConfig.OutDataDBPort != "9999" { + t.Errorf("Expected: 9999, received: %+v", cgrCfg.MigratorCgrConfig.OutDataDBPassword) + } +} diff --git a/data/conf/samples/migratortest/cgrates.json b/data/conf/samples/migratortest/cgrates.json new file mode 100755 index 000000000..19d47df05 --- /dev/null +++ b/data/conf/samples/migratortest/cgrates.json @@ -0,0 +1,28 @@ +{ + +"data_db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_type": "redis", // data_db type: + "db_port": 9999, // data_db port to reach the database + "db_name": "9999", // data_db database name to connect to +}, + +"stor_db": { // database used to store offline tariff plans and CDRs +"db_host": "*internal", +}, + +"migrator": { +"out_datadb_type": "mongo", +"out_datadb_host": "127.0.0.1", +"out_datadb_port": "27017", +"out_datadb_name": "data_backup", +"out_datadb_user": "", +"out_datadb_password":"", +"out_stordb_type": "mongo", +"out_datadb_host": "127.0.0.1", +"out_stordb_port": "27017", +"out_stordb_name": "store-backup", +"out_stordb_user": "", +"out_stordb_password": "", +}, + +} \ No newline at end of file diff --git a/engine/storage_utils.go b/engine/storage_utils.go index 0cf173887..d24cd4b6c 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -49,7 +49,7 @@ func ConfigureDataStorage(db_type, host, port, name, user, pass, marshaler strin case utils.MONGO: d, err = NewMongoStorage(host, port, name, user, pass, utils.DataDB, nil, cacheCfg, loadHistorySize) dm = NewDataManager(d.(DataDB)) - case utils.MetaInternal: + case utils.INTERNAL: if marshaler == utils.JSON { d, err = NewMapStorageJson() } else { @@ -58,7 +58,7 @@ func ConfigureDataStorage(db_type, host, port, name, user, pass, marshaler strin dm = NewDataManager(d.(DataDB)) default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s or '%s'", - db_type, utils.REDIS, utils.MONGO, utils.MetaInternal)) + db_type, utils.REDIS, utils.MONGO, utils.INTERNAL)) } if err != nil { return nil, err @@ -76,11 +76,11 @@ func ConfigureStorStorage(db_type, host, port, name, user, pass, marshaler strin d, err = NewPostgresStorage(host, port, name, user, pass, maxConn, maxIdleConn, connMaxLifetime) case utils.MYSQL: d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn, connMaxLifetime) - case utils.MetaInternal: + case utils.INTERNAL: d, err = NewMapStorage() default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s, %s]", - db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.MetaInternal)) + db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.INTERNAL)) } if err != nil { return nil, err @@ -98,11 +98,11 @@ func ConfigureLoadStorage(db_type, host, port, name, user, pass, marshaler strin d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn, connMaxLifetime) case utils.MONGO: d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, 1) - case utils.MetaInternal: + case utils.INTERNAL: d, err = NewMapStorage() default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s, %s]", - db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.MetaInternal)) + db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.INTERNAL)) } if err != nil { return nil, err @@ -120,11 +120,11 @@ func ConfigureCdrStorage(db_type, host, port, name, user, pass string, d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn, connMaxLifetime) case utils.MONGO: d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, 1) - case utils.MetaInternal: + case utils.INTERNAL: d, err = NewMapStorage() default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s, %s]", - db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.MetaInternal)) + db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.INTERNAL)) } if err != nil { return nil, err @@ -142,11 +142,11 @@ func ConfigureStorDB(db_type, host, port, name, user, pass string, d, err = NewMySQLStorage(host, port, name, user, pass, maxConn, maxIdleConn, connMaxLifetime) case utils.MONGO: d, err = NewMongoStorage(host, port, name, user, pass, utils.StorDB, cdrsIndexes, nil, 1) - case utils.MetaInternal: + case utils.INTERNAL: d, err = NewMapStorage() default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s, %s]", - db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.MetaInternal)) + db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.INTERNAL)) } if err != nil { return nil, err diff --git a/migrator/accounts.go b/migrator/accounts.go index 1e4d7b5db..a56bce916 100755 --- a/migrator/accounts.go +++ b/migrator/accounts.go @@ -77,6 +77,9 @@ func (m *Migrator) migrateV1Accounts() (err error) { if err = m.dmOut.DataManager().DataDB().SetAccount(acnt); err != nil { return err } + if err = m.dmIN.remV1Account(v1Acnt.Id); err != nil { + return err + } m.stats[utils.Accounts] += 1 } } @@ -110,6 +113,9 @@ func (m *Migrator) migrateV2Accounts() (err error) { if err = m.dmOut.DataManager().DataDB().SetAccount(acnt); err != nil { return err } + if err = m.dmIN.remV2Account(v2Acnt.ID); err != nil { + return err + } m.stats[utils.Accounts] += 1 } } diff --git a/migrator/accounts_it_test.go b/migrator/accounts_it_test.go index 5977fc456..cd8348f88 100755 --- a/migrator/accounts_it_test.go +++ b/migrator/accounts_it_test.go @@ -50,7 +50,7 @@ var sTestsAccIT = []func(t *testing.T){ testAccITMigrateAndMove, } -func TestAccountITRedis(t *testing.T) { +func TestAccountMigrateITRedis(t *testing.T) { var err error accPathIn = path.Join(*dataDir, "conf", "samples", "tutmysql") accCfgIn, err = config.NewCGRConfigFromFolder(accPathIn) @@ -67,7 +67,7 @@ func TestAccountITRedis(t *testing.T) { } } -func TestAccountITMongo(t *testing.T) { +func TestAccountMigrateITMongo(t *testing.T) { var err error accPathIn = path.Join(*dataDir, "conf", "samples", "tutmongo") accCfgIn, err = config.NewCGRConfigFromFolder(accPathIn) @@ -166,6 +166,10 @@ func testAccITFlush(t *testing.T) { if err := engine.SetDBVersions(accMigrator.dmOut.DataManager().DataDB()); err != nil { t.Error("Error ", err.Error()) } + accMigrator.dmIN.DataManager().DataDB().Flush("") + if err := engine.SetDBVersions(accMigrator.dmIN.DataManager().DataDB()); err != nil { + t.Error("Error ", err.Error()) + } } func testAccITMigrateAndMove(t *testing.T) { @@ -244,10 +248,12 @@ func testAccITMigrateAndMove(t *testing.T) { } switch accAction { case utils.Migrate: + // set v1Account err := accMigrator.dmIN.setV1Account(v1Acc) if err != nil { t.Error("Error when setting v1 Accounts ", err.Error()) } + //set version for account : 1 currentVersion := engine.Versions{ utils.StatS: 2, utils.Thresholds: 2, @@ -260,46 +266,52 @@ func testAccITMigrateAndMove(t *testing.T) { if err != nil { t.Error("Error when setting version for Accounts ", err.Error()) } - + //check if version was set correctly if vrs, err := accMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil { t.Error(err) } else if vrs[utils.Accounts] != 1 { t.Errorf("Unexpected version returned: %d", vrs[utils.Accounts]) } - + //migrate account err, _ = accMigrator.Migrate([]string{utils.MetaAccounts}) if err != nil { t.Error("Error when migrating Accounts ", err.Error()) } - + //check if version was updated if vrs, err := accMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil { t.Error(err) } else if vrs[utils.Accounts] != 3 { t.Errorf("Unexpected version returned: %d", vrs[utils.Accounts]) } - + //check if account was migrate correctly result, err := accMigrator.dmOut.DataManager().DataDB().GetAccount(testAccount.ID) if err != nil { t.Error("Error when getting Accounts ", err.Error()) } - if !reflect.DeepEqual(testAccount.BalanceMap["*voice"][0], result.BalanceMap["*voice"][0]) { - t.Errorf("Expecting: %+v, received: %+v", testAccount.BalanceMap["*voice"][0], result.BalanceMap["*voice"][0]) - } else if !reflect.DeepEqual(testAccount, result) { + if !reflect.DeepEqual(testAccount, result) { t.Errorf("Expecting: %+v, received: %+v", testAccount, result) } - case utils.Move: - if err := accMigrator.dmIN.DataManager().DataDB().SetAccount(testAccount); err != nil { - log.Print("GOT ERR DMIN", err) + //check if old account was deleted + if _, err = accMigrator.dmIN.getv1Account(); err != utils.ErrNoMoreData { + t.Error("Error should be not found : ", err) } + case utils.Move: + //set an account in dmIN + if err := accMigrator.dmIN.DataManager().DataDB().SetAccount(testAccount); err != nil { + t.Error(err) + } + //set versions for account currentVersion := engine.CurrentDataDBVersions() err := accMigrator.dmOut.DataManager().DataDB().SetVersions(currentVersion, false) if err != nil { t.Error("Error when setting version for Accounts ", err.Error()) } + //migrate accounts err, _ = accMigrator.Migrate([]string{utils.MetaAccounts}) if err != nil { t.Error("Error when accMigratorrating Accounts ", err.Error()) } + //check if account was migrate correctly result, err := accMigrator.dmOut.DataManager().DataDB().GetAccount(testAccount.ID) if err != nil { t.Error(err) @@ -307,6 +319,7 @@ func testAccITMigrateAndMove(t *testing.T) { if !reflect.DeepEqual(testAccount, result) { t.Errorf("Expecting: %+v, received: %+v", testAccount, result) } + //check if old account was deleted result, err = accMigrator.dmIN.DataManager().DataDB().GetAccount(testAccount.ID) if err != utils.ErrNotFound { t.Error(err) diff --git a/migrator/migrator_datadb.go b/migrator/migrator_datadb.go index e1c5e51f3..c25d1c6a0 100644 --- a/migrator/migrator_datadb.go +++ b/migrator/migrator_datadb.go @@ -25,6 +25,7 @@ import ( type MigratorDataDB interface { getv1Account() (v1Acnt *v1Account, err error) setV1Account(x *v1Account) (err error) + remV1Account(id string) (err error) getV1ActionPlans() (v1aps *v1ActionPlans, err error) setV1ActionPlans(x *v1ActionPlans) (err error) getV1Actions() (v1acs *v1Actions, err error) @@ -39,6 +40,7 @@ type MigratorDataDB interface { setV2ActionTrigger(x *v2ActionTrigger) (err error) getv2Account() (v2Acnt *v2Account, err error) setV2Account(x *v2Account) (err error) + remV2Account(id string) (err error) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) setV1AttributeProfile(x *v1AttributeProfile) (err error) getV2ThresholdProfile() (v2T *v2Threshold, err error) diff --git a/migrator/migrator_utils.go b/migrator/migrator_utils.go index 50892c3e5..37cdfe46b 100644 --- a/migrator/migrator_utils.go +++ b/migrator/migrator_utils.go @@ -42,9 +42,11 @@ func NewMigratorDataDB(db_type, host, port, name, user, pass, marshaler string, case utils.MONGO: d = newMongoMigrator(dm) db = d.(MigratorDataDB) + case utils.INTERNAL: + //do nothing for the moment default: - err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'", - db_type, utils.REDIS, utils.MONGO)) + err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s or '%s'", + db_type, utils.REDIS, utils.MONGO, utils.INTERNAL)) } return d, nil } @@ -67,9 +69,11 @@ func NewMigratorStorDB(db_type, host, port, name, user, pass string, case utils.POSTGRES: d = newMigratorSQL(storDb) db = d.(MigratorStorDB) + case utils.INTERNAL: + //for the momen do nothing default: - err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'", - db_type, utils.MONGO, utils.MYSQL)) + err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are [%s, %s, %s, %s]", + db_type, utils.MYSQL, utils.MONGO, utils.POSTGRES, utils.INTERNAL)) } return d, nil } diff --git a/migrator/storage_mongo_datadb.go b/migrator/storage_mongo_datadb.go index 6b3e31f23..51dd2964f 100644 --- a/migrator/storage_mongo_datadb.go +++ b/migrator/storage_mongo_datadb.go @@ -84,6 +84,11 @@ func (v1ms *mongoMigrator) setV1Account(x *v1Account) (err error) { return } +//rem +func (v1ms *mongoMigrator) remV1Account(id string) (err error) { + return v1ms.mgoDB.DB().C(v1AccountDBPrefix).Remove(bson.M{"id": id}) +} + //V2 //get func (v1ms *mongoMigrator) getv2Account() (v2Acnt *v2Account, err error) { @@ -108,6 +113,11 @@ func (v1ms *mongoMigrator) setV2Account(x *v2Account) (err error) { return } +//rem +func (v1ms *mongoMigrator) remV2Account(id string) (err error) { + return v1ms.mgoDB.DB().C(v2AccountsCol).Remove(bson.M{"id": id}) +} + //Action methods //get func (v1ms *mongoMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error) { diff --git a/migrator/storage_redis.go b/migrator/storage_redis.go index 06e859649..16a0ac84b 100644 --- a/migrator/storage_redis.go +++ b/migrator/storage_redis.go @@ -50,7 +50,7 @@ func (v1rs *redisMigrator) getv1Account() (v1Acnt *v1Account, err error) { if err != nil { return } else if len(v1rs.dataKeys) == 0 { - return nil, utils.ErrNotFound + return nil, utils.ErrNoMoreData } v1rs.qryIdx = utils.IntPointer(0) } @@ -84,6 +84,12 @@ func (v1rs *redisMigrator) setV1Account(x *v1Account) (err error) { return } +//rem +func (v1rs *redisMigrator) remV1Account(id string) (err error) { + key := v1AccountDBPrefix + id + return v1rs.rds.Cmd("DEL", key).Err +} + //V2 //get func (v1rs *redisMigrator) getv2Account() (v2Acnt *v2Account, err error) { @@ -92,7 +98,7 @@ func (v1rs *redisMigrator) getv2Account() (v2Acnt *v2Account, err error) { if err != nil { return } else if len(v1rs.dataKeys) == 0 { - return nil, utils.ErrNotFound + return nil, utils.ErrNoMoreData } v1rs.qryIdx = utils.IntPointer(0) } @@ -126,6 +132,12 @@ func (v1rs *redisMigrator) setV2Account(x *v2Account) (err error) { return } +//rem +func (v1rs *redisMigrator) remV2Account(id string) (err error) { + key := utils.ACCOUNT_PREFIX + id + return v1rs.rds.Cmd("DEL", key).Err +} + //ActionPlans methods //get func (v1rs *redisMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error) { diff --git a/utils/consts.go b/utils/consts.go index 226a91f33..baa36e22f 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -96,6 +96,7 @@ const ( POSTGRES = "postgres" MYSQL = "mysql" MONGO = "mongo" + INTERNAL = "internal" DataManager = "DataManager" REDIS = "redis" MAPSTOR = "mapstor"