From aca4c5a8619e8d3554c9aa5a235b0c50409406d0 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Sat, 8 Jun 2013 19:12:24 +0300 Subject: [PATCH] first draft of DataStorage loading --- rater/action_timing.go | 17 +-- rater/actions_test.go | 4 +- rater/loader_db.go | 235 ++++++++++++------------------------- rater/loader_helpers.go | 4 + rater/ratingprofile.go | 7 +- rater/storage_gosexy.go | 9 +- rater/storage_interface.go | 5 +- rater/storage_map.go | 32 ++++- rater/storage_mongo.go | 30 ++++- rater/storage_mysql.go | 152 +++++++++++++++++++++--- rater/storage_postgres.go | 230 +++++++++++++++++++++++++++++++++++- rater/storage_redigo.go | 32 ++++- rater/storage_redis.go | 32 ++++- rater/userbalance.go | 13 +- scheduler/scheduler.go | 2 +- 15 files changed, 585 insertions(+), 219 deletions(-) diff --git a/rater/action_timing.go b/rater/action_timing.go index ad0539cb4..eb4d1189f 100644 --- a/rater/action_timing.go +++ b/rater/action_timing.go @@ -35,14 +35,15 @@ const ( ) type ActionTiming struct { - Id string // uniquely identify the timing - Tag string // informative purpos only - UserBalanceIds []string - Timing *Interval - Weight float64 - ActionsId string - actions Actions - stCache time.Time + Id string // uniquely identify the timing + Tag string // informative purpos only + UserBalanceIds []string + Timing *Interval + Weight float64 + ActionsId string + actions Actions + stCache time.Time + actionsTag, timingsTag string // used only for loading } type ActionTimings []*ActionTiming diff --git a/rater/actions_test.go b/rater/actions_test.go index 62078c8b6..7193faa1d 100644 --- a/rater/actions_test.go +++ b/rater/actions_test.go @@ -713,7 +713,7 @@ func TestActionTriggerLogging(t *testing.T) { storageGetter.LogActionTrigger("rif", RATER_SOURCE, at, as) //expected := "rif*some_uuid;MONETARY;OUT;NAT;TEST_ACTIONS;100;10;false*|TOPUP|MONETARY|OUT|10|0" var key string - atMap, _ := storageGetter.GetAllActionTimings() + atMap, _ := storageGetter.GetAllActionTimings("") for k, v := range atMap { _ = k _ = v @@ -755,7 +755,7 @@ func TestActionTimingLogging(t *testing.T) { storageGetter.LogActionTiming(SCHED_SOURCE, at, as) //expected := "some uuid|test|one,two,three|;1,2,3,4,5,6,7,8,9,10,11,12;1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31;1,2,3,4,5;18:00:00;00:00:00;10;0;1;60;1|10|TEST_ACTIONS*|TOPUP|MONETARY|OUT|10|0" var key string - atMap, _ := storageGetter.GetAllActionTimings() + atMap, _ := storageGetter.GetAllActionTimings("") for k, v := range atMap { _ = k _ = v diff --git a/rater/loader_db.go b/rater/loader_db.go index 4178e13fe..32d411857 100644 --- a/rater/loader_db.go +++ b/rater/loader_db.go @@ -19,7 +19,10 @@ along with this program. If not, see package rater import ( + "errors" + "fmt" "log" + "time" ) type DbReader struct { @@ -121,35 +124,35 @@ func (dbr *DbReader) LoadDestinations(tpid string) (err error) { return } -func (dbr *DbReader) LoadRates(tpid string) error { - dbr.rates, err := dbr.storDB.GetAllRates(tpid) - return err +func (dbr *DbReader) LoadRates(tpid string) (err error) { + dbr.rates, err = dbr.storDB.GetAllRates(tpid) + return err } -func (dbr *DbReader) LoadTimings(tpid string) error { - dbr.timings, err := dbr.storDB.GetAllTimings(tpid) +func (dbr *DbReader) LoadTimings(tpid string) (err error) { + dbr.timings, err = dbr.storDB.GetAllTimings(tpid) return err } func (dbr *DbReader) LoadRateTimings(tpid string) error { rts, err := dbr.storDB.GetAllRateTimings(tpid) if err != nil { - return nil, err + return err } for _, rt := range rts { ts, exists := dbr.timings[rt.TimingsTag] if !exists { - return errors.New(fmt.Sprintf("Could not get timing for tag %v", timings_tag)) + return errors.New(fmt.Sprintf("Could not get timing for tag %v", rt.TimingsTag)) } for _, t := range ts { rateTiming := &RateTiming{ - RatesTag: rates_tag, - Weight: weight, + RatesTag: rt.RatesTag, + Weight: rt.Weight, timing: t, } - rs, exists := dbr.rates[rates_tag] + rs, exists := dbr.rates[rt.RatesTag] if !exists { - return errors.New(fmt.Sprintf("Could not find rate for tag %v", rates_tag)) + return errors.New(fmt.Sprintf("Could not find rate for tag %v", rt.RatesTag)) } for _, r := range rs { _, exists := dbr.activationPeriods[rt.Tag] @@ -158,182 +161,100 @@ func (dbr *DbReader) LoadRateTimings(tpid string) error { } dbr.activationPeriods[rt.Tag].AddIntervalIfNotPresent(rateTiming.GetInterval(r)) } - } - } + } + } return nil } func (dbr *DbReader) LoadRatingProfiles(tpid string) error { rpfs, err := dbr.storDB.GetAllRatingProfiles(tpid) + if err != nil { + return err + } for _, rp := range rpfs { + at, err := time.Parse(time.RFC3339, rp.activationTime) + if err != nil { + return errors.New(fmt.Sprintf("Cannot parse activation time from %v", rp.activationTime)) + } for _, d := range dbr.destinations { - ap, exists := dbr.activationPeriods[rates_timing_tag] + ap, exists := dbr.activationPeriods[rp.ratesTimingTag] if !exists { - return errors.New(fmt.Sprintf("Could not load rating timing for tag: %v", rates_timing_tag)) + return errors.New(fmt.Sprintf("Could not load rating timing for tag: %v", rp.ratesTimingTag)) } newAP := &ActivationPeriod{ActivationTime: at} //copy(newAP.Intervals, ap.Intervals) newAP.Intervals = append(newAP.Intervals, ap.Intervals...) rp.AddActivationPeriodIfNotPresent(d.Id, newAP) - if fallbacksubject != "" { - rp.FallbackKey = fmt.Sprintf("%s:%s:%s:%s", direction, tenant, tor, fallbacksubject) + if rp.fallbackSubject != "" { + rp.FallbackKey = fmt.Sprintf("%s:%s:%s:%s", rp.direction, rp.tenant, rp.tor, rp.fallbackSubject) } } } return nil } -func (dbr *DbReader) LoadActions(tpid string) error { - /*rows, err := dbr.db.Query("SELECT * FROM tp_actions WHERE tpid=?", tpid) - if err != nil { - return err - } - for rows.Next() { - var id int - var units, rate, minutes_weight, weight float64 - var tpid, tag, action, balances_tag, direction, destinations_tag, rate_type string - if err := rows.Scan(&id, &tpid, &tag, &action, &balances_tag, &direction, &units, &destinations_tag, &rate_type, &rate, &minutes_weight, &weight); err != nil { - return err - } - var a *Action - if balances_tag != MINUTES { - a = &Action{ - ActionType: action, - BalanceId: balances_tag, - Direction: direction, - Units: units, - } - } else { - var percent, price float64 - if rate_type == PERCENT { - percent = rate - } - if rate_type == ABSOLUTE { - price = rate - } - a = &Action{ - Id: GenUUID(), - ActionType: action, - BalanceId: balances_tag, - Direction: direction, - Weight: weight, - MinuteBucket: &MinuteBucket{ - Seconds: units, - Weight: minutes_weight, - Price: price, - Percent: percent, - DestinationId: destinations_tag, - }, - } - } - dbr.actions[tag] = append(dbr.actions[tag], a) - } - return rows.Err()*/ - return nil +func (dbr *DbReader) LoadActions(tpid string) (err error) { + dbr.actions, err = dbr.storDB.GetAllActions(tpid) + return err } -func (dbr *DbReader) LoadActionTimings(tpid string) error { - /*rows, err := dbr.db.Query("SELECT * FROM tp_action_timings WHERE tpid=?", tpid) +func (dbr *DbReader) LoadActionTimings(tpid string) (err error) { + atsMap, err := dbr.storDB.GetAllActionTimings(tpid) if err != nil { return err } - for rows.Next() { - var id int - var weight float64 - var tpid, tag, actions_tag, timings_tag string - if err := rows.Scan(&id, &tpid, &tag, &actions_tag, &timings_tag, &weight); err != nil { - return err + for tag, ats := range atsMap { + for _, at := range ats { + _, exists := dbr.actions[at.ActionsId] + if !exists { + return errors.New(fmt.Sprintf("ActionTiming: Could not load the action for tag: %v", at.ActionsId)) + } + ts, exists := dbr.timings[at.Tag] + if !exists { + return errors.New(fmt.Sprintf("ActionTiming: Could not load the timing for tag: %v", at.Tag)) + } + for _, t := range ts { + actTmg := &ActionTiming{ + Id: GenUUID(), + Tag: at.Tag, + Weight: at.Weight, + Timing: &Interval{ + Months: t.Months, + MonthDays: t.MonthDays, + WeekDays: t.WeekDays, + StartTime: t.StartTime, + }, + ActionsId: at.ActionsId, + } + dbr.actionsTimings[tag] = append(dbr.actionsTimings[tag], actTmg) + } } - _, exists := dbr.actions[actions_tag] + } + return err +} + +func (dbr *DbReader) LoadActionTriggers(tpid string) (err error) { + dbr.actionsTriggers, err = dbr.storDB.GetAllActionTriggers(tpid) + return err +} + +func (dbr *DbReader) LoadAccountActions(tpid string) (err error) { + dbr.accountActions, err = dbr.storDB.GetAllUserBalances(tpid) + for _, ub := range dbr.accountActions { + aTimings, exists := dbr.actionsTimings[ub.actionTimingsTag] if !exists { - return errors.New(fmt.Sprintf("ActionTiming: Could not load the action for tag: %v", actions_tag)) - } - ts, exists := dbr.timings[timings_tag] - if !exists { - return errors.New(fmt.Sprintf("ActionTiming: Could not load the timing for tag: %v", timings_tag)) - } - for _, t := range ts { - at := &ActionTiming{ - Id: GenUUID(), - Tag: timings_tag, - Weight: weight, - Timing: &Interval{ - Months: t.Months, - MonthDays: t.MonthDays, - WeekDays: t.WeekDays, - StartTime: t.StartTime, - }, - ActionsId: actions_tag, - } - dbr.actionsTimings[tag] = append(dbr.actionsTimings[tag], at) - } - } - return rows.Err()*/ - return nil -} - -func (dbr *DbReader) LoadActionTriggers(tpid string) error { - /*rows, err := dbr.db.Query("SELECT * FROM tp_action_triggers WHERE tpid=?", tpid) - if err != nil { - return err - } - for rows.Next() { - var id int - var threshold, weight float64 - var tpid, tag, balances_tag, direction, destinations_tag, actions_tag string - if err := rows.Scan(&id, &tpid, &tag, &balances_tag, &direction, &threshold, &destinations_tag, &actions_tag, &weight); err != nil { - return err - } - - at := &ActionTrigger{ - Id: GenUUID(), - BalanceId: balances_tag, - Direction: direction, - ThresholdValue: threshold, - DestinationId: destinations_tag, - ActionsId: actions_tag, - Weight: weight, - } - dbr.actionsTriggers[tag] = append(dbr.actionsTriggers[tag], at) - } - return rows.Err()*/ - return nil -} - -func (dbr *DbReader) LoadAccountActions() error { - /*rows, err := dbr.db.Query("SELECT * FROM tp_account_actions WHERE tpid=?", tpid) - if err != nil { - return err - } - for rows.Next() { - var id int - var tpid, tenant, account, direction, action_timings_tag, action_triggers_tag string - if err := rows.Scan(&id, &tpid, &tenant, &account, &direction, &action_timings_tag, &action_triggers_tag); err != nil { - return err - } - - tag := fmt.Sprintf("%s:%s:%s", direction, tenant, account) - aTriggers, exists := dbr.actionsTriggers[action_triggers_tag] - if action_triggers_tag != "" && !exists { - // only return error if there was something ther for the tag - return errors.New(fmt.Sprintf("Could not get action triggers for tag %v", action_triggers_tag)) - } - ub := &UserBalance{ - Type: UB_TYPE_PREPAID, - Id: tag, - ActionTriggers: aTriggers, - } - dbr.accountActions = append(dbr.accountActions, ub) - - aTimings, exists := dbr.actionsTimings[action_timings_tag] - if !exists { - log.Printf("Could not get action timing for tag %v", action_timings_tag) + log.Printf("Could not get action timing for tag %v", ub.actionTimingsTag) // must not continue here } for _, at := range aTimings { - at.UserBalanceIds = append(at.UserBalanceIds, tag) + aTriggers, exists := dbr.actionsTriggers[ub.actionTriggersTag] + if ub.actionTriggersTag != "" && !exists { + // only return error if there was something ther for the tag + return errors.New(fmt.Sprintf("Could not get action triggers for tag %v", ub.actionTriggersTag)) + } + ub.ActionTriggers = aTriggers + at.UserBalanceIds = append(at.UserBalanceIds, ub.Id) } } - return rows.Err()*/ return nil } diff --git a/rater/loader_helpers.go b/rater/loader_helpers.go index bfbaa3a75..4b7296fd3 100644 --- a/rater/loader_helpers.go +++ b/rater/loader_helpers.go @@ -133,6 +133,10 @@ func (rt *RateTiming) GetInterval(r *Rate) (i *Interval) { return } +type AccountAction struct { + Tenant, Account, Direction, ActionTimingsTag, ActionTriggersTag string +} + func ValidateCSVData(fn string, re *regexp.Regexp) (err error) { fin, err := os.Open(fn) if err != nil { diff --git a/rater/ratingprofile.go b/rater/ratingprofile.go index 6002b581a..e8242f566 100644 --- a/rater/ratingprofile.go +++ b/rater/ratingprofile.go @@ -29,9 +29,10 @@ const ( ) type RatingProfile struct { - Id string - FallbackKey string - DestinationMap map[string][]*ActivationPeriod + Id string + FallbackKey string + DestinationMap map[string][]*ActivationPeriod + tenant, tor, direction, subject, fallbackSubject, ratesTimingTag, activationTime string // used only for loading } // Adds an activation period that applyes to current rating profile if not already present. diff --git a/rater/storage_gosexy.go b/rater/storage_gosexy.go index 762a9489c..17a4f61ad 100644 --- a/rater/storage_gosexy.go +++ b/rater/storage_gosexy.go @@ -150,12 +150,12 @@ func (rs *GosexyStorage) SetActionTimings(key string, ats ActionTimings) (err er return } -func (rs *GosexyStorage) GetAllActionTimings(tpid string) (ats map[string]ActionTimings, err error) { +func (rs *GosexyStorage) GetAllActionTimings(tpid string) (ats map[string][]*ActionTiming, err error) { keys, err := rs.db.Keys(ACTION_TIMING_PREFIX + tpid + "*") if err != nil { return nil, err } - ats = make(map[string]ActionTimings, len(keys)) + ats = make(map[string][]*ActionTiming, len(keys)) for _, key := range keys { values, err := rs.db.Get(key) if err != nil { @@ -163,7 +163,7 @@ func (rs *GosexyStorage) GetAllActionTimings(tpid string) (ats map[string]Action } var tempAts ActionTimings err = rs.ms.Unmarshal([]byte(values), &tempAts) - ats[key[len(ACTION_TIMING_PREFIX):]] = tempAts + ats[key[len(ACTION_TIMING_PREFIX+tpid):]] = tempAts } return @@ -248,3 +248,6 @@ func (rs *GosexyStorage) GetAllActions(string) (map[string][]*Action, error) { func (rs *GosexyStorage) GetAllActionTriggers(string) (map[string][]*ActionTrigger, error) { return nil, nil } +func (rs *GosexyStorage) GetAllUserBalances(string) ([]*UserBalance, error) { + return nil, nil +} diff --git a/rater/storage_interface.go b/rater/storage_interface.go index 9afa982c2..89cf54b64 100644 --- a/rater/storage_interface.go +++ b/rater/storage_interface.go @@ -63,7 +63,7 @@ type DataStorage interface { SetUserBalance(*UserBalance) error GetActionTimings(string) (ActionTimings, error) SetActionTimings(string, ActionTimings) error - GetAllActionTimings(string) (map[string]ActionTimings, error) + GetAllActionTimings(string) (map[string][]*ActionTiming, error) SetCdr(utils.CDR) error SetRatedCdr(utils.CDR, *CallCost) error //GetAllActionTimingsLogs() (map[string]ActionsTimings, error) @@ -78,8 +78,9 @@ type DataStorage interface { GetAllTimings(string) (map[string][]*Timing, error) GetAllRateTimings(string) ([]*RateTiming, error) GetAllRatingProfiles(string) (map[string]*RatingProfile, error) - GetAllAllActions(string) (map[string][]*Action, error) + GetAllActions(string) (map[string][]*Action, error) GetAllActionTriggers(string) (map[string][]*ActionTrigger, error) + GetAllUserBalances(string) ([]*UserBalance, error) } type Marshaler interface { diff --git a/rater/storage_map.go b/rater/storage_map.go index 20ef89f0b..68be5b5de 100644 --- a/rater/storage_map.go +++ b/rater/storage_map.go @@ -124,15 +124,15 @@ func (ms *MapStorage) SetActionTimings(key string, ats ActionTimings) (err error return } -func (ms *MapStorage) GetAllActionTimings() (ats map[string]ActionTimings, err error) { - ats = make(map[string]ActionTimings) +func (ms *MapStorage) GetAllActionTimings(tpid string) (ats map[string][]*ActionTiming, err error) { + ats = make(map[string][]*ActionTiming) for key, value := range ms.dict { - if !strings.Contains(key, ACTION_TIMING_PREFIX) { + if !strings.Contains(key, ACTION_TIMING_PREFIX+tpid) { continue } var tempAts ActionTimings err = ms.ms.Unmarshal(value, &tempAts) - ats[key[len(ACTION_TIMING_PREFIX):]] = tempAts + ats[key[len(ACTION_TIMING_PREFIX+tpid):]] = tempAts } return @@ -192,6 +192,28 @@ func (ms *MapStorage) SetRatedCdr(utils.CDR, *CallCost) error { return nil } -func (ms *MapStorage) GetDestinations(tpid string) ([]*Destination, error) { +func (ms *MapStorage) GetAllDestinations(tpid string) ([]*Destination, error) { + return nil, nil +} + +func (ms *MapStorage) GetAllRates(string) (map[string][]*Rate, error) { + return nil, nil +} +func (ms *MapStorage) GetAllTimings(string) (map[string][]*Timing, error) { + return nil, nil +} +func (ms *MapStorage) GetAllRateTimings(string) ([]*RateTiming, error) { + return nil, nil +} +func (ms *MapStorage) GetAllRatingProfiles(string) (map[string]*RatingProfile, error) { + return nil, nil +} +func (ms *MapStorage) GetAllActions(string) (map[string][]*Action, error) { + return nil, nil +} +func (ms *MapStorage) GetAllActionTriggers(string) (map[string][]*ActionTrigger, error) { + return nil, nil +} +func (ms *MapStorage) GetAllUserBalances(string) ([]*UserBalance, error) { return nil, nil } diff --git a/rater/storage_mongo.go b/rater/storage_mongo.go index 6d1df35c9..0748512ee 100644 --- a/rater/storage_mongo.go +++ b/rater/storage_mongo.go @@ -176,10 +176,10 @@ func (ms *MongoStorage) SetActionTimings(key string, ats ActionTimings) error { return ms.db.C("actiontimings").Insert(&AtKeyValue{key, ats}) } -func (ms *MongoStorage) GetAllActionTimings() (ats map[string]ActionTimings, err error) { +func (ms *MongoStorage) GetAllActionTimings(tpid string) (ats map[string][]*ActionTiming, err error) { result := AtKeyValue{} iter := ms.db.C("actiontimings").Find(nil).Iter() - ats = make(map[string]ActionTimings) + ats = make(map[string][]*ActionTiming) for iter.Next(&result) { ats[result.Key] = result.Value } @@ -220,3 +220,29 @@ func (ms *MongoStorage) SetRatedCdr(utils.CDR, *CallCost) error { func (ms *MongoStorage) GetDestinations(tpid string) ([]*Destination, error) { return nil, nil } + +func (ms *MongoStorage) GetAllDestinations(tpid string) ([]*Destination, error) { + return nil, nil +} + +func (ms *MongoStorage) GetAllRates(string) (map[string][]*Rate, error) { + return nil, nil +} +func (ms *MongoStorage) GetAllTimings(string) (map[string][]*Timing, error) { + return nil, nil +} +func (ms *MongoStorage) GetAllRateTimings(string) ([]*RateTiming, error) { + return nil, nil +} +func (ms *MongoStorage) GetAllRatingProfiles(string) (map[string]*RatingProfile, error) { + return nil, nil +} +func (ms *MongoStorage) GetAllActions(string) (map[string][]*Action, error) { + return nil, nil +} +func (ms *MongoStorage) GetAllActionTriggers(string) (map[string][]*ActionTrigger, error) { + return nil, nil +} +func (ms *MongoStorage) GetAllUserBalances(string) ([]*UserBalance, error) { + return nil, nil +} diff --git a/rater/storage_mysql.go b/rater/storage_mysql.go index 61c4217ef..e16f567e0 100644 --- a/rater/storage_mysql.go +++ b/rater/storage_mysql.go @@ -77,7 +77,30 @@ func (mys *MySQLStorage) GetActionTimings(key string) (ats ActionTimings, err er func (mys *MySQLStorage) SetActionTimings(key string, ats ActionTimings) (err error) { return } -func (mys *MySQLStorage) GetAllActionTimings() (ats map[string]ActionTimings, err error) { return } +func (mys *MySQLStorage) GetAllActionTimings(tpid string) (ats map[string][]*ActionTiming, err error) { + ats = make(map[string][]*ActionTiming) + rows, err := mys.Db.Query("SELECT * FROM tp_action_timings WHERE tpid=?", tpid) + if err != nil { + return nil, err + } + for rows.Next() { + var id int + var weight float64 + var tpid, tag, actions_tag, timings_tag string + if err := rows.Scan(&id, &tpid, &tag, &actions_tag, &timings_tag, &weight); err != nil { + return nil, err + } + + at := &ActionTiming{ + Id: GenUUID(), + Tag: timings_tag, + Weight: weight, + ActionsId: actions_tag, + } + ats[tag] = append(ats[tag], at) + } + return ats, rows.Err() +} func (mys *MySQLStorage) LogCallCost(uuid, source string, cc *CallCost) (err error) { if mys.Db == nil { @@ -225,9 +248,9 @@ func (mys *MySQLStorage) GetAllRates(tpid string) (map[string][]*Rate, error) { return rts, rows.Err() } -func (mys *MySQLStorage) GetAllTimings(string) (map[string][]*Timing, error) { +func (mys *MySQLStorage) GetAllTimings(tpid string) (map[string][]*Timing, error) { tms := make(map[string][]*Timing) - rows, err := dbr.db.Query("SELECT * FROM tp_timings WHERE tpid=?", tpid) + rows, err := mys.Db.Query("SELECT * FROM tp_timings WHERE tpid=?", tpid) if err != nil { return nil, err } @@ -243,7 +266,7 @@ func (mys *MySQLStorage) GetAllTimings(string) (map[string][]*Timing, error) { return tms, rows.Err() } -func (mys *MySQLStorage) GetAllRateTimings(string) ([]*RateTiming, error) { +func (mys *MySQLStorage) GetAllRateTimings(tpid string) ([]*RateTiming, error) { var rts []*RateTiming rows, err := mys.Db.Query("SELECT * FROM tp_rate_timings WHERE tpid=?", tpid) if err != nil { @@ -254,7 +277,7 @@ func (mys *MySQLStorage) GetAllRateTimings(string) ([]*RateTiming, error) { var weight float64 var tpid, tag, rates_tag, timings_tag string if err := rows.Scan(&id, &tpid, &tag, &rates_tag, &timings_tag, &weight); err != nil { - return err + return nil, err } rt := &RateTiming{ Tag: tag, @@ -267,22 +290,18 @@ func (mys *MySQLStorage) GetAllRateTimings(string) ([]*RateTiming, error) { return rts, rows.Err() } -func (mys *MySQLStorage) GetAllRatingProfiles(string) (map[string]*RatingProfile, error) { +func (mys *MySQLStorage) GetAllRatingProfiles(tpid string) (map[string]*RatingProfile, error) { rpfs := make(map[string]*RatingProfile) rows, err := mys.Db.Query("SELECT * FROM tp_rate_profiles WHERE tpid=?", tpid) if err != nil { - return err + return nil, err } for rows.Next() { var id int var tpid, tenant, tor, direction, subject, fallbacksubject, rates_timing_tag, activation_time string if err := rows.Scan(&id, &tpid, &tenant, &tor, &direction, &subject, &fallbacksubject, &rates_timing_tag, &activation_time); err != nil { - return err - } - at, err := time.Parse(time.RFC3339, activation_time) - if err != nil { - return errors.New(fmt.Sprintf("Cannot parse activation time from %v", activation_time)) + return nil, err } key := fmt.Sprintf("%s:%s:%s:%s", direction, tenant, tor, subject) rp, ok := rpfs[key] @@ -290,13 +309,112 @@ func (mys *MySQLStorage) GetAllRatingProfiles(string) (map[string]*RatingProfile rp = &RatingProfile{Id: key} rpfs[key] = rp } - + rp.tor = tor + rp.direction = direction + rp.subject = subject + rp.fallbackSubject = fallbacksubject + rp.ratesTimingTag = rates_timing_tag + rp.activationTime = activation_time } return rpfs, rows.Err() } -func (mys *MySQLStorage) GetAllActions(string) (map[string][]*Action, error) { - return nil, nil +func (mys *MySQLStorage) GetAllActions(tpid string) (map[string][]*Action, error) { + as := make(map[string][]*Action) + rows, err := mys.Db.Query("SELECT * FROM tp_actions WHERE tpid=?", tpid) + if err != nil { + return nil, err + } + for rows.Next() { + var id int + var units, rate, minutes_weight, weight float64 + var tpid, tag, action, balances_tag, direction, destinations_tag, rate_type string + if err := rows.Scan(&id, &tpid, &tag, &action, &balances_tag, &direction, &units, &destinations_tag, &rate_type, &rate, &minutes_weight, &weight); err != nil { + return nil, err + } + var a *Action + if balances_tag != MINUTES { + a = &Action{ + ActionType: action, + BalanceId: balances_tag, + Direction: direction, + Units: units, + } + } else { + var percent, price float64 + if rate_type == PERCENT { + percent = rate + } + if rate_type == ABSOLUTE { + price = rate + } + a = &Action{ + Id: GenUUID(), + ActionType: action, + BalanceId: balances_tag, + Direction: direction, + Weight: weight, + MinuteBucket: &MinuteBucket{ + Seconds: units, + Weight: minutes_weight, + Price: price, + Percent: percent, + DestinationId: destinations_tag, + }, + } + } + as[tag] = append(as[tag], a) + } + return as, rows.Err() } -func (mys *MySQLStorage) GetAllActionTriggers(string) (map[string][]*ActionTrigger, error) { - return nil, nil +func (mys *MySQLStorage) GetAllActionTriggers(tpid string) (map[string][]*ActionTrigger, error) { + ats := make(map[string][]*ActionTrigger) + rows, err := mys.Db.Query("SELECT * FROM tp_action_triggers WHERE tpid=?", tpid) + if err != nil { + return nil, err + } + for rows.Next() { + var id int + var threshold, weight float64 + var tpid, tag, balances_tag, direction, destinations_tag, actions_tag string + if err := rows.Scan(&id, &tpid, &tag, &balances_tag, &direction, &threshold, &destinations_tag, &actions_tag, &weight); err != nil { + return nil, err + } + + at := &ActionTrigger{ + Id: GenUUID(), + BalanceId: balances_tag, + Direction: direction, + ThresholdValue: threshold, + DestinationId: destinations_tag, + ActionsId: actions_tag, + Weight: weight, + } + ats[tag] = append(ats[tag], at) + } + return ats, rows.Err() +} + +func (mys *MySQLStorage) GetAllUserBalances(tpid string) ([]*UserBalance, error) { + var ubs []*UserBalance + rows, err := mys.Db.Query("SELECT * FROM tp_account_actions WHERE tpid=?", tpid) + if err != nil { + return nil, err + } + for rows.Next() { + var id int + var tpid, tenant, account, direction, action_timings_tag, action_triggers_tag string + if err := rows.Scan(&id, &tpid, &tenant, &account, &direction, &action_timings_tag, &action_triggers_tag); err != nil { + return nil, err + } + + tag := fmt.Sprintf("%s:%s:%s", direction, tenant, account) + ub := &UserBalance{ + Type: UB_TYPE_PREPAID, + Id: tag, + actionTriggersTag: action_triggers_tag, + actionTimingsTag: action_timings_tag, + } + ubs = append(ubs, ub) + } + return ubs, rows.Err() } diff --git a/rater/storage_postgres.go b/rater/storage_postgres.go index e234826e9..49c43b995 100644 --- a/rater/storage_postgres.go +++ b/rater/storage_postgres.go @@ -77,7 +77,9 @@ func (psl *PostgresStorage) GetActionTimings(key string) (ats ActionTimings, err func (psl *PostgresStorage) SetActionTimings(key string, ats ActionTimings) (err error) { return } -func (psl *PostgresStorage) GetAllActionTimings() (ats map[string]ActionTimings, err error) { return } +func (psl *PostgresStorage) GetAllActionTimings(tpid string) (ats map[string][]*ActionTiming, err error) { + return +} func (psl *PostgresStorage) LogCallCost(uuid, source string, cc *CallCost) (err error) { if psl.Db == nil { @@ -173,6 +175,228 @@ func (psl *PostgresStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost) (err error) return } -func (psl *PostgresStorage) GetDestinations(tpid string) ([]*Destination, error) { - return nil, nil +func (psl *PostgresStorage) GetAllDestinations(tpid string) ([]*Destination, error) { + var dests []*Destination + rows, err := psl.Db.Query("SELECT * FROM tp_destinations WHERE tpid=?", tpid) + if err != nil { + return nil, err + } + for rows.Next() { + var id int + var tpid, tag, prefix string + if err := rows.Scan(id, tpid, &tag, &prefix); err != nil { + return nil, err + } + var dest *Destination + for _, d := range dests { + if d.Id == tag { + dest = d + break + } + } + if dest == nil { + dest = &Destination{Id: tag} + dests = append(dests, dest) + } + dest.Prefixes = append(dest.Prefixes, prefix) + } + return dests, rows.Err() +} + +func (psl *PostgresStorage) GetAllRates(tpid string) (map[string][]*Rate, error) { + rts := make(map[string][]*Rate) + rows, err := psl.Db.Query("SELECT * FROM tp_rates WHERE tpid=?", tpid) + if err != nil { + return nil, err + } + for rows.Next() { + var id int + var tpid, tag, destinations_tag string + var connect_fee, rate, priced_units, rate_increments float64 + if err := rows.Scan(&id, &tpid, &tag, &destinations_tag, &connect_fee, &rate, &priced_units, &rate_increments); err != nil { + return nil, err + } + + r := &Rate{ + DestinationsTag: destinations_tag, + ConnectFee: connect_fee, + Price: rate, + PricedUnits: priced_units, + RateIncrements: rate_increments, + } + + rts[tag] = append(rts[tag], r) + } + return rts, rows.Err() +} + +func (psl *PostgresStorage) GetAllTimings(tpid string) (map[string][]*Timing, error) { + tms := make(map[string][]*Timing) + rows, err := psl.Db.Query("SELECT * FROM tp_timings WHERE tpid=?", tpid) + if err != nil { + return nil, err + } + for rows.Next() { + var id int + var tpid, tag, years, months, month_days, week_days, start_time string + if err := rows.Scan(&id, &tpid, &tag, &years, &months, &month_days, &week_days, &start_time); err != nil { + return nil, err + } + t := NewTiming(years, months, month_days, week_days, start_time) + tms[tag] = append(tms[tag], t) + } + return tms, rows.Err() +} + +func (psl *PostgresStorage) GetAllRateTimings(tpid string) ([]*RateTiming, error) { + var rts []*RateTiming + rows, err := psl.Db.Query("SELECT * FROM tp_rate_timings WHERE tpid=?", tpid) + if err != nil { + return nil, err + } + for rows.Next() { + var id int + var weight float64 + var tpid, tag, rates_tag, timings_tag string + if err := rows.Scan(&id, &tpid, &tag, &rates_tag, &timings_tag, &weight); err != nil { + return nil, err + } + rt := &RateTiming{ + Tag: tag, + RatesTag: rates_tag, + Weight: weight, + TimingsTag: timings_tag, + } + rts = append(rts, rt) + } + return rts, rows.Err() +} + +func (psl *PostgresStorage) GetAllRatingProfiles(tpid string) (map[string]*RatingProfile, error) { + rpfs := make(map[string]*RatingProfile) + rows, err := psl.Db.Query("SELECT * FROM tp_rate_profiles WHERE tpid=?", tpid) + if err != nil { + return nil, err + } + for rows.Next() { + var id int + var tpid, tenant, tor, direction, subject, fallbacksubject, rates_timing_tag, activation_time string + + if err := rows.Scan(&id, &tpid, &tenant, &tor, &direction, &subject, &fallbacksubject, &rates_timing_tag, &activation_time); err != nil { + return nil, err + } + key := fmt.Sprintf("%s:%s:%s:%s", direction, tenant, tor, subject) + rp, ok := rpfs[key] + if !ok { + rp = &RatingProfile{Id: key} + rpfs[key] = rp + } + rp.tor = tor + rp.direction = direction + rp.subject = subject + rp.fallbackSubject = fallbacksubject + rp.ratesTimingTag = rates_timing_tag + rp.activationTime = activation_time + } + return rpfs, rows.Err() +} +func (psl *PostgresStorage) GetAllActions(tpid string) (map[string][]*Action, error) { + as := make(map[string][]*Action) + rows, err := psl.Db.Query("SELECT * FROM tp_actions WHERE tpid=?", tpid) + if err != nil { + return nil, err + } + for rows.Next() { + var id int + var units, rate, minutes_weight, weight float64 + var tpid, tag, action, balances_tag, direction, destinations_tag, rate_type string + if err := rows.Scan(&id, &tpid, &tag, &action, &balances_tag, &direction, &units, &destinations_tag, &rate_type, &rate, &minutes_weight, &weight); err != nil { + return nil, err + } + var a *Action + if balances_tag != MINUTES { + a = &Action{ + ActionType: action, + BalanceId: balances_tag, + Direction: direction, + Units: units, + } + } else { + var percent, price float64 + if rate_type == PERCENT { + percent = rate + } + if rate_type == ABSOLUTE { + price = rate + } + a = &Action{ + Id: GenUUID(), + ActionType: action, + BalanceId: balances_tag, + Direction: direction, + Weight: weight, + MinuteBucket: &MinuteBucket{ + Seconds: units, + Weight: minutes_weight, + Price: price, + Percent: percent, + DestinationId: destinations_tag, + }, + } + } + as[tag] = append(as[tag], a) + } + return as, rows.Err() +} +func (psl *PostgresStorage) GetAllActionTriggers(tpid string) (map[string][]*ActionTrigger, error) { + ats := make(map[string][]*ActionTrigger) + rows, err := psl.Db.Query("SELECT * FROM tp_action_triggers WHERE tpid=?", tpid) + if err != nil { + return nil, err + } + for rows.Next() { + var id int + var threshold, weight float64 + var tpid, tag, balances_tag, direction, destinations_tag, actions_tag string + if err := rows.Scan(&id, &tpid, &tag, &balances_tag, &direction, &threshold, &destinations_tag, &actions_tag, &weight); err != nil { + return nil, err + } + + at := &ActionTrigger{ + Id: GenUUID(), + BalanceId: balances_tag, + Direction: direction, + ThresholdValue: threshold, + DestinationId: destinations_tag, + ActionsId: actions_tag, + Weight: weight, + } + ats[tag] = append(ats[tag], at) + } + return ats, rows.Err() +} + +func (psl *PostgresStorage) GetAllUserBalances(tpid string) ([]*UserBalance, error) { + var ubs []*UserBalance + rows, err := psl.Db.Query("SELECT * FROM tp_account_actions WHERE tpid=?", tpid) + if err != nil { + return nil, err + } + for rows.Next() { + var id int + var tpid, tenant, account, direction, action_timings_tag, action_triggers_tag string + if err := rows.Scan(&id, &tpid, &tenant, &account, &direction, &action_timings_tag, &action_triggers_tag); err != nil { + return nil, err + } + + tag := fmt.Sprintf("%s:%s:%s", direction, tenant, account) + ub := &UserBalance{ + Type: UB_TYPE_PREPAID, + Id: tag, + actionTriggersTag: action_triggers_tag, + actionTimingsTag: action_timings_tag, + } + ubs = append(ubs, ub) + } + return ubs, rows.Err() } diff --git a/rater/storage_redigo.go b/rater/storage_redigo.go index 7abf19f29..06bc54d3e 100644 --- a/rater/storage_redigo.go +++ b/rater/storage_redigo.go @@ -142,8 +142,8 @@ func (rs *RedigoStorage) SetActionTimings(key string, ats ActionTimings) (err er return } -func (rs *RedigoStorage) GetAllActionTimings() (ats map[string]ActionTimings, err error) { - reply, err := redis.Values(rs.db.Do("keys", ACTION_TIMING_PREFIX+"*")) +func (rs *RedigoStorage) GetAllActionTimings(tpid string) (ats map[string][]*ActionTiming, err error) { + reply, err := redis.Values(rs.db.Do("keys", ACTION_TIMING_PREFIX+tpid+"*")) if err != nil { return nil, err } @@ -153,7 +153,7 @@ func (rs *RedigoStorage) GetAllActionTimings() (ats map[string]ActionTimings, er keys = append(keys, string(v)) } } - ats = make(map[string]ActionTimings, len(keys)) + ats = make(map[string][]*ActionTiming, len(keys)) for _, key := range keys { values, err := redis.Bytes(rs.db.Do("get", key)) if err != nil { @@ -161,7 +161,7 @@ func (rs *RedigoStorage) GetAllActionTimings() (ats map[string]ActionTimings, er } var tempAts ActionTimings err = rs.ms.Unmarshal(values, &tempAts) - ats[key[len(ACTION_TIMING_PREFIX):]] = tempAts + ats[key[len(ACTION_TIMING_PREFIX+tpid):]] = tempAts } return @@ -224,6 +224,28 @@ func (rs *RedigoStorage) SetRatedCdr(utils.CDR, *CallCost) error { return nil } -func (rs *RedigoStorage) GetDestinations(tpid string) ([]*Destination, error) { +func (ms *RedigoStorage) GetAllDestinations(tpid string) ([]*Destination, error) { + return nil, nil +} + +func (ms *RedigoStorage) GetAllRates(string) (map[string][]*Rate, error) { + return nil, nil +} +func (ms *RedigoStorage) GetAllTimings(string) (map[string][]*Timing, error) { + return nil, nil +} +func (ms *RedigoStorage) GetAllRateTimings(string) ([]*RateTiming, error) { + return nil, nil +} +func (ms *RedigoStorage) GetAllRatingProfiles(string) (map[string]*RatingProfile, error) { + return nil, nil +} +func (ms *RedigoStorage) GetAllActions(string) (map[string][]*Action, error) { + return nil, nil +} +func (ms *RedigoStorage) GetAllActionTriggers(string) (map[string][]*ActionTrigger, error) { + return nil, nil +} +func (ms *RedigoStorage) GetAllUserBalances(string) ([]*UserBalance, error) { return nil, nil } diff --git a/rater/storage_redis.go b/rater/storage_redis.go index bd8cddc30..001dbc892 100644 --- a/rater/storage_redis.go +++ b/rater/storage_redis.go @@ -169,12 +169,12 @@ func (rs *RedisStorage) SetActionTimings(key string, ats ActionTimings) (err err return } -func (rs *RedisStorage) GetAllActionTimings() (ats map[string]ActionTimings, err error) { - keys, err := rs.db.Cmd("keys", ACTION_TIMING_PREFIX+"*").List() +func (rs *RedisStorage) GetAllActionTimings(tpid string) (ats map[string][]*ActionTiming, err error) { + keys, err := rs.db.Cmd("keys", ACTION_TIMING_PREFIX+tpid+"*").List() if err != nil { return } - ats = make(map[string]ActionTimings, len(keys)) + ats = make(map[string][]*ActionTiming, len(keys)) for _, key := range keys { values, err := rs.db.Cmd("get", key).Bytes() if err != nil { @@ -182,7 +182,7 @@ func (rs *RedisStorage) GetAllActionTimings() (ats map[string]ActionTimings, err } var tempAts ActionTimings err = rs.ms.Unmarshal(values, &tempAts) - ats[key[len(ACTION_TIMING_PREFIX):]] = tempAts + ats[key[len(ACTION_TIMING_PREFIX+tpid):]] = tempAts } return @@ -251,6 +251,28 @@ func (rs *RedisStorage) SetRatedCdr(utils.CDR, *CallCost) error { return nil } -func (rs *RedisStorage) GetDestinations(tpid string) ([]*Destination, error) { +func (ms *RedisStorage) GetAllDestinations(tpid string) ([]*Destination, error) { + return nil, nil +} + +func (ms *RedisStorage) GetAllRates(string) (map[string][]*Rate, error) { + return nil, nil +} +func (ms *RedisStorage) GetAllTimings(string) (map[string][]*Timing, error) { + return nil, nil +} +func (ms *RedisStorage) GetAllRateTimings(string) ([]*RateTiming, error) { + return nil, nil +} +func (ms *RedisStorage) GetAllRatingProfiles(string) (map[string]*RatingProfile, error) { + return nil, nil +} +func (ms *RedisStorage) GetAllActions(string) (map[string][]*Action, error) { + return nil, nil +} +func (ms *RedisStorage) GetAllActionTriggers(string) (map[string][]*ActionTrigger, error) { + return nil, nil +} +func (ms *RedisStorage) GetAllUserBalances(string) ([]*UserBalance, error) { return nil, nil } diff --git a/rater/userbalance.go b/rater/userbalance.go index 332d8360a..a2db6882c 100644 --- a/rater/userbalance.go +++ b/rater/userbalance.go @@ -44,12 +44,13 @@ const ( Structure containing information about user's credit (minutes, cents, sms...).' */ type UserBalance struct { - Id string - Type string // prepaid-postpaid - BalanceMap map[string]float64 - MinuteBuckets []*MinuteBucket - UnitCounters []*UnitsCounter - ActionTriggers ActionTriggerPriotityList + Id string + Type string // prepaid-postpaid + BalanceMap map[string]float64 + MinuteBuckets []*MinuteBucket + UnitCounters []*UnitsCounter + ActionTriggers ActionTriggerPriotityList + actionTriggersTag, actionTimingsTag string // used only for loading } /* diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 492a28f9a..23865143f 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -64,7 +64,7 @@ func (s *Scheduler) Loop() { } func (s *Scheduler) LoadActionTimings(storage rater.DataStorage) { - actionTimings, err := storage.GetAllActionTimings() + actionTimings, err := storage.GetAllActionTimings("") if err != nil { rater.Logger.Warning(fmt.Sprintf("Cannot get action timings: %v", err)) }