mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Refactoring the rest of migrator
This commit is contained in:
@@ -86,7 +86,6 @@ const CGRATES_CFG_JSON = `
|
||||
|
||||
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
|
||||
"db_type": "redis", // data_db type: <redis|mongo>
|
||||
//"db_host": "192.168.100.40", // data_db host address
|
||||
"db_host": "127.0.0.1", // data_db host address
|
||||
"db_port": 6379, // data_db port to reach the database
|
||||
"db_name": "10", // data_db database name to connect to
|
||||
@@ -98,7 +97,6 @@ const CGRATES_CFG_JSON = `
|
||||
|
||||
"stor_db": { // database used to store offline tariff plans and CDRs
|
||||
"db_type": "mysql", // stor database type to use: <mongo|mysql|postgres>
|
||||
//"db_host": "192.168.100.40", // data_db host address
|
||||
"db_host": "127.0.0.1", // the host to connect to
|
||||
"db_port": 3306, // the port to reach the stordb
|
||||
"db_name": "cgrates", // stor database name
|
||||
|
||||
@@ -33,29 +33,31 @@ const (
|
||||
)
|
||||
|
||||
func (m *Migrator) migrateAccounts() (err error) {
|
||||
var v1Acnt *v1Account
|
||||
for {
|
||||
v1Acnt,err=m.oldDataDB.getv1Account()
|
||||
if err!=nil&&err!=utils.ErrNoMoreData{
|
||||
var v1Acnt *v1Account
|
||||
for {
|
||||
v1Acnt, err = m.oldDataDB.getv1Account()
|
||||
if err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
}
|
||||
if err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
if v1Acnt != nil {
|
||||
acnt := v1Acnt.AsAccount()
|
||||
if err = m.dataDB.SetAccount(acnt); err != nil {
|
||||
return err
|
||||
}
|
||||
if err==utils.ErrNoMoreData{break}
|
||||
if v1Acnt != nil {
|
||||
acnt := v1Acnt.AsAccount()
|
||||
if err = m.dataDB.SetAccount(acnt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
|
||||
if err = m.dataDB.SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
|
||||
}
|
||||
return
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
|
||||
if err = m.dataDB.SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type v1Account struct {
|
||||
|
||||
@@ -40,36 +40,37 @@ type v1Action struct {
|
||||
type v1Actions []*v1Action
|
||||
|
||||
func (m *Migrator) migrateActions() (err error) {
|
||||
var v1ACs *v1Actions
|
||||
var acts engine.Actions
|
||||
for {
|
||||
v1ACs,err=m.oldDataDB.getV1Actions()
|
||||
if err!=nil&&err!=utils.ErrNoMoreData{
|
||||
return err
|
||||
}
|
||||
if err==utils.ErrNoMoreData{break}
|
||||
var v1ACs *v1Actions
|
||||
var acts engine.Actions
|
||||
for {
|
||||
v1ACs, err = m.oldDataDB.getV1Actions()
|
||||
if err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
}
|
||||
if err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
if *v1ACs != nil {
|
||||
for _, v1ac := range *v1ACs{
|
||||
for _, v1ac := range *v1ACs {
|
||||
act := v1ac.AsAction()
|
||||
acts = append(acts, act)
|
||||
|
||||
}
|
||||
if err := m.dataDB.SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
|
||||
if err = m.dataDB.SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
|
||||
}
|
||||
return
|
||||
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.ACTION_PREFIX]}
|
||||
if err = m.dataDB.SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (v1Act v1Action) AsAction() (act *engine.Action) {
|
||||
|
||||
@@ -19,7 +19,6 @@ package migrator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
//"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -52,15 +51,17 @@ func (at *v1ActionPlan) IsASAP() bool {
|
||||
}
|
||||
|
||||
func (m *Migrator) migrateActionPlans() (err error) {
|
||||
var v1APs *v1ActionPlans
|
||||
for {
|
||||
v1APs,err=m.oldDataDB.getV1ActionPlans()
|
||||
if err!=nil&&err!=utils.ErrNoMoreData{
|
||||
return err
|
||||
}
|
||||
if err==utils.ErrNoMoreData{break}
|
||||
var v1APs *v1ActionPlans
|
||||
for {
|
||||
v1APs, err = m.oldDataDB.getV1ActionPlans()
|
||||
if err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
}
|
||||
if err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
if *v1APs != nil {
|
||||
for _, v1ap := range *v1APs{
|
||||
for _, v1ap := range *v1APs {
|
||||
ap := v1ap.AsActionPlan()
|
||||
if err = m.dataDB.SetActionPlan(ap.Id, ap, true, utils.NonTransactional); err != nil {
|
||||
return err
|
||||
@@ -68,15 +69,15 @@ func (m *Migrator) migrateActionPlans() (err error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
|
||||
if err = m.dataDB.SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
|
||||
}
|
||||
return
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.ACTION_PLAN_PREFIX]}
|
||||
if err = m.dataDB.SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (v1AP v1ActionPlan) AsActionPlan() (ap *engine.ActionPlan) {
|
||||
|
||||
@@ -2,8 +2,6 @@ package migrator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -38,97 +36,38 @@ type v1ActionTrigger struct {
|
||||
type v1ActionTriggers []*v1ActionTrigger
|
||||
|
||||
func (m *Migrator) migrateActionTriggers() (err error) {
|
||||
switch m.dataDBType {
|
||||
case utils.REDIS:
|
||||
var atrrs engine.ActionTriggers
|
||||
var v1atrskeys []string
|
||||
v1atrskeys, err = m.dataDB.GetKeysForPrefix(utils.ACTION_TRIGGER_PREFIX)
|
||||
if err != nil {
|
||||
return
|
||||
var v1ACTs *v1ActionTriggers
|
||||
var acts engine.ActionTriggers
|
||||
for {
|
||||
v1ACTs, err = m.oldDataDB.getV1ActionTriggers()
|
||||
if err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
}
|
||||
for _, v1atrskey := range v1atrskeys {
|
||||
v1atrs, err := m.getV1ActionTriggerFromDB(v1atrskey)
|
||||
if err != nil {
|
||||
if err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
if *v1ACTs != nil {
|
||||
for _, v1ac := range *v1ACTs {
|
||||
act := v1ac.AsActionTrigger()
|
||||
acts = append(acts, act)
|
||||
|
||||
}
|
||||
if err := m.dataDB.SetActionTriggers(acts[0].ID, acts, utils.NonTransactional); err != nil {
|
||||
return err
|
||||
}
|
||||
if v1atrs == nil {
|
||||
log.Print("No Action Triggers found key:", v1atrskey)
|
||||
} else {
|
||||
for _, v1atr := range *v1atrs {
|
||||
atr := v1atr.AsActionTrigger()
|
||||
atrrs = append(atrrs, atr)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
if err := m.dataDB.SetActionTriggers(atrrs[0].ID, atrrs, utils.NonTransactional); err != nil {
|
||||
return err
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.ACTION_TRIGGER_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_TRIGGER_PREFIX]}
|
||||
if err = m.dataDB.SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating ActionTrigger version into StorDB", err.Error()))
|
||||
}
|
||||
return
|
||||
case utils.MONGO:
|
||||
dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
mgoDB := dataDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
var atrrs engine.ActionTriggers
|
||||
var v1atr v1ActionTrigger
|
||||
iter := mgoDB.C(utils.ACTION_TRIGGER_PREFIX).Find(nil).Iter()
|
||||
for iter.Next(&v1atr) {
|
||||
atr := v1atr.AsActionTrigger()
|
||||
atrrs = append(atrrs, atr)
|
||||
}
|
||||
if err := m.dataDB.SetActionTriggers(atrrs[0].ID, atrrs, utils.NonTransactional); err != nil {
|
||||
return err
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.ACTION_TRIGGER_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_TRIGGER_PREFIX]}
|
||||
if err = m.dataDB.SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating ActionTrigger version into StorDB", err.Error()))
|
||||
}
|
||||
return
|
||||
default:
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.ACTION_TRIGGER_PREFIX]}
|
||||
if err = m.dataDB.SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
utils.UnsupportedDB,
|
||||
fmt.Sprintf("error: unsupported: <%s> for migrateActionTriggers method", m.dataDBType))
|
||||
}
|
||||
}
|
||||
func (m *Migrator) getV1ActionTriggerFromDB(key string) (v1Atr *v1ActionTriggers, err error) {
|
||||
switch m.dataDBType {
|
||||
case utils.REDIS:
|
||||
dataDB := m.dataDB.(*engine.RedisStorage)
|
||||
if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
if err := m.mrshlr.Unmarshal(strVal, &v1Atr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1Atr, nil
|
||||
}
|
||||
case utils.MONGO:
|
||||
dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
mgoDB := dataDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
v1Atr := new(v1ActionTriggers)
|
||||
if err := mgoDB.C(utils.ACTION_TRIGGER_PREFIX).Find(bson.M{"id": key}).One(v1Atr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1Atr, nil
|
||||
default:
|
||||
return nil, utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
utils.UnsupportedDB,
|
||||
fmt.Sprintf("error: unsupported: <%s> for getV1ActionTriggerFromDB method", m.dataDBType))
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
|
||||
}
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
func (v1Act v1ActionTrigger) AsActionTrigger() (at *engine.ActionTrigger) {
|
||||
|
||||
@@ -19,11 +19,11 @@ package migrator
|
||||
import (
|
||||
"flag"
|
||||
// "fmt"
|
||||
"path"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"log"
|
||||
"path"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -72,24 +72,25 @@ var (
|
||||
|
||||
dbDataEncoding = flag.String("dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings")
|
||||
oldDBDataEncoding = flag.String("old_dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings")
|
||||
)
|
||||
)
|
||||
|
||||
// subtests to be executed for each migrator
|
||||
var sTestsITMigrator = []func(t *testing.T){
|
||||
testOnStorITFlush,
|
||||
testMigratorAccounts,
|
||||
testMigratorActionPlans,
|
||||
// testMigratorActionTriggers,
|
||||
testMigratorActions,
|
||||
// testMigratorSharedGroups,
|
||||
}
|
||||
testMigratorActionPlans,
|
||||
testMigratorActionTriggers,
|
||||
testMigratorActions,
|
||||
testMigratorSharedGroups,
|
||||
testOnStorITFlush,
|
||||
}
|
||||
|
||||
func TestOnStorITRedisConnect(t *testing.T) {
|
||||
dataDB, err := engine.ConfigureDataStorage(*dataDBType, *dataDBHost, *dataDBPort, *dataDBName, *dataDBUser, *dataDBPass, *dbDataEncoding, config.CgrConfig().CacheConfig, *loadHistorySize)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
oldDataDB, err := ConfigureV1DataStorage(*oldDataDBType, *oldDataDBHost, *oldDataDBPort, *oldDataDBName, *oldDataDBUser, *oldDataDBPass, *oldDBDataEncoding )
|
||||
oldDataDB, err := ConfigureV1DataStorage(*oldDataDBType, *oldDataDBHost, *oldDataDBPort, *oldDataDBName, *oldDataDBUser, *oldDataDBPass, *oldDBDataEncoding)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -117,7 +118,6 @@ func TestOnStorITRedis(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestOnStorITMongoConnect(t *testing.T) {
|
||||
|
||||
cdrsMongoCfgPath := path.Join(*dataDir, "conf", "samples", "tutmongo")
|
||||
mgoITCfg, err := config.NewCGRConfigFromFolder(cdrsMongoCfgPath)
|
||||
if err != nil {
|
||||
@@ -141,7 +141,7 @@ func TestOnStorITMongoConnect(t *testing.T) {
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
mig, err = NewMigrator(dataDB, mgoITCfg.DataDbType,mgoITCfg.DBDataEncoding, storDB, mgoITCfg.StorDBType, oldDataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, oldstorDB, mgoITCfg.StorDBType)
|
||||
mig, err = NewMigrator(dataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, storDB, mgoITCfg.StorDBType, oldDataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, oldstorDB, mgoITCfg.StorDBType)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -170,8 +170,6 @@ func testOnStorITFlush(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
//1
|
||||
|
||||
func testMigratorAccounts(t *testing.T) {
|
||||
v1b := &v1Balance{Value: 10, Weight: 10, DestinationIds: "NAT", ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}
|
||||
v1Acc := &v1Account{Id: "*OUT:CUSTOMER_1:rif", BalanceMap: map[string]v1BalanceChain{utils.VOICE: v1BalanceChain{v1b}, utils.MONETARY: v1BalanceChain{&v1Balance{Value: 21, ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}}
|
||||
@@ -199,27 +197,25 @@ func testMigratorAccounts(t *testing.T) {
|
||||
} else if !reflect.DeepEqual(testAccount, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", testAccount, result)
|
||||
}
|
||||
case dbtype == utils.MONGO:
|
||||
err := mig.oldDataDB.setV1Account(v1Acc)
|
||||
if err != nil {
|
||||
t.Error("Error when marshaling ", err.Error())
|
||||
}
|
||||
err = mig.Migrate(utils.MetaAccounts)
|
||||
if err != nil {
|
||||
t.Error("Error when migrating accounts ", err.Error())
|
||||
}
|
||||
result, err := mig.dataDB.GetAccount(testAccount.ID)
|
||||
if err != nil {
|
||||
t.Error("Error when getting account ", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(testAccount, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", testAccount, result)
|
||||
}
|
||||
case dbtype == utils.MONGO:
|
||||
err := mig.oldDataDB.setV1Account(v1Acc)
|
||||
if err != nil {
|
||||
t.Error("Error when marshaling ", err.Error())
|
||||
}
|
||||
err = mig.Migrate(utils.MetaAccounts)
|
||||
if err != nil {
|
||||
t.Error("Error when migrating accounts ", err.Error())
|
||||
}
|
||||
result, err := mig.dataDB.GetAccount(testAccount.ID)
|
||||
if err != nil {
|
||||
t.Error("Error when getting account ", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(testAccount, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", testAccount, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//2
|
||||
|
||||
func testMigratorActionPlans(t *testing.T) {
|
||||
v1ap := &v1ActionPlans{&v1ActionPlan{Id: "test", AccountIds: []string{"one"}, Timing: &engine.RateInterval{Timing: &engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}
|
||||
ap := &engine.ActionPlan{Id: "test", AccountIDs: utils.StringMap{"one": true}, ActionTimings: []*engine.ActionTiming{&engine.ActionTiming{Timing: &engine.RateInterval{Timing: &engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}
|
||||
@@ -245,33 +241,31 @@ func testMigratorActionPlans(t *testing.T) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Weight, result.ActionTimings[0].Weight)
|
||||
}
|
||||
case dbtype == utils.MONGO:
|
||||
err := mig.oldDataDB.setV1ActionPlans(v1ap)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 ActionPlans ", err.Error())
|
||||
}
|
||||
err = mig.Migrate(utils.MetaActionPlans )
|
||||
if err != nil {
|
||||
t.Error("Error when migrating ActionPlans ", err.Error())
|
||||
}
|
||||
result, err := mig.dataDB.GetActionPlan(ap.Id, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
t.Error("Error when getting ActionPlan ", err.Error())
|
||||
}
|
||||
if ap.Id != result.Id || !reflect.DeepEqual(ap.AccountIDs, result.AccountIDs) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", *ap, result)
|
||||
} else if !reflect.DeepEqual(ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing)
|
||||
} else if ap.ActionTimings[0].Weight != result.ActionTimings[0].Weight || ap.ActionTimings[0].ActionsID != result.ActionTimings[0].ActionsID {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Weight, result.ActionTimings[0].Weight)
|
||||
}
|
||||
err := mig.oldDataDB.setV1ActionPlans(v1ap)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 ActionPlans ", err.Error())
|
||||
}
|
||||
err = mig.Migrate(utils.MetaActionPlans)
|
||||
if err != nil {
|
||||
t.Error("Error when migrating ActionPlans ", err.Error())
|
||||
}
|
||||
result, err := mig.dataDB.GetActionPlan(ap.Id, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
t.Error("Error when getting ActionPlan ", err.Error())
|
||||
}
|
||||
if ap.Id != result.Id || !reflect.DeepEqual(ap.AccountIDs, result.AccountIDs) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", *ap, result)
|
||||
} else if !reflect.DeepEqual(ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing)
|
||||
} else if ap.ActionTimings[0].Weight != result.ActionTimings[0].Weight || ap.ActionTimings[0].ActionsID != result.ActionTimings[0].ActionsID {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Weight, result.ActionTimings[0].Weight)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//3
|
||||
/*
|
||||
func testMigratorActionTriggers(t *testing.T) {
|
||||
tim := time.Date(2012, time.February, 27, 23, 59, 59, 0, time.UTC).Local()
|
||||
v1atrs := v1ActionTriggers{
|
||||
v1atrs := &v1ActionTriggers{
|
||||
&v1ActionTrigger{
|
||||
Id: "Test",
|
||||
BalanceType: "*monetary",
|
||||
@@ -303,19 +297,7 @@ func testMigratorActionTriggers(t *testing.T) {
|
||||
}
|
||||
switch {
|
||||
case dbtype == utils.REDIS:
|
||||
bit, err := mig.mrshlr.Marshal(v1atrs)
|
||||
if err != nil {
|
||||
t.Error("Error when marshaling ", err.Error())
|
||||
}
|
||||
// if err := mig.mrshlr.Unmarshal(bit, &v1Atr); err != nil {
|
||||
// t.Error("Error when setting v1 ActionTriggers ", err.Error())
|
||||
// }
|
||||
setv1id := utils.ACTION_TRIGGER_PREFIX + v1atrs[0].Id
|
||||
err = mig.SetV1onRedis(setv1id, bit)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 ActionTriggers ", err.Error())
|
||||
}
|
||||
_, err = mig.getV1ActionTriggerFromDB(setv1id)
|
||||
err := mig.oldDataDB.setV1ActionTriggers(v1atrs)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 ActionTriggers ", err.Error())
|
||||
}
|
||||
@@ -323,7 +305,7 @@ func testMigratorActionTriggers(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error("Error when migrating ActionTriggers ", err.Error())
|
||||
}
|
||||
result, err := mig.dataDB.GetActionTriggers(v1atrs[0].Id, true, utils.NonTransactional)
|
||||
result, err := mig.dataDB.GetActionTriggers((*v1atrs)[0].Id, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
t.Error("Error when getting ActionTriggers ", err.Error())
|
||||
}
|
||||
@@ -343,8 +325,8 @@ func testMigratorActionTriggers(t *testing.T) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", atrs[0].ExpirationDate, result[0].ExpirationDate)
|
||||
} else if !reflect.DeepEqual(atrs[0].ActivationDate, result[0].ActivationDate) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", atrs[0].ActivationDate, result[0].ActivationDate)
|
||||
} else if !reflect.DeepEqual(atrs[0].Balance, result[0].Balance) {
|
||||
// t.Errorf("Expecting: %+v, received: %+v", atrs[0].Balance, result[0].Balance)
|
||||
} else if !reflect.DeepEqual(atrs[0].Balance.Type, result[0].Balance.Type) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", atrs[0].Balance.Type, result[0].Balance.Type)
|
||||
} else if !reflect.DeepEqual(atrs[0].Weight, result[0].Weight) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", atrs[0].Weight, result[0].Weight)
|
||||
} else if !reflect.DeepEqual(atrs[0].ActionsID, result[0].ActionsID) {
|
||||
@@ -390,33 +372,14 @@ func testMigratorActionTriggers(t *testing.T) {
|
||||
} else if !reflect.DeepEqual(atrs[0].Balance.Blocker, result[0].Balance.Blocker) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", atrs[0].Balance.Blocker, result[0].Balance.Blocker)
|
||||
}
|
||||
|
||||
|
||||
case dbtype == utils.MONGO:
|
||||
err := mig.SetV1onMongoActionTrigger(utils.ACTION_TRIGGER_PREFIX, &v1atrs)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 ActionTriggers ", err.Error())
|
||||
}
|
||||
err = mig.Migrate("migrateActionTriggers")
|
||||
if err != nil {
|
||||
t.Error("Error when migrating ActionTriggers ", err.Error())
|
||||
}
|
||||
result, err := mig.dataDB.GetActionTriggers(v1atrs[0].Id, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
t.Error("Error when getting ActionTriggers ", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(atrs[0], result[0]) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", atrs[0], result[0])
|
||||
}
|
||||
err = mig.DropV1Colection(utils.ACTION_TRIGGER_PREFIX)
|
||||
if err != nil {
|
||||
t.Error("Error when flushing v1 ActionTriggers ", err.Error())
|
||||
}
|
||||
|
||||
case dbtype == utils.MONGO:
|
||||
err := mig.Migrate(utils.MetaActionTriggers)
|
||||
if err != nil && err != utils.ErrNotImplemented {
|
||||
t.Error("Error when migrating ActionTriggers ", err.Error())
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
*/
|
||||
//4
|
||||
|
||||
func testMigratorActions(t *testing.T) {
|
||||
v1act := &v1Actions{&v1Action{Id: "test", ActionType: "", BalanceType: "", Direction: "INBOUND", ExtraParameters: "", ExpirationString: "", Balance: &v1Balance{Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}
|
||||
@@ -438,28 +401,26 @@ func testMigratorActions(t *testing.T) {
|
||||
if !reflect.DeepEqual(*act, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", *act, result)
|
||||
}
|
||||
|
||||
case dbtype == utils.MONGO:
|
||||
err := mig.oldDataDB.setV1Actions(v1act)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 Actions ", err.Error())
|
||||
}
|
||||
err = mig.Migrate(utils.MetaActions)
|
||||
if err != nil {
|
||||
t.Error("Error when migrating Actions ", err.Error())
|
||||
}
|
||||
result, err := mig.dataDB.GetActions((*v1act)[0].Id, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
t.Error("Error when getting Actions ", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(*act, result) {
|
||||
|
||||
case dbtype == utils.MONGO:
|
||||
err := mig.oldDataDB.setV1Actions(v1act)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 Actions ", err.Error())
|
||||
}
|
||||
err = mig.Migrate(utils.MetaActions)
|
||||
if err != nil {
|
||||
t.Error("Error when migrating Actions ", err.Error())
|
||||
}
|
||||
result, err := mig.dataDB.GetActions((*v1act)[0].Id, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
t.Error("Error when getting Actions ", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(*act, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", *act, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 5
|
||||
/*
|
||||
func testMigratorSharedGroups(t *testing.T) {
|
||||
v1sg := &v1SharedGroup{
|
||||
Id: "Test",
|
||||
@@ -477,12 +438,7 @@ func testMigratorSharedGroups(t *testing.T) {
|
||||
}
|
||||
switch {
|
||||
case dbtype == utils.REDIS:
|
||||
bit, err := mig.mrshlr.Marshal(v1sg)
|
||||
if err != nil {
|
||||
t.Error("Error when marshaling ", err.Error())
|
||||
}
|
||||
setv1id := utils.SHARED_GROUP_PREFIX + v1sg.Id
|
||||
err = mig.SetV1onRedis(setv1id, bit)
|
||||
err := mig.oldDataDB.setV1SharedGroup(v1sg)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 SharedGroup ", err.Error())
|
||||
}
|
||||
@@ -497,24 +453,22 @@ func testMigratorSharedGroups(t *testing.T) {
|
||||
if !reflect.DeepEqual(sg, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", sg, result)
|
||||
}
|
||||
|
||||
case dbtype == utils.MONGO:
|
||||
err := mig.SetV1onMongoSharedGroup(utils.SHARED_GROUP_PREFIX, v1sg)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 SharedGroup ", err.Error())
|
||||
}
|
||||
err = mig.Migrate("migrateSharedGroups")
|
||||
if err != nil {
|
||||
t.Error("Error when migrating SharedGroup ", err.Error())
|
||||
}
|
||||
result, err := mig.dataDB.GetSharedGroup(v1sg.Id, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
t.Error("Error when getting SharedGroup ", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(sg, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", sg, result)
|
||||
}
|
||||
|
||||
case dbtype == utils.MONGO:
|
||||
err := mig.oldDataDB.setV1SharedGroup(v1sg)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 SharedGroup ", err.Error())
|
||||
}
|
||||
err = mig.Migrate(utils.MetaSharedGroups)
|
||||
if err != nil {
|
||||
t.Error("Error when migrating SharedGroup ", err.Error())
|
||||
}
|
||||
result, err := mig.dataDB.GetSharedGroup(v1sg.Id, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
t.Error("Error when getting SharedGroup ", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(sg, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", sg, result)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
@@ -19,7 +19,7 @@ package migrator
|
||||
|
||||
// import (
|
||||
// "fmt"
|
||||
|
||||
|
||||
// "github.com/cgrates/cgrates/engine"
|
||||
// "github.com/cgrates/cgrates/utils"
|
||||
// "gopkg.in/mgo.v2/bson"
|
||||
@@ -119,4 +119,4 @@ package migrator
|
||||
// utils.UnsupportedDB,
|
||||
// fmt.Sprintf("error: unsupported: <%s> for getV1AccountFromDB method", m.oldDataDBType))
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
type v1SharedGroup struct {
|
||||
@@ -32,89 +31,31 @@ type v1SharedGroup struct {
|
||||
}
|
||||
|
||||
func (m *Migrator) migrateSharedGroups() (err error) {
|
||||
switch m.dataDBType {
|
||||
case utils.REDIS:
|
||||
var sgv1keys []string
|
||||
sgv1keys, err = m.dataDB.GetKeysForPrefix(utils.SHARED_GROUP_PREFIX)
|
||||
if err != nil {
|
||||
return
|
||||
var v1SG *v1SharedGroup
|
||||
for {
|
||||
v1SG, err = m.oldDataDB.getV1SharedGroup()
|
||||
if err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
}
|
||||
for _, sgv1key := range sgv1keys {
|
||||
v1sg, err := m.getv1SharedGroupFromDB(sgv1key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sg := v1sg.AsSharedGroup()
|
||||
if err = m.dataDB.SetSharedGroup(sg, utils.NonTransactional); err != nil {
|
||||
if err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
if v1SG != nil {
|
||||
acnt := v1SG.AsSharedGroup()
|
||||
if err = m.dataDB.SetSharedGroup(acnt, utils.NonTransactional); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.SHARED_GROUP_PREFIX: engine.CurrentStorDBVersions()[utils.SHARED_GROUP_PREFIX]}
|
||||
if err = m.dataDB.SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating SharedGroup version into dataDB", err.Error()))
|
||||
}
|
||||
return
|
||||
case utils.MONGO:
|
||||
dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
mgoDB := dataDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
var v1sg v1SharedGroup
|
||||
iter := mgoDB.C(utils.SHARED_GROUP_PREFIX).Find(nil).Iter()
|
||||
for iter.Next(&v1sg) {
|
||||
sg := v1sg.AsSharedGroup()
|
||||
if err = m.dataDB.SetSharedGroup(sg, utils.NonTransactional); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.SHARED_GROUP_PREFIX: engine.CurrentStorDBVersions()[utils.SHARED_GROUP_PREFIX]}
|
||||
if err = m.dataDB.SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating SharedGroup version into dataDB", err.Error()))
|
||||
}
|
||||
return
|
||||
default:
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
|
||||
if err = m.dataDB.SetVersions(vrs, false); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
utils.UnsupportedDB,
|
||||
fmt.Sprintf("error: unsupported: <%s> for migrateSharedGroups method", m.dataDBType))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Migrator) getv1SharedGroupFromDB(key string) (*v1SharedGroup, error) {
|
||||
switch m.dataDBType {
|
||||
case utils.REDIS:
|
||||
dataDB := m.dataDB.(*engine.RedisStorage)
|
||||
if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
v1SG := &v1SharedGroup{Id: key}
|
||||
if err := m.mrshlr.Unmarshal(strVal, v1SG); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1SG, nil
|
||||
}
|
||||
case utils.MONGO:
|
||||
dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
mgoDB := dataDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
v1SG := new(v1SharedGroup)
|
||||
if err := mgoDB.C(utils.SHARED_GROUP_PREFIX).Find(bson.M{"id": key}).One(v1SG); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1SG, nil
|
||||
default:
|
||||
return nil, utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
utils.UnsupportedDB,
|
||||
fmt.Sprintf("error: unsupported: <%s> for getv1SharedGroupFromDB method", m.dataDBType))
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (v1SG v1SharedGroup) AsSharedGroup() (sg *engine.SharedGroup) {
|
||||
|
||||
@@ -17,12 +17,16 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package migrator
|
||||
|
||||
type v1DataDB interface{
|
||||
getKeysForPrefix(prefix string) ([]string, error)
|
||||
getv1Account() (v1Acnt *v1Account, err error)
|
||||
setV1Account( x *v1Account) (err error)
|
||||
getV1ActionPlans() (v1aps *v1ActionPlans, err error)
|
||||
setV1ActionPlans(x *v1ActionPlans) (err error)
|
||||
getV1Actions() (v1acs *v1Actions, err error)
|
||||
setV1Actions(x *v1Actions) (err error)
|
||||
}
|
||||
type v1DataDB interface {
|
||||
getKeysForPrefix(prefix string) ([]string, error)
|
||||
getv1Account() (v1Acnt *v1Account, err error)
|
||||
setV1Account(x *v1Account) (err error)
|
||||
getV1ActionPlans() (v1aps *v1ActionPlans, err error)
|
||||
setV1ActionPlans(x *v1ActionPlans) (err error)
|
||||
getV1Actions() (v1acs *v1Actions, err error)
|
||||
setV1Actions(x *v1Actions) (err error)
|
||||
getV1ActionTriggers() (v1acts *v1ActionTriggers, err error)
|
||||
setV1ActionTriggers(x *v1ActionTriggers) (err error)
|
||||
getV1SharedGroup() (v1acts *v1SharedGroup, err error)
|
||||
setV1SharedGroup(x *v1SharedGroup) (err error)
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ package migrator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
// "log"
|
||||
// "log"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
|
||||
@@ -27,12 +27,11 @@ import (
|
||||
"gopkg.in/mgo.v2"
|
||||
)
|
||||
|
||||
type v1Mongo struct{
|
||||
session *mgo.Session
|
||||
db string
|
||||
v1ms engine.Marshaler
|
||||
qryIter *mgo.Iter
|
||||
|
||||
type v1Mongo struct {
|
||||
session *mgo.Session
|
||||
db string
|
||||
v1ms engine.Marshaler
|
||||
qryIter *mgo.Iter
|
||||
}
|
||||
|
||||
type AcKeyValue struct {
|
||||
@@ -64,28 +63,28 @@ func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes
|
||||
return
|
||||
}
|
||||
|
||||
func (v1ms *v1Mongo) getKeysForPrefix(prefix string) ([]string, error){
|
||||
return nil,nil
|
||||
func (v1ms *v1Mongo) getKeysForPrefix(prefix string) ([]string, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
//Account methods
|
||||
//get
|
||||
func (v1ms *v1Mongo) getv1Account() (v1Acnt *v1Account, err error){
|
||||
if v1ms.qryIter==nil{
|
||||
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Find(nil).Iter()
|
||||
func (v1ms *v1Mongo) getv1Account() (v1Acnt *v1Account, err error) {
|
||||
if v1ms.qryIter == nil {
|
||||
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Find(nil).Iter()
|
||||
}
|
||||
v1ms.qryIter.Next(&v1Acnt)
|
||||
v1ms.qryIter.Next(&v1Acnt)
|
||||
|
||||
if v1Acnt==nil{
|
||||
v1ms.qryIter=nil
|
||||
return nil,utils.ErrNoMoreData
|
||||
if v1Acnt == nil {
|
||||
v1ms.qryIter = nil
|
||||
return nil, utils.ErrNoMoreData
|
||||
|
||||
}
|
||||
return v1Acnt,nil
|
||||
}
|
||||
return v1Acnt, nil
|
||||
}
|
||||
|
||||
//set
|
||||
func (v1ms *v1Mongo) setV1Account( x *v1Account) (err error) {
|
||||
func (v1ms *v1Mongo) setV1Account(x *v1Account) (err error) {
|
||||
if err := v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Insert(x); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -94,23 +93,23 @@ func (v1ms *v1Mongo) setV1Account( x *v1Account) (err error) {
|
||||
|
||||
//Action methods
|
||||
//get
|
||||
func (v1ms *v1Mongo) getV1ActionPlans() (v1aps *v1ActionPlans, err error){
|
||||
var strct *AtKeyValue
|
||||
if v1ms.qryIter==nil{
|
||||
v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actiontimings").Find(nil).Iter()
|
||||
func (v1ms *v1Mongo) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {
|
||||
var strct *AtKeyValue
|
||||
if v1ms.qryIter == nil {
|
||||
v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actiontimings").Find(nil).Iter()
|
||||
}
|
||||
v1ms.qryIter.Next(&strct)
|
||||
if strct==nil{
|
||||
v1ms.qryIter=nil
|
||||
return nil,utils.ErrNoMoreData
|
||||
v1ms.qryIter.Next(&strct)
|
||||
if strct == nil {
|
||||
v1ms.qryIter = nil
|
||||
return nil, utils.ErrNoMoreData
|
||||
}
|
||||
v1aps=&strct.Value
|
||||
return v1aps,nil
|
||||
v1aps = &strct.Value
|
||||
return v1aps, nil
|
||||
}
|
||||
|
||||
//set
|
||||
func (v1ms *v1Mongo) setV1ActionPlans(x *v1ActionPlans) (err error) {
|
||||
key:=utils.ACTION_PLAN_PREFIX + (*x)[0].Id
|
||||
key := utils.ACTION_PLAN_PREFIX + (*x)[0].Id
|
||||
if err := v1ms.session.DB(v1ms.db).C("actiontimings").Insert(&AtKeyValue{key, *x}); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -119,45 +118,60 @@ func (v1ms *v1Mongo) setV1ActionPlans(x *v1ActionPlans) (err error) {
|
||||
|
||||
//Actions methods
|
||||
//get
|
||||
func (v1ms *v1Mongo) getV1Actions() (v1acs *v1Actions, err error){
|
||||
var strct *AcKeyValue
|
||||
if v1ms.qryIter==nil{
|
||||
v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actions").Find(nil).Iter()
|
||||
func (v1ms *v1Mongo) getV1Actions() (v1acs *v1Actions, err error) {
|
||||
var strct *AcKeyValue
|
||||
if v1ms.qryIter == nil {
|
||||
v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actions").Find(nil).Iter()
|
||||
}
|
||||
v1ms.qryIter.Next(&strct)
|
||||
if strct==nil{
|
||||
v1ms.qryIter=nil
|
||||
return nil,utils.ErrNoMoreData
|
||||
v1ms.qryIter.Next(&strct)
|
||||
if strct == nil {
|
||||
v1ms.qryIter = nil
|
||||
return nil, utils.ErrNoMoreData
|
||||
}
|
||||
|
||||
v1acs=&strct.Value
|
||||
return v1acs,nil
|
||||
v1acs = &strct.Value
|
||||
return v1acs, nil
|
||||
}
|
||||
|
||||
//set
|
||||
func (v1ms *v1Mongo) setV1Actions(x *v1Actions) (err error) {
|
||||
key:=utils.ACTION_PREFIX + (*x)[0].Id
|
||||
key := utils.ACTION_PREFIX + (*x)[0].Id
|
||||
if err := v1ms.session.DB(v1ms.db).C("actions").Insert(&AcKeyValue{key, *x}); err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// func (v1ms *v1Mongo) setV1onMongoActionTrigger(pref string, x *v1ActionTriggers) (err error) {
|
||||
// if err := v1ms.session.DB(v1ms.db).C(pref).Insert(x); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
//ActionTriggers methods
|
||||
//get
|
||||
func (v1ms *v1Mongo) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// func (v1ms *v1Mongo) setV1onMongoSharedGroup(pref string, x *v1SharedGroup) (err error) {
|
||||
// if err := v1ms.session.DB(v1ms.db).C(pref).Insert(x); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
// func (v1ms *v1Mongo) DropV1Colection(pref string) (err error) {
|
||||
// if err := v1ms.session.DB(v1ms.db).C(pref).DropCollection(); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
//set
|
||||
func (v1ms *v1Mongo) setV1ActionTriggers(x *v1ActionTriggers) (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
//Actions methods
|
||||
//get
|
||||
func (v1ms *v1Mongo) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
|
||||
if v1ms.qryIter == nil {
|
||||
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.SHARED_GROUP_PREFIX).Find(nil).Iter()
|
||||
}
|
||||
v1ms.qryIter.Next(&v1sg)
|
||||
if v1sg == nil {
|
||||
v1ms.qryIter = nil
|
||||
return nil, utils.ErrNoMoreData
|
||||
|
||||
}
|
||||
return v1sg, nil
|
||||
}
|
||||
|
||||
//set
|
||||
func (v1ms *v1Mongo) setV1SharedGroup(x *v1SharedGroup) (err error) {
|
||||
if err := v1ms.session.DB(v1ms.db).C(utils.SHARED_GROUP_PREFIX).Insert(x); err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -18,20 +18,20 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package migrator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
//"log"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"fmt"
|
||||
//"log"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
|
||||
"github.com/mediocregopher/radix.v2/redis"
|
||||
"github.com/mediocregopher/radix.v2/pool"
|
||||
"github.com/mediocregopher/radix.v2/redis"
|
||||
)
|
||||
type v1Redis struct{
|
||||
dbPool *pool.Pool
|
||||
ms engine.Marshaler
|
||||
dataKeys []string
|
||||
qryIdx *int
|
||||
|
||||
type v1Redis struct {
|
||||
dbPool *pool.Pool
|
||||
ms engine.Marshaler
|
||||
dataKeys []string
|
||||
qryIdx *int
|
||||
}
|
||||
|
||||
func newv1RedisStorage(address string, db int, pass, mrshlerStr string) (*v1Redis, error) {
|
||||
@@ -66,7 +66,7 @@ func newv1RedisStorage(address string, db int, pass, mrshlerStr string) (*v1Redi
|
||||
} else {
|
||||
return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr)
|
||||
}
|
||||
return &v1Redis{dbPool: p,ms: mrshler}, nil
|
||||
return &v1Redis{dbPool: p, ms: mrshler}, nil
|
||||
}
|
||||
|
||||
// This CMD function get a connection from the pool.
|
||||
@@ -80,19 +80,18 @@ func (v1rs *v1Redis) cmd(cmd string, args ...interface{}) *redis.Resp {
|
||||
if result.IsType(redis.IOErr) { // Failover mecanism
|
||||
utils.Logger.Warning(fmt.Sprintf("<RedisStorage> error <%s>, attempting failover.", result.Err.Error()))
|
||||
c2, err := v1rs.dbPool.Get()
|
||||
if err == nil {
|
||||
if result2 := c2.Cmd(cmd, args...); !result2.IsType(redis.IOErr) {
|
||||
v1rs.dbPool.Put(c2)
|
||||
return result2
|
||||
}
|
||||
if err == nil {
|
||||
if result2 := c2.Cmd(cmd, args...); !result2.IsType(redis.IOErr) {
|
||||
v1rs.dbPool.Put(c2)
|
||||
return result2
|
||||
}
|
||||
}
|
||||
} else {
|
||||
v1rs.dbPool.Put(c1)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
|
||||
func (v1rs *v1Redis) getKeysForPrefix(prefix string) ([]string, error) {
|
||||
r := v1rs.cmd("KEYS", prefix+"*")
|
||||
if r.Err != nil {
|
||||
@@ -103,41 +102,41 @@ func (v1rs *v1Redis) getKeysForPrefix(prefix string) ([]string, error) {
|
||||
|
||||
//Account methods
|
||||
//get
|
||||
func (v1rs *v1Redis) getv1Account() (v1Acnt *v1Account, err error){
|
||||
if v1rs.qryIdx==nil{
|
||||
v1rs.dataKeys, err = v1rs.getKeysForPrefix(v1AccountDBPrefix);
|
||||
func (v1rs *v1Redis) getv1Account() (v1Acnt *v1Account, err error) {
|
||||
if v1rs.qryIdx == nil {
|
||||
v1rs.dataKeys, err = v1rs.getKeysForPrefix(v1AccountDBPrefix)
|
||||
if err != nil {
|
||||
return
|
||||
}else if len(v1rs.dataKeys)==0{
|
||||
return nil,utils.ErrNotFound
|
||||
}
|
||||
v1rs.qryIdx=utils.IntPointer(0)
|
||||
}
|
||||
if *v1rs.qryIdx<=len(v1rs.dataKeys)-1{
|
||||
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
|
||||
if err != nil {
|
||||
return nil ,err
|
||||
return
|
||||
} else if len(v1rs.dataKeys) == 0 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
v1Acnt = &v1Account{Id: v1rs.dataKeys[*v1rs.qryIdx]}
|
||||
if err := v1rs.ms.Unmarshal(strVal, v1Acnt); err != nil {
|
||||
return nil,err
|
||||
}
|
||||
*v1rs.qryIdx=*v1rs.qryIdx+1
|
||||
}else{
|
||||
v1rs.qryIdx=nil
|
||||
return nil,utils.ErrNoMoreData
|
||||
v1rs.qryIdx = utils.IntPointer(0)
|
||||
}
|
||||
return v1Acnt,nil
|
||||
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
|
||||
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
v1Acnt = &v1Account{Id: v1rs.dataKeys[*v1rs.qryIdx]}
|
||||
if err := v1rs.ms.Unmarshal(strVal, v1Acnt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
*v1rs.qryIdx = *v1rs.qryIdx + 1
|
||||
} else {
|
||||
v1rs.qryIdx = nil
|
||||
return nil, utils.ErrNoMoreData
|
||||
}
|
||||
return v1Acnt, nil
|
||||
}
|
||||
|
||||
//set
|
||||
func (v1rs *v1Redis) setV1Account( x *v1Account) (err error) {
|
||||
key:=v1AccountDBPrefix + x.Id
|
||||
bit, err := v1rs.ms.Marshal(x)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
|
||||
func (v1rs *v1Redis) setV1Account(x *v1Account) (err error) {
|
||||
key := v1AccountDBPrefix + x.Id
|
||||
bit, err := v1rs.ms.Marshal(x)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
@@ -145,40 +144,40 @@ if err = v1rs.cmd("SET", key, bit).Err; err != nil {
|
||||
|
||||
//ActionPlans methods
|
||||
//get
|
||||
func (v1rs *v1Redis) getV1ActionPlans() (v1aps *v1ActionPlans, err error){
|
||||
if v1rs.qryIdx==nil{
|
||||
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PLAN_PREFIX);
|
||||
func (v1rs *v1Redis) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {
|
||||
if v1rs.qryIdx == nil {
|
||||
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PLAN_PREFIX)
|
||||
if err != nil {
|
||||
return
|
||||
}else if len(v1rs.dataKeys)==0{
|
||||
return nil,utils.ErrNotFound
|
||||
}
|
||||
v1rs.qryIdx=utils.IntPointer(0)
|
||||
}
|
||||
if *v1rs.qryIdx<=len(v1rs.dataKeys)-1{
|
||||
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
|
||||
if err != nil {
|
||||
return nil ,err
|
||||
return
|
||||
} else if len(v1rs.dataKeys) == 0 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
if err := v1rs.ms.Unmarshal(strVal, &v1aps); err != nil {
|
||||
return nil,err
|
||||
}
|
||||
*v1rs.qryIdx=*v1rs.qryIdx+1
|
||||
}else{
|
||||
v1rs.qryIdx=nil
|
||||
return nil,utils.ErrNoMoreData
|
||||
v1rs.qryIdx = utils.IntPointer(0)
|
||||
}
|
||||
return v1aps,nil
|
||||
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
|
||||
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := v1rs.ms.Unmarshal(strVal, &v1aps); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
*v1rs.qryIdx = *v1rs.qryIdx + 1
|
||||
} else {
|
||||
v1rs.qryIdx = nil
|
||||
return nil, utils.ErrNoMoreData
|
||||
}
|
||||
return v1aps, nil
|
||||
}
|
||||
|
||||
//set
|
||||
func (v1rs *v1Redis) setV1ActionPlans( x *v1ActionPlans) (err error) {
|
||||
key:=utils.ACTION_PLAN_PREFIX + (*x)[0].Id
|
||||
bit, err := v1rs.ms.Marshal(x)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
|
||||
func (v1rs *v1Redis) setV1ActionPlans(x *v1ActionPlans) (err error) {
|
||||
key := utils.ACTION_PLAN_PREFIX + (*x)[0].Id
|
||||
bit, err := v1rs.ms.Marshal(x)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
@@ -186,41 +185,123 @@ if err = v1rs.cmd("SET", key, bit).Err; err != nil {
|
||||
|
||||
//Actions methods
|
||||
//get
|
||||
func (v1rs *v1Redis) getV1Actions() (v1acs *v1Actions, err error){
|
||||
if v1rs.qryIdx==nil{
|
||||
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PREFIX);
|
||||
func (v1rs *v1Redis) getV1Actions() (v1acs *v1Actions, err error) {
|
||||
if v1rs.qryIdx == nil {
|
||||
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PREFIX)
|
||||
if err != nil {
|
||||
return
|
||||
}else if len(v1rs.dataKeys)==0{
|
||||
return nil,utils.ErrNotFound
|
||||
}
|
||||
v1rs.qryIdx=utils.IntPointer(0)
|
||||
}
|
||||
if *v1rs.qryIdx<=len(v1rs.dataKeys)-1{
|
||||
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
|
||||
if err != nil {
|
||||
return nil ,err
|
||||
return
|
||||
} else if len(v1rs.dataKeys) == 0 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
if err := v1rs.ms.Unmarshal(strVal, &v1acs); err != nil {
|
||||
return nil,err
|
||||
}
|
||||
*v1rs.qryIdx=*v1rs.qryIdx+1
|
||||
}else{
|
||||
v1rs.qryIdx=nil
|
||||
return nil,utils.ErrNoMoreData
|
||||
v1rs.qryIdx = utils.IntPointer(0)
|
||||
}
|
||||
return v1acs,nil
|
||||
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
|
||||
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := v1rs.ms.Unmarshal(strVal, &v1acs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
*v1rs.qryIdx = *v1rs.qryIdx + 1
|
||||
} else {
|
||||
v1rs.qryIdx = nil
|
||||
return nil, utils.ErrNoMoreData
|
||||
}
|
||||
return v1acs, nil
|
||||
}
|
||||
|
||||
//set
|
||||
func (v1rs *v1Redis) setV1Actions(x *v1Actions) (err error){
|
||||
key:=utils.ACTION_PREFIX + (*x)[0].Id
|
||||
bit, err := v1rs.ms.Marshal(x)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
|
||||
func (v1rs *v1Redis) setV1Actions(x *v1Actions) (err error) {
|
||||
key := utils.ACTION_PREFIX + (*x)[0].Id
|
||||
bit, err := v1rs.ms.Marshal(x)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
//ActionTriggers methods
|
||||
//get
|
||||
func (v1rs *v1Redis) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) {
|
||||
if v1rs.qryIdx == nil {
|
||||
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_TRIGGER_PREFIX)
|
||||
if err != nil {
|
||||
return
|
||||
} else if len(v1rs.dataKeys) == 0 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
v1rs.qryIdx = utils.IntPointer(0)
|
||||
}
|
||||
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
|
||||
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := v1rs.ms.Unmarshal(strVal, &v1acts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
*v1rs.qryIdx = *v1rs.qryIdx + 1
|
||||
} else {
|
||||
v1rs.qryIdx = nil
|
||||
return nil, utils.ErrNoMoreData
|
||||
}
|
||||
return v1acts, nil
|
||||
}
|
||||
|
||||
//set
|
||||
func (v1rs *v1Redis) setV1ActionTriggers(x *v1ActionTriggers) (err error) {
|
||||
key := utils.ACTION_TRIGGER_PREFIX + (*x)[0].Id
|
||||
bit, err := v1rs.ms.Marshal(x)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
//SharedGroup methods
|
||||
//get
|
||||
func (v1rs *v1Redis) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
|
||||
if v1rs.qryIdx == nil {
|
||||
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.SHARED_GROUP_PREFIX)
|
||||
if err != nil {
|
||||
return
|
||||
} else if len(v1rs.dataKeys) == 0 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
v1rs.qryIdx = utils.IntPointer(0)
|
||||
}
|
||||
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
|
||||
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := v1rs.ms.Unmarshal(strVal, &v1sg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
*v1rs.qryIdx = *v1rs.qryIdx + 1
|
||||
} else {
|
||||
v1rs.qryIdx = nil
|
||||
return nil, utils.ErrNoMoreData
|
||||
}
|
||||
return v1sg, nil
|
||||
}
|
||||
|
||||
//set
|
||||
func (v1rs *v1Redis) setV1SharedGroup(x *v1SharedGroup) (err error) {
|
||||
key := utils.SHARED_GROUP_PREFIX + x.Id
|
||||
bit, err := v1rs.ms.Marshal(x)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user