mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
backup
This commit is contained in:
@@ -74,41 +74,41 @@ var (
|
||||
)
|
||||
|
||||
func main() {
|
||||
// flag.Parse()
|
||||
// if *version {
|
||||
// fmt.Println(utils.GetCGRVersion())
|
||||
// return
|
||||
// }
|
||||
// if migrate != nil && *migrate != "" { // Run migrator
|
||||
flag.Parse()
|
||||
if *version {
|
||||
fmt.Println(utils.GetCGRVersion())
|
||||
return
|
||||
}
|
||||
if migrate != nil && *migrate != "" { // Run migrator
|
||||
|
||||
// dataDB, err := engine.ConfigureDataStorage(*dataDBType, *dataDBHost, *dataDBPort, *dataDBName, *dataDBUser, *dataDBPass, *dbDataEncoding, config.CgrConfig().CacheConfig, *loadHistorySize)
|
||||
// if err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
// oldDataDB, err := engine.ConfigureDataStorage(*oldDataDBType, *oldDataDBHost, *oldDataDBPort, *oldDataDBName, *oldDataDBUser, *oldDataDBPass, *oldDBDataEncoding, config.CgrConfig().CacheConfig, *oldLoadHistorySize)
|
||||
// if err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
// storDB, err := engine.ConfigureStorStorage(*storDBType, *storDBHost, *storDBPort, *storDBName, *storDBUser, *storDBPass, *dbDataEncoding,
|
||||
// config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
|
||||
// if err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
// oldstorDB, err := engine.ConfigureStorStorage(*oldStorDBType, *oldStorDBHost, *oldStorDBPort, *oldStorDBName, *oldStorDBUser, *oldStorDBPass, *oldDBDataEncoding,
|
||||
// config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
|
||||
// if err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
// m,err := migrator.NewMigrator(dataDB, *dataDBType, *dbDataEncoding, storDB, *storDBType,oldDataDB,*oldDataDBType,*oldDBDataEncoding,oldstorDB,*oldStorDBType)
|
||||
// if err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
// err = m.Migrate(*migrate);
|
||||
// if err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
dataDB, err := engine.ConfigureDataStorage(*dataDBType, *dataDBHost, *dataDBPort, *dataDBName, *dataDBUser, *dataDBPass, *dbDataEncoding, config.CgrConfig().CacheConfig, *loadHistorySize)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
oldDataDB, err := migrator.ConfigureV1DataStorage(*oldDataDBType, *oldDataDBHost, *oldDataDBPort, *oldDataDBName, *oldDataDBUser, *oldDataDBPass, *oldDBDataEncoding)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
storDB, err := engine.ConfigureStorStorage(*storDBType, *storDBHost, *storDBPort, *storDBName, *storDBUser, *storDBPass, *dbDataEncoding,
|
||||
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
oldstorDB, err := engine.ConfigureStorStorage(*oldStorDBType, *oldStorDBHost, *oldStorDBPort, *oldStorDBName, *oldStorDBUser, *oldStorDBPass, *oldDBDataEncoding,
|
||||
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
m,err := migrator.NewMigrator(dataDB, *dataDBType, *dbDataEncoding, storDB, *storDBType,oldDataDB,*oldDataDBType,*oldDBDataEncoding,oldstorDB,*oldStorDBType)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
err = m.Migrate(*migrate);
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// log.Print("Done migrating!")
|
||||
// return
|
||||
// }
|
||||
log.Print("Done migrating!")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,7 +86,8 @@ const CGRATES_CFG_JSON = `
|
||||
|
||||
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
|
||||
"db_type": "redis", // data_db type: <redis|mongo>
|
||||
"db_host": "127.0.0.1", // data_db host address
|
||||
"db_host": "192.168.100.40", // data_db host address
|
||||
//"db_host": "127.0.0.1", // data_db host address
|
||||
"db_port": 6379, // data_db port to reach the database
|
||||
"db_name": "10", // data_db database name to connect to
|
||||
"db_user": "cgrates", // username to use when connecting to data_db
|
||||
@@ -97,7 +98,8 @@ const CGRATES_CFG_JSON = `
|
||||
|
||||
"stor_db": { // database used to store offline tariff plans and CDRs
|
||||
"db_type": "mysql", // stor database type to use: <mongo|mysql|postgres>
|
||||
"db_host": "127.0.0.1", // the host to connect to
|
||||
"db_host": "192.168.100.40", // data_db host address
|
||||
//"db_host": "127.0.0.1", // the host to connect to
|
||||
"db_port": 3306, // the port to reach the stordb
|
||||
"db_name": "cgrates", // stor database name
|
||||
"db_user": "cgrates", // username to use when connecting to stordb
|
||||
|
||||
@@ -15,12 +15,15 @@
|
||||
|
||||
"data_db": {
|
||||
"db_type": "mongo",
|
||||
"db_host": "127.0.0.1",
|
||||
"db_port": 27017,
|
||||
},
|
||||
|
||||
"stor_db": {
|
||||
"db_type": "mongo",
|
||||
"db_host": "127.0.0.1",
|
||||
"db_port": 27017,
|
||||
"db_password":"",
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
{
|
||||
{
|
||||
// CGRateS Configuration file
|
||||
//
|
||||
// Used for cgradmin
|
||||
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -34,92 +33,34 @@ const (
|
||||
)
|
||||
|
||||
func (m *Migrator) migrateAccounts() (err error) {
|
||||
switch m.dataDBType {
|
||||
case utils.REDIS:
|
||||
var acntV1Keys []string
|
||||
acntV1Keys, err = m.oldDataDB.GetKeysForPrefix(v1AccountDBPrefix)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, acntV1Key := range acntV1Keys {
|
||||
v1Acnt, err := m.getV1AccountFromDB(acntV1Key)
|
||||
if err != nil {
|
||||
var v1Acnt *v1Account
|
||||
// for infinit pana cand vine err
|
||||
for {
|
||||
|
||||
v1Acnt,err=m.oldDataDB.getv1Account()
|
||||
if err!=nil&&err!=utils.ErrNoMoreData{
|
||||
return err
|
||||
}
|
||||
if err==utils.ErrNoMoreData{break}
|
||||
if v1Acnt != nil {
|
||||
acnt := v1Acnt.AsAccount()
|
||||
|
||||
if err = m.dataDB.SetAccount(acnt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
|
||||
if err = m.dataDB.SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
|
||||
}
|
||||
return
|
||||
case utils.MONGO:
|
||||
dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
mgoDB := dataDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
var accn v1Account
|
||||
iter := mgoDB.C(v1AccountDBPrefix).Find(nil).Iter()
|
||||
for iter.Next(&accn) {
|
||||
if acnt := accn.AsAccount(); acnt != nil {
|
||||
if err = m.dataDB.SetAccount(acnt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
|
||||
if err = m.dataDB.SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
|
||||
}
|
||||
return
|
||||
default:
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
utils.UnsupportedDB,
|
||||
fmt.Sprintf("error: unsupported: <%s> for migrateAccounts method", m.dataDBType))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Migrator) getV1AccountFromDB(key string) (*v1Account, error) {
|
||||
switch m.oldDataDBType {
|
||||
case utils.REDIS:
|
||||
dataDB := m.oldDataDB.(*engine.RedisStorage)
|
||||
if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
v1Acnt := &v1Account{Id: key}
|
||||
if err := m.mrshlr.Unmarshal(strVal, v1Acnt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1Acnt, nil
|
||||
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
|
||||
if err = m.dataDB.SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
|
||||
}
|
||||
case utils.MONGO:
|
||||
dataDB := m.oldDataDB.(*engine.MongoStorage)
|
||||
mgoDB := dataDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
v1Acnt := new(v1Account)
|
||||
if err := mgoDB.C(v1AccountTBL).Find(bson.M{"id": key}).One(v1Acnt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1Acnt, nil
|
||||
default:
|
||||
return nil, utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
utils.UnsupportedDB,
|
||||
fmt.Sprintf("error: unsupported: <%s> for getV1AccountFromDB method", m.oldDataDBType))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type v1Account struct {
|
||||
|
||||
@@ -23,7 +23,7 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewMigrator(dataDB engine.DataDB, dataDBType, dataDBEncoding string, storDB engine.Storage, storDBType string, oldDataDB engine.DataDB, oldDataDBType, oldDataDBEncoding string, oldStorDB engine.Storage, oldStorDBType string) (m *Migrator, err error) {
|
||||
func NewMigrator(dataDB engine.DataDB, dataDBType, dataDBEncoding string, storDB engine.Storage, storDBType string, oldDataDB v1DataDB, oldDataDBType, oldDataDBEncoding string, oldStorDB engine.Storage, oldStorDBType string) (m *Migrator, err error) {
|
||||
var mrshlr engine.Marshaler
|
||||
var oldmrshlr engine.Marshaler
|
||||
if dataDBEncoding == utils.MSGPACK {
|
||||
@@ -50,7 +50,7 @@ type Migrator struct {
|
||||
storDB engine.Storage
|
||||
storDBType string
|
||||
mrshlr engine.Marshaler
|
||||
oldDataDB engine.DataDB
|
||||
oldDataDB v1DataDB
|
||||
oldDataDBType string
|
||||
oldStorDB engine.Storage
|
||||
oldStorDBType string
|
||||
|
||||
@@ -19,7 +19,7 @@ package migrator
|
||||
import (
|
||||
"flag"
|
||||
// "fmt"
|
||||
// "path"
|
||||
"path"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -37,8 +37,7 @@ var (
|
||||
onStorCfg string
|
||||
dbtype string
|
||||
mig *Migrator
|
||||
migrate = flag.String("migrate", "", "Fire up automatic migration <*cost_details|*set_versions>")
|
||||
version = flag.Bool("version", false, "Prints the application version.")
|
||||
dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
|
||||
|
||||
dataDBType = flag.String("datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb database <redis>")
|
||||
dataDBHost = flag.String("datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.")
|
||||
@@ -73,17 +72,17 @@ var (
|
||||
|
||||
dbDataEncoding = flag.String("dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings")
|
||||
oldDBDataEncoding = flag.String("old_dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings")
|
||||
)
|
||||
)
|
||||
|
||||
// subtests to be executed for each migrator
|
||||
var sTestsITMigrator = []func(t *testing.T){
|
||||
testOnStorITFlush,
|
||||
testMigratorAccounts,
|
||||
testMigratorActionPlans,
|
||||
testMigratorActionTriggers,
|
||||
testMigratorActions,
|
||||
testMigratorSharedGroups,
|
||||
}
|
||||
// testMigratorActionPlans,
|
||||
// testMigratorActionTriggers,
|
||||
// testMigratorActions,
|
||||
// testMigratorSharedGroups,
|
||||
}
|
||||
|
||||
func TestOnStorITRedisConnect(t *testing.T) {
|
||||
dataDB, err := engine.ConfigureDataStorage(*dataDBType, *dataDBHost, *dataDBPort, *dataDBName, *dataDBUser, *dataDBPass, *dbDataEncoding, config.CgrConfig().CacheConfig, *loadHistorySize)
|
||||
@@ -117,28 +116,43 @@ func TestOnStorITRedis(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// func TestOnStorITMongoConnect(t *testing.T) {
|
||||
// cdrsMongoCfgPath := path.Join(*dataDir, "conf", "samples", "cdrsv2mongo")
|
||||
// mgoITCfg, err := config.NewCGRConfigFromFolder(cdrsMongoCfgPath)
|
||||
// if err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
// if mgoITdb, err = engine.NewMongoStorage(mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, db_passwd,
|
||||
// utils.StorDB, nil, mgoITCfg.CacheConfig, mgoITCfg.LoadHistorySize); err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
// mongo = mgoITCfg
|
||||
// onStorCfg = mgoITCfg.StorDBName
|
||||
// mig = NewMigrator(mgoITdb, mgoITdb, utils.MONGO, utils.JSON, mgoITdb, utils.MONGO)
|
||||
// }
|
||||
func TestOnStorITMongoConnect(t *testing.T) {
|
||||
|
||||
cdrsMongoCfgPath := path.Join(*dataDir, "conf", "samples", "tutmongo")
|
||||
mgoITCfg, err := config.NewCGRConfigFromFolder(cdrsMongoCfgPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
dataDB, err := engine.ConfigureDataStorage(mgoITCfg.DataDbType, mgoITCfg.DataDbHost, mgoITCfg.DataDbPort, mgoITCfg.DataDbName, mgoITCfg.DataDbUser, mgoITCfg.DataDbPass, mgoITCfg.DBDataEncoding, mgoITCfg.CacheConfig, *loadHistorySize)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
oldDataDB, err := engine.ConfigureDataStorage(mgoITCfg.DataDbType, mgoITCfg.DataDbHost, mgoITCfg.DataDbPort, mgoITCfg.DataDbName, mgoITCfg.DataDbUser, mgoITCfg.DataDbPass, mgoITCfg.DBDataEncoding, mgoITCfg.CacheConfig, *oldLoadHistorySize)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
storDB, err := engine.ConfigureStorStorage(mgoITCfg.StorDBType, mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, mgoITCfg.StorDBPass, mgoITCfg.DBDataEncoding,
|
||||
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
oldstorDB, err := engine.ConfigureStorStorage(mgoITCfg.StorDBType, mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, mgoITCfg.StorDBPass, mgoITCfg.DBDataEncoding,
|
||||
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
mig, err = NewMigrator(dataDB, mgoITCfg.DataDbType,mgoITCfg.DBDataEncoding, storDB, mgoITCfg.StorDBType, oldDataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, oldstorDB, mgoITCfg.StorDBType)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// func TestOnStorITMongo(t *testing.T) {
|
||||
// dbtype = utils.MONGO
|
||||
// onStor = mgoITdb
|
||||
// for _, stest := range sTestsITMigrator {
|
||||
// t.Run("TestITMigratorOnMongo", stest)
|
||||
// }
|
||||
// }
|
||||
func TestOnStorITMongo(t *testing.T) {
|
||||
dbtype = utils.MONGO
|
||||
for _, stest := range sTestsITMigrator {
|
||||
t.Run("TestITMigratorOnMongo", stest)
|
||||
}
|
||||
}
|
||||
|
||||
func testOnStorITFlush(t *testing.T) {
|
||||
switch {
|
||||
@@ -149,7 +163,7 @@ func testOnStorITFlush(t *testing.T) {
|
||||
t.Error("Error when flushing Redis ", err.Error())
|
||||
}
|
||||
case dbtype == utils.MONGO:
|
||||
err := engine.InitDataDb(mongo)
|
||||
err := mig.dataDB.Flush("")
|
||||
if err != nil {
|
||||
t.Error("Error when flushing Mongo ", err.Error())
|
||||
}
|
||||
@@ -193,9 +207,9 @@ func testMigratorAccounts(t *testing.T) {
|
||||
} else if !reflect.DeepEqual(testAccount, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", testAccount, result)
|
||||
}
|
||||
/*
|
||||
|
||||
case dbtype == utils.MONGO:
|
||||
err := mig.SetV1onMongoAccount(v1AccountDBPrefix, v1Acc)
|
||||
err := mig.SetV1onMongoAccount(v1Acc)
|
||||
if err != nil {
|
||||
t.Error("Error when marshaling ", err.Error())
|
||||
}
|
||||
@@ -210,14 +224,12 @@ func testMigratorAccounts(t *testing.T) {
|
||||
if !reflect.DeepEqual(testAccount, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", testAccount, result)
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
//2
|
||||
|
||||
/*
|
||||
func testMigratorActionPlans(t *testing.T) {
|
||||
|
||||
v1ap := v1ActionPlans{&v1ActionPlan{Id: "test", AccountIds: []string{"one"}, Timing: &engine.RateInterval{Timing: &engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}
|
||||
ap := &engine.ActionPlan{Id: "test", AccountIDs: utils.StringMap{"one": true}, ActionTimings: []*engine.ActionTiming{&engine.ActionTiming{Timing: &engine.RateInterval{Timing: &engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}
|
||||
switch {
|
||||
@@ -233,7 +245,7 @@ func testMigratorActionPlans(t *testing.T) {
|
||||
}
|
||||
_, err = mig.getV1ActionPlansFromDB(setv1id)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 ActionPlan ", err.Error())
|
||||
t.Error("Error when getting v1 ActionPlan ", err.Error())
|
||||
}
|
||||
err = mig.Migrate(utils.MetaActionPlans)
|
||||
if err != nil {
|
||||
@@ -252,12 +264,18 @@ func testMigratorActionPlans(t *testing.T) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Weight, result.ActionTimings[0].Weight)
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
case dbtype == utils.MONGO:
|
||||
err := mig.SetV1onMongoActionPlan(utils.ACTION_PLAN_PREFIX, v1ap)
|
||||
err := mig.SetV1onMongoActionPlan("actions", &v1ap)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 ActionPlans ", err.Error())
|
||||
}
|
||||
log.Print("dadada!")
|
||||
_, err = mig.getV1ActionPlansFromDB("")
|
||||
if err != nil {
|
||||
t.Error("Error when getting v1 ActionPlan ", err.Error())
|
||||
}
|
||||
log.Print("dadada!")
|
||||
err = mig.Migrate("migrateActionPlans")
|
||||
if err != nil {
|
||||
t.Error("Error when migrating ActionPlans ", err.Error())
|
||||
@@ -268,12 +286,12 @@ func testMigratorActionPlans(t *testing.T) {
|
||||
}
|
||||
if ap.Id != result.Id || !reflect.DeepEqual(ap.AccountIDs, result.AccountIDs) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", *ap, result)
|
||||
} else if !reflect.DeepEqual(ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing)
|
||||
} else if ap.ActionTimings[0].Weight != result.ActionTimings[0].Weight || ap.ActionTimings[0].ActionsID != result.ActionTimings[0].ActionsID {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Weight, result.ActionTimings[0].Weight)
|
||||
}
|
||||
*/
|
||||
} //else if !reflect.DeepEqual(ap.ActionTimings[0].Timing, result.ActionTimings.Timing) {
|
||||
// t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing)
|
||||
//} else if ap.ActionTimings[0].Weight != result.ActionTimings[0].Weight || ap.ActionTimings[0].ActionsID != result.ActionTimings[0].ActionsID {
|
||||
// t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Weight, result.ActionTimings[0].Weight)
|
||||
//}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -339,7 +357,7 @@ func testMigratorActionTriggers(t *testing.T) {
|
||||
}
|
||||
if !reflect.DeepEqual(atrs[0].ID, result[0].ID) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", atrs[0].ID, result[0].ID)
|
||||
} /*else if !reflect.DeepEqual(atrs[0].UniqueID, result[0].UniqueID) {
|
||||
} else if !reflect.DeepEqual(atrs[0].UniqueID, result[0].UniqueID) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", atrs[0].UniqueID, result[0].UniqueID)
|
||||
} else if !reflect.DeepEqual(atrs[0].ThresholdType, result[0].ThresholdType) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", atrs[0].ThresholdType, result[0].ThresholdType)
|
||||
@@ -400,10 +418,10 @@ func testMigratorActionTriggers(t *testing.T) {
|
||||
} else if !reflect.DeepEqual(atrs[0].Balance.Blocker, result[0].Balance.Blocker) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", atrs[0].Balance.Blocker, result[0].Balance.Blocker)
|
||||
}
|
||||
*/
|
||||
/*
|
||||
|
||||
|
||||
case dbtype == utils.MONGO:
|
||||
err := mig.SetV1onMongoActionTrigger(utils.ACTION_TRIGGER_PREFIX, v1atrs)
|
||||
err := mig.SetV1onMongoActionTrigger(utils.ACTION_TRIGGER_PREFIX, &v1atrs)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 ActionTriggers ", err.Error())
|
||||
}
|
||||
@@ -411,7 +429,7 @@ func testMigratorActionTriggers(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error("Error when migrating ActionTriggers ", err.Error())
|
||||
}
|
||||
result, err := mig.dataDB.GetActionTriggers(v1atrs.Id, true, utils.NonTransactional)
|
||||
result, err := mig.dataDB.GetActionTriggers(v1atrs[0].Id, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
t.Error("Error when getting ActionTriggers ", err.Error())
|
||||
}
|
||||
@@ -422,7 +440,7 @@ func testMigratorActionTriggers(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error("Error when flushing v1 ActionTriggers ", err.Error())
|
||||
}
|
||||
*/
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -457,9 +475,9 @@ func testMigratorActions(t *testing.T) {
|
||||
if !reflect.DeepEqual(act, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", act, result)
|
||||
}
|
||||
/*
|
||||
|
||||
case dbtype == utils.MONGO:
|
||||
err := mig.SetV1onMongoAction(utils.ACTION_PREFIX, v1act)
|
||||
err := mig.SetV1onMongoAction(utils.ACTION_PREFIX, &v1act)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 Actions ", err.Error())
|
||||
}
|
||||
@@ -467,7 +485,7 @@ func testMigratorActions(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error("Error when migrating Actions ", err.Error())
|
||||
}
|
||||
result, err := mig.dataDB.GetActions(v1act.Id, true, utils.NonTransactional)
|
||||
result, err := mig.dataDB.GetActions(v1act[0].Id, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
t.Error("Error when getting Actions ", err.Error())
|
||||
}
|
||||
@@ -478,7 +496,7 @@ func testMigratorActions(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error("Error when flushing v1 Actions ", err.Error())
|
||||
}
|
||||
*/
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -521,7 +539,7 @@ func testMigratorSharedGroups(t *testing.T) {
|
||||
if !reflect.DeepEqual(sg, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", sg, result)
|
||||
}
|
||||
/*
|
||||
|
||||
case dbtype == utils.MONGO:
|
||||
err := mig.SetV1onMongoSharedGroup(utils.SHARED_GROUP_PREFIX, v1sg)
|
||||
if err != nil {
|
||||
@@ -538,6 +556,7 @@ func testMigratorSharedGroups(t *testing.T) {
|
||||
if !reflect.DeepEqual(sg, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", sg, result)
|
||||
}
|
||||
*/
|
||||
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
@@ -1,95 +1,140 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
// /*
|
||||
// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
// Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
// */
|
||||
package migrator
|
||||
|
||||
import "github.com/cgrates/cgrates/engine"
|
||||
// import (
|
||||
// "fmt"
|
||||
|
||||
// "github.com/cgrates/cgrates/engine"
|
||||
// "github.com/cgrates/cgrates/utils"
|
||||
// "gopkg.in/mgo.v2/bson"
|
||||
// )
|
||||
|
||||
func (m *Migrator) SetV1onOldRedis(key string, bl []byte) (err error) {
|
||||
dataDB := m.oldDataDB.(*engine.RedisStorage)
|
||||
if err = dataDB.Cmd("SET", key, bl).Err; err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
// type AcKeyValue struct {
|
||||
// Key string
|
||||
// Value v1Actions
|
||||
// }
|
||||
// type AtKeyValue struct {
|
||||
// Key string
|
||||
// Value v1ActionPlans
|
||||
// }
|
||||
|
||||
func (m *Migrator) SetV1onRedis(key string, bl []byte) (err error) {
|
||||
dataDB := m.dataDB.(*engine.RedisStorage)
|
||||
if err = dataDB.Cmd("SET", key, bl).Err; err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
// func (m *Migrator) SetV1onOldRedis(key string, bl []byte) (err error) {
|
||||
// dataDB := m.oldDataDB.(*engine.RedisStorage)
|
||||
// if err = dataDB.Cmd("SET", key, bl).Err; err != nil {
|
||||
// return err
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
|
||||
func (m *Migrator) SetV1onMongoAccount(pref string, x *v1Account) (err error) {
|
||||
dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
mgoDB := dataDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
if err := mgoDB.C(pref).Insert(x); err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
// func (m *Migrator) SetV1onRedis(key string, bl []byte) (err error) {
|
||||
// dataDB := m.dataDB.(*engine.RedisStorage)
|
||||
// if err = dataDB.Cmd("SET", key, bl).Err; err != nil {
|
||||
// return err
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
|
||||
func (m *Migrator) SetV1onMongoAction(pref string, x *v1Action) (err error) {
|
||||
dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
mgoDB := dataDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
if err := mgoDB.C(pref).Insert(x); err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
// func (m *Migrator) SetV1onMongoAccount( x *v1Account) (err error) {
|
||||
// dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
// mgoDB := dataDB.DB()
|
||||
// defer mgoDB.Session.Close()
|
||||
// if err := mgoDB.C("userbalances").Insert(x); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
|
||||
func (m *Migrator) SetV1onMongoActionPlan(pref string, x *v1ActionPlan) (err error) {
|
||||
dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
mgoDB := dataDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
if err := mgoDB.C(pref).Insert(x); err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
// func (m *Migrator) SetV1onMongoAction(key string, x *v1Actions) (err error) {
|
||||
// dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
// mgoDB := dataDB.DB()
|
||||
// defer mgoDB.Session.Close()
|
||||
// if err := mgoDB.C("actions").Insert(&AcKeyValue{key, *x}); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
|
||||
func (m *Migrator) SetV1onMongoActionTrigger(pref string, x *v1ActionTrigger) (err error) {
|
||||
dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
mgoDB := dataDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
if err := mgoDB.C(pref).Insert(x); err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
// func (m *Migrator) SetV1onMongoActionPlan(key string, x *v1ActionPlans) (err error) {
|
||||
// dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
// mgoDB := dataDB.DB()
|
||||
// defer mgoDB.Session.Close()
|
||||
// if err := mgoDB.C("actiontimings").Insert(&AtKeyValue{key, *x}); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
|
||||
func (m *Migrator) SetV1onMongoSharedGroup(pref string, x *v1SharedGroup) (err error) {
|
||||
dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
mgoDB := dataDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
if err := mgoDB.C(pref).Insert(x); err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
func (m *Migrator) DropV1Colection(pref string) (err error) {
|
||||
dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
mgoDB := dataDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
if err := mgoDB.C(pref).DropCollection(); err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
// func (m *Migrator) SetV1onMongoActionTrigger(pref string, x *v1ActionTriggers) (err error) {
|
||||
// dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
// mgoDB := dataDB.DB()
|
||||
// defer mgoDB.Session.Close()
|
||||
// if err := mgoDB.C(pref).Insert(x); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
|
||||
// func (m *Migrator) SetV1onMongoSharedGroup(pref string, x *v1SharedGroup) (err error) {
|
||||
// dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
// mgoDB := dataDB.DB()
|
||||
// defer mgoDB.Session.Close()
|
||||
// if err := mgoDB.C(pref).Insert(x); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
// func (m *Migrator) DropV1Colection(pref string) (err error) {
|
||||
// dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
// mgoDB := dataDB.DB()
|
||||
// defer mgoDB.Session.Close()
|
||||
// if err := mgoDB.C(pref).DropCollection(); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
|
||||
// func (m *Migrator) getV1AccountFromDB(key string) (*v1Account, error) {
|
||||
// switch m.oldDataDBType {
|
||||
// case utils.REDIS:
|
||||
// dataDB := m.oldDataDB.(*engine.RedisStorage)
|
||||
// if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil {
|
||||
// return nil, err
|
||||
// } else {
|
||||
// v1Acnt := &v1Account{Id: key}
|
||||
// if err := m.mrshlr.Unmarshal(strVal, v1Acnt); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// return v1Acnt, nil
|
||||
// }
|
||||
// case utils.MONGO:
|
||||
// dataDB := m.oldDataDB.(*engine.MongoStorage)
|
||||
// mgoDB := dataDB.DB()
|
||||
// defer mgoDB.Session.Close()
|
||||
// v1Acnt := new(v1Account)
|
||||
// if err := mgoDB.C(v1AccountTBL).Find(bson.M{"id": key}).One(v1Acnt); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// return v1Acnt, nil
|
||||
// default:
|
||||
// return nil, utils.NewCGRError(utils.Migrator,
|
||||
// utils.ServerErrorCaps,
|
||||
// utils.UnsupportedDB,
|
||||
// fmt.Sprintf("error: unsupported: <%s> for getV1AccountFromDB method", m.oldDataDBType))
|
||||
// }
|
||||
// }
|
||||
23
migrator/v1DataDB.go
Normal file
23
migrator/v1DataDB.go
Normal file
@@ -0,0 +1,23 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package migrator
|
||||
|
||||
type v1DataDB interface{
|
||||
getv1Account() (v1Acnt *v1Account, err error)
|
||||
getKeysForPrefix(prefix string) ([]string, error)
|
||||
}
|
||||
53
migrator/v1Migrator_Utils.go
Normal file
53
migrator/v1Migrator_Utils.go
Normal file
@@ -0,0 +1,53 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package migrator
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func ConfigureV1DataStorage(db_type, host, port, name, user, pass, marshaler string) (db v1DataDB, err error) {
|
||||
var d v1DataDB
|
||||
switch db_type {
|
||||
case utils.REDIS:
|
||||
var db_nb int
|
||||
db_nb, err = strconv.Atoi(name)
|
||||
if err != nil {
|
||||
utils.Logger.Crit("Redis db name must be an integer!")
|
||||
return nil, err
|
||||
}
|
||||
if port != "" {
|
||||
host += ":" + port
|
||||
}
|
||||
d, err = newv1RedisStorage(host, db_nb, pass, marshaler)
|
||||
case utils.MONGO:
|
||||
d, err = NewMongoStorage(host, port, name, user, pass, utils.DataDB, nil)
|
||||
db = d.(v1DataDB)
|
||||
default:
|
||||
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'",
|
||||
db_type, utils.REDIS, utils.MONGO))
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return d, nil
|
||||
}
|
||||
68
migrator/v1Mongo.go
Normal file
68
migrator/v1Mongo.go
Normal file
@@ -0,0 +1,68 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package migrator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"gopkg.in/mgo.v2"
|
||||
)
|
||||
|
||||
type v1Mongo struct{
|
||||
session *mgo.Session
|
||||
db string
|
||||
v1ms engine.Marshaler
|
||||
qryIter *mgo.Iter
|
||||
|
||||
}
|
||||
func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes []string) (v1ms *v1Mongo, err error) {
|
||||
url := host
|
||||
if port != "" {
|
||||
url += ":" + port
|
||||
}
|
||||
if user != "" && pass != "" {
|
||||
url = fmt.Sprintf("%s:%s@%s", user, pass, url)
|
||||
}
|
||||
if db != "" {
|
||||
url += "/" + db
|
||||
}
|
||||
session, err := mgo.Dial(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
session.SetMode(mgo.Strong, true)
|
||||
v1ms = &v1Mongo{db: db, session: session, v1ms: engine.NewCodecMsgpackMarshaler()}
|
||||
return
|
||||
}
|
||||
|
||||
func (v1ms *v1Mongo) getv1Account() (v1Acnt *v1Account, err error){
|
||||
if v1ms.qryIter==nil{
|
||||
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Find(nil).Iter()
|
||||
}
|
||||
v1ms.qryIter.Next(&v1Acnt)
|
||||
|
||||
if v1Acnt==nil{
|
||||
v1ms.qryIter=nil
|
||||
}
|
||||
return v1Acnt,nil
|
||||
}
|
||||
|
||||
func (v1ms *v1Mongo) getKeysForPrefix(prefix string) ([]string, error){
|
||||
return nil,nil
|
||||
}
|
||||
136
migrator/v1Redis.go
Normal file
136
migrator/v1Redis.go
Normal file
@@ -0,0 +1,136 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package migrator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
|
||||
"github.com/mediocregopher/radix.v2/redis"
|
||||
"github.com/mediocregopher/radix.v2/pool"
|
||||
)
|
||||
type v1Redis struct{
|
||||
dbPool *pool.Pool
|
||||
ms engine.Marshaler
|
||||
dataKeys []string
|
||||
qryIdx *int
|
||||
|
||||
}
|
||||
|
||||
func newv1RedisStorage(address string, db int, pass, mrshlerStr string) (*v1Redis, error) {
|
||||
df := func(network, addr string) (*redis.Client, error) {
|
||||
client, err := redis.Dial(network, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(pass) != 0 {
|
||||
if err = client.Cmd("AUTH", pass).Err; err != nil {
|
||||
client.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if db != 0 {
|
||||
if err = client.Cmd("SELECT", db).Err; err != nil {
|
||||
client.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
p, err := pool.NewCustom("tcp", address, 1, df)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var mrshler engine.Marshaler
|
||||
if mrshlerStr == utils.MSGPACK {
|
||||
mrshler = engine.NewCodecMsgpackMarshaler()
|
||||
} else if mrshlerStr == utils.JSON {
|
||||
mrshler = new(engine.JSONMarshaler)
|
||||
} else {
|
||||
return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr)
|
||||
}
|
||||
return &v1Redis{dbPool: p,ms: mrshler}, nil
|
||||
}
|
||||
|
||||
// This CMD function get a connection from the pool.
|
||||
// Handles automatic failover in case of network disconnects
|
||||
func (v1rs *v1Redis) cmd(cmd string, args ...interface{}) *redis.Resp {
|
||||
c1, err := v1rs.dbPool.Get()
|
||||
if err != nil {
|
||||
return redis.NewResp(err)
|
||||
}
|
||||
result := c1.Cmd(cmd, args...)
|
||||
if result.IsType(redis.IOErr) { // Failover mecanism
|
||||
utils.Logger.Warning(fmt.Sprintf("<RedisStorage> error <%s>, attempting failover.", result.Err.Error()))
|
||||
c2, err := v1rs.dbPool.Get()
|
||||
if err == nil {
|
||||
if result2 := c2.Cmd(cmd, args...); !result2.IsType(redis.IOErr) {
|
||||
v1rs.dbPool.Put(c2)
|
||||
return result2
|
||||
}
|
||||
}
|
||||
} else {
|
||||
v1rs.dbPool.Put(c1)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
|
||||
func (v1rs *v1Redis) getKeysForPrefix(prefix string) ([]string, error) {
|
||||
r := v1rs.cmd("KEYS", prefix+"*")
|
||||
if r.Err != nil {
|
||||
return nil, r.Err
|
||||
}
|
||||
return r.List()
|
||||
}
|
||||
|
||||
func (v1rs *v1Redis) getv1Account() (v1Acnt *v1Account, err error){
|
||||
if v1rs.qryIdx==nil{
|
||||
v1rs.dataKeys, err = v1rs.getKeysForPrefix(v1AccountDBPrefix);
|
||||
log.Print("#1 Done migrating!",v1rs.dataKeys)
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}else if len(v1rs.dataKeys)==0{
|
||||
return nil,utils.ErrNotFound
|
||||
}
|
||||
v1rs.qryIdx=utils.IntPointer(0)
|
||||
log.Print("#2 Done migrating!",*v1rs.qryIdx)
|
||||
|
||||
}
|
||||
if *v1rs.qryIdx<=len(v1rs.dataKeys)-1{
|
||||
log.Print("#3 Done migrating!",v1rs.dataKeys[*v1rs.qryIdx])
|
||||
|
||||
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
|
||||
if err != nil {
|
||||
return nil ,err
|
||||
}
|
||||
v1Acnt = &v1Account{Id: v1rs.dataKeys[*v1rs.qryIdx]}
|
||||
if err := v1rs.ms.Unmarshal(strVal, v1Acnt); err != nil {
|
||||
return nil,err
|
||||
}
|
||||
log.Print("#4 Done migrating!",*v1rs.qryIdx)
|
||||
*v1rs.qryIdx=*v1rs.qryIdx+1
|
||||
}else{
|
||||
v1rs.qryIdx=utils.IntPointer(-1)
|
||||
return nil,utils.ErrNoMoreData
|
||||
}
|
||||
return v1Acnt,nil
|
||||
}
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
ErrNoMoreData = errors.New("NO_MORE_DATA")
|
||||
ErrNotImplemented = errors.New("NOT_IMPLEMENTED")
|
||||
ErrNotFound = errors.New("NOT_FOUND")
|
||||
ErrTimedOut = errors.New("TIMED_OUT")
|
||||
|
||||
Reference in New Issue
Block a user