From 3937e4f33bde6cf46f3b7c89514dd539a295c2b7 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 23 Oct 2015 18:49:59 +0300 Subject: [PATCH] added account migrator --- cmd/cgr-loader/cgr-loader.go | 44 ++++++ cmd/cgr-loader/migrator_rc8.go | 266 +++++++++++++++++++++++++++++++++ 2 files changed, 310 insertions(+) create mode 100644 cmd/cgr-loader/migrator_rc8.go diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index d958881a6..d23a26441 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -24,6 +24,7 @@ import ( "log" "net/rpc" "path" + "strconv" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -34,6 +35,7 @@ import ( var ( //separator = flag.String("separator", ",", "Default field separator") cgrConfig, _ = config.NewDefaultCGRConfig() + migrateRC8 = flag.Bool("migrate_rc8", false, "Migrate Accounts and ActionTriggers to RC8 structures") tpdb_type = flag.String("tpdb_type", cgrConfig.TpDbType, "The type of the TariffPlan database ") tpdb_host = flag.String("tpdb_host", cgrConfig.TpDbHost, "The TariffPlan host to connect to.") tpdb_port = flag.String("tpdb_port", cgrConfig.TpDbPort, "The TariffPlan port to bind to.") @@ -88,6 +90,48 @@ func main() { var storDb engine.LoadStorage var rater, cdrstats, users *rpc.Client var loader engine.LoadReader + if *migrateRC8 { + var db_nb int + db_nb, err = strconv.Atoi(*datadb_name) + if err != nil { + utils.Logger.Crit("Redis db name must be an integer!") + return + } + host := *datadb_host + if *datadb_port != "" { + host += ":" + *datadb_port + } + migratorRC8acc, err := NewMigratorRC8(host, db_nb, *datadb_pass, *dbdata_encoding) + if err != nil { + utils.Logger.Crit(err.Error()) + return + } + if err := migratorRC8acc.migrateAccounts(); err != nil { + utils.Logger.Crit(err.Error()) + return + } + + db_nb, err = strconv.Atoi(*tpdb_name) + if err != nil { + utils.Logger.Crit("Redis db name must be an integer!") + return + } + host = *tpdb_host + if *tpdb_port != "" { + host += ":" + *tpdb_port + } + migratorRC8atr, err := NewMigratorRC8(host, db_nb, *tpdb_pass, *dbdata_encoding) + if err != nil { + utils.Logger.Crit(err.Error()) + return + } + if err := migratorRC8atr.migrateActionTriggers(); err != nil { + utils.Logger.Crit(err.Error()) + return + } + + return + } // Init necessary db connections, only if not already if !*dryRun { // make sure we do not need db connections on dry run, also not importing into any stordb if *fromStorDb { diff --git a/cmd/cgr-loader/migrator_rc8.go b/cmd/cgr-loader/migrator_rc8.go new file mode 100644 index 000000000..79b560e24 --- /dev/null +++ b/cmd/cgr-loader/migrator_rc8.go @@ -0,0 +1,266 @@ +package main + +import ( + "fmt" + "strings" + "time" + + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "github.com/hoisie/redis" +) + +const OLD_ACCOUNT_PREFIX = "ubl_" + +type MigratorRC8 struct { + dbNb int + db *redis.Client + ms engine.Marshaler +} + +func NewMigratorRC8(address string, db int, pass, mrshlerStr string) (*MigratorRC8, error) { + ndb := &redis.Client{Addr: address, Db: db, Password: pass} + + var mrshler engine.Marshaler + if mrshlerStr == utils.MSGPACK { + mrshler = engine.NewCodecMsgpackMarshaler() + } else if mrshlerStr == utils.JSON { + mrshler = new(engine.JSONMarshaler) + } else { + return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr) + } + return &MigratorRC8{db: ndb, dbNb: db, ms: mrshler}, nil +} + +type Account struct { + Id string + BalanceMap map[string]BalanceChain + UnitCounters []*UnitsCounter + ActionTriggers ActionTriggers + AllowNegative bool + Disabled bool +} +type BalanceChain []*Balance + +type Balance struct { + Uuid string //system wide unique + Id string // account wide unique + Value float64 + ExpirationDate time.Time + Weight float64 + DestinationIds string + RatingSubject string + Category string + SharedGroup string + Timings []*engine.RITiming + TimingIDs string + Disabled bool + precision int + account *Account + dirty bool +} + +type UnitsCounter struct { + Direction string + BalanceType string + // Units float64 + Balances BalanceChain // first balance is the general one (no destination) +} + +type ActionTriggers []*ActionTrigger + +type ActionTrigger struct { + Id string + ThresholdType string + ThresholdValue float64 + Recurrent bool + MinSleep time.Duration + BalanceId string + BalanceType string + BalanceDirection string + BalanceDestinationIds string + BalanceWeight float64 + BalanceExpirationDate time.Time + BalanceTimingTags string + BalanceRatingSubject string + BalanceCategory string + BalanceSharedGroup string + BalanceDisabled bool + Weight float64 + ActionsId string + MinQueuedItems int + Executed bool +} + +func (mig MigratorRC8) migrateAccounts() error { + keys, err := mig.db.Keys(OLD_ACCOUNT_PREFIX + "*") + if err != nil { + return err + } + newAccounts := make([]*engine.Account, len(keys)) + // get existing accounts + for keyIndex, key := range keys { + values, err := mig.db.Get(key) + if err != nil { + continue + } + var oldAcc Account + if err = mig.ms.Unmarshal(values, &oldAcc); err != nil { + return err + } + // transfer data into new structurse + newAcc := &engine.Account{ + Id: oldAcc.Id, + BalanceMap: make(map[string]engine.BalanceChain, len(oldAcc.BalanceMap)), + UnitCounters: make([]*engine.UnitsCounter, len(oldAcc.UnitCounters)), + ActionTriggers: make(engine.ActionTriggers, len(oldAcc.ActionTriggers)), + AllowNegative: oldAcc.AllowNegative, + Disabled: oldAcc.Disabled, + } + // balances + for oldBalKey, oldBalChain := range oldAcc.BalanceMap { + keyElements := strings.Split(oldBalKey, "*") + if len(key) != 4 { + return fmt.Errorf("Malformed balance key in %s: %s", oldAcc.Id, oldBalKey) + } + newBalKey := "*" + keyElements[1] + newBalDirection := "*" + keyElements[3] + newAcc.BalanceMap[newBalKey] = make(engine.BalanceChain, len(oldBalChain)) + for index, oldBal := range oldBalChain { + newAcc.BalanceMap[newBalKey][index] = &engine.Balance{ + Uuid: oldBal.Uuid, + Id: oldBal.Id, + Value: oldBal.Value, + Directions: utils.ParseStringMap(newBalDirection), + ExpirationDate: oldBal.ExpirationDate, + Weight: oldBal.Weight, + DestinationIds: utils.ParseStringMap(oldBal.DestinationIds), + RatingSubject: oldBal.RatingSubject, + Categories: utils.ParseStringMap(oldBal.Category), + SharedGroup: oldBal.SharedGroup, + Timings: oldBal.Timings, + TimingIDs: oldBal.TimingIDs, + Disabled: oldBal.Disabled, + } + } + } + // unit counters + for _, oldUc := range oldAcc.UnitCounters { + newUc := &engine.UnitsCounter{ + BalanceType: oldUc.BalanceType, + Balances: make(engine.BalanceChain, len(oldUc.Balances)), + } + for index, oldUcBal := range oldUc.Balances { + newUc.Balances[index] = &engine.Balance{ + Uuid: oldUcBal.Uuid, + Id: oldUcBal.Id, + Value: oldUcBal.Value, + Directions: utils.ParseStringMap(oldUc.Direction), + ExpirationDate: oldUcBal.ExpirationDate, + Weight: oldUcBal.Weight, + DestinationIds: utils.ParseStringMap(oldUcBal.DestinationIds), + RatingSubject: oldUcBal.RatingSubject, + Categories: utils.ParseStringMap(oldUcBal.Category), + SharedGroup: oldUcBal.SharedGroup, + Timings: oldUcBal.Timings, + TimingIDs: oldUcBal.TimingIDs, + Disabled: oldUcBal.Disabled, + } + } + } + // action triggers + + for index, oldAtr := range oldAcc.ActionTriggers { + newAcc.ActionTriggers[index] = &engine.ActionTrigger{ + Id: oldAtr.Id, + ThresholdType: oldAtr.ThresholdType, + ThresholdValue: oldAtr.ThresholdValue, + Recurrent: oldAtr.Recurrent, + MinSleep: oldAtr.MinSleep, + BalanceId: oldAtr.BalanceId, + BalanceType: oldAtr.BalanceType, + BalanceDirections: utils.ParseStringMap(oldAtr.BalanceDirection), + BalanceDestinationIds: utils.ParseStringMap(oldAtr.BalanceDestinationIds), + BalanceWeight: oldAtr.BalanceWeight, + BalanceExpirationDate: oldAtr.BalanceExpirationDate, + BalanceTimingTags: oldAtr.BalanceTimingTags, + BalanceRatingSubject: oldAtr.BalanceRatingSubject, + BalanceCategories: utils.ParseStringMap(oldAtr.BalanceCategory), + BalanceSharedGroup: oldAtr.BalanceSharedGroup, + BalanceDisabled: oldAtr.BalanceDisabled, + Weight: oldAtr.Weight, + ActionsId: oldAtr.ActionsId, + MinQueuedItems: oldAtr.MinQueuedItems, + Executed: oldAtr.Executed, + } + } + newAccounts[keyIndex] = newAcc + } + // write data back + for _, newAcc := range newAccounts { + result, err := mig.ms.Marshal(newAcc) + if err != nil { + return err + } + if err := mig.db.Set(utils.ACCOUNT_PREFIX+newAcc.Id, result); err != nil { + return err + } + } + // delete old data + _, err = mig.db.Del(OLD_ACCOUNT_PREFIX + "*") + return err +} + +func (mig MigratorRC8) migrateActionTriggers() error { + keys, err := mig.db.Keys(utils.ACTION_TRIGGER_PREFIX + "*") + if err != nil { + return err + } + newAtrsMap := make(map[string]engine.ActionTriggers, len(keys)) + for _, key := range keys { + var oldAtrs ActionTriggers + var values []byte + if values, err = mig.db.Get(key); err == nil { + if err := mig.ms.Unmarshal(values, &oldAtrs); err != nil { + return err + } + } + newAtrs := make(engine.ActionTriggers, len(oldAtrs)) + for index, oldAtr := range oldAtrs { + newAtrs[index] = &engine.ActionTrigger{ + Id: oldAtr.Id, + ThresholdType: oldAtr.ThresholdType, + ThresholdValue: oldAtr.ThresholdValue, + Recurrent: oldAtr.Recurrent, + MinSleep: oldAtr.MinSleep, + BalanceId: oldAtr.BalanceId, + BalanceType: oldAtr.BalanceType, + BalanceDirections: utils.ParseStringMap(oldAtr.BalanceDirection), + BalanceDestinationIds: utils.ParseStringMap(oldAtr.BalanceDestinationIds), + BalanceWeight: oldAtr.BalanceWeight, + BalanceExpirationDate: oldAtr.BalanceExpirationDate, + BalanceTimingTags: oldAtr.BalanceTimingTags, + BalanceRatingSubject: oldAtr.BalanceRatingSubject, + BalanceCategories: utils.ParseStringMap(oldAtr.BalanceCategory), + BalanceSharedGroup: oldAtr.BalanceSharedGroup, + BalanceDisabled: oldAtr.BalanceDisabled, + Weight: oldAtr.Weight, + ActionsId: oldAtr.ActionsId, + MinQueuedItems: oldAtr.MinQueuedItems, + Executed: oldAtr.Executed, + } + } + newAtrsMap[key] = newAtrs + } + // write data back + for key, atrs := range newAtrsMap { + result, err := mig.ms.Marshal(&atrs) + if err != nil { + return err + } + if err = mig.db.Set(key, result); err != nil { + return err + } + } + return nil +}