From d131b90d4fc72ddaaa13fa63ae3d8545cf525897 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 23 Oct 2015 19:21:16 +0300 Subject: [PATCH] added action migration --- cmd/cgr-loader/cgr-loader.go | 12 +++-- cmd/cgr-loader/migrator_rc8.go | 82 ++++++++++++++++++++++++++++++++-- 2 files changed, 87 insertions(+), 7 deletions(-) diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index d23a26441..1c1603e28 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -35,7 +35,7 @@ import ( var ( //separator = flag.String("separator", ",", "Default field separator") cgrConfig, _ = config.NewDefaultCGRConfig() - migrateRC8 = flag.Bool("migrate_rc8", false, "Migrate Accounts and ActionTriggers to RC8 structures") + migrateRC8 = flag.Bool("migrate_rc8", false, "Migrate Accounts, Actions and ActionTriggers to RC8 structures") tpdb_type = flag.String("tpdb_type", cgrConfig.TpDbType, "The type of the TariffPlan database ") tpdb_host = flag.String("tpdb_host", cgrConfig.TpDbHost, "The TariffPlan host to connect to.") tpdb_port = flag.String("tpdb_port", cgrConfig.TpDbPort, "The TariffPlan port to bind to.") @@ -120,16 +120,20 @@ func main() { if *tpdb_port != "" { host += ":" + *tpdb_port } - migratorRC8atr, err := NewMigratorRC8(host, db_nb, *tpdb_pass, *dbdata_encoding) + migratorRC8rat, err := NewMigratorRC8(host, db_nb, *tpdb_pass, *dbdata_encoding) if err != nil { utils.Logger.Crit(err.Error()) return } - if err := migratorRC8atr.migrateActionTriggers(); err != nil { + if err := migratorRC8rat.migrateActionTriggers(); err != nil { utils.Logger.Crit(err.Error()) return } - + if err := migratorRC8rat.migrateActions(); err != nil { + utils.Logger.Crit(err.Error()) + return + } + utils.Logger.Info("Done!") return } // Init necessary db connections, only if not already diff --git a/cmd/cgr-loader/migrator_rc8.go b/cmd/cgr-loader/migrator_rc8.go index 79b560e24..45a29a7a5 100644 --- a/cmd/cgr-loader/migrator_rc8.go +++ b/cmd/cgr-loader/migrator_rc8.go @@ -91,6 +91,18 @@ type ActionTrigger struct { MinQueuedItems int Executed bool } +type Actions []*Action + +type Action struct { + Id string + ActionType string + BalanceType string + Direction string + ExtraParameters string + ExpirationString string + Weight float64 + Balance *Balance +} func (mig MigratorRC8) migrateAccounts() error { keys, err := mig.db.Keys(OLD_ACCOUNT_PREFIX + "*") @@ -100,6 +112,7 @@ func (mig MigratorRC8) migrateAccounts() error { newAccounts := make([]*engine.Account, len(keys)) // get existing accounts for keyIndex, key := range keys { + utils.Logger.Info(fmt.Sprintf("Migrating account: %s...", key)) values, err := mig.db.Get(key) if err != nil { continue @@ -117,14 +130,20 @@ func (mig MigratorRC8) migrateAccounts() error { AllowNegative: oldAcc.AllowNegative, Disabled: oldAcc.Disabled, } + // fix id + idElements := strings.Split(newAcc.Id, utils.CONCATENATED_KEY_SEP) + if len(idElements) != 3 { + return fmt.Errorf("Malformed account ID %s", oldAcc.Id) + } + newAcc.Id = fmt.Sprintf("%s:%s", idElements[1], idElements[2]) // balances for oldBalKey, oldBalChain := range oldAcc.BalanceMap { keyElements := strings.Split(oldBalKey, "*") - if len(key) != 4 { + if len(keyElements) != 3 { return fmt.Errorf("Malformed balance key in %s: %s", oldAcc.Id, oldBalKey) } newBalKey := "*" + keyElements[1] - newBalDirection := "*" + keyElements[3] + newBalDirection := "*" + keyElements[2] newAcc.BalanceMap[newBalKey] = make(engine.BalanceChain, len(oldBalChain)) for index, oldBal := range oldBalChain { newAcc.BalanceMap[newBalKey][index] = &engine.Balance{ @@ -169,7 +188,6 @@ func (mig MigratorRC8) migrateAccounts() error { } } // action triggers - for index, oldAtr := range oldAcc.ActionTriggers { newAcc.ActionTriggers[index] = &engine.ActionTrigger{ Id: oldAtr.Id, @@ -207,6 +225,7 @@ func (mig MigratorRC8) migrateAccounts() error { } } // delete old data + utils.Logger.Info(fmt.Sprintf("Deleting old accounts: %s...", OLD_ACCOUNT_PREFIX+"*")) _, err = mig.db.Del(OLD_ACCOUNT_PREFIX + "*") return err } @@ -218,6 +237,7 @@ func (mig MigratorRC8) migrateActionTriggers() error { } newAtrsMap := make(map[string]engine.ActionTriggers, len(keys)) for _, key := range keys { + utils.Logger.Info(fmt.Sprintf("Migrating action trigger: %s...", key)) var oldAtrs ActionTriggers var values []byte if values, err = mig.db.Get(key); err == nil { @@ -264,3 +284,59 @@ func (mig MigratorRC8) migrateActionTriggers() error { } return nil } + +func (mig MigratorRC8) migrateActions() error { + keys, err := mig.db.Keys(utils.ACTION_PREFIX + "*") + if err != nil { + return err + } + newAcsMap := make(map[string]engine.Actions, len(keys)) + for _, key := range keys { + utils.Logger.Info(fmt.Sprintf("Migrating action: %s...", key)) + var oldAcs Actions + var values []byte + if values, err = mig.db.Get(key); err == nil { + if err := mig.ms.Unmarshal(values, &oldAcs); err != nil { + return err + } + } + newAcs := make(engine.Actions, len(oldAcs)) + for index, oldAc := range oldAcs { + newAcs[index] = &engine.Action{ + Id: oldAc.Id, + ActionType: oldAc.ActionType, + BalanceType: oldAc.BalanceType, + ExtraParameters: oldAc.ExtraParameters, + ExpirationString: oldAc.ExpirationString, + Weight: oldAc.Weight, + Balance: &engine.Balance{ + Uuid: oldAc.Balance.Uuid, + Id: oldAc.Balance.Id, + Value: oldAc.Balance.Value, + Directions: utils.ParseStringMap(oldAc.Direction), + ExpirationDate: oldAc.Balance.ExpirationDate, + Weight: oldAc.Balance.Weight, + DestinationIds: utils.ParseStringMap(oldAc.Balance.DestinationIds), + RatingSubject: oldAc.Balance.RatingSubject, + Categories: utils.ParseStringMap(oldAc.Balance.Category), + SharedGroup: oldAc.Balance.SharedGroup, + Timings: oldAc.Balance.Timings, + TimingIDs: oldAc.Balance.TimingIDs, + Disabled: oldAc.Balance.Disabled, + }, + } + } + newAcsMap[key] = newAcs + } + // write data back + for key, acs := range newAcsMap { + result, err := mig.ms.Marshal(&acs) + if err != nil { + return err + } + if err = mig.db.Set(key, result); err != nil { + return err + } + } + return nil +}