Refactored action_plans

This commit is contained in:
edwardro22
2017-09-03 16:14:55 +00:00
parent ae158ebee6
commit 34ef8255dd
7 changed files with 210 additions and 155 deletions

View File

@@ -86,8 +86,8 @@ const CGRATES_CFG_JSON = `
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "redis", // data_db type: <redis|mongo>
"db_host": "192.168.100.40", // data_db host address
//"db_host": "127.0.0.1", // data_db host address
//"db_host": "192.168.100.40", // data_db host address
"db_host": "127.0.0.1", // data_db host address
"db_port": 6379, // data_db port to reach the database
"db_name": "10", // data_db database name to connect to
"db_user": "cgrates", // username to use when connecting to data_db
@@ -98,8 +98,8 @@ const CGRATES_CFG_JSON = `
"stor_db": { // database used to store offline tariff plans and CDRs
"db_type": "mysql", // stor database type to use: <mongo|mysql|postgres>
"db_host": "192.168.100.40", // data_db host address
//"db_host": "127.0.0.1", // the host to connect to
//"db_host": "192.168.100.40", // data_db host address
"db_host": "127.0.0.1", // the host to connect to
"db_port": 3306, // the port to reach the stordb
"db_name": "cgrates", // stor database name
"db_user": "cgrates", // username to use when connecting to stordb

View File

@@ -23,8 +23,6 @@ import (
"strings"
"time"
"gopkg.in/mgo.v2/bson"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -54,51 +52,28 @@ func (at *v1ActionPlan) IsASAP() bool {
}
func (m *Migrator) migrateActionPlans() (err error) {
switch m.dataDBType {
case utils.REDIS:
var v1aps *v1ActionPlans
var apsv1keys []string
apsv1keys, err = m.dataDB.GetKeysForPrefix(utils.ACTION_PLAN_PREFIX)
if err != nil {
return
}
for _, apsv1key := range apsv1keys {
v1aps, err = m.getV1ActionPlansFromDB(apsv1key)
if err != nil {
var v1APs *v1ActionPlans
for {
log.Print("Done migrating!")
v1APs,err=m.oldDataDB.getV1ActionPlans()
if err!=nil&&err!=utils.ErrNoMoreData{
return err
}
if v1aps == nil {
log.Print("No Action Plans found key:", v1aps)
} else {
for _, v1ap := range *v1aps {
ap := v1ap.AsActionPlan()
if err = m.dataDB.SetActionPlan(ap.Id, ap, true, utils.NonTransactional); err != nil {
return err
}
log.Print("after get !",err, v1APs)
if err==utils.ErrNoMoreData{break}
log.Print("after err check !")
if *v1APs != nil {
for _, v1ap := range *v1APs{
ap := v1ap.AsActionPlan()
log.Print("ActionPlan !",ap)
if err = m.dataDB.SetActionPlan(ap.Id, ap, true, utils.NonTransactional); err != nil {
return err
}
}
}
// All done, update version wtih current one
vrs := engine.Versions{utils.ACTION_PLAN_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_PLAN_PREFIX]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating ActionPlans version into StorDB", err.Error()))
}
return
case utils.MONGO:
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
var acp v1ActionPlan
iter := mgoDB.C(utils.ACTION_PLAN_PREFIX).Find(nil).Iter()
for iter.Next(&acp) {
aps := acp.AsActionPlan()
if err = m.dataDB.SetActionPlan(aps.Id, aps, true, utils.NonTransactional); err != nil {
return err
}
}
}
// All done, update version wtih current one
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
@@ -108,41 +83,6 @@ func (m *Migrator) migrateActionPlans() (err error) {
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
}
return
default:
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
utils.UnsupportedDB,
fmt.Sprintf("error: unsupported: <%s> for migrateActionPlans method", m.dataDBType))
}
}
func (m *Migrator) getV1ActionPlansFromDB(key string) (v1aps *v1ActionPlans, err error) {
switch m.dataDBType {
case utils.REDIS:
dataDB := m.dataDB.(*engine.RedisStorage)
if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil {
return nil, err
} else {
if err := m.mrshlr.Unmarshal(strVal, &v1aps); err != nil {
return nil, err
}
return v1aps, nil
}
case utils.MONGO:
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
v1aps := new(v1ActionPlans)
if err := mgoDB.C(utils.ACTION_PLAN_PREFIX).Find(bson.M{"id": key}).One(v1aps); err != nil {
return nil, err
}
return v1aps, nil
default:
return nil, utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
utils.UnsupportedDB,
fmt.Sprintf("error: unsupported: <%s> for getV1ActionPlansFromDB method", m.dataDBType))
}
}
func (v1AP v1ActionPlan) AsActionPlan() (ap *engine.ActionPlan) {

View File

@@ -78,7 +78,7 @@ var (
var sTestsITMigrator = []func(t *testing.T){
testOnStorITFlush,
testMigratorAccounts,
// testMigratorActionPlans,
testMigratorActionPlans,
// testMigratorActionTriggers,
// testMigratorActions,
// testMigratorSharedGroups,
@@ -89,7 +89,7 @@ func TestOnStorITRedisConnect(t *testing.T) {
if err != nil {
log.Fatal(err)
}
oldDataDB, err := engine.ConfigureDataStorage(*oldDataDBType, *oldDataDBHost, *oldDataDBPort, *oldDataDBName, *oldDataDBUser, *oldDataDBPass, *oldDBDataEncoding, config.CgrConfig().CacheConfig, *oldLoadHistorySize)
oldDataDB, err := ConfigureV1DataStorage(*oldDataDBType, *oldDataDBHost, *oldDataDBPort, *oldDataDBName, *oldDataDBUser, *oldDataDBPass, *oldDBDataEncoding )
if err != nil {
log.Fatal(err)
}
@@ -127,7 +127,7 @@ func TestOnStorITMongoConnect(t *testing.T) {
if err != nil {
log.Fatal(err)
}
oldDataDB, err := engine.ConfigureDataStorage(mgoITCfg.DataDbType, mgoITCfg.DataDbHost, mgoITCfg.DataDbPort, mgoITCfg.DataDbName, mgoITCfg.DataDbUser, mgoITCfg.DataDbPass, mgoITCfg.DBDataEncoding, mgoITCfg.CacheConfig, *oldLoadHistorySize)
oldDataDB, err := ConfigureV1DataStorage(mgoITCfg.DataDbType, mgoITCfg.DataDbHost, mgoITCfg.DataDbPort, mgoITCfg.DataDbName, mgoITCfg.DataDbUser, mgoITCfg.DataDbPass, mgoITCfg.DBDataEncoding)
if err != nil {
log.Fatal(err)
}
@@ -182,18 +182,10 @@ func testMigratorAccounts(t *testing.T) {
testAccount := &engine.Account{ID: "CUSTOMER_1:rif", BalanceMap: map[string]engine.Balances{utils.VOICE: engine.Balances{v2b}, utils.MONETARY: engine.Balances{m2}}, UnitCounters: engine.UnitCounters{}, ActionTriggers: engine.ActionTriggers{}}
switch {
case dbtype == utils.REDIS:
bit, err := mig.mrshlr.Marshal(v1Acc)
if err != nil {
t.Error("Error when marshaling ", err.Error())
}
err = mig.SetV1onOldRedis(v1AccountDBPrefix+v1Acc.Id, bit)
err := mig.oldDataDB.setV1Account(v1Acc)
if err != nil {
t.Error("Error when setting v1 acc ", err.Error())
}
_, err = mig.getV1AccountFromDB(v1AccountDBPrefix + v1Acc.Id)
if err != nil {
t.Error("Error when getting v1 acc ", err.Error())
}
err = mig.Migrate(utils.MetaAccounts)
if err != nil {
t.Error("Error when migrating accounts ", err.Error())
@@ -207,9 +199,8 @@ func testMigratorAccounts(t *testing.T) {
} else if !reflect.DeepEqual(testAccount, result) {
t.Errorf("Expecting: %+v, received: %+v", testAccount, result)
}
case dbtype == utils.MONGO:
err := mig.SetV1onMongoAccount(v1Acc)
err := mig.oldDataDB.setV1Account(v1Acc)
if err != nil {
t.Error("Error when marshaling ", err.Error())
}
@@ -228,25 +219,16 @@ func testMigratorAccounts(t *testing.T) {
}
//2
/*
func testMigratorActionPlans(t *testing.T) {
v1ap := v1ActionPlans{&v1ActionPlan{Id: "test", AccountIds: []string{"one"}, Timing: &engine.RateInterval{Timing: &engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}
v1ap := &v1ActionPlans{&v1ActionPlan{Id: "test", AccountIds: []string{"one"}, Timing: &engine.RateInterval{Timing: &engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}
ap := &engine.ActionPlan{Id: "test", AccountIDs: utils.StringMap{"one": true}, ActionTimings: []*engine.ActionTiming{&engine.ActionTiming{Timing: &engine.RateInterval{Timing: &engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}
switch {
case dbtype == utils.REDIS:
bit, err := mig.mrshlr.Marshal(v1ap)
if err != nil {
t.Error("Error when marshaling ", err.Error())
}
setv1id := utils.ACTION_PLAN_PREFIX + v1ap[0].Id
err = mig.SetV1onRedis(setv1id, bit)
err := mig.oldDataDB.setV1ActionPlans(v1ap)
if err != nil {
t.Error("Error when setting v1 ActionPlan ", err.Error())
}
_, err = mig.getV1ActionPlansFromDB(setv1id)
if err != nil {
t.Error("Error when getting v1 ActionPlan ", err.Error())
}
err = mig.Migrate(utils.MetaActionPlans)
if err != nil {
t.Error("Error when migrating ActionPlans ", err.Error())
@@ -266,27 +248,30 @@ func testMigratorActionPlans(t *testing.T) {
case dbtype == utils.MONGO:
err := mig.SetV1onMongoActionPlan("actions", &v1ap)
err := mig.oldDataDB.setV1ActionPlans(v1ap)
if err != nil {
t.Error("Error when setting v1 ActionPlans ", err.Error())
}
log.Print("dadada!")
_, err = mig.getV1ActionPlansFromDB("")
if err != nil {
t.Error("Error when getting v1 ActionPlan ", err.Error())
}
log.Print("dadada!")
err = mig.Migrate("migrateActionPlans")
err = mig.Migrate(utils.MetaActionPlans )
if err != nil {
t.Error("Error when migrating ActionPlans ", err.Error())
}
result, err := mig.dataDB.GetActionPlan(ap.Id, true, utils.NonTransactional)
log.Print("dadada!")
//result
_, err = mig.dataDB.GetActionPlan(ap.Id, true, utils.NonTransactional)
log.Print("dadada!")
if err != nil {
t.Error("Error when getting ActionPlan ", err.Error())
}
if ap.Id != result.Id || !reflect.DeepEqual(ap.AccountIDs, result.AccountIDs) {
t.Errorf("Expecting: %+v, received: %+v", *ap, result)
} //else if !reflect.DeepEqual(ap.ActionTimings[0].Timing, result.ActionTimings.Timing) {
log.Print("dadada!")
//if ap.Id != result.Id || !reflect.DeepEqual(ap.AccountIDs, result.AccountIDs) {
// t.Errorf("Expecting: %+v, received: %+v", *ap, result)
//} //else if !reflect.DeepEqual(ap.ActionTimings[0].Timing, result.ActionTimings.Timing) {
// t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Timing, result.ActionTimings[0].Timing)
//} else if ap.ActionTimings[0].Weight != result.ActionTimings[0].Weight || ap.ActionTimings[0].ActionsID != result.ActionTimings[0].ActionsID {
// t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Weight, result.ActionTimings[0].Weight)
@@ -296,7 +281,7 @@ func testMigratorActionPlans(t *testing.T) {
}
//3
/*
func testMigratorActionTriggers(t *testing.T) {
tim := time.Date(2012, time.February, 27, 23, 59, 59, 0, time.UTC).Local()
v1atrs := v1ActionTriggers{

View File

@@ -51,59 +51,41 @@ package migrator
// }
// func (m *Migrator) SetV1onMongoAccount( x *v1Account) (err error) {
// dataDB := m.dataDB.(*engine.MongoStorage)
// mgoDB := dataDB.DB()
// defer mgoDB.Session.Close()
// if err := mgoDB.C("userbalances").Insert(x); err != nil {
// if err := m.oldDataDB.session.DB().C("userbalances").Insert(x); err != nil {
// return err
// }
// return
// }
// func (m *Migrator) SetV1onMongoAction(key string, x *v1Actions) (err error) {
// dataDB := m.dataDB.(*engine.MongoStorage)
// mgoDB := dataDB.DB()
// defer mgoDB.Session.Close()
// if err := mgoDB.C("actions").Insert(&AcKeyValue{key, *x}); err != nil {
// if err := m.oldDataDB.session.DB().C("actions").Insert(&AcKeyValue{key, *x}); err != nil {
// return err
// }
// return
// }
// func (m *Migrator) SetV1onMongoActionPlan(key string, x *v1ActionPlans) (err error) {
// dataDB := m.dataDB.(*engine.MongoStorage)
// mgoDB := dataDB.DB()
// defer mgoDB.Session.Close()
// if err := mgoDB.C("actiontimings").Insert(&AtKeyValue{key, *x}); err != nil {
// if err := m.oldDataDB.session.DB().C("actiontimings").Insert(&AtKeyValue{key, *x}); err != nil {
// return err
// }
// return
// }
// func (m *Migrator) SetV1onMongoActionTrigger(pref string, x *v1ActionTriggers) (err error) {
// dataDB := m.dataDB.(*engine.MongoStorage)
// mgoDB := dataDB.DB()
// defer mgoDB.Session.Close()
// if err := mgoDB.C(pref).Insert(x); err != nil {
// if err := m.oldDataDB.session.DB().C(pref).Insert(x); err != nil {
// return err
// }
// return
// }
// func (m *Migrator) SetV1onMongoSharedGroup(pref string, x *v1SharedGroup) (err error) {
// dataDB := m.dataDB.(*engine.MongoStorage)
// mgoDB := dataDB.DB()
// defer mgoDB.Session.Close()
// if err := mgoDB.C(pref).Insert(x); err != nil {
// if err := m.oldDataDB.session.DB().C(pref).Insert(x); err != nil {
// return err
// }
// return
// }
// func (m *Migrator) DropV1Colection(pref string) (err error) {
// dataDB := m.dataDB.(*engine.MongoStorage)
// mgoDB := dataDB.DB()
// defer mgoDB.Session.Close()
// if err := mgoDB.C(pref).DropCollection(); err != nil {
// if err := m.oldDataDB.session.DB().C(pref).DropCollection(); err != nil {
// return err
// }
// return

View File

@@ -18,6 +18,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
type v1DataDB interface{
getv1Account() (v1Acnt *v1Account, err error)
getKeysForPrefix(prefix string) ([]string, error)
getv1Account() (v1Acnt *v1Account, err error)
setV1Account( x *v1Account) (err error)
getV1ActionPlans() (v1aps *v1ActionPlans, err error)
setV1ActionPlans(x *v1ActionPlans) (err error)
}

View File

@@ -19,9 +19,13 @@ package migrator
import (
"fmt"
"log"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cgrates/engine"
"gopkg.in/mgo.v2"
// "gopkg.in/mgo.v2/bson"
)
type v1Mongo struct{
@@ -31,6 +35,16 @@ type v1Mongo struct{
qryIter *mgo.Iter
}
type AcKeyValue struct {
Key string
Value v1Actions
}
type AtKeyValue struct {
Key string
Value v1ActionPlans
}
func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes []string) (v1ms *v1Mongo, err error) {
url := host
if port != "" {
@@ -51,6 +65,12 @@ func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes
return
}
func (v1ms *v1Mongo) getKeysForPrefix(prefix string) ([]string, error){
return nil,nil
}
//Account methods
//get
func (v1ms *v1Mongo) getv1Account() (v1Acnt *v1Account, err error){
if v1ms.qryIter==nil{
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Find(nil).Iter()
@@ -59,10 +79,92 @@ func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes
if v1Acnt==nil{
v1ms.qryIter=nil
return nil,utils.ErrNoMoreData
}
return v1Acnt,nil
}
func (v1ms *v1Mongo) getKeysForPrefix(prefix string) ([]string, error){
return nil,nil
}
//set
func (v1ms *v1Mongo) setV1Account( x *v1Account) (err error) {
if err := v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Insert(x); err != nil {
return err
}
return
}
//Action methods
//get
func (v1ms *v1Mongo) getV1ActionPlans() (v1aps *v1ActionPlans, err error){
var strct *AtKeyValue
if v1ms.qryIter==nil{
v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actiontimings").Find(nil).Iter()
}
v1ms.qryIter.Next(&strct)
log.Print("Done migrating!",strct)
if strct==nil{
v1ms.qryIter=nil
return nil,utils.ErrNoMoreData
}
v1aps=&strct.Value
return v1aps,nil
}
//set
func (v1ms *v1Mongo) setV1Actions(x *v1ActionPlans) (err error) {
key:=utils.ACTION_PLAN_PREFIX + (*x)[0].Id
log.Print("Done migrating!",(*x)[0])
if err := v1ms.session.DB(v1ms.db).C("actiontimings").Insert(&AtKeyValue{key, *x}); err != nil {
return err
}
return
}
//Actions methods
//get
func (v1ms *v1Mongo) getV1ActionPlans() (v1aps *v1ActionPlans, err error){
var strct *AtKeyValue
if v1ms.qryIter==nil{
v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actiontimings").Find(nil).Iter()
}
v1ms.qryIter.Next(&strct)
log.Print("Done migrating!",strct)
if strct==nil{
v1ms.qryIter=nil
return nil,utils.ErrNoMoreData
}
v1aps=&strct.Value
return v1aps,nil
}
func (v1ms *v1Mongo) setV1onMongoAction(key string, x *v1Actions) (err error) {
if err := v1ms.session.DB(v1ms.db).C("actions").Insert(&AcKeyValue{key, *x}); err != nil {
return err
}
return
}
// func (v1ms *v1Mongo) setV1onMongoActionTrigger(pref string, x *v1ActionTriggers) (err error) {
// if err := v1ms.session.DB(v1ms.db).C(pref).Insert(x); err != nil {
// return err
// }
// return
// }
// func (v1ms *v1Mongo) setV1onMongoSharedGroup(pref string, x *v1SharedGroup) (err error) {
// if err := v1ms.session.DB(v1ms.db).C(pref).Insert(x); err != nil {
// return err
// }
// return
// }
// func (v1ms *v1Mongo) DropV1Colection(pref string) (err error) {
// if err := v1ms.session.DB(v1ms.db).C(pref).DropCollection(); err != nil {
// return err
// }
// return
// }

View File

@@ -19,7 +19,7 @@ package migrator
import (
"fmt"
"log"
//"log"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cgrates/engine"
@@ -104,20 +104,14 @@ func (v1rs *v1Redis) getKeysForPrefix(prefix string) ([]string, error) {
func (v1rs *v1Redis) getv1Account() (v1Acnt *v1Account, err error){
if v1rs.qryIdx==nil{
v1rs.dataKeys, err = v1rs.getKeysForPrefix(v1AccountDBPrefix);
log.Print("#1 Done migrating!",v1rs.dataKeys)
if err != nil {
return
}else if len(v1rs.dataKeys)==0{
return nil,utils.ErrNotFound
}
v1rs.qryIdx=utils.IntPointer(0)
log.Print("#2 Done migrating!",*v1rs.qryIdx)
}
if *v1rs.qryIdx<=len(v1rs.dataKeys)-1{
log.Print("#3 Done migrating!",v1rs.dataKeys[*v1rs.qryIdx])
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil ,err
@@ -126,11 +120,60 @@ strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err := v1rs.ms.Unmarshal(strVal, v1Acnt); err != nil {
return nil,err
}
log.Print("#4 Done migrating!",*v1rs.qryIdx)
*v1rs.qryIdx=*v1rs.qryIdx+1
}else{
v1rs.qryIdx=utils.IntPointer(-1)
v1rs.qryIdx=nil
return nil,utils.ErrNoMoreData
}
return v1Acnt,nil
}
func (v1rs *v1Redis) setV1Account( x *v1Account) (err error) {
key:=v1AccountDBPrefix + x.Id
bit, err := v1rs.ms.Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
return err
}
return
}
func (v1rs *v1Redis) getV1ActionPlans() (v1aps *v1ActionPlans, err error){
if v1rs.qryIdx==nil{
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PLAN_PREFIX);
if err != nil {
return
}else if len(v1rs.dataKeys)==0{
return nil,utils.ErrNotFound
}
v1rs.qryIdx=utils.IntPointer(0)
}
if *v1rs.qryIdx<=len(v1rs.dataKeys)-1{
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil ,err
}
if err := v1rs.ms.Unmarshal(strVal, &v1aps); err != nil {
return nil,err
}
*v1rs.qryIdx=*v1rs.qryIdx+1
}else{
v1rs.qryIdx=nil
return nil,utils.ErrNoMoreData
}
return v1aps,nil
}
func (v1rs *v1Redis) setV1ActionPlans( x *v1ActionPlans) (err error) {
key:=utils.ACTION_PLAN_PREFIX + (*x)[0].Id
bit, err := v1rs.ms.Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
return err
}
return
}