mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
added db loader
This commit is contained in:
@@ -30,6 +30,7 @@ import (
|
||||
|
||||
const (
|
||||
POSTGRES = "postgres"
|
||||
MYSQL = "mysql"
|
||||
MONGO = "mongo"
|
||||
REDIS = "redis"
|
||||
)
|
||||
@@ -44,6 +45,7 @@ var (
|
||||
db_pass = flag.String("dbpass", "", "The database user's password.")
|
||||
|
||||
flush = flag.Bool("flush", false, "Flush the database before importing")
|
||||
dataDbId = flag.String("tpid", "", "The tariff plan id from the database")
|
||||
dataPath = flag.String("path", ".", "The path containing the data files")
|
||||
version = flag.Bool("version", false, "Prints the application version.")
|
||||
|
||||
@@ -71,6 +73,91 @@ func main() {
|
||||
fmt.Println("CGRateS " + rater.VERSION)
|
||||
return
|
||||
}
|
||||
var err error
|
||||
var getter rater.DataStorage
|
||||
switch *db_type {
|
||||
case REDIS:
|
||||
db_nb, err := strconv.Atoi(*db_name)
|
||||
if err != nil {
|
||||
log.Fatal("Redis db name must be an integer!")
|
||||
}
|
||||
if *db_port != "" {
|
||||
*db_host += ":" + *db_port
|
||||
}
|
||||
getter, err = rater.NewGosexyStorage(*db_host, db_nb, *db_pass)
|
||||
case MONGO:
|
||||
getter, err = rater.NewMongoStorage(*db_host, *db_port, *db_name, *db_user, *db_pass)
|
||||
case MYSQL:
|
||||
getter, err = rater.NewMySQLStorage(*db_host, *db_port, *db_name, *db_user, *db_pass)
|
||||
case POSTGRES:
|
||||
getter, err = rater.NewPostgresStorage(*db_host, *db_port, *db_name, *db_user, *db_pass)
|
||||
default:
|
||||
log.Fatal("Unknown data db type, exiting!")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("Could not open database connection: %v", err)
|
||||
}
|
||||
|
||||
if *dataDbId != "" && *dataPath != "" {
|
||||
log.Fatal("You can read either from db or from files, not both.")
|
||||
}
|
||||
if *dataPath != "" {
|
||||
loadFromCSVFile(getter)
|
||||
}
|
||||
if *dataDbId != "" {
|
||||
loadFromDb(getter)
|
||||
}
|
||||
}
|
||||
|
||||
func loadFromDb(getter rater.DataStorage) {
|
||||
// TODO: how do we read from db
|
||||
//dbr := rater.NewDbReader(getter, *dataDbId)
|
||||
var dbr *rater.DbReader
|
||||
err := dbr.LoadDestinations()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
err = dbr.LoadRates()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
err = dbr.LoadTimings()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
err = dbr.LoadRateTimings()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
err = dbr.LoadRatingProfiles()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
err = dbr.LoadActions()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
err = dbr.LoadActionTimings()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
err = dbr.LoadActionTriggers()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
err = dbr.LoadAccountActions()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// write maps to database
|
||||
if err := dbr.WriteToDatabase(getter, *flush, true); err != nil {
|
||||
log.Fatal("Could not write to database: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
func loadFromCSVFile(getter rater.DataStorage) {
|
||||
dataFilesValidators := []*validator{
|
||||
&validator{destinationsFn,
|
||||
regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\d+.?\d*){1}$`),
|
||||
@@ -145,28 +232,6 @@ func main() {
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
var getter rater.DataStorage
|
||||
switch *db_type {
|
||||
case REDIS:
|
||||
db_nb, err := strconv.Atoi(*db_name)
|
||||
if err != nil {
|
||||
log.Fatal("Redis db name must be an integer!")
|
||||
}
|
||||
if *db_port != "" {
|
||||
*db_host += ":" + *db_port
|
||||
}
|
||||
getter, err = rater.NewGosexyStorage(*db_host, db_nb, *db_pass)
|
||||
case MONGO:
|
||||
getter, err = rater.NewMongoStorage(*db_host, *db_port, *db_name, *db_user, *db_pass)
|
||||
case POSTGRES:
|
||||
getter, err = rater.NewPostgresStorage(*db_host, *db_port, *db_name, *db_user, *db_pass)
|
||||
default:
|
||||
log.Fatal("Unknown data db type, exiting!")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("Could not open database connection: %v", err)
|
||||
}
|
||||
|
||||
// write maps to database
|
||||
if err := csvr.WriteToDatabase(getter, *flush, true); err != nil {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
--
|
||||
-- Table structure for table `tp_timings`
|
||||
--
|
||||
??? all float instead int
|
||||
|
||||
CREATE TABLE `tp_timings` (
|
||||
`id` int(11) NOT NULL AUTO_INCREMENT,
|
||||
@@ -39,6 +40,7 @@ CREATE TABLE `tp_rates` (
|
||||
`destinations_tag` varchar(24) NOT NULL,
|
||||
`connect_fee` DECIMAL(5,4) NOT NULL,
|
||||
`rate` DECIMAL(5,4) NOT NULL,
|
||||
??? priced_units
|
||||
`rate_increments` INT(11) NOT NULL,
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `tpid` (`tpid`)
|
||||
|
||||
420
rater/loader_db.go
Normal file
420
rater/loader_db.go
Normal file
@@ -0,0 +1,420 @@
|
||||
/*
|
||||
Rating system designed to be used in VoIP Carriers World
|
||||
Copyright (C) 2013 ITsysCOM
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package rater
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
type DbReader struct {
|
||||
tpid string
|
||||
db *sql.DB
|
||||
actions map[string][]*Action
|
||||
actionsTimings map[string][]*ActionTiming
|
||||
actionsTriggers map[string][]*ActionTrigger
|
||||
accountActions []*UserBalance
|
||||
destinations []*Destination
|
||||
rates map[string][]*Rate
|
||||
timings map[string][]*Timing
|
||||
activationPeriods map[string]*ActivationPeriod
|
||||
ratingProfiles map[string]*RatingProfile
|
||||
}
|
||||
|
||||
func NewDbReader(db *sql.DB, tpid string) *DbReader {
|
||||
c := new(DbReader)
|
||||
c.db = db
|
||||
c.tpid = tpid
|
||||
c.actions = make(map[string][]*Action)
|
||||
c.actionsTimings = make(map[string][]*ActionTiming)
|
||||
c.actionsTriggers = make(map[string][]*ActionTrigger)
|
||||
c.rates = make(map[string][]*Rate)
|
||||
c.timings = make(map[string][]*Timing)
|
||||
c.activationPeriods = make(map[string]*ActivationPeriod)
|
||||
c.ratingProfiles = make(map[string]*RatingProfile)
|
||||
return c
|
||||
}
|
||||
|
||||
func (dbr *DbReader) WriteToDatabase(storage DataStorage, flush, verbose bool) (err error) {
|
||||
if flush {
|
||||
storage.Flush()
|
||||
}
|
||||
if verbose {
|
||||
log.Print("Destinations")
|
||||
}
|
||||
for _, d := range dbr.destinations {
|
||||
err = storage.SetDestination(d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if verbose {
|
||||
log.Print(d.Id, " : ", d.Prefixes)
|
||||
}
|
||||
}
|
||||
if verbose {
|
||||
log.Print("Rating profiles")
|
||||
}
|
||||
for _, rp := range dbr.ratingProfiles {
|
||||
err = storage.SetRatingProfile(rp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if verbose {
|
||||
log.Print(rp.Id)
|
||||
}
|
||||
}
|
||||
if verbose {
|
||||
log.Print("Action timings")
|
||||
}
|
||||
for k, ats := range dbr.actionsTimings {
|
||||
err = storage.SetActionTimings(k, ats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if verbose {
|
||||
log.Println(k)
|
||||
}
|
||||
}
|
||||
if verbose {
|
||||
log.Print("Actions")
|
||||
}
|
||||
for k, as := range dbr.actions {
|
||||
err = storage.SetActions(k, as)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if verbose {
|
||||
log.Println(k)
|
||||
}
|
||||
}
|
||||
if verbose {
|
||||
log.Print("Account actions")
|
||||
}
|
||||
for _, ub := range dbr.accountActions {
|
||||
err = storage.SetUserBalance(ub)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if verbose {
|
||||
log.Println(ub.Id)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (dbr *DbReader) LoadDestinations() error {
|
||||
rows, err := dbr.db.Query("SELECT * FROM tp_destinations WHERE tpid=?", dbr.tpid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for rows.Next() {
|
||||
var id int
|
||||
var tpid, tag, prefix string
|
||||
if err := rows.Scan(id, tpid, &tag, &prefix); err != nil {
|
||||
return err
|
||||
}
|
||||
var dest *Destination
|
||||
for _, d := range dbr.destinations {
|
||||
if d.Id == tag {
|
||||
dest = d
|
||||
break
|
||||
}
|
||||
}
|
||||
if dest == nil {
|
||||
dest = &Destination{Id: tag}
|
||||
dbr.destinations = append(dbr.destinations, dest)
|
||||
}
|
||||
dest.Prefixes = append(dest.Prefixes, prefix)
|
||||
}
|
||||
return rows.Err()
|
||||
}
|
||||
|
||||
func (dbr *DbReader) LoadRates() error {
|
||||
rows, err := dbr.db.Query("SELECT * FROM tp_rates WHERE tpid=?", dbr.tpid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for rows.Next() {
|
||||
var id int
|
||||
var tpid, tag, destinations_tag string
|
||||
var connect_fee, rate, priced_units, rate_increments float64
|
||||
if err := rows.Scan(&id, &tpid, &tag, &destinations_tag, &connect_fee, &rate, &priced_units, &rate_increments); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r := &Rate{
|
||||
DestinationsTag: destinations_tag,
|
||||
ConnectFee: connect_fee,
|
||||
Price: rate,
|
||||
PricedUnits: priced_units,
|
||||
RateIncrements: rate_increments,
|
||||
}
|
||||
|
||||
dbr.rates[tag] = append(dbr.rates[tag], r)
|
||||
}
|
||||
return rows.Err()
|
||||
}
|
||||
|
||||
func (dbr *DbReader) LoadTimings() error {
|
||||
rows, err := dbr.db.Query("SELECT * FROM tp_timings WHERE tpid=?", dbr.tpid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for rows.Next() {
|
||||
var id int
|
||||
var tpid, tag, years, months, month_days, week_days, start_time string
|
||||
if err := rows.Scan(&id, &tpid, &tag, &years, &months, &month_days, &week_days, &start_time); err != nil {
|
||||
return err
|
||||
}
|
||||
t := NewTiming(years, months, month_days, week_days, start_time)
|
||||
dbr.timings[tag] = append(dbr.timings[tag], t)
|
||||
}
|
||||
return rows.Err()
|
||||
}
|
||||
|
||||
func (dbr *DbReader) LoadRateTimings() error {
|
||||
rows, err := dbr.db.Query("SELECT * FROM tp_rate_timings WHERE tpid=?", dbr.tpid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for rows.Next() {
|
||||
var id int
|
||||
var weight float64
|
||||
var tpid, tag, rates_tag, timings_tag string
|
||||
if err := rows.Scan(&id, &tpid, &tag, &rates_tag, &timings_tag, &weight); err != nil {
|
||||
return err
|
||||
}
|
||||
ts, exists := dbr.timings[timings_tag]
|
||||
if !exists {
|
||||
return errors.New(fmt.Sprintf("Could not get timing for tag %v", timings_tag))
|
||||
}
|
||||
for _, t := range ts {
|
||||
rt := &RateTiming{
|
||||
RatesTag: rates_tag,
|
||||
Weight: weight,
|
||||
timing: t,
|
||||
}
|
||||
rs, exists := dbr.rates[rates_tag]
|
||||
if !exists {
|
||||
return errors.New(fmt.Sprintf("Could not find rate for tag %v", rates_tag))
|
||||
}
|
||||
for _, r := range rs {
|
||||
_, exists := dbr.activationPeriods[tag]
|
||||
if !exists {
|
||||
dbr.activationPeriods[tag] = &ActivationPeriod{}
|
||||
}
|
||||
dbr.activationPeriods[tag].AddIntervalIfNotPresent(rt.GetInterval(r))
|
||||
}
|
||||
}
|
||||
}
|
||||
return rows.Err()
|
||||
}
|
||||
|
||||
func (dbr *DbReader) LoadRatingProfiles() error {
|
||||
rows, err := dbr.db.Query("SELECT * FROM tp_rate_profiles WHERE tpid=?", dbr.tpid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for rows.Next() {
|
||||
var id int
|
||||
var tpid, tenant, tor, direction, subject, fallbacksubject, rates_timing_tag, activation_time string
|
||||
|
||||
if err := rows.Scan(&id, &tpid, &tenant, &tor, &direction, &subject, &fallbacksubject, &rates_timing_tag, &activation_time); err != nil {
|
||||
return err
|
||||
}
|
||||
at, err := time.Parse(time.RFC3339, activation_time)
|
||||
if err != nil {
|
||||
return errors.New(fmt.Sprintf("Cannot parse activation time from %v", activation_time))
|
||||
}
|
||||
key := fmt.Sprintf("%s:%s:%s:%s", direction, tenant, tor, subject)
|
||||
rp, ok := dbr.ratingProfiles[key]
|
||||
if !ok {
|
||||
rp = &RatingProfile{Id: key}
|
||||
dbr.ratingProfiles[key] = rp
|
||||
}
|
||||
for _, d := range dbr.destinations {
|
||||
ap, exists := dbr.activationPeriods[rates_timing_tag]
|
||||
if !exists {
|
||||
return errors.New(fmt.Sprintf("Could not load rating timing for tag: %v", rates_timing_tag))
|
||||
}
|
||||
newAP := &ActivationPeriod{ActivationTime: at}
|
||||
//copy(newAP.Intervals, ap.Intervals)
|
||||
newAP.Intervals = append(newAP.Intervals, ap.Intervals...)
|
||||
rp.AddActivationPeriodIfNotPresent(d.Id, newAP)
|
||||
if fallbacksubject != "" {
|
||||
rp.FallbackKey = fmt.Sprintf("%s:%s:%s:%s", direction, tenant, tor, fallbacksubject)
|
||||
}
|
||||
}
|
||||
}
|
||||
return rows.Err()
|
||||
}
|
||||
|
||||
func (dbr *DbReader) LoadActions() error {
|
||||
rows, err := dbr.db.Query("SELECT * FROM tp_actions WHERE tpid=?", dbr.tpid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for rows.Next() {
|
||||
var id int
|
||||
var units, rate, minutes_weight, weight float64
|
||||
var tpid, tag, action, balances_tag, direction, destinations_tag, rate_type string
|
||||
if err := rows.Scan(&id, &tpid, &tag, &action, &balances_tag, &direction, &units, &destinations_tag, &rate_type, &rate, &minutes_weight, &weight); err != nil {
|
||||
return err
|
||||
}
|
||||
var a *Action
|
||||
if balances_tag != MINUTES {
|
||||
a = &Action{
|
||||
ActionType: action,
|
||||
BalanceId: balances_tag,
|
||||
Direction: direction,
|
||||
Units: units,
|
||||
}
|
||||
} else {
|
||||
var percent, price float64
|
||||
if rate_type == PERCENT {
|
||||
percent = rate
|
||||
}
|
||||
if rate_type == ABSOLUTE {
|
||||
price = rate
|
||||
}
|
||||
a = &Action{
|
||||
Id: GenUUID(),
|
||||
ActionType: action,
|
||||
BalanceId: balances_tag,
|
||||
Direction: direction,
|
||||
Weight: weight,
|
||||
MinuteBucket: &MinuteBucket{
|
||||
Seconds: units,
|
||||
Weight: minutes_weight,
|
||||
Price: price,
|
||||
Percent: percent,
|
||||
DestinationId: destinations_tag,
|
||||
},
|
||||
}
|
||||
}
|
||||
dbr.actions[tag] = append(dbr.actions[tag], a)
|
||||
}
|
||||
return rows.Err()
|
||||
}
|
||||
|
||||
func (dbr *DbReader) LoadActionTimings() error {
|
||||
rows, err := dbr.db.Query("SELECT * FROM tp_action_timings WHERE tpid=?", dbr.tpid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for rows.Next() {
|
||||
var id int
|
||||
var weight float64
|
||||
var tpid, tag, actions_tag, timings_tag string
|
||||
if err := rows.Scan(&id, &tpid, &tag, &actions_tag, &timings_tag, &weight); err != nil {
|
||||
return err
|
||||
}
|
||||
_, exists := dbr.actions[actions_tag]
|
||||
if !exists {
|
||||
return errors.New(fmt.Sprintf("ActionTiming: Could not load the action for tag: %v", actions_tag))
|
||||
}
|
||||
ts, exists := dbr.timings[timings_tag]
|
||||
if !exists {
|
||||
return errors.New(fmt.Sprintf("ActionTiming: Could not load the timing for tag: %v", timings_tag))
|
||||
}
|
||||
for _, t := range ts {
|
||||
at := &ActionTiming{
|
||||
Id: GenUUID(),
|
||||
Tag: timings_tag,
|
||||
Weight: weight,
|
||||
Timing: &Interval{
|
||||
Months: t.Months,
|
||||
MonthDays: t.MonthDays,
|
||||
WeekDays: t.WeekDays,
|
||||
StartTime: t.StartTime,
|
||||
},
|
||||
ActionsId: actions_tag,
|
||||
}
|
||||
dbr.actionsTimings[tag] = append(dbr.actionsTimings[tag], at)
|
||||
}
|
||||
}
|
||||
return rows.Err()
|
||||
}
|
||||
|
||||
func (dbr *DbReader) LoadActionTriggers() error {
|
||||
rows, err := dbr.db.Query("SELECT * FROM tp_action_triggers WHERE tpid=?", dbr.tpid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for rows.Next() {
|
||||
var id int
|
||||
var threshold, weight float64
|
||||
var tpid, tag, balances_tag, direction, destinations_tag, actions_tag string
|
||||
if err := rows.Scan(&id, &tpid, &tag, &balances_tag, &direction, &threshold, &destinations_tag, &actions_tag, &weight); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
at := &ActionTrigger{
|
||||
Id: GenUUID(),
|
||||
BalanceId: balances_tag,
|
||||
Direction: direction,
|
||||
ThresholdValue: threshold,
|
||||
DestinationId: destinations_tag,
|
||||
ActionsId: actions_tag,
|
||||
Weight: weight,
|
||||
}
|
||||
dbr.actionsTriggers[tag] = append(dbr.actionsTriggers[tag], at)
|
||||
}
|
||||
return rows.Err()
|
||||
}
|
||||
|
||||
func (dbr *DbReader) LoadAccountActions() error {
|
||||
rows, err := dbr.db.Query("SELECT * FROM tp_account_actions WHERE tpid=?", dbr.tpid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for rows.Next() {
|
||||
var id int
|
||||
var tpid, tenant, account, direction, action_timings_tag, action_triggers_tag string
|
||||
if err := rows.Scan(&id, &tpid, &tenant, &account, &direction, &action_timings_tag, &action_triggers_tag); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tag := fmt.Sprintf("%s:%s:%s", direction, tenant, account)
|
||||
aTriggers, exists := dbr.actionsTriggers[action_triggers_tag]
|
||||
if action_triggers_tag != "" && !exists {
|
||||
// only return error if there was something ther for the tag
|
||||
return errors.New(fmt.Sprintf("Could not get action triggers for tag %v", action_triggers_tag))
|
||||
}
|
||||
ub := &UserBalance{
|
||||
Type: UB_TYPE_PREPAID,
|
||||
Id: tag,
|
||||
ActionTriggers: aTriggers,
|
||||
}
|
||||
dbr.accountActions = append(dbr.accountActions, ub)
|
||||
|
||||
aTimings, exists := dbr.actionsTimings[action_timings_tag]
|
||||
if !exists {
|
||||
log.Printf("Could not get action timing for tag %v", action_timings_tag)
|
||||
// must not continue here
|
||||
}
|
||||
for _, at := range aTimings {
|
||||
at.UserBalanceIds = append(at.UserBalanceIds, tag)
|
||||
}
|
||||
}
|
||||
return rows.Err()
|
||||
}
|
||||
@@ -101,7 +101,7 @@ func (rs *RedigoStorage) GetActions(key string) (as Actions, err error) {
|
||||
}
|
||||
|
||||
func (rs *RedigoStorage) SetActions(key string, as Actions) (err error) {
|
||||
result, err := rs.ms.Marshal(as)
|
||||
result, err := rs.ms.Marshal(&as)
|
||||
_, err = rs.db.Do("set", ACTION_PREFIX+key, result)
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user