added action migration

This commit is contained in:
Radu Ioan Fericean
2015-10-23 19:21:16 +03:00
parent 3937e4f33b
commit d131b90d4f
2 changed files with 87 additions and 7 deletions

View File

@@ -35,7 +35,7 @@ import (
var (
//separator = flag.String("separator", ",", "Default field separator")
cgrConfig, _ = config.NewDefaultCGRConfig()
migrateRC8 = flag.Bool("migrate_rc8", false, "Migrate Accounts and ActionTriggers to RC8 structures")
migrateRC8 = flag.Bool("migrate_rc8", false, "Migrate Accounts, Actions and ActionTriggers to RC8 structures")
tpdb_type = flag.String("tpdb_type", cgrConfig.TpDbType, "The type of the TariffPlan database <redis>")
tpdb_host = flag.String("tpdb_host", cgrConfig.TpDbHost, "The TariffPlan host to connect to.")
tpdb_port = flag.String("tpdb_port", cgrConfig.TpDbPort, "The TariffPlan port to bind to.")
@@ -120,16 +120,20 @@ func main() {
if *tpdb_port != "" {
host += ":" + *tpdb_port
}
migratorRC8atr, err := NewMigratorRC8(host, db_nb, *tpdb_pass, *dbdata_encoding)
migratorRC8rat, err := NewMigratorRC8(host, db_nb, *tpdb_pass, *dbdata_encoding)
if err != nil {
utils.Logger.Crit(err.Error())
return
}
if err := migratorRC8atr.migrateActionTriggers(); err != nil {
if err := migratorRC8rat.migrateActionTriggers(); err != nil {
utils.Logger.Crit(err.Error())
return
}
if err := migratorRC8rat.migrateActions(); err != nil {
utils.Logger.Crit(err.Error())
return
}
utils.Logger.Info("Done!")
return
}
// Init necessary db connections, only if not already

View File

@@ -91,6 +91,18 @@ type ActionTrigger struct {
MinQueuedItems int
Executed bool
}
type Actions []*Action
type Action struct {
Id string
ActionType string
BalanceType string
Direction string
ExtraParameters string
ExpirationString string
Weight float64
Balance *Balance
}
func (mig MigratorRC8) migrateAccounts() error {
keys, err := mig.db.Keys(OLD_ACCOUNT_PREFIX + "*")
@@ -100,6 +112,7 @@ func (mig MigratorRC8) migrateAccounts() error {
newAccounts := make([]*engine.Account, len(keys))
// get existing accounts
for keyIndex, key := range keys {
utils.Logger.Info(fmt.Sprintf("Migrating account: %s...", key))
values, err := mig.db.Get(key)
if err != nil {
continue
@@ -117,14 +130,20 @@ func (mig MigratorRC8) migrateAccounts() error {
AllowNegative: oldAcc.AllowNegative,
Disabled: oldAcc.Disabled,
}
// fix id
idElements := strings.Split(newAcc.Id, utils.CONCATENATED_KEY_SEP)
if len(idElements) != 3 {
return fmt.Errorf("Malformed account ID %s", oldAcc.Id)
}
newAcc.Id = fmt.Sprintf("%s:%s", idElements[1], idElements[2])
// balances
for oldBalKey, oldBalChain := range oldAcc.BalanceMap {
keyElements := strings.Split(oldBalKey, "*")
if len(key) != 4 {
if len(keyElements) != 3 {
return fmt.Errorf("Malformed balance key in %s: %s", oldAcc.Id, oldBalKey)
}
newBalKey := "*" + keyElements[1]
newBalDirection := "*" + keyElements[3]
newBalDirection := "*" + keyElements[2]
newAcc.BalanceMap[newBalKey] = make(engine.BalanceChain, len(oldBalChain))
for index, oldBal := range oldBalChain {
newAcc.BalanceMap[newBalKey][index] = &engine.Balance{
@@ -169,7 +188,6 @@ func (mig MigratorRC8) migrateAccounts() error {
}
}
// action triggers
for index, oldAtr := range oldAcc.ActionTriggers {
newAcc.ActionTriggers[index] = &engine.ActionTrigger{
Id: oldAtr.Id,
@@ -207,6 +225,7 @@ func (mig MigratorRC8) migrateAccounts() error {
}
}
// delete old data
utils.Logger.Info(fmt.Sprintf("Deleting old accounts: %s...", OLD_ACCOUNT_PREFIX+"*"))
_, err = mig.db.Del(OLD_ACCOUNT_PREFIX + "*")
return err
}
@@ -218,6 +237,7 @@ func (mig MigratorRC8) migrateActionTriggers() error {
}
newAtrsMap := make(map[string]engine.ActionTriggers, len(keys))
for _, key := range keys {
utils.Logger.Info(fmt.Sprintf("Migrating action trigger: %s...", key))
var oldAtrs ActionTriggers
var values []byte
if values, err = mig.db.Get(key); err == nil {
@@ -264,3 +284,59 @@ func (mig MigratorRC8) migrateActionTriggers() error {
}
return nil
}
func (mig MigratorRC8) migrateActions() error {
keys, err := mig.db.Keys(utils.ACTION_PREFIX + "*")
if err != nil {
return err
}
newAcsMap := make(map[string]engine.Actions, len(keys))
for _, key := range keys {
utils.Logger.Info(fmt.Sprintf("Migrating action: %s...", key))
var oldAcs Actions
var values []byte
if values, err = mig.db.Get(key); err == nil {
if err := mig.ms.Unmarshal(values, &oldAcs); err != nil {
return err
}
}
newAcs := make(engine.Actions, len(oldAcs))
for index, oldAc := range oldAcs {
newAcs[index] = &engine.Action{
Id: oldAc.Id,
ActionType: oldAc.ActionType,
BalanceType: oldAc.BalanceType,
ExtraParameters: oldAc.ExtraParameters,
ExpirationString: oldAc.ExpirationString,
Weight: oldAc.Weight,
Balance: &engine.Balance{
Uuid: oldAc.Balance.Uuid,
Id: oldAc.Balance.Id,
Value: oldAc.Balance.Value,
Directions: utils.ParseStringMap(oldAc.Direction),
ExpirationDate: oldAc.Balance.ExpirationDate,
Weight: oldAc.Balance.Weight,
DestinationIds: utils.ParseStringMap(oldAc.Balance.DestinationIds),
RatingSubject: oldAc.Balance.RatingSubject,
Categories: utils.ParseStringMap(oldAc.Balance.Category),
SharedGroup: oldAc.Balance.SharedGroup,
Timings: oldAc.Balance.Timings,
TimingIDs: oldAc.Balance.TimingIDs,
Disabled: oldAc.Balance.Disabled,
},
}
}
newAcsMap[key] = newAcs
}
// write data back
for key, acs := range newAcsMap {
result, err := mig.ms.Marshal(&acs)
if err != nil {
return err
}
if err = mig.db.Set(key, result); err != nil {
return err
}
}
return nil
}