Refactored actions

This commit is contained in:
edwardro22
2017-09-03 17:42:14 +00:00
parent 34ef8255dd
commit 8a86f04897
7 changed files with 102 additions and 163 deletions

View File

@@ -34,9 +34,7 @@ const (
func (m *Migrator) migrateAccounts() (err error) {
var v1Acnt *v1Account
// for infinit pana cand vine err
for {
for {
v1Acnt,err=m.oldDataDB.getv1Account()
if err!=nil&&err!=utils.ErrNoMoreData{
return err
@@ -44,14 +42,11 @@ func (m *Migrator) migrateAccounts() (err error) {
if err==utils.ErrNoMoreData{break}
if v1Acnt != nil {
acnt := v1Acnt.AsAccount()
if err = m.dataDB.SetAccount(acnt); err != nil {
return err
}
}
}
// All done, update version wtih current one
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {

View File

@@ -19,11 +19,11 @@ package migrator
import (
"fmt"
"log"
// /"log"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"gopkg.in/mgo.v2/bson"
//"gopkg.in/mgo.v2/bson"
)
type v1Action struct {
@@ -40,58 +40,28 @@ type v1Action struct {
type v1Actions []*v1Action
func (m *Migrator) migrateActions() (err error) {
switch m.dataDBType {
case utils.REDIS:
var v1ACs *v1Actions
var acts engine.Actions
var actv1keys []string
actv1keys, err = m.dataDB.GetKeysForPrefix(utils.ACTION_PREFIX)
if err != nil {
return
}
for _, actv1key := range actv1keys {
v1acts, err := m.getV1ActionFromDB(actv1key)
if err != nil {
for {
v1ACs,err=m.oldDataDB.getV1Actions()
if err!=nil&&err!=utils.ErrNoMoreData{
return err
}
if v1acts == nil {
log.Print("No Actions found with key:", actv1key)
} else {
for _, v1act := range *v1acts {
act := v1act.AsAction()
acts = append(acts, act)
}
if err==utils.ErrNoMoreData{break}
if *v1ACs != nil {
for _, v1ac := range *v1ACs{
act := v1ac.AsAction()
acts = append(acts, act)
}
}
if err := m.dataDB.SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil {
if err := m.dataDB.SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil {
return err
}
}
}
// All done, update version wtih current one
vrs := engine.Versions{utils.ACTION_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_PREFIX]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
}
return
case utils.MONGO:
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
var acts engine.Actions
var v1act v1Action
iter := mgoDB.C(utils.ACTION_PREFIX).Find(nil).Iter()
for iter.Next(&v1act) {
act := v1act.AsAction()
acts = append(acts, act)
}
if err := m.dataDB.SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil {
return err
}
// All done, update version wtih current one
vrs := engine.Versions{utils.ACTION_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_PREFIX]}
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
@@ -100,41 +70,6 @@ func (m *Migrator) migrateActions() (err error) {
}
return
default:
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
utils.UnsupportedDB,
fmt.Sprintf("error: unsupported: <%s> for migrateActions method", m.dataDBType))
}
}
func (m *Migrator) getV1ActionFromDB(key string) (v1act *v1Actions, err error) {
switch m.dataDBType {
case utils.REDIS:
dataDB := m.dataDB.(*engine.RedisStorage)
if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil {
return nil, err
} else {
if err := m.mrshlr.Unmarshal(strVal, &v1act); err != nil {
return nil, err
}
return v1act, nil
}
case utils.MONGO:
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
v1act := new(v1Actions)
if err := mgoDB.C(utils.ACTION_PREFIX).Find(bson.M{"id": key}).One(v1act); err != nil {
return nil, err
}
return v1act, nil
default:
return nil, utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
utils.UnsupportedDB,
fmt.Sprintf("error: unsupported: <%s> for getV1ActionPlansFromDB method", m.dataDBType))
}
}
func (v1Act v1Action) AsAction() (act *engine.Action) {

View File

@@ -19,7 +19,7 @@ package migrator
import (
"fmt"
"log"
//"log"
"strings"
"time"
@@ -54,20 +54,14 @@ func (at *v1ActionPlan) IsASAP() bool {
func (m *Migrator) migrateActionPlans() (err error) {
var v1APs *v1ActionPlans
for {
log.Print("Done migrating!")
v1APs,err=m.oldDataDB.getV1ActionPlans()
if err!=nil&&err!=utils.ErrNoMoreData{
return err
}
log.Print("after get !",err, v1APs)
if err==utils.ErrNoMoreData{break}
log.Print("after err check !")
if *v1APs != nil {
for _, v1ap := range *v1APs{
ap := v1ap.AsActionPlan()
log.Print("ActionPlan !",ap)
if err = m.dataDB.SetActionPlan(ap.Id, ap, true, utils.NonTransactional); err != nil {
return err
}

View File

@@ -80,7 +80,7 @@ var sTestsITMigrator = []func(t *testing.T){
testMigratorAccounts,
testMigratorActionPlans,
// testMigratorActionTriggers,
// testMigratorActions,
testMigratorActions,
// testMigratorSharedGroups,
}
@@ -237,7 +237,6 @@ func testMigratorActionPlans(t *testing.T) {
if err != nil {
t.Error("Error when getting ActionPlan ", err.Error())
}
if ap.Id != result.Id || !reflect.DeepEqual(ap.AccountIDs, result.AccountIDs) {
t.Errorf("Expecting: %+v, received: %+v", *ap, result)
} else if !reflect.DeepEqual(ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing) {
@@ -245,40 +244,28 @@ func testMigratorActionPlans(t *testing.T) {
} else if ap.ActionTimings[0].Weight != result.ActionTimings[0].Weight || ap.ActionTimings[0].ActionsID != result.ActionTimings[0].ActionsID {
t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Weight, result.ActionTimings[0].Weight)
}
case dbtype == utils.MONGO:
case dbtype == utils.MONGO:
err := mig.oldDataDB.setV1ActionPlans(v1ap)
if err != nil {
t.Error("Error when setting v1 ActionPlans ", err.Error())
}
log.Print("dadada!")
log.Print("dadada!")
err = mig.Migrate(utils.MetaActionPlans )
if err != nil {
t.Error("Error when migrating ActionPlans ", err.Error())
}
log.Print("dadada!")
//result
_, err = mig.dataDB.GetActionPlan(ap.Id, true, utils.NonTransactional)
log.Print("dadada!")
result, err := mig.dataDB.GetActionPlan(ap.Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting ActionPlan ", err.Error())
}
log.Print("dadada!")
//if ap.Id != result.Id || !reflect.DeepEqual(ap.AccountIDs, result.AccountIDs) {
// t.Errorf("Expecting: %+v, received: %+v", *ap, result)
//} //else if !reflect.DeepEqual(ap.ActionTimings[0].Timing, result.ActionTimings.Timing) {
// t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing)
//} else if ap.ActionTimings[0].Weight != result.ActionTimings[0].Weight || ap.ActionTimings[0].ActionsID != result.ActionTimings[0].ActionsID {
// t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Weight, result.ActionTimings[0].Weight)
//}
if ap.Id != result.Id || !reflect.DeepEqual(ap.AccountIDs, result.AccountIDs) {
t.Errorf("Expecting: %+v, received: %+v", *ap, result)
} else if !reflect.DeepEqual(ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing) {
t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing)
} else if ap.ActionTimings[0].Weight != result.ActionTimings[0].Weight || ap.ActionTimings[0].ActionsID != result.ActionTimings[0].ActionsID {
t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Weight, result.ActionTimings[0].Weight)
}
}
}
}
//3
/*
@@ -428,65 +415,51 @@ func testMigratorActionTriggers(t *testing.T) {
}
}
*/
//4
func testMigratorActions(t *testing.T) {
v1act := v1Actions{&v1Action{Id: "test", ActionType: "", BalanceType: "", Direction: "INBOUND", ExtraParameters: "", ExpirationString: "", Balance: &v1Balance{Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}
act := engine.Actions{&engine.Action{Id: "test", ActionType: "", ExtraParameters: "", ExpirationString: "", Weight: 0.00, Balance: &engine.BalanceFilter{Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}
v1act := &v1Actions{&v1Action{Id: "test", ActionType: "", BalanceType: "", Direction: "INBOUND", ExtraParameters: "", ExpirationString: "", Balance: &v1Balance{Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}
act := &engine.Actions{&engine.Action{Id: "test", ActionType: "", ExtraParameters: "", ExpirationString: "", Weight: 0.00, Balance: &engine.BalanceFilter{Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}
switch {
case dbtype == utils.REDIS:
bit, err := mig.mrshlr.Marshal(v1act)
if err != nil {
t.Error("Error when marshaling ", err.Error())
}
setv1id := utils.ACTION_PREFIX + v1act[0].Id
err = mig.SetV1onRedis(setv1id, bit)
err := mig.oldDataDB.setV1Actions(v1act)
if err != nil {
t.Error("Error when setting v1 Actions ", err.Error())
}
_, err = mig.getV1ActionFromDB(setv1id)
if err != nil {
t.Error("Error when getting v1 Actions ", err.Error())
}
err = mig.Migrate(utils.MetaActions)
if err != nil {
t.Error("Error when migrating Actions ", err.Error())
}
result, err := mig.dataDB.GetActions(v1act[0].Id, true, utils.NonTransactional)
result, err := mig.dataDB.GetActions((*v1act)[0].Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting Actions ", err.Error())
}
if !reflect.DeepEqual(act, result) {
t.Errorf("Expecting: %+v, received: %+v", act, result)
if !reflect.DeepEqual(*act, result) {
t.Errorf("Expecting: %+v, received: %+v", *act, result)
}
case dbtype == utils.MONGO:
err := mig.SetV1onMongoAction(utils.ACTION_PREFIX, &v1act)
err := mig.oldDataDB.setV1Actions(v1act)
if err != nil {
t.Error("Error when setting v1 Actions ", err.Error())
}
err = mig.Migrate("migrateActions")
err = mig.Migrate(utils.MetaActions)
if err != nil {
t.Error("Error when migrating Actions ", err.Error())
}
result, err := mig.dataDB.GetActions(v1act[0].Id, true, utils.NonTransactional)
result, err := mig.dataDB.GetActions((*v1act)[0].Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting Actions ", err.Error())
}
if !reflect.DeepEqual(act[0].Balance.Timings, result[0].Balance.Timings) {
t.Errorf("Expecting: %+v, received: %+v", act[0].Balance.Timings, result[0].Balance.Timings)
}
err = mig.DropV1Colection(utils.ACTION_PREFIX)
if err != nil {
t.Error("Error when flushing v1 Actions ", err.Error())
}
if !reflect.DeepEqual(*act, result) {
t.Errorf("Expecting: %+v, received: %+v", *act, result)
}
}
}
// 5
/*
func testMigratorSharedGroups(t *testing.T) {
v1sg := &v1SharedGroup{
Id: "Test",

View File

@@ -23,4 +23,6 @@ getv1Account() (v1Acnt *v1Account, err error)
setV1Account( x *v1Account) (err error)
getV1ActionPlans() (v1aps *v1ActionPlans, err error)
setV1ActionPlans(x *v1ActionPlans) (err error)
getV1Actions() (v1acs *v1Actions, err error)
setV1Actions(x *v1Actions) (err error)
}

View File

@@ -19,13 +19,12 @@ package migrator
import (
"fmt"
"log"
// "log"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cgrates/engine"
"gopkg.in/mgo.v2"
// "gopkg.in/mgo.v2/bson"
)
type v1Mongo struct{
@@ -101,22 +100,17 @@ var strct *AtKeyValue
v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actiontimings").Find(nil).Iter()
}
v1ms.qryIter.Next(&strct)
log.Print("Done migrating!",strct)
if strct==nil{
v1ms.qryIter=nil
return nil,utils.ErrNoMoreData
}
v1aps=&strct.Value
return v1aps,nil
}
//set
func (v1ms *v1Mongo) setV1Actions(x *v1ActionPlans) (err error) {
func (v1ms *v1Mongo) setV1ActionPlans(x *v1ActionPlans) (err error) {
key:=utils.ACTION_PLAN_PREFIX + (*x)[0].Id
log.Print("Done migrating!",(*x)[0])
if err := v1ms.session.DB(v1ms.db).C("actiontimings").Insert(&AtKeyValue{key, *x}); err != nil {
return err
}
@@ -125,24 +119,23 @@ func (v1ms *v1Mongo) setV1Actions(x *v1ActionPlans) (err error) {
//Actions methods
//get
func (v1ms *v1Mongo) getV1ActionPlans() (v1aps *v1ActionPlans, err error){
var strct *AtKeyValue
func (v1ms *v1Mongo) getV1Actions() (v1acs *v1Actions, err error){
var strct *AcKeyValue
if v1ms.qryIter==nil{
v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actiontimings").Find(nil).Iter()
v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actions").Find(nil).Iter()
}
v1ms.qryIter.Next(&strct)
log.Print("Done migrating!",strct)
if strct==nil{
v1ms.qryIter=nil
return nil,utils.ErrNoMoreData
}
v1aps=&strct.Value
return v1aps,nil
v1acs=&strct.Value
return v1acs,nil
}
func (v1ms *v1Mongo) setV1onMongoAction(key string, x *v1Actions) (err error) {
func (v1ms *v1Mongo) setV1Actions(x *v1Actions) (err error) {
key:=utils.ACTION_PREFIX + (*x)[0].Id
if err := v1ms.session.DB(v1ms.db).C("actions").Insert(&AcKeyValue{key, *x}); err != nil {
return err
}

View File

@@ -101,6 +101,8 @@ func (v1rs *v1Redis) getKeysForPrefix(prefix string) ([]string, error) {
return r.List()
}
//Account methods
//get
func (v1rs *v1Redis) getv1Account() (v1Acnt *v1Account, err error){
if v1rs.qryIdx==nil{
v1rs.dataKeys, err = v1rs.getKeysForPrefix(v1AccountDBPrefix);
@@ -128,6 +130,7 @@ v1rs.qryIdx=nil
return v1Acnt,nil
}
//set
func (v1rs *v1Redis) setV1Account( x *v1Account) (err error) {
key:=v1AccountDBPrefix + x.Id
bit, err := v1rs.ms.Marshal(x)
@@ -140,6 +143,8 @@ if err = v1rs.cmd("SET", key, bit).Err; err != nil {
return
}
//ActionPlans methods
//get
func (v1rs *v1Redis) getV1ActionPlans() (v1aps *v1ActionPlans, err error){
if v1rs.qryIdx==nil{
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PLAN_PREFIX);
@@ -166,8 +171,50 @@ v1rs.qryIdx=nil
return v1aps,nil
}
func (v1rs *v1Redis) setV1ActionPlans( x *v1ActionPlans) (err error) {
//set
func (v1rs *v1Redis) setV1ActionPlans( x *v1ActionPlans) (err error) {
key:=utils.ACTION_PLAN_PREFIX + (*x)[0].Id
bit, err := v1rs.ms.Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
return err
}
return
}
//Actions methods
//get
func (v1rs *v1Redis) getV1Actions() (v1acs *v1Actions, err error){
if v1rs.qryIdx==nil{
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PREFIX);
if err != nil {
return
}else if len(v1rs.dataKeys)==0{
return nil,utils.ErrNotFound
}
v1rs.qryIdx=utils.IntPointer(0)
}
if *v1rs.qryIdx<=len(v1rs.dataKeys)-1{
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil ,err
}
if err := v1rs.ms.Unmarshal(strVal, &v1acs); err != nil {
return nil,err
}
*v1rs.qryIdx=*v1rs.qryIdx+1
}else{
v1rs.qryIdx=nil
return nil,utils.ErrNoMoreData
}
return v1acs,nil
}
//set
func (v1rs *v1Redis) setV1Actions(x *v1Actions) (err error){
key:=utils.ACTION_PREFIX + (*x)[0].Id
bit, err := v1rs.ms.Marshal(x)
if err != nil {
return err