mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Fixed migrator methods
This commit is contained in:
@@ -91,6 +91,38 @@ func (m *Migrator) getV1AccountFromDB(key string) (*v1Account, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Migrator) SetV1onRedis(key string, bl []byte) (err error) {
|
||||
dataDB := m.dataDB.(*engine.RedisStorage)
|
||||
if err = dataDB.Cmd("SET", key, bl).Err; err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Migrator) FlushRedis() (err error) {
|
||||
dataDB := m.dataDB.(*engine.RedisStorage)
|
||||
if err = dataDB.Cmd("FLUSHALL").Err; err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Migrator) FlushMongo() (err error) {
|
||||
dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
mgoDB := dataDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
cols, err := mgoDB.CollectionNames()
|
||||
for _, col := range cols {
|
||||
if err := mgoDB.C(col).DropCollection(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type v1Account struct {
|
||||
Id string
|
||||
BalanceMap map[string]v1BalanceChain
|
||||
|
||||
@@ -18,8 +18,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package migrator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
type v1Action struct {
|
||||
@@ -35,8 +38,67 @@ type v1Action struct {
|
||||
|
||||
type v1Actions []*v1Action
|
||||
|
||||
func (v1Act v1Action) AsAction() (act engine.Action) {
|
||||
act = engine.Action{
|
||||
func (m *Migrator) migrateActions() (err error) {
|
||||
var acts engine.Actions
|
||||
var actv1keys []string
|
||||
actv1keys, err = m.tpDB.GetKeysForPrefix(utils.ACTION_PREFIX)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, actv1key := range actv1keys {
|
||||
v1act, err := m.getV1ActionFromDB(actv1key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
act := v1act.AsAction()
|
||||
acts = append(acts, act)
|
||||
if err := m.tpDB.SetActions(act.Id, acts, utils.NonTransactional); err != nil {
|
||||
return err
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.ACTION_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_PREFIX]}
|
||||
if err = m.tpDB.SetVersions(vrs); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Migrator) getV1ActionFromDB(key string) (v1act *v1Action, err error) {
|
||||
switch m.dataDBType {
|
||||
case utils.REDIS:
|
||||
tpDB := m.tpDB.(*engine.RedisStorage)
|
||||
if strVal, err := tpDB.Cmd("GET", key).Bytes(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
v1act := &v1Action{Id: key}
|
||||
if err := m.mrshlr.Unmarshal(strVal, v1act); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1act, nil
|
||||
}
|
||||
case utils.MONGO:
|
||||
tpDB := m.tpDB.(*engine.MongoStorage)
|
||||
mgoDB := tpDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
v1act := new(v1Action)
|
||||
if err := mgoDB.C(v1AccountTBL).Find(bson.M{"id": key}).One(v1act); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1act, nil
|
||||
default:
|
||||
return nil, utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
utils.UnsupportedDB,
|
||||
fmt.Sprintf("error: unsupported: <%s> for getV1ActionPlansFromDB method", m.dataDBType))
|
||||
}
|
||||
}
|
||||
|
||||
func (v1Act v1Action) AsAction() (act *engine.Action) {
|
||||
act = &engine.Action{
|
||||
Id: v1Act.Id,
|
||||
ActionType: v1Act.ActionType,
|
||||
ExtraParameters: v1Act.ExtraParameters,
|
||||
|
||||
@@ -22,10 +22,16 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// const (
|
||||
// v1AccountDBPrefix = "ubl_"
|
||||
// )
|
||||
|
||||
type v1ActionPlan struct {
|
||||
Uuid string // uniquely identify the timing
|
||||
Id string // informative purpose only
|
||||
@@ -46,15 +52,72 @@ func (at *v1ActionPlan) IsASAP() bool {
|
||||
return at.Timing.Timing.StartTime == utils.ASAP
|
||||
}
|
||||
|
||||
func (v1AP v1ActionPlan) AsActionPlan() (ap engine.ActionPlan) {
|
||||
func (m *Migrator) migrateActionPlans() (err error) {
|
||||
var apsv1keys []string
|
||||
apsv1keys, err = m.tpDB.GetKeysForPrefix(utils.ACTION_PLAN_PREFIX)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, apsv1key := range apsv1keys {
|
||||
v1aps, err := m.getV1ActionPlansFromDB(apsv1key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
aps := v1aps.AsActionPlan()
|
||||
if err = m.tpDB.SetActionPlan(aps.Id, aps, true, utils.NonTransactional); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.ACTION_PLAN_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_PLAN_PREFIX]}
|
||||
if err = m.tpDB.SetVersions(vrs); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating ActionPlans version into StorDB", err.Error()))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Migrator) getV1ActionPlansFromDB(key string) (v1aps *v1ActionPlan, err error) {
|
||||
switch m.dataDBType {
|
||||
case utils.REDIS:
|
||||
tpDB := m.tpDB.(*engine.RedisStorage)
|
||||
if strVal, err := tpDB.Cmd("GET", key).Bytes(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
v1aps := &v1ActionPlan{Id: key}
|
||||
if err := m.mrshlr.Unmarshal(strVal, v1aps); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1aps, nil
|
||||
}
|
||||
case utils.MONGO:
|
||||
tpDB := m.tpDB.(*engine.MongoStorage)
|
||||
mgoDB := tpDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
v1aps := new(v1ActionPlan)
|
||||
if err := mgoDB.C(v1AccountTBL).Find(bson.M{"id": key}).One(v1aps); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1aps, nil
|
||||
default:
|
||||
return nil, utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
utils.UnsupportedDB,
|
||||
fmt.Sprintf("error: unsupported: <%s> for getV1ActionPlansFromDB method", m.dataDBType))
|
||||
}
|
||||
}
|
||||
|
||||
func (v1AP v1ActionPlan) AsActionPlan() (ap *engine.ActionPlan) {
|
||||
for idx, actionId := range v1AP.AccountIds {
|
||||
idElements := strings.Split(actionId, utils.CONCATENATED_KEY_SEP)
|
||||
if len(idElements) != 3 {
|
||||
idElements := strings.Split(actionId, "_")
|
||||
if len(idElements) != 2 {
|
||||
continue
|
||||
}
|
||||
v1AP.AccountIds[idx] = fmt.Sprintf("%s:%s", idElements[1], idElements[2])
|
||||
v1AP.AccountIds[idx] = idElements[1]
|
||||
}
|
||||
ap = engine.ActionPlan{
|
||||
ap = &engine.ActionPlan{
|
||||
Id: v1AP.Id,
|
||||
AccountIDs: make(utils.StringMap),
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ func TestV1ActionsAsActions(t *testing.T) {
|
||||
v1act := &v1Action{Id: "", ActionType: "", BalanceType: "", Direction: "INBOUND", ExtraParameters: "", ExpirationString: "", Balance: &v1Balance{}}
|
||||
act := &engine.Action{Id: "", ActionType: "", ExtraParameters: "", ExpirationString: "", Weight: 0.00, Balance: &engine.BalanceFilter{}}
|
||||
newact := v1act.AsAction()
|
||||
if !reflect.DeepEqual(*act, newact) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", *act, newact)
|
||||
if !reflect.DeepEqual(act, newact) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", act, newact)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
package migrator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
@@ -33,10 +36,73 @@ type v1ActionTrigger struct {
|
||||
|
||||
type v1ActionTriggers []*v1ActionTrigger
|
||||
|
||||
func (v1Act v1ActionTrigger) AsActionTrigger() (at engine.ActionTrigger) {
|
||||
func (m *Migrator) migrateActionTriggers() (err error) {
|
||||
var atrrs engine.ActionTriggers
|
||||
var v1atrskeys []string
|
||||
v1atrskeys, err = m.tpDB.GetKeysForPrefix(utils.ACTION_TRIGGER_PREFIX)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, v1atrskey := range v1atrskeys {
|
||||
v1atrs, err := m.getV1ActionTriggerFromDB(v1atrskey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
v1atr := v1atrs
|
||||
if v1atrs != nil {
|
||||
atr := v1atr.AsActionTrigger()
|
||||
atrrs = append(atrrs, atr)
|
||||
|
||||
at = engine.ActionTrigger{
|
||||
UniqueID: v1Act.Id,
|
||||
if err := m.tpDB.SetActionTriggers(atr.ID, atrrs, utils.NonTransactional); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.ACTION_TRIGGER_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_TRIGGER_PREFIX]}
|
||||
if err = m.tpDB.SetVersions(vrs); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Migrator) getV1ActionTriggerFromDB(key string) (v1Atr *v1ActionTrigger, err error) {
|
||||
switch m.dataDBType {
|
||||
case utils.REDIS:
|
||||
tpDB := m.tpDB.(*engine.RedisStorage)
|
||||
if strVal, err := tpDB.Cmd("GET", key).Bytes(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
v1Atr := &v1ActionTrigger{Id: key}
|
||||
if err := m.mrshlr.Unmarshal(strVal, &v1Atr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1Atr, nil
|
||||
}
|
||||
case utils.MONGO:
|
||||
tpDB := m.tpDB.(*engine.MongoStorage)
|
||||
mgoDB := tpDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
v1Atr := new(v1ActionTrigger)
|
||||
if err := mgoDB.C(v1AccountTBL).Find(bson.M{"id": key}).One(v1Atr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1Atr, nil
|
||||
default:
|
||||
return nil, utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
utils.UnsupportedDB,
|
||||
fmt.Sprintf("error: unsupported: <%s> for getV1ActionTriggerFromDB method", m.dataDBType))
|
||||
}
|
||||
}
|
||||
|
||||
func (v1Act v1ActionTrigger) AsActionTrigger() (at *engine.ActionTrigger) {
|
||||
at = &engine.ActionTrigger{
|
||||
ID: v1Act.Id,
|
||||
// UniqueID: utils.GenUUID(),
|
||||
ThresholdType: v1Act.ThresholdType,
|
||||
ThresholdValue: v1Act.ThresholdValue,
|
||||
Recurrent: v1Act.Recurrent,
|
||||
|
||||
@@ -26,10 +26,11 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
var v1ActionTriggers1 = `{"BalanceType": "*monetary","BalanceDirection": "*out","ThresholdType":"*max_balance", "ThresholdValue" :2, "ActionsId": "TEST_ACTIONS", "Executed": true}`
|
||||
var v1ActionTriggers1 = `{"Id" : "Test","BalanceType": "*monetary","BalanceDirection": "*out","ThresholdType":"*max_balance", "ThresholdValue" :2, "ActionsId": "TEST_ACTIONS", "Executed": true}`
|
||||
|
||||
func TestV1ActionTriggersAsActionTriggers(t *testing.T) {
|
||||
atrs := &engine.ActionTrigger{
|
||||
ID: "Test",
|
||||
Balance: &engine.BalanceFilter{
|
||||
Type: utils.StringPointer(utils.MONETARY),
|
||||
Directions: utils.StringMapPointer(utils.NewStringMap(utils.OUT)),
|
||||
@@ -44,7 +45,7 @@ func TestV1ActionTriggersAsActionTriggers(t *testing.T) {
|
||||
t.Error(err)
|
||||
}
|
||||
newatrs := v1actstrgrs.AsActionTrigger()
|
||||
if !reflect.DeepEqual(*atrs, newatrs) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", *atrs, newatrs)
|
||||
if !reflect.DeepEqual(atrs, newatrs) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", atrs, newatrs)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,6 +64,15 @@ func (m *Migrator) Migrate(taskID string) (err error) {
|
||||
err = m.migrateCostDetails()
|
||||
case utils.MetaAccounts:
|
||||
err = m.migrateAccounts()
|
||||
case "migrateActionPlans":
|
||||
err = m.migrateActionPlans()
|
||||
case "migrateActionTriggers":
|
||||
err = m.migrateActionTriggers()
|
||||
case "migrateActions":
|
||||
err = m.migrateActions()
|
||||
case "migrateSharedGroups":
|
||||
err = m.migrateSharedGroups()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
303
migrator/migrator_it_test.go
Normal file
303
migrator/migrator_it_test.go
Normal file
@@ -0,0 +1,303 @@
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package migrator
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"path"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
rdsITdb *engine.RedisStorage
|
||||
mgoITdb *engine.MongoStorage
|
||||
onStor engine.DataDB
|
||||
onStorCfg string
|
||||
dbtype string
|
||||
mig *Migrator
|
||||
dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
|
||||
)
|
||||
|
||||
// subtests to be executed for each migrator
|
||||
var sTestsOnStorIT = []func(t *testing.T){
|
||||
testOnStorITFlush,
|
||||
testMigratorAccounts,
|
||||
testMigratorActionPlans,
|
||||
testMigratorActionTriggers,
|
||||
testMigratorActions,
|
||||
testMigratorSharedGroups,
|
||||
}
|
||||
|
||||
func TestOnStorITRedisConnect(t *testing.T) {
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
rdsITdb, err := engine.NewRedisStorage(fmt.Sprintf("%s:%s", cfg.TpDbHost, cfg.TpDbPort), 4, cfg.TpDbPass, cfg.DBDataEncoding, utils.REDIS_MAX_CONNS, nil, 1)
|
||||
if err != nil {
|
||||
t.Fatal("Could not connect to Redis", err.Error())
|
||||
}
|
||||
onStorCfg = cfg.DataDbName
|
||||
mig = NewMigrator(rdsITdb, rdsITdb, utils.REDIS, utils.JSON, rdsITdb, utils.REDIS)
|
||||
}
|
||||
|
||||
func TestOnStorITRedis(t *testing.T) {
|
||||
dbtype = utils.REDIS
|
||||
onStor = rdsITdb
|
||||
for _, stest := range sTestsOnStorIT {
|
||||
t.Run("TestOnStorITRedis", stest)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOnStorITMongoConnect(t *testing.T) {
|
||||
cdrsMongoCfgPath := path.Join(*dataDir, "conf", "samples", "cdrsv2mongo")
|
||||
mgoITCfg, err := config.NewCGRConfigFromFolder(cdrsMongoCfgPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if mgoITdb, err = engine.NewMongoStorage(mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, mgoITCfg.StorDBPass,
|
||||
utils.StorDB, nil, mgoITCfg.CacheConfig, mgoITCfg.LoadHistorySize); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
onStorCfg = mgoITCfg.StorDBName
|
||||
mig = NewMigrator(mgoITdb, mgoITdb, utils.MONGO, utils.JSON, mgoITdb, utils.MONGO)
|
||||
}
|
||||
|
||||
func TestOnStorITMongo(t *testing.T) {
|
||||
dbtype = utils.MONGO
|
||||
onStor = mgoITdb
|
||||
for _, stest := range sTestsOnStorIT {
|
||||
t.Run("TestOnStorITMongo", stest)
|
||||
}
|
||||
}
|
||||
|
||||
func testOnStorITFlush(t *testing.T) {
|
||||
switch {
|
||||
case dbtype == utils.REDIS:
|
||||
err := mig.FlushRedis()
|
||||
if err != nil {
|
||||
t.Error("Error when flushing redis ", err.Error())
|
||||
}
|
||||
case dbtype == utils.MONGO:
|
||||
err := mig.FlushMongo()
|
||||
if err != nil {
|
||||
t.Error("Error when flushing redis ", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testMigratorAccounts(t *testing.T) {
|
||||
v1b := &v1Balance{Value: 10, Weight: 10, DestinationIds: "NAT", ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local()}
|
||||
v1Acc := &v1Account{Id: "OUT:CUSTOMER_1:rif", BalanceMap: map[string]v1BalanceChain{utils.VOICE: v1BalanceChain{v1b}, utils.MONETARY: v1BalanceChain{&v1Balance{Value: 21, ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local()}}}}
|
||||
v2 := &engine.Balance{Uuid: "", ID: "", Value: 10, Directions: utils.StringMap{"*OUT": true}, ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), Weight: 10, DestinationIDs: utils.StringMap{"NAT": true}, RatingSubject: "", Categories: utils.NewStringMap(""), SharedGroups: utils.NewStringMap(""), TimingIDs: utils.NewStringMap("")}
|
||||
m2 := &engine.Balance{Uuid: "", ID: "", Value: 21, Directions: utils.StringMap{"*OUT": true}, ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), DestinationIDs: utils.NewStringMap(""), RatingSubject: "", Categories: utils.NewStringMap(""), SharedGroups: utils.NewStringMap(""), TimingIDs: utils.NewStringMap("")}
|
||||
testAccount := &engine.Account{ID: "CUSTOMER_1:rif", BalanceMap: map[string]engine.Balances{utils.VOICE: engine.Balances{v2}, utils.MONETARY: engine.Balances{m2}}, UnitCounters: engine.UnitCounters{}, ActionTriggers: engine.ActionTriggers{}}
|
||||
switch {
|
||||
case dbtype == utils.REDIS:
|
||||
bit, err := mig.mrshlr.Marshal(v1Acc)
|
||||
if err != nil {
|
||||
t.Error("Error when marshaling ", err.Error())
|
||||
}
|
||||
err = mig.SetV1onRedis(v1AccountDBPrefix+v1Acc.Id, bit)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 acc ", err.Error())
|
||||
}
|
||||
|
||||
err = mig.Migrate(utils.MetaAccounts)
|
||||
if err != nil {
|
||||
t.Error("Error when migrating accounts ", err.Error())
|
||||
}
|
||||
result, err := mig.dataDB.GetAccount(testAccount.ID)
|
||||
if err != nil {
|
||||
t.Error("Error when getting account ", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(testAccount, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", testAccount, result)
|
||||
}
|
||||
case dbtype == utils.MONGO:
|
||||
t.Errorf("not yet")
|
||||
}
|
||||
}
|
||||
|
||||
func testMigratorActionPlans(t *testing.T) {
|
||||
v1ap := &v1ActionPlan{Id: "test", AccountIds: []string{"one"}, Timing: &engine.RateInterval{Timing: new(engine.RITiming)}}
|
||||
ap := &engine.ActionPlan{Id: "test", AccountIDs: utils.StringMap{"one": true}, ActionTimings: []*engine.ActionTiming{&engine.ActionTiming{Timing: &engine.RateInterval{Timing: new(engine.RITiming)}}}}
|
||||
switch {
|
||||
case dbtype == utils.REDIS:
|
||||
bit, err := mig.mrshlr.Marshal(v1ap)
|
||||
if err != nil {
|
||||
t.Error("Error when marshaling ", err.Error())
|
||||
}
|
||||
setv1id := utils.ACTION_PLAN_PREFIX + v1ap.Id
|
||||
err = mig.SetV1onRedis(setv1id, bit)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 ActionPlan ", err.Error())
|
||||
}
|
||||
err = mig.Migrate("migrateActionPlans")
|
||||
if err != nil {
|
||||
t.Error("Error when migrating ActionPlans ", err.Error())
|
||||
}
|
||||
result, err := mig.tpDB.GetActionPlan(ap.Id, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
t.Error("Error when getting ActionPlan ", err.Error())
|
||||
}
|
||||
if ap.Id != result.Id || !reflect.DeepEqual(ap.AccountIDs, result.AccountIDs) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", *ap, result)
|
||||
} else if !reflect.DeepEqual(ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing)
|
||||
} else if ap.ActionTimings[0].Weight != result.ActionTimings[0].Weight || ap.ActionTimings[0].ActionsID != result.ActionTimings[0].ActionsID {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Weight, result.ActionTimings[0].Weight)
|
||||
}
|
||||
case dbtype == utils.MONGO:
|
||||
t.Errorf("not yet")
|
||||
}
|
||||
}
|
||||
|
||||
func testMigratorActionTriggers(t *testing.T) {
|
||||
tim := time.Date(0001, time.January, 1, 0, 0, 0, 0, time.UTC)
|
||||
v1atrs := &v1ActionTrigger{
|
||||
Id: "Test",
|
||||
BalanceType: "*monetary",
|
||||
BalanceDirection: "*out",
|
||||
ThresholdType: "*max_balance",
|
||||
ThresholdValue: 2,
|
||||
ActionsId: "TEST_ACTIONS",
|
||||
Executed: true,
|
||||
}
|
||||
|
||||
atrs := engine.ActionTriggers{
|
||||
&engine.ActionTrigger{
|
||||
ID: "Test",
|
||||
Balance: &engine.BalanceFilter{
|
||||
Type: utils.StringPointer(utils.MONETARY),
|
||||
Directions: utils.StringMapPointer(utils.NewStringMap(utils.OUT)),
|
||||
},
|
||||
ExpirationDate: tim,
|
||||
LastExecutionTime: tim,
|
||||
ActivationDate: tim,
|
||||
ThresholdType: utils.TRIGGER_MAX_BALANCE,
|
||||
ThresholdValue: 2,
|
||||
ActionsID: "TEST_ACTIONS",
|
||||
Executed: true,
|
||||
},
|
||||
}
|
||||
|
||||
switch {
|
||||
case dbtype == utils.REDIS:
|
||||
bit, err := mig.mrshlr.Marshal(v1atrs)
|
||||
if err != nil {
|
||||
t.Error("Error when marshaling ", err.Error())
|
||||
}
|
||||
setv1id := utils.ACTION_TRIGGER_PREFIX + v1atrs.Id
|
||||
err = mig.SetV1onRedis(setv1id, bit)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 ActionTriggers ", err.Error())
|
||||
}
|
||||
err = mig.Migrate("migrateActionTriggers")
|
||||
if err != nil {
|
||||
t.Error("Error when migrating ActionTriggers ", err.Error())
|
||||
}
|
||||
result, err := mig.tpDB.GetActionTriggers(v1atrs.Id, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
t.Error("Error when getting ActionTriggers ", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(atrs, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", atrs, result)
|
||||
}
|
||||
case dbtype == utils.MONGO:
|
||||
t.Errorf("not yet")
|
||||
}
|
||||
}
|
||||
|
||||
func testMigratorActions(t *testing.T) {
|
||||
v1act := &v1Action{Id: "test", ActionType: "", BalanceType: "", Direction: "INBOUND", ExtraParameters: "", ExpirationString: "", Balance: &v1Balance{}}
|
||||
act := engine.Actions{&engine.Action{Id: "test", ActionType: "", ExtraParameters: "", ExpirationString: "", Weight: 0.00, Balance: &engine.BalanceFilter{}}}
|
||||
switch {
|
||||
case dbtype == utils.REDIS:
|
||||
bit, err := mig.mrshlr.Marshal(v1act)
|
||||
if err != nil {
|
||||
t.Error("Error when marshaling ", err.Error())
|
||||
}
|
||||
setv1id := utils.ACTION_PREFIX + v1act.Id
|
||||
err = mig.SetV1onRedis(setv1id, bit)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 Actions ", err.Error())
|
||||
}
|
||||
|
||||
err = mig.Migrate("migrateActions")
|
||||
if err != nil {
|
||||
t.Error("Error when migrating Actions ", err.Error())
|
||||
}
|
||||
result, err := mig.tpDB.GetActions(v1act.Id, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
t.Error("Error when getting Actions ", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(act, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", act, result)
|
||||
}
|
||||
case dbtype == utils.MONGO:
|
||||
t.Errorf("not yet")
|
||||
}
|
||||
}
|
||||
|
||||
func testMigratorSharedGroups(t *testing.T) {
|
||||
v1sg := &v1SharedGroup{
|
||||
Id: "Test",
|
||||
AccountParameters: map[string]*engine.SharingParameters{
|
||||
"test": &engine.SharingParameters{Strategy: "*highest"},
|
||||
},
|
||||
MemberIds: []string{"1", "2", "3"},
|
||||
}
|
||||
sg := &engine.SharedGroup{
|
||||
Id: "Test",
|
||||
AccountParameters: map[string]*engine.SharingParameters{
|
||||
"test": &engine.SharingParameters{Strategy: "*highest"},
|
||||
},
|
||||
MemberIds: utils.NewStringMap("1", "2", "3"),
|
||||
}
|
||||
|
||||
switch {
|
||||
case dbtype == utils.REDIS:
|
||||
bit, err := mig.mrshlr.Marshal(v1sg)
|
||||
if err != nil {
|
||||
t.Error("Error when marshaling ", err.Error())
|
||||
}
|
||||
setv1id := utils.SHARED_GROUP_PREFIX + v1sg.Id
|
||||
err = mig.SetV1onRedis(setv1id, bit)
|
||||
if err != nil {
|
||||
t.Error("Error when setting v1 Actions ", err.Error())
|
||||
}
|
||||
|
||||
err = mig.Migrate("migrateSharedGroups")
|
||||
if err != nil {
|
||||
t.Error("Error when migrating Actions ", err.Error())
|
||||
}
|
||||
result, err := mig.tpDB.GetSharedGroup(v1sg.Id, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
t.Error("Error when getting Actions ", err.Error())
|
||||
}
|
||||
if !reflect.DeepEqual(sg, result) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", sg, result)
|
||||
}
|
||||
case dbtype == utils.MONGO:
|
||||
t.Errorf("not yet")
|
||||
}
|
||||
}
|
||||
@@ -18,8 +18,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package migrator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
type v1SharedGroup struct {
|
||||
@@ -28,8 +31,66 @@ type v1SharedGroup struct {
|
||||
MemberIds []string
|
||||
}
|
||||
|
||||
func (v1SG v1SharedGroup) AsSharedGroup() (sg engine.SharedGroup) {
|
||||
sg = engine.SharedGroup{
|
||||
func (m *Migrator) migrateSharedGroups() (err error) {
|
||||
var sgv1keys []string
|
||||
sgv1keys, err = m.tpDB.GetKeysForPrefix(utils.SHARED_GROUP_PREFIX)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, sgv1key := range sgv1keys {
|
||||
v1sg, err := m.getv1SharedGroupFromDB(sgv1key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sg := v1sg.AsSharedGroup()
|
||||
if err = m.tpDB.SetSharedGroup(sg, utils.NonTransactional); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.SHARED_GROUP_PREFIX: engine.CurrentStorDBVersions()[utils.SHARED_GROUP_PREFIX]}
|
||||
if err = m.tpDB.SetVersions(vrs); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating SharedGroup version into tpDB", err.Error()))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Migrator) getv1SharedGroupFromDB(key string) (*v1SharedGroup, error) {
|
||||
switch m.dataDBType {
|
||||
case utils.REDIS:
|
||||
tpDB := m.tpDB.(*engine.RedisStorage)
|
||||
if strVal, err := tpDB.Cmd("GET", key).Bytes(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
v1SG := &v1SharedGroup{Id: key}
|
||||
if err := m.mrshlr.Unmarshal(strVal, v1SG); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1SG, nil
|
||||
}
|
||||
case utils.MONGO:
|
||||
tpDB := m.tpDB.(*engine.MongoStorage)
|
||||
mgoDB := tpDB.DB()
|
||||
defer mgoDB.Session.Close()
|
||||
v1SG := new(v1SharedGroup)
|
||||
if err := mgoDB.C(v1AccountTBL).Find(bson.M{"id": key}).One(v1SG); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1SG, nil
|
||||
default:
|
||||
return nil, utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
utils.UnsupportedDB,
|
||||
fmt.Sprintf("error: unsupported: <%s> for getv1SharedGroupFromDB method", m.dataDBType))
|
||||
}
|
||||
}
|
||||
|
||||
func (v1SG v1SharedGroup) AsSharedGroup() (sg *engine.SharedGroup) {
|
||||
sg = &engine.SharedGroup{
|
||||
Id: v1SG.Id,
|
||||
AccountParameters: v1SG.AccountParameters,
|
||||
MemberIds: make(utils.StringMap),
|
||||
|
||||
@@ -41,8 +41,8 @@ func TestV1SharedGroupAsSharedGroup(t *testing.T) {
|
||||
MemberIds: utils.NewStringMap("1", "2", "3"),
|
||||
}
|
||||
newsg := v1sg.AsSharedGroup()
|
||||
if !reflect.DeepEqual(*sg, newsg) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", *sg, newsg)
|
||||
if !reflect.DeepEqual(sg, newsg) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", sg, newsg)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user