From dca4cd00388be67e11806d54eeb3bbfed24d9555 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Sat, 1 Jun 2013 16:45:37 +0300 Subject: [PATCH] added db loader --- cmd/cgr-loader/cgr-loader.go | 109 ++++- .../mysql/create_tariffplan_tables.sql | 2 + rater/{csvreader.go => loader_csv.go} | 0 .../{csvreader_test.go => loader_csv_test.go} | 0 rater/loader_db.go | 420 ++++++++++++++++++ ...csvreader_helpers.go => loader_helpers.go} | 0 rater/storage_redigo.go | 2 +- 7 files changed, 510 insertions(+), 23 deletions(-) rename rater/{csvreader.go => loader_csv.go} (100%) rename rater/{csvreader_test.go => loader_csv_test.go} (100%) create mode 100644 rater/loader_db.go rename rater/{csvreader_helpers.go => loader_helpers.go} (100%) diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index a96d3f6c0..46d3a87c6 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -30,6 +30,7 @@ import ( const ( POSTGRES = "postgres" + MYSQL = "mysql" MONGO = "mongo" REDIS = "redis" ) @@ -44,6 +45,7 @@ var ( db_pass = flag.String("dbpass", "", "The database user's password.") flush = flag.Bool("flush", false, "Flush the database before importing") + dataDbId = flag.String("tpid", "", "The tariff plan id from the database") dataPath = flag.String("path", ".", "The path containing the data files") version = flag.Bool("version", false, "Prints the application version.") @@ -71,6 +73,91 @@ func main() { fmt.Println("CGRateS " + rater.VERSION) return } + var err error + var getter rater.DataStorage + switch *db_type { + case REDIS: + db_nb, err := strconv.Atoi(*db_name) + if err != nil { + log.Fatal("Redis db name must be an integer!") + } + if *db_port != "" { + *db_host += ":" + *db_port + } + getter, err = rater.NewGosexyStorage(*db_host, db_nb, *db_pass) + case MONGO: + getter, err = rater.NewMongoStorage(*db_host, *db_port, *db_name, *db_user, *db_pass) + case MYSQL: + getter, err = rater.NewMySQLStorage(*db_host, *db_port, *db_name, *db_user, *db_pass) + case POSTGRES: + getter, err = rater.NewPostgresStorage(*db_host, *db_port, *db_name, *db_user, *db_pass) + default: + log.Fatal("Unknown data db type, exiting!") + } + + if err != nil { + log.Fatalf("Could not open database connection: %v", err) + } + + if *dataDbId != "" && *dataPath != "" { + log.Fatal("You can read either from db or from files, not both.") + } + if *dataPath != "" { + loadFromCSVFile(getter) + } + if *dataDbId != "" { + loadFromDb(getter) + } +} + +func loadFromDb(getter rater.DataStorage) { + // TODO: how do we read from db + //dbr := rater.NewDbReader(getter, *dataDbId) + var dbr *rater.DbReader + err := dbr.LoadDestinations() + if err != nil { + log.Fatal(err) + } + err = dbr.LoadRates() + if err != nil { + log.Fatal(err) + } + err = dbr.LoadTimings() + if err != nil { + log.Fatal(err) + } + err = dbr.LoadRateTimings() + if err != nil { + log.Fatal(err) + } + err = dbr.LoadRatingProfiles() + if err != nil { + log.Fatal(err) + } + err = dbr.LoadActions() + if err != nil { + log.Fatal(err) + } + err = dbr.LoadActionTimings() + if err != nil { + log.Fatal(err) + } + err = dbr.LoadActionTriggers() + if err != nil { + log.Fatal(err) + } + err = dbr.LoadAccountActions() + if err != nil { + log.Fatal(err) + } + + // write maps to database + if err := dbr.WriteToDatabase(getter, *flush, true); err != nil { + log.Fatal("Could not write to database: ", err) + } +} + +func loadFromCSVFile(getter rater.DataStorage) { dataFilesValidators := []*validator{ &validator{destinationsFn, regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\d+.?\d*){1}$`), @@ -145,28 +232,6 @@ func main() { if err != nil { log.Fatal(err) } - var getter rater.DataStorage - switch *db_type { - case REDIS: - db_nb, err := strconv.Atoi(*db_name) - if err != nil { - log.Fatal("Redis db name must be an integer!") - } - if *db_port != "" { - *db_host += ":" + *db_port - } - getter, err = rater.NewGosexyStorage(*db_host, db_nb, *db_pass) - case MONGO: - getter, err = rater.NewMongoStorage(*db_host, *db_port, *db_name, *db_user, *db_pass) - case POSTGRES: - getter, err = rater.NewPostgresStorage(*db_host, *db_port, *db_name, *db_user, *db_pass) - default: - log.Fatal("Unknown data db type, exiting!") - } - - if err != nil { - log.Fatalf("Could not open database connection: %v", err) - } // write maps to database if err := csvr.WriteToDatabase(getter, *flush, true); err != nil { diff --git a/data/storage/mysql/create_tariffplan_tables.sql b/data/storage/mysql/create_tariffplan_tables.sql index 3b9ddbd9c..5004825fb 100644 --- a/data/storage/mysql/create_tariffplan_tables.sql +++ b/data/storage/mysql/create_tariffplan_tables.sql @@ -1,6 +1,7 @@ -- -- Table structure for table `tp_timings` -- +??? all float instead int CREATE TABLE `tp_timings` ( `id` int(11) NOT NULL AUTO_INCREMENT, @@ -39,6 +40,7 @@ CREATE TABLE `tp_rates` ( `destinations_tag` varchar(24) NOT NULL, `connect_fee` DECIMAL(5,4) NOT NULL, `rate` DECIMAL(5,4) NOT NULL, +??? priced_units `rate_increments` INT(11) NOT NULL, PRIMARY KEY (`id`), KEY `tpid` (`tpid`) diff --git a/rater/csvreader.go b/rater/loader_csv.go similarity index 100% rename from rater/csvreader.go rename to rater/loader_csv.go diff --git a/rater/csvreader_test.go b/rater/loader_csv_test.go similarity index 100% rename from rater/csvreader_test.go rename to rater/loader_csv_test.go diff --git a/rater/loader_db.go b/rater/loader_db.go new file mode 100644 index 000000000..6d3d531f9 --- /dev/null +++ b/rater/loader_db.go @@ -0,0 +1,420 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2013 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package rater + +import ( + "database/sql" + "errors" + "fmt" + "log" + "time" +) + +type DbReader struct { + tpid string + db *sql.DB + actions map[string][]*Action + actionsTimings map[string][]*ActionTiming + actionsTriggers map[string][]*ActionTrigger + accountActions []*UserBalance + destinations []*Destination + rates map[string][]*Rate + timings map[string][]*Timing + activationPeriods map[string]*ActivationPeriod + ratingProfiles map[string]*RatingProfile +} + +func NewDbReader(db *sql.DB, tpid string) *DbReader { + c := new(DbReader) + c.db = db + c.tpid = tpid + c.actions = make(map[string][]*Action) + c.actionsTimings = make(map[string][]*ActionTiming) + c.actionsTriggers = make(map[string][]*ActionTrigger) + c.rates = make(map[string][]*Rate) + c.timings = make(map[string][]*Timing) + c.activationPeriods = make(map[string]*ActivationPeriod) + c.ratingProfiles = make(map[string]*RatingProfile) + return c +} + +func (dbr *DbReader) WriteToDatabase(storage DataStorage, flush, verbose bool) (err error) { + if flush { + storage.Flush() + } + if verbose { + log.Print("Destinations") + } + for _, d := range dbr.destinations { + err = storage.SetDestination(d) + if err != nil { + return err + } + if verbose { + log.Print(d.Id, " : ", d.Prefixes) + } + } + if verbose { + log.Print("Rating profiles") + } + for _, rp := range dbr.ratingProfiles { + err = storage.SetRatingProfile(rp) + if err != nil { + return err + } + if verbose { + log.Print(rp.Id) + } + } + if verbose { + log.Print("Action timings") + } + for k, ats := range dbr.actionsTimings { + err = storage.SetActionTimings(k, ats) + if err != nil { + return err + } + if verbose { + log.Println(k) + } + } + if verbose { + log.Print("Actions") + } + for k, as := range dbr.actions { + err = storage.SetActions(k, as) + if err != nil { + return err + } + if verbose { + log.Println(k) + } + } + if verbose { + log.Print("Account actions") + } + for _, ub := range dbr.accountActions { + err = storage.SetUserBalance(ub) + if err != nil { + return err + } + if verbose { + log.Println(ub.Id) + } + } + return +} + +func (dbr *DbReader) LoadDestinations() error { + rows, err := dbr.db.Query("SELECT * FROM tp_destinations WHERE tpid=?", dbr.tpid) + if err != nil { + return err + } + for rows.Next() { + var id int + var tpid, tag, prefix string + if err := rows.Scan(id, tpid, &tag, &prefix); err != nil { + return err + } + var dest *Destination + for _, d := range dbr.destinations { + if d.Id == tag { + dest = d + break + } + } + if dest == nil { + dest = &Destination{Id: tag} + dbr.destinations = append(dbr.destinations, dest) + } + dest.Prefixes = append(dest.Prefixes, prefix) + } + return rows.Err() +} + +func (dbr *DbReader) LoadRates() error { + rows, err := dbr.db.Query("SELECT * FROM tp_rates WHERE tpid=?", dbr.tpid) + if err != nil { + return err + } + for rows.Next() { + var id int + var tpid, tag, destinations_tag string + var connect_fee, rate, priced_units, rate_increments float64 + if err := rows.Scan(&id, &tpid, &tag, &destinations_tag, &connect_fee, &rate, &priced_units, &rate_increments); err != nil { + return err + } + + r := &Rate{ + DestinationsTag: destinations_tag, + ConnectFee: connect_fee, + Price: rate, + PricedUnits: priced_units, + RateIncrements: rate_increments, + } + + dbr.rates[tag] = append(dbr.rates[tag], r) + } + return rows.Err() +} + +func (dbr *DbReader) LoadTimings() error { + rows, err := dbr.db.Query("SELECT * FROM tp_timings WHERE tpid=?", dbr.tpid) + if err != nil { + return err + } + for rows.Next() { + var id int + var tpid, tag, years, months, month_days, week_days, start_time string + if err := rows.Scan(&id, &tpid, &tag, &years, &months, &month_days, &week_days, &start_time); err != nil { + return err + } + t := NewTiming(years, months, month_days, week_days, start_time) + dbr.timings[tag] = append(dbr.timings[tag], t) + } + return rows.Err() +} + +func (dbr *DbReader) LoadRateTimings() error { + rows, err := dbr.db.Query("SELECT * FROM tp_rate_timings WHERE tpid=?", dbr.tpid) + if err != nil { + return err + } + for rows.Next() { + var id int + var weight float64 + var tpid, tag, rates_tag, timings_tag string + if err := rows.Scan(&id, &tpid, &tag, &rates_tag, &timings_tag, &weight); err != nil { + return err + } + ts, exists := dbr.timings[timings_tag] + if !exists { + return errors.New(fmt.Sprintf("Could not get timing for tag %v", timings_tag)) + } + for _, t := range ts { + rt := &RateTiming{ + RatesTag: rates_tag, + Weight: weight, + timing: t, + } + rs, exists := dbr.rates[rates_tag] + if !exists { + return errors.New(fmt.Sprintf("Could not find rate for tag %v", rates_tag)) + } + for _, r := range rs { + _, exists := dbr.activationPeriods[tag] + if !exists { + dbr.activationPeriods[tag] = &ActivationPeriod{} + } + dbr.activationPeriods[tag].AddIntervalIfNotPresent(rt.GetInterval(r)) + } + } + } + return rows.Err() +} + +func (dbr *DbReader) LoadRatingProfiles() error { + rows, err := dbr.db.Query("SELECT * FROM tp_rate_profiles WHERE tpid=?", dbr.tpid) + if err != nil { + return err + } + for rows.Next() { + var id int + var tpid, tenant, tor, direction, subject, fallbacksubject, rates_timing_tag, activation_time string + + if err := rows.Scan(&id, &tpid, &tenant, &tor, &direction, &subject, &fallbacksubject, &rates_timing_tag, &activation_time); err != nil { + return err + } + at, err := time.Parse(time.RFC3339, activation_time) + if err != nil { + return errors.New(fmt.Sprintf("Cannot parse activation time from %v", activation_time)) + } + key := fmt.Sprintf("%s:%s:%s:%s", direction, tenant, tor, subject) + rp, ok := dbr.ratingProfiles[key] + if !ok { + rp = &RatingProfile{Id: key} + dbr.ratingProfiles[key] = rp + } + for _, d := range dbr.destinations { + ap, exists := dbr.activationPeriods[rates_timing_tag] + if !exists { + return errors.New(fmt.Sprintf("Could not load rating timing for tag: %v", rates_timing_tag)) + } + newAP := &ActivationPeriod{ActivationTime: at} + //copy(newAP.Intervals, ap.Intervals) + newAP.Intervals = append(newAP.Intervals, ap.Intervals...) + rp.AddActivationPeriodIfNotPresent(d.Id, newAP) + if fallbacksubject != "" { + rp.FallbackKey = fmt.Sprintf("%s:%s:%s:%s", direction, tenant, tor, fallbacksubject) + } + } + } + return rows.Err() +} + +func (dbr *DbReader) LoadActions() error { + rows, err := dbr.db.Query("SELECT * FROM tp_actions WHERE tpid=?", dbr.tpid) + if err != nil { + return err + } + for rows.Next() { + var id int + var units, rate, minutes_weight, weight float64 + var tpid, tag, action, balances_tag, direction, destinations_tag, rate_type string + if err := rows.Scan(&id, &tpid, &tag, &action, &balances_tag, &direction, &units, &destinations_tag, &rate_type, &rate, &minutes_weight, &weight); err != nil { + return err + } + var a *Action + if balances_tag != MINUTES { + a = &Action{ + ActionType: action, + BalanceId: balances_tag, + Direction: direction, + Units: units, + } + } else { + var percent, price float64 + if rate_type == PERCENT { + percent = rate + } + if rate_type == ABSOLUTE { + price = rate + } + a = &Action{ + Id: GenUUID(), + ActionType: action, + BalanceId: balances_tag, + Direction: direction, + Weight: weight, + MinuteBucket: &MinuteBucket{ + Seconds: units, + Weight: minutes_weight, + Price: price, + Percent: percent, + DestinationId: destinations_tag, + }, + } + } + dbr.actions[tag] = append(dbr.actions[tag], a) + } + return rows.Err() +} + +func (dbr *DbReader) LoadActionTimings() error { + rows, err := dbr.db.Query("SELECT * FROM tp_action_timings WHERE tpid=?", dbr.tpid) + if err != nil { + return err + } + for rows.Next() { + var id int + var weight float64 + var tpid, tag, actions_tag, timings_tag string + if err := rows.Scan(&id, &tpid, &tag, &actions_tag, &timings_tag, &weight); err != nil { + return err + } + _, exists := dbr.actions[actions_tag] + if !exists { + return errors.New(fmt.Sprintf("ActionTiming: Could not load the action for tag: %v", actions_tag)) + } + ts, exists := dbr.timings[timings_tag] + if !exists { + return errors.New(fmt.Sprintf("ActionTiming: Could not load the timing for tag: %v", timings_tag)) + } + for _, t := range ts { + at := &ActionTiming{ + Id: GenUUID(), + Tag: timings_tag, + Weight: weight, + Timing: &Interval{ + Months: t.Months, + MonthDays: t.MonthDays, + WeekDays: t.WeekDays, + StartTime: t.StartTime, + }, + ActionsId: actions_tag, + } + dbr.actionsTimings[tag] = append(dbr.actionsTimings[tag], at) + } + } + return rows.Err() +} + +func (dbr *DbReader) LoadActionTriggers() error { + rows, err := dbr.db.Query("SELECT * FROM tp_action_triggers WHERE tpid=?", dbr.tpid) + if err != nil { + return err + } + for rows.Next() { + var id int + var threshold, weight float64 + var tpid, tag, balances_tag, direction, destinations_tag, actions_tag string + if err := rows.Scan(&id, &tpid, &tag, &balances_tag, &direction, &threshold, &destinations_tag, &actions_tag, &weight); err != nil { + return err + } + + at := &ActionTrigger{ + Id: GenUUID(), + BalanceId: balances_tag, + Direction: direction, + ThresholdValue: threshold, + DestinationId: destinations_tag, + ActionsId: actions_tag, + Weight: weight, + } + dbr.actionsTriggers[tag] = append(dbr.actionsTriggers[tag], at) + } + return rows.Err() +} + +func (dbr *DbReader) LoadAccountActions() error { + rows, err := dbr.db.Query("SELECT * FROM tp_account_actions WHERE tpid=?", dbr.tpid) + if err != nil { + return err + } + for rows.Next() { + var id int + var tpid, tenant, account, direction, action_timings_tag, action_triggers_tag string + if err := rows.Scan(&id, &tpid, &tenant, &account, &direction, &action_timings_tag, &action_triggers_tag); err != nil { + return err + } + + tag := fmt.Sprintf("%s:%s:%s", direction, tenant, account) + aTriggers, exists := dbr.actionsTriggers[action_triggers_tag] + if action_triggers_tag != "" && !exists { + // only return error if there was something ther for the tag + return errors.New(fmt.Sprintf("Could not get action triggers for tag %v", action_triggers_tag)) + } + ub := &UserBalance{ + Type: UB_TYPE_PREPAID, + Id: tag, + ActionTriggers: aTriggers, + } + dbr.accountActions = append(dbr.accountActions, ub) + + aTimings, exists := dbr.actionsTimings[action_timings_tag] + if !exists { + log.Printf("Could not get action timing for tag %v", action_timings_tag) + // must not continue here + } + for _, at := range aTimings { + at.UserBalanceIds = append(at.UserBalanceIds, tag) + } + } + return rows.Err() +} diff --git a/rater/csvreader_helpers.go b/rater/loader_helpers.go similarity index 100% rename from rater/csvreader_helpers.go rename to rater/loader_helpers.go diff --git a/rater/storage_redigo.go b/rater/storage_redigo.go index e85605c47..5581c5a71 100644 --- a/rater/storage_redigo.go +++ b/rater/storage_redigo.go @@ -101,7 +101,7 @@ func (rs *RedigoStorage) GetActions(key string) (as Actions, err error) { } func (rs *RedigoStorage) SetActions(key string, as Actions) (err error) { - result, err := rs.ms.Marshal(as) + result, err := rs.ms.Marshal(&as) _, err = rs.db.Do("set", ACTION_PREFIX+key, result) return }