diff --git a/cmd/cgr-migrator/cgr-migrator.go b/cmd/cgr-migrator/cgr-migrator.go index 9563240b2..7716f422d 100755 --- a/cmd/cgr-migrator/cgr-migrator.go +++ b/cmd/cgr-migrator/cgr-migrator.go @@ -74,41 +74,41 @@ var ( ) func main() { -// flag.Parse() -// if *version { -// fmt.Println(utils.GetCGRVersion()) -// return -// } -// if migrate != nil && *migrate != "" { // Run migrator + flag.Parse() + if *version { + fmt.Println(utils.GetCGRVersion()) + return + } +if migrate != nil && *migrate != "" { // Run migrator -// dataDB, err := engine.ConfigureDataStorage(*dataDBType, *dataDBHost, *dataDBPort, *dataDBName, *dataDBUser, *dataDBPass, *dbDataEncoding, config.CgrConfig().CacheConfig, *loadHistorySize) -// if err != nil { -// log.Fatal(err) -// } -// oldDataDB, err := engine.ConfigureDataStorage(*oldDataDBType, *oldDataDBHost, *oldDataDBPort, *oldDataDBName, *oldDataDBUser, *oldDataDBPass, *oldDBDataEncoding, config.CgrConfig().CacheConfig, *oldLoadHistorySize) -// if err != nil { -// log.Fatal(err) -// } -// storDB, err := engine.ConfigureStorStorage(*storDBType, *storDBHost, *storDBPort, *storDBName, *storDBUser, *storDBPass, *dbDataEncoding, -// config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes) -// if err != nil { -// log.Fatal(err) -// } -// oldstorDB, err := engine.ConfigureStorStorage(*oldStorDBType, *oldStorDBHost, *oldStorDBPort, *oldStorDBName, *oldStorDBUser, *oldStorDBPass, *oldDBDataEncoding, -// config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes) -// if err != nil { -// log.Fatal(err) -// } -// m,err := migrator.NewMigrator(dataDB, *dataDBType, *dbDataEncoding, storDB, *storDBType,oldDataDB,*oldDataDBType,*oldDBDataEncoding,oldstorDB,*oldStorDBType) -// if err != nil { -// log.Fatal(err) -// } -// err = m.Migrate(*migrate); -// if err != nil { -// log.Fatal(err) -// } + dataDB, err := engine.ConfigureDataStorage(*dataDBType, *dataDBHost, *dataDBPort, *dataDBName, *dataDBUser, *dataDBPass, *dbDataEncoding, config.CgrConfig().CacheConfig, *loadHistorySize) + if err != nil { + log.Fatal(err) + } + oldDataDB, err := migrator.ConfigureV1DataStorage(*oldDataDBType, *oldDataDBHost, *oldDataDBPort, *oldDataDBName, *oldDataDBUser, *oldDataDBPass, *oldDBDataEncoding) + if err != nil { + log.Fatal(err) + } + storDB, err := engine.ConfigureStorStorage(*storDBType, *storDBHost, *storDBPort, *storDBName, *storDBUser, *storDBPass, *dbDataEncoding, + config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes) + if err != nil { + log.Fatal(err) + } + oldstorDB, err := engine.ConfigureStorStorage(*oldStorDBType, *oldStorDBHost, *oldStorDBPort, *oldStorDBName, *oldStorDBUser, *oldStorDBPass, *oldDBDataEncoding, + config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes) + if err != nil { + log.Fatal(err) + } + m,err := migrator.NewMigrator(dataDB, *dataDBType, *dbDataEncoding, storDB, *storDBType,oldDataDB,*oldDataDBType,*oldDBDataEncoding,oldstorDB,*oldStorDBType) + if err != nil { + log.Fatal(err) + } + err = m.Migrate(*migrate); + if err != nil { + log.Fatal(err) + } -// log.Print("Done migrating!") -// return -// } + log.Print("Done migrating!") + return + } } diff --git a/config/config_defaults.go b/config/config_defaults.go index fa367dd08..e275f2d6a 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -86,7 +86,8 @@ const CGRATES_CFG_JSON = ` "data_db": { // database used to store runtime data (eg: accounts, cdr stats) "db_type": "redis", // data_db type: - "db_host": "127.0.0.1", // data_db host address + "db_host": "192.168.100.40", // data_db host address + //"db_host": "127.0.0.1", // data_db host address "db_port": 6379, // data_db port to reach the database "db_name": "10", // data_db database name to connect to "db_user": "cgrates", // username to use when connecting to data_db @@ -97,7 +98,8 @@ const CGRATES_CFG_JSON = ` "stor_db": { // database used to store offline tariff plans and CDRs "db_type": "mysql", // stor database type to use: - "db_host": "127.0.0.1", // the host to connect to + "db_host": "192.168.100.40", // data_db host address + //"db_host": "127.0.0.1", // the host to connect to "db_port": 3306, // the port to reach the stordb "db_name": "cgrates", // stor database name "db_user": "cgrates", // username to use when connecting to stordb diff --git a/data/conf/samples/tutmongo/cgrates.json b/data/conf/samples/tutmongo/cgrates.json index 8ec932ecb..55286e6ca 100644 --- a/data/conf/samples/tutmongo/cgrates.json +++ b/data/conf/samples/tutmongo/cgrates.json @@ -15,12 +15,15 @@ "data_db": { "db_type": "mongo", + "db_host": "127.0.0.1", "db_port": 27017, }, "stor_db": { "db_type": "mongo", + "db_host": "127.0.0.1", "db_port": 27017, + "db_password":"", }, diff --git a/data/conf/samples/tutpostgres/cgrates.json b/data/conf/samples/tutpostgres/cgrates.json index 26584eccd..2d51322d6 100644 --- a/data/conf/samples/tutpostgres/cgrates.json +++ b/data/conf/samples/tutpostgres/cgrates.json @@ -1,4 +1,4 @@ -{ + { // CGRateS Configuration file // // Used for cgradmin diff --git a/migrator/accounts.go b/migrator/accounts.go index 849d2564b..80db4c842 100755 --- a/migrator/accounts.go +++ b/migrator/accounts.go @@ -25,7 +25,6 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "gopkg.in/mgo.v2/bson" ) const ( @@ -34,92 +33,34 @@ const ( ) func (m *Migrator) migrateAccounts() (err error) { - switch m.dataDBType { - case utils.REDIS: - var acntV1Keys []string - acntV1Keys, err = m.oldDataDB.GetKeysForPrefix(v1AccountDBPrefix) - if err != nil { - return - } - for _, acntV1Key := range acntV1Keys { - v1Acnt, err := m.getV1AccountFromDB(acntV1Key) - if err != nil { + var v1Acnt *v1Account + // for infinit pana cand vine err + for { + + v1Acnt,err=m.oldDataDB.getv1Account() + if err!=nil&&err!=utils.ErrNoMoreData{ return err } + if err==utils.ErrNoMoreData{break} if v1Acnt != nil { acnt := v1Acnt.AsAccount() + if err = m.dataDB.SetAccount(acnt); err != nil { return err } } } - // All done, update version wtih current one - vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]} - if err = m.dataDB.SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error())) - } - return - case utils.MONGO: - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - var accn v1Account - iter := mgoDB.C(v1AccountDBPrefix).Find(nil).Iter() - for iter.Next(&accn) { - if acnt := accn.AsAccount(); acnt != nil { - if err = m.dataDB.SetAccount(acnt); err != nil { - return err - } - } - } - // All done, update version wtih current one - vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]} - if err = m.dataDB.SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error())) - } - return - default: - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - utils.UnsupportedDB, - fmt.Sprintf("error: unsupported: <%s> for migrateAccounts method", m.dataDBType)) - } -} -func (m *Migrator) getV1AccountFromDB(key string) (*v1Account, error) { - switch m.oldDataDBType { - case utils.REDIS: - dataDB := m.oldDataDB.(*engine.RedisStorage) - if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil { - return nil, err - } else { - v1Acnt := &v1Account{Id: key} - if err := m.mrshlr.Unmarshal(strVal, v1Acnt); err != nil { - return nil, err - } - return v1Acnt, nil + + // All done, update version wtih current one + vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]} + if err = m.dataDB.SetVersions(vrs, false); err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error())) } - case utils.MONGO: - dataDB := m.oldDataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - v1Acnt := new(v1Account) - if err := mgoDB.C(v1AccountTBL).Find(bson.M{"id": key}).One(v1Acnt); err != nil { - return nil, err - } - return v1Acnt, nil - default: - return nil, utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - utils.UnsupportedDB, - fmt.Sprintf("error: unsupported: <%s> for getV1AccountFromDB method", m.oldDataDBType)) - } + return } type v1Account struct { diff --git a/migrator/migrator.go b/migrator/migrator.go index eeefbe8ab..1e8222f5f 100755 --- a/migrator/migrator.go +++ b/migrator/migrator.go @@ -23,7 +23,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewMigrator(dataDB engine.DataDB, dataDBType, dataDBEncoding string, storDB engine.Storage, storDBType string, oldDataDB engine.DataDB, oldDataDBType, oldDataDBEncoding string, oldStorDB engine.Storage, oldStorDBType string) (m *Migrator, err error) { +func NewMigrator(dataDB engine.DataDB, dataDBType, dataDBEncoding string, storDB engine.Storage, storDBType string, oldDataDB v1DataDB, oldDataDBType, oldDataDBEncoding string, oldStorDB engine.Storage, oldStorDBType string) (m *Migrator, err error) { var mrshlr engine.Marshaler var oldmrshlr engine.Marshaler if dataDBEncoding == utils.MSGPACK { @@ -50,7 +50,7 @@ type Migrator struct { storDB engine.Storage storDBType string mrshlr engine.Marshaler - oldDataDB engine.DataDB + oldDataDB v1DataDB oldDataDBType string oldStorDB engine.Storage oldStorDBType string diff --git a/migrator/migrator_it_test.go b/migrator/migrator_it_test.go index 77cc51e76..911c2b7db 100644 --- a/migrator/migrator_it_test.go +++ b/migrator/migrator_it_test.go @@ -19,7 +19,7 @@ package migrator import ( "flag" // "fmt" - // "path" + "path" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -37,8 +37,7 @@ var ( onStorCfg string dbtype string mig *Migrator - migrate = flag.String("migrate", "", "Fire up automatic migration <*cost_details|*set_versions>") - version = flag.Bool("version", false, "Prints the application version.") + dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") dataDBType = flag.String("datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb database ") dataDBHost = flag.String("datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.") @@ -73,17 +72,17 @@ var ( dbDataEncoding = flag.String("dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings") oldDBDataEncoding = flag.String("old_dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings") -) + ) // subtests to be executed for each migrator var sTestsITMigrator = []func(t *testing.T){ testOnStorITFlush, testMigratorAccounts, - testMigratorActionPlans, - testMigratorActionTriggers, - testMigratorActions, - testMigratorSharedGroups, -} +// testMigratorActionPlans, +// testMigratorActionTriggers, +// testMigratorActions, +// testMigratorSharedGroups, + } func TestOnStorITRedisConnect(t *testing.T) { dataDB, err := engine.ConfigureDataStorage(*dataDBType, *dataDBHost, *dataDBPort, *dataDBName, *dataDBUser, *dataDBPass, *dbDataEncoding, config.CgrConfig().CacheConfig, *loadHistorySize) @@ -117,28 +116,43 @@ func TestOnStorITRedis(t *testing.T) { } } -// func TestOnStorITMongoConnect(t *testing.T) { -// cdrsMongoCfgPath := path.Join(*dataDir, "conf", "samples", "cdrsv2mongo") -// mgoITCfg, err := config.NewCGRConfigFromFolder(cdrsMongoCfgPath) -// if err != nil { -// t.Fatal(err) -// } -// if mgoITdb, err = engine.NewMongoStorage(mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, db_passwd, -// utils.StorDB, nil, mgoITCfg.CacheConfig, mgoITCfg.LoadHistorySize); err != nil { -// t.Fatal(err) -// } -// mongo = mgoITCfg -// onStorCfg = mgoITCfg.StorDBName -// mig = NewMigrator(mgoITdb, mgoITdb, utils.MONGO, utils.JSON, mgoITdb, utils.MONGO) -// } +func TestOnStorITMongoConnect(t *testing.T) { + + cdrsMongoCfgPath := path.Join(*dataDir, "conf", "samples", "tutmongo") + mgoITCfg, err := config.NewCGRConfigFromFolder(cdrsMongoCfgPath) + if err != nil { + t.Fatal(err) + } + dataDB, err := engine.ConfigureDataStorage(mgoITCfg.DataDbType, mgoITCfg.DataDbHost, mgoITCfg.DataDbPort, mgoITCfg.DataDbName, mgoITCfg.DataDbUser, mgoITCfg.DataDbPass, mgoITCfg.DBDataEncoding, mgoITCfg.CacheConfig, *loadHistorySize) + if err != nil { + log.Fatal(err) + } + oldDataDB, err := engine.ConfigureDataStorage(mgoITCfg.DataDbType, mgoITCfg.DataDbHost, mgoITCfg.DataDbPort, mgoITCfg.DataDbName, mgoITCfg.DataDbUser, mgoITCfg.DataDbPass, mgoITCfg.DBDataEncoding, mgoITCfg.CacheConfig, *oldLoadHistorySize) + if err != nil { + log.Fatal(err) + } + storDB, err := engine.ConfigureStorStorage(mgoITCfg.StorDBType, mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, mgoITCfg.StorDBPass, mgoITCfg.DBDataEncoding, + config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes) + if err != nil { + log.Fatal(err) + } + oldstorDB, err := engine.ConfigureStorStorage(mgoITCfg.StorDBType, mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, mgoITCfg.StorDBPass, mgoITCfg.DBDataEncoding, + config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes) + if err != nil { + log.Fatal(err) + } + mig, err = NewMigrator(dataDB, mgoITCfg.DataDbType,mgoITCfg.DBDataEncoding, storDB, mgoITCfg.StorDBType, oldDataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, oldstorDB, mgoITCfg.StorDBType) + if err != nil { + log.Fatal(err) + } +} -// func TestOnStorITMongo(t *testing.T) { -// dbtype = utils.MONGO -// onStor = mgoITdb -// for _, stest := range sTestsITMigrator { -// t.Run("TestITMigratorOnMongo", stest) -// } -// } +func TestOnStorITMongo(t *testing.T) { + dbtype = utils.MONGO + for _, stest := range sTestsITMigrator { + t.Run("TestITMigratorOnMongo", stest) + } +} func testOnStorITFlush(t *testing.T) { switch { @@ -149,7 +163,7 @@ func testOnStorITFlush(t *testing.T) { t.Error("Error when flushing Redis ", err.Error()) } case dbtype == utils.MONGO: - err := engine.InitDataDb(mongo) + err := mig.dataDB.Flush("") if err != nil { t.Error("Error when flushing Mongo ", err.Error()) } @@ -193,9 +207,9 @@ func testMigratorAccounts(t *testing.T) { } else if !reflect.DeepEqual(testAccount, result) { t.Errorf("Expecting: %+v, received: %+v", testAccount, result) } - /* + case dbtype == utils.MONGO: - err := mig.SetV1onMongoAccount(v1AccountDBPrefix, v1Acc) + err := mig.SetV1onMongoAccount(v1Acc) if err != nil { t.Error("Error when marshaling ", err.Error()) } @@ -210,14 +224,12 @@ func testMigratorAccounts(t *testing.T) { if !reflect.DeepEqual(testAccount, result) { t.Errorf("Expecting: %+v, received: %+v", testAccount, result) } - */ } } //2 - +/* func testMigratorActionPlans(t *testing.T) { - v1ap := v1ActionPlans{&v1ActionPlan{Id: "test", AccountIds: []string{"one"}, Timing: &engine.RateInterval{Timing: &engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}} ap := &engine.ActionPlan{Id: "test", AccountIDs: utils.StringMap{"one": true}, ActionTimings: []*engine.ActionTiming{&engine.ActionTiming{Timing: &engine.RateInterval{Timing: &engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}} switch { @@ -233,7 +245,7 @@ func testMigratorActionPlans(t *testing.T) { } _, err = mig.getV1ActionPlansFromDB(setv1id) if err != nil { - t.Error("Error when setting v1 ActionPlan ", err.Error()) + t.Error("Error when getting v1 ActionPlan ", err.Error()) } err = mig.Migrate(utils.MetaActionPlans) if err != nil { @@ -252,12 +264,18 @@ func testMigratorActionPlans(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Weight, result.ActionTimings[0].Weight) } - /* + case dbtype == utils.MONGO: - err := mig.SetV1onMongoActionPlan(utils.ACTION_PLAN_PREFIX, v1ap) + err := mig.SetV1onMongoActionPlan("actions", &v1ap) if err != nil { t.Error("Error when setting v1 ActionPlans ", err.Error()) } + log.Print("dadada!") + _, err = mig.getV1ActionPlansFromDB("") + if err != nil { + t.Error("Error when getting v1 ActionPlan ", err.Error()) + } + log.Print("dadada!") err = mig.Migrate("migrateActionPlans") if err != nil { t.Error("Error when migrating ActionPlans ", err.Error()) @@ -268,12 +286,12 @@ func testMigratorActionPlans(t *testing.T) { } if ap.Id != result.Id || !reflect.DeepEqual(ap.AccountIDs, result.AccountIDs) { t.Errorf("Expecting: %+v, received: %+v", *ap, result) - } else if !reflect.DeepEqual(ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing) { - t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing) - } else if ap.ActionTimings[0].Weight != result.ActionTimings[0].Weight || ap.ActionTimings[0].ActionsID != result.ActionTimings[0].ActionsID { - t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Weight, result.ActionTimings[0].Weight) - } - */ + } //else if !reflect.DeepEqual(ap.ActionTimings[0].Timing, result.ActionTimings.Timing) { + // t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing) + //} else if ap.ActionTimings[0].Weight != result.ActionTimings[0].Weight || ap.ActionTimings[0].ActionsID != result.ActionTimings[0].ActionsID { + // t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Weight, result.ActionTimings[0].Weight) + //} + } } @@ -339,7 +357,7 @@ func testMigratorActionTriggers(t *testing.T) { } if !reflect.DeepEqual(atrs[0].ID, result[0].ID) { t.Errorf("Expecting: %+v, received: %+v", atrs[0].ID, result[0].ID) - } /*else if !reflect.DeepEqual(atrs[0].UniqueID, result[0].UniqueID) { + } else if !reflect.DeepEqual(atrs[0].UniqueID, result[0].UniqueID) { t.Errorf("Expecting: %+v, received: %+v", atrs[0].UniqueID, result[0].UniqueID) } else if !reflect.DeepEqual(atrs[0].ThresholdType, result[0].ThresholdType) { t.Errorf("Expecting: %+v, received: %+v", atrs[0].ThresholdType, result[0].ThresholdType) @@ -400,10 +418,10 @@ func testMigratorActionTriggers(t *testing.T) { } else if !reflect.DeepEqual(atrs[0].Balance.Blocker, result[0].Balance.Blocker) { t.Errorf("Expecting: %+v, received: %+v", atrs[0].Balance.Blocker, result[0].Balance.Blocker) } - */ - /* + + case dbtype == utils.MONGO: - err := mig.SetV1onMongoActionTrigger(utils.ACTION_TRIGGER_PREFIX, v1atrs) + err := mig.SetV1onMongoActionTrigger(utils.ACTION_TRIGGER_PREFIX, &v1atrs) if err != nil { t.Error("Error when setting v1 ActionTriggers ", err.Error()) } @@ -411,7 +429,7 @@ func testMigratorActionTriggers(t *testing.T) { if err != nil { t.Error("Error when migrating ActionTriggers ", err.Error()) } - result, err := mig.dataDB.GetActionTriggers(v1atrs.Id, true, utils.NonTransactional) + result, err := mig.dataDB.GetActionTriggers(v1atrs[0].Id, true, utils.NonTransactional) if err != nil { t.Error("Error when getting ActionTriggers ", err.Error()) } @@ -422,7 +440,7 @@ func testMigratorActionTriggers(t *testing.T) { if err != nil { t.Error("Error when flushing v1 ActionTriggers ", err.Error()) } - */ + } } @@ -457,9 +475,9 @@ func testMigratorActions(t *testing.T) { if !reflect.DeepEqual(act, result) { t.Errorf("Expecting: %+v, received: %+v", act, result) } - /* + case dbtype == utils.MONGO: - err := mig.SetV1onMongoAction(utils.ACTION_PREFIX, v1act) + err := mig.SetV1onMongoAction(utils.ACTION_PREFIX, &v1act) if err != nil { t.Error("Error when setting v1 Actions ", err.Error()) } @@ -467,7 +485,7 @@ func testMigratorActions(t *testing.T) { if err != nil { t.Error("Error when migrating Actions ", err.Error()) } - result, err := mig.dataDB.GetActions(v1act.Id, true, utils.NonTransactional) + result, err := mig.dataDB.GetActions(v1act[0].Id, true, utils.NonTransactional) if err != nil { t.Error("Error when getting Actions ", err.Error()) } @@ -478,7 +496,7 @@ func testMigratorActions(t *testing.T) { if err != nil { t.Error("Error when flushing v1 Actions ", err.Error()) } - */ + } } @@ -521,7 +539,7 @@ func testMigratorSharedGroups(t *testing.T) { if !reflect.DeepEqual(sg, result) { t.Errorf("Expecting: %+v, received: %+v", sg, result) } - /* + case dbtype == utils.MONGO: err := mig.SetV1onMongoSharedGroup(utils.SHARED_GROUP_PREFIX, v1sg) if err != nil { @@ -538,6 +556,7 @@ func testMigratorSharedGroups(t *testing.T) { if !reflect.DeepEqual(sg, result) { t.Errorf("Expecting: %+v, received: %+v", sg, result) } - */ + } } +*/ diff --git a/migrator/setv1.go b/migrator/setv1.go index 7652ece3e..b96824369 100644 --- a/migrator/setv1.go +++ b/migrator/setv1.go @@ -1,95 +1,140 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH +// /* +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ +// You should have received a copy of the GNU General Public License +// along with this program. If not, see +// */ package migrator -import "github.com/cgrates/cgrates/engine" +// import ( +// "fmt" + +// "github.com/cgrates/cgrates/engine" +// "github.com/cgrates/cgrates/utils" +// "gopkg.in/mgo.v2/bson" +// ) -func (m *Migrator) SetV1onOldRedis(key string, bl []byte) (err error) { - dataDB := m.oldDataDB.(*engine.RedisStorage) - if err = dataDB.Cmd("SET", key, bl).Err; err != nil { - return err - } - return -} +// type AcKeyValue struct { +// Key string +// Value v1Actions +// } +// type AtKeyValue struct { +// Key string +// Value v1ActionPlans +// } -func (m *Migrator) SetV1onRedis(key string, bl []byte) (err error) { - dataDB := m.dataDB.(*engine.RedisStorage) - if err = dataDB.Cmd("SET", key, bl).Err; err != nil { - return err - } - return -} +// func (m *Migrator) SetV1onOldRedis(key string, bl []byte) (err error) { +// dataDB := m.oldDataDB.(*engine.RedisStorage) +// if err = dataDB.Cmd("SET", key, bl).Err; err != nil { +// return err +// } +// return +// } -func (m *Migrator) SetV1onMongoAccount(pref string, x *v1Account) (err error) { - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - if err := mgoDB.C(pref).Insert(x); err != nil { - return err - } - return -} +// func (m *Migrator) SetV1onRedis(key string, bl []byte) (err error) { +// dataDB := m.dataDB.(*engine.RedisStorage) +// if err = dataDB.Cmd("SET", key, bl).Err; err != nil { +// return err +// } +// return +// } -func (m *Migrator) SetV1onMongoAction(pref string, x *v1Action) (err error) { - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - if err := mgoDB.C(pref).Insert(x); err != nil { - return err - } - return -} +// func (m *Migrator) SetV1onMongoAccount( x *v1Account) (err error) { +// dataDB := m.dataDB.(*engine.MongoStorage) +// mgoDB := dataDB.DB() +// defer mgoDB.Session.Close() +// if err := mgoDB.C("userbalances").Insert(x); err != nil { +// return err +// } +// return +// } -func (m *Migrator) SetV1onMongoActionPlan(pref string, x *v1ActionPlan) (err error) { - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - if err := mgoDB.C(pref).Insert(x); err != nil { - return err - } - return -} +// func (m *Migrator) SetV1onMongoAction(key string, x *v1Actions) (err error) { +// dataDB := m.dataDB.(*engine.MongoStorage) +// mgoDB := dataDB.DB() +// defer mgoDB.Session.Close() +// if err := mgoDB.C("actions").Insert(&AcKeyValue{key, *x}); err != nil { +// return err +// } +// return +// } -func (m *Migrator) SetV1onMongoActionTrigger(pref string, x *v1ActionTrigger) (err error) { - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - if err := mgoDB.C(pref).Insert(x); err != nil { - return err - } - return -} +// func (m *Migrator) SetV1onMongoActionPlan(key string, x *v1ActionPlans) (err error) { +// dataDB := m.dataDB.(*engine.MongoStorage) +// mgoDB := dataDB.DB() +// defer mgoDB.Session.Close() +// if err := mgoDB.C("actiontimings").Insert(&AtKeyValue{key, *x}); err != nil { +// return err +// } +// return +// } -func (m *Migrator) SetV1onMongoSharedGroup(pref string, x *v1SharedGroup) (err error) { - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - if err := mgoDB.C(pref).Insert(x); err != nil { - return err - } - return -} -func (m *Migrator) DropV1Colection(pref string) (err error) { - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - if err := mgoDB.C(pref).DropCollection(); err != nil { - return err - } - return -} +// func (m *Migrator) SetV1onMongoActionTrigger(pref string, x *v1ActionTriggers) (err error) { +// dataDB := m.dataDB.(*engine.MongoStorage) +// mgoDB := dataDB.DB() +// defer mgoDB.Session.Close() +// if err := mgoDB.C(pref).Insert(x); err != nil { +// return err +// } +// return +// } + +// func (m *Migrator) SetV1onMongoSharedGroup(pref string, x *v1SharedGroup) (err error) { +// dataDB := m.dataDB.(*engine.MongoStorage) +// mgoDB := dataDB.DB() +// defer mgoDB.Session.Close() +// if err := mgoDB.C(pref).Insert(x); err != nil { +// return err +// } +// return +// } +// func (m *Migrator) DropV1Colection(pref string) (err error) { +// dataDB := m.dataDB.(*engine.MongoStorage) +// mgoDB := dataDB.DB() +// defer mgoDB.Session.Close() +// if err := mgoDB.C(pref).DropCollection(); err != nil { +// return err +// } +// return +// } + +// func (m *Migrator) getV1AccountFromDB(key string) (*v1Account, error) { +// switch m.oldDataDBType { +// case utils.REDIS: +// dataDB := m.oldDataDB.(*engine.RedisStorage) +// if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil { +// return nil, err +// } else { +// v1Acnt := &v1Account{Id: key} +// if err := m.mrshlr.Unmarshal(strVal, v1Acnt); err != nil { +// return nil, err +// } +// return v1Acnt, nil +// } +// case utils.MONGO: +// dataDB := m.oldDataDB.(*engine.MongoStorage) +// mgoDB := dataDB.DB() +// defer mgoDB.Session.Close() +// v1Acnt := new(v1Account) +// if err := mgoDB.C(v1AccountTBL).Find(bson.M{"id": key}).One(v1Acnt); err != nil { +// return nil, err +// } +// return v1Acnt, nil +// default: +// return nil, utils.NewCGRError(utils.Migrator, +// utils.ServerErrorCaps, +// utils.UnsupportedDB, +// fmt.Sprintf("error: unsupported: <%s> for getV1AccountFromDB method", m.oldDataDBType)) +// } +// } \ No newline at end of file diff --git a/migrator/v1DataDB.go b/migrator/v1DataDB.go new file mode 100644 index 000000000..38aa7b700 --- /dev/null +++ b/migrator/v1DataDB.go @@ -0,0 +1,23 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package migrator + +type v1DataDB interface{ +getv1Account() (v1Acnt *v1Account, err error) +getKeysForPrefix(prefix string) ([]string, error) +} \ No newline at end of file diff --git a/migrator/v1Migrator_Utils.go b/migrator/v1Migrator_Utils.go new file mode 100644 index 000000000..fa9b90b7a --- /dev/null +++ b/migrator/v1Migrator_Utils.go @@ -0,0 +1,53 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package migrator + +import ( + "errors" + "fmt" + "strconv" + + "github.com/cgrates/cgrates/utils" +) + +func ConfigureV1DataStorage(db_type, host, port, name, user, pass, marshaler string) (db v1DataDB, err error) { + var d v1DataDB + switch db_type { + case utils.REDIS: + var db_nb int + db_nb, err = strconv.Atoi(name) + if err != nil { + utils.Logger.Crit("Redis db name must be an integer!") + return nil, err + } + if port != "" { + host += ":" + port + } + d, err = newv1RedisStorage(host, db_nb, pass, marshaler) + case utils.MONGO: + d, err = NewMongoStorage(host, port, name, user, pass, utils.DataDB, nil) + db = d.(v1DataDB) + default: + err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'", + db_type, utils.REDIS, utils.MONGO)) + } + if err != nil { + return nil, err + } + return d, nil +} diff --git a/migrator/v1Mongo.go b/migrator/v1Mongo.go new file mode 100644 index 000000000..12dfe707a --- /dev/null +++ b/migrator/v1Mongo.go @@ -0,0 +1,68 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package migrator + +import ( + "fmt" + + "github.com/cgrates/cgrates/engine" + "gopkg.in/mgo.v2" +) + +type v1Mongo struct{ + session *mgo.Session + db string + v1ms engine.Marshaler + qryIter *mgo.Iter + +} +func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes []string) (v1ms *v1Mongo, err error) { + url := host + if port != "" { + url += ":" + port + } + if user != "" && pass != "" { + url = fmt.Sprintf("%s:%s@%s", user, pass, url) + } + if db != "" { + url += "/" + db + } + session, err := mgo.Dial(url) + if err != nil { + return nil, err + } + session.SetMode(mgo.Strong, true) + v1ms = &v1Mongo{db: db, session: session, v1ms: engine.NewCodecMsgpackMarshaler()} + return +} + + func (v1ms *v1Mongo) getv1Account() (v1Acnt *v1Account, err error){ + if v1ms.qryIter==nil{ + v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Find(nil).Iter() + } + v1ms.qryIter.Next(&v1Acnt) + + if v1Acnt==nil{ + v1ms.qryIter=nil + } + return v1Acnt,nil + } + +func (v1ms *v1Mongo) getKeysForPrefix(prefix string) ([]string, error){ +return nil,nil +} \ No newline at end of file diff --git a/migrator/v1Redis.go b/migrator/v1Redis.go new file mode 100644 index 000000000..0f7b6e604 --- /dev/null +++ b/migrator/v1Redis.go @@ -0,0 +1,136 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package migrator + +import ( +"fmt" +"log" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/cgrates/engine" + + "github.com/mediocregopher/radix.v2/redis" + "github.com/mediocregopher/radix.v2/pool" +) +type v1Redis struct{ + dbPool *pool.Pool + ms engine.Marshaler + dataKeys []string + qryIdx *int + +} + +func newv1RedisStorage(address string, db int, pass, mrshlerStr string) (*v1Redis, error) { + df := func(network, addr string) (*redis.Client, error) { + client, err := redis.Dial(network, addr) + if err != nil { + return nil, err + } + if len(pass) != 0 { + if err = client.Cmd("AUTH", pass).Err; err != nil { + client.Close() + return nil, err + } + } + if db != 0 { + if err = client.Cmd("SELECT", db).Err; err != nil { + client.Close() + return nil, err + } + } + return client, nil + } + p, err := pool.NewCustom("tcp", address, 1, df) + if err != nil { + return nil, err + } + var mrshler engine.Marshaler + if mrshlerStr == utils.MSGPACK { + mrshler = engine.NewCodecMsgpackMarshaler() + } else if mrshlerStr == utils.JSON { + mrshler = new(engine.JSONMarshaler) + } else { + return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr) + } + return &v1Redis{dbPool: p,ms: mrshler}, nil +} + +// This CMD function get a connection from the pool. +// Handles automatic failover in case of network disconnects +func (v1rs *v1Redis) cmd(cmd string, args ...interface{}) *redis.Resp { + c1, err := v1rs.dbPool.Get() + if err != nil { + return redis.NewResp(err) + } + result := c1.Cmd(cmd, args...) + if result.IsType(redis.IOErr) { // Failover mecanism + utils.Logger.Warning(fmt.Sprintf(" error <%s>, attempting failover.", result.Err.Error())) + c2, err := v1rs.dbPool.Get() + if err == nil { + if result2 := c2.Cmd(cmd, args...); !result2.IsType(redis.IOErr) { + v1rs.dbPool.Put(c2) + return result2 + } + } + } else { + v1rs.dbPool.Put(c1) + } + return result +} + + +func (v1rs *v1Redis) getKeysForPrefix(prefix string) ([]string, error) { + r := v1rs.cmd("KEYS", prefix+"*") + if r.Err != nil { + return nil, r.Err + } + return r.List() +} + +func (v1rs *v1Redis) getv1Account() (v1Acnt *v1Account, err error){ +if v1rs.qryIdx==nil{ + v1rs.dataKeys, err = v1rs.getKeysForPrefix(v1AccountDBPrefix); + log.Print("#1 Done migrating!",v1rs.dataKeys) + + if err != nil { + return + }else if len(v1rs.dataKeys)==0{ + return nil,utils.ErrNotFound + } + v1rs.qryIdx=utils.IntPointer(0) + log.Print("#2 Done migrating!",*v1rs.qryIdx) + + } +if *v1rs.qryIdx<=len(v1rs.dataKeys)-1{ + log.Print("#3 Done migrating!",v1rs.dataKeys[*v1rs.qryIdx]) + +strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes() + if err != nil { + return nil ,err + } + v1Acnt = &v1Account{Id: v1rs.dataKeys[*v1rs.qryIdx]} + if err := v1rs.ms.Unmarshal(strVal, v1Acnt); err != nil { + return nil,err + } +log.Print("#4 Done migrating!",*v1rs.qryIdx) +*v1rs.qryIdx=*v1rs.qryIdx+1 +}else{ +v1rs.qryIdx=utils.IntPointer(-1) + return nil,utils.ErrNoMoreData + } + return v1Acnt,nil +} \ No newline at end of file diff --git a/utils/errors.go b/utils/errors.go index 2e9a215b8..5e079f55a 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -23,6 +23,7 @@ import ( ) var ( + ErrNoMoreData = errors.New("NO_MORE_DATA") ErrNotImplemented = errors.New("NOT_IMPLEMENTED") ErrNotFound = errors.New("NOT_FOUND") ErrTimedOut = errors.New("TIMED_OUT")