first draft of DataStorage loading

This commit is contained in:
Radu Ioan Fericean
2013-06-08 19:12:24 +03:00
parent 7c57d3a321
commit aca4c5a861
15 changed files with 585 additions and 219 deletions

View File

@@ -35,14 +35,15 @@ const (
)
type ActionTiming struct {
Id string // uniquely identify the timing
Tag string // informative purpos only
UserBalanceIds []string
Timing *Interval
Weight float64
ActionsId string
actions Actions
stCache time.Time
Id string // uniquely identify the timing
Tag string // informative purpos only
UserBalanceIds []string
Timing *Interval
Weight float64
ActionsId string
actions Actions
stCache time.Time
actionsTag, timingsTag string // used only for loading
}
type ActionTimings []*ActionTiming

View File

@@ -713,7 +713,7 @@ func TestActionTriggerLogging(t *testing.T) {
storageGetter.LogActionTrigger("rif", RATER_SOURCE, at, as)
//expected := "rif*some_uuid;MONETARY;OUT;NAT;TEST_ACTIONS;100;10;false*|TOPUP|MONETARY|OUT|10|0"
var key string
atMap, _ := storageGetter.GetAllActionTimings()
atMap, _ := storageGetter.GetAllActionTimings("")
for k, v := range atMap {
_ = k
_ = v
@@ -755,7 +755,7 @@ func TestActionTimingLogging(t *testing.T) {
storageGetter.LogActionTiming(SCHED_SOURCE, at, as)
//expected := "some uuid|test|one,two,three|;1,2,3,4,5,6,7,8,9,10,11,12;1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31;1,2,3,4,5;18:00:00;00:00:00;10;0;1;60;1|10|TEST_ACTIONS*|TOPUP|MONETARY|OUT|10|0"
var key string
atMap, _ := storageGetter.GetAllActionTimings()
atMap, _ := storageGetter.GetAllActionTimings("")
for k, v := range atMap {
_ = k
_ = v

View File

@@ -19,7 +19,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package rater
import (
"errors"
"fmt"
"log"
"time"
)
type DbReader struct {
@@ -121,35 +124,35 @@ func (dbr *DbReader) LoadDestinations(tpid string) (err error) {
return
}
func (dbr *DbReader) LoadRates(tpid string) error {
dbr.rates, err := dbr.storDB.GetAllRates(tpid)
return err
func (dbr *DbReader) LoadRates(tpid string) (err error) {
dbr.rates, err = dbr.storDB.GetAllRates(tpid)
return err
}
func (dbr *DbReader) LoadTimings(tpid string) error {
dbr.timings, err := dbr.storDB.GetAllTimings(tpid)
func (dbr *DbReader) LoadTimings(tpid string) (err error) {
dbr.timings, err = dbr.storDB.GetAllTimings(tpid)
return err
}
func (dbr *DbReader) LoadRateTimings(tpid string) error {
rts, err := dbr.storDB.GetAllRateTimings(tpid)
if err != nil {
return nil, err
return err
}
for _, rt := range rts {
ts, exists := dbr.timings[rt.TimingsTag]
if !exists {
return errors.New(fmt.Sprintf("Could not get timing for tag %v", timings_tag))
return errors.New(fmt.Sprintf("Could not get timing for tag %v", rt.TimingsTag))
}
for _, t := range ts {
rateTiming := &RateTiming{
RatesTag: rates_tag,
Weight: weight,
RatesTag: rt.RatesTag,
Weight: rt.Weight,
timing: t,
}
rs, exists := dbr.rates[rates_tag]
rs, exists := dbr.rates[rt.RatesTag]
if !exists {
return errors.New(fmt.Sprintf("Could not find rate for tag %v", rates_tag))
return errors.New(fmt.Sprintf("Could not find rate for tag %v", rt.RatesTag))
}
for _, r := range rs {
_, exists := dbr.activationPeriods[rt.Tag]
@@ -158,182 +161,100 @@ func (dbr *DbReader) LoadRateTimings(tpid string) error {
}
dbr.activationPeriods[rt.Tag].AddIntervalIfNotPresent(rateTiming.GetInterval(r))
}
}
}
}
}
return nil
}
func (dbr *DbReader) LoadRatingProfiles(tpid string) error {
rpfs, err := dbr.storDB.GetAllRatingProfiles(tpid)
if err != nil {
return err
}
for _, rp := range rpfs {
at, err := time.Parse(time.RFC3339, rp.activationTime)
if err != nil {
return errors.New(fmt.Sprintf("Cannot parse activation time from %v", rp.activationTime))
}
for _, d := range dbr.destinations {
ap, exists := dbr.activationPeriods[rates_timing_tag]
ap, exists := dbr.activationPeriods[rp.ratesTimingTag]
if !exists {
return errors.New(fmt.Sprintf("Could not load rating timing for tag: %v", rates_timing_tag))
return errors.New(fmt.Sprintf("Could not load rating timing for tag: %v", rp.ratesTimingTag))
}
newAP := &ActivationPeriod{ActivationTime: at}
//copy(newAP.Intervals, ap.Intervals)
newAP.Intervals = append(newAP.Intervals, ap.Intervals...)
rp.AddActivationPeriodIfNotPresent(d.Id, newAP)
if fallbacksubject != "" {
rp.FallbackKey = fmt.Sprintf("%s:%s:%s:%s", direction, tenant, tor, fallbacksubject)
if rp.fallbackSubject != "" {
rp.FallbackKey = fmt.Sprintf("%s:%s:%s:%s", rp.direction, rp.tenant, rp.tor, rp.fallbackSubject)
}
}
}
return nil
}
func (dbr *DbReader) LoadActions(tpid string) error {
/*rows, err := dbr.db.Query("SELECT * FROM tp_actions WHERE tpid=?", tpid)
if err != nil {
return err
}
for rows.Next() {
var id int
var units, rate, minutes_weight, weight float64
var tpid, tag, action, balances_tag, direction, destinations_tag, rate_type string
if err := rows.Scan(&id, &tpid, &tag, &action, &balances_tag, &direction, &units, &destinations_tag, &rate_type, &rate, &minutes_weight, &weight); err != nil {
return err
}
var a *Action
if balances_tag != MINUTES {
a = &Action{
ActionType: action,
BalanceId: balances_tag,
Direction: direction,
Units: units,
}
} else {
var percent, price float64
if rate_type == PERCENT {
percent = rate
}
if rate_type == ABSOLUTE {
price = rate
}
a = &Action{
Id: GenUUID(),
ActionType: action,
BalanceId: balances_tag,
Direction: direction,
Weight: weight,
MinuteBucket: &MinuteBucket{
Seconds: units,
Weight: minutes_weight,
Price: price,
Percent: percent,
DestinationId: destinations_tag,
},
}
}
dbr.actions[tag] = append(dbr.actions[tag], a)
}
return rows.Err()*/
return nil
func (dbr *DbReader) LoadActions(tpid string) (err error) {
dbr.actions, err = dbr.storDB.GetAllActions(tpid)
return err
}
func (dbr *DbReader) LoadActionTimings(tpid string) error {
/*rows, err := dbr.db.Query("SELECT * FROM tp_action_timings WHERE tpid=?", tpid)
func (dbr *DbReader) LoadActionTimings(tpid string) (err error) {
atsMap, err := dbr.storDB.GetAllActionTimings(tpid)
if err != nil {
return err
}
for rows.Next() {
var id int
var weight float64
var tpid, tag, actions_tag, timings_tag string
if err := rows.Scan(&id, &tpid, &tag, &actions_tag, &timings_tag, &weight); err != nil {
return err
for tag, ats := range atsMap {
for _, at := range ats {
_, exists := dbr.actions[at.ActionsId]
if !exists {
return errors.New(fmt.Sprintf("ActionTiming: Could not load the action for tag: %v", at.ActionsId))
}
ts, exists := dbr.timings[at.Tag]
if !exists {
return errors.New(fmt.Sprintf("ActionTiming: Could not load the timing for tag: %v", at.Tag))
}
for _, t := range ts {
actTmg := &ActionTiming{
Id: GenUUID(),
Tag: at.Tag,
Weight: at.Weight,
Timing: &Interval{
Months: t.Months,
MonthDays: t.MonthDays,
WeekDays: t.WeekDays,
StartTime: t.StartTime,
},
ActionsId: at.ActionsId,
}
dbr.actionsTimings[tag] = append(dbr.actionsTimings[tag], actTmg)
}
}
_, exists := dbr.actions[actions_tag]
}
return err
}
func (dbr *DbReader) LoadActionTriggers(tpid string) (err error) {
dbr.actionsTriggers, err = dbr.storDB.GetAllActionTriggers(tpid)
return err
}
func (dbr *DbReader) LoadAccountActions(tpid string) (err error) {
dbr.accountActions, err = dbr.storDB.GetAllUserBalances(tpid)
for _, ub := range dbr.accountActions {
aTimings, exists := dbr.actionsTimings[ub.actionTimingsTag]
if !exists {
return errors.New(fmt.Sprintf("ActionTiming: Could not load the action for tag: %v", actions_tag))
}
ts, exists := dbr.timings[timings_tag]
if !exists {
return errors.New(fmt.Sprintf("ActionTiming: Could not load the timing for tag: %v", timings_tag))
}
for _, t := range ts {
at := &ActionTiming{
Id: GenUUID(),
Tag: timings_tag,
Weight: weight,
Timing: &Interval{
Months: t.Months,
MonthDays: t.MonthDays,
WeekDays: t.WeekDays,
StartTime: t.StartTime,
},
ActionsId: actions_tag,
}
dbr.actionsTimings[tag] = append(dbr.actionsTimings[tag], at)
}
}
return rows.Err()*/
return nil
}
func (dbr *DbReader) LoadActionTriggers(tpid string) error {
/*rows, err := dbr.db.Query("SELECT * FROM tp_action_triggers WHERE tpid=?", tpid)
if err != nil {
return err
}
for rows.Next() {
var id int
var threshold, weight float64
var tpid, tag, balances_tag, direction, destinations_tag, actions_tag string
if err := rows.Scan(&id, &tpid, &tag, &balances_tag, &direction, &threshold, &destinations_tag, &actions_tag, &weight); err != nil {
return err
}
at := &ActionTrigger{
Id: GenUUID(),
BalanceId: balances_tag,
Direction: direction,
ThresholdValue: threshold,
DestinationId: destinations_tag,
ActionsId: actions_tag,
Weight: weight,
}
dbr.actionsTriggers[tag] = append(dbr.actionsTriggers[tag], at)
}
return rows.Err()*/
return nil
}
func (dbr *DbReader) LoadAccountActions() error {
/*rows, err := dbr.db.Query("SELECT * FROM tp_account_actions WHERE tpid=?", tpid)
if err != nil {
return err
}
for rows.Next() {
var id int
var tpid, tenant, account, direction, action_timings_tag, action_triggers_tag string
if err := rows.Scan(&id, &tpid, &tenant, &account, &direction, &action_timings_tag, &action_triggers_tag); err != nil {
return err
}
tag := fmt.Sprintf("%s:%s:%s", direction, tenant, account)
aTriggers, exists := dbr.actionsTriggers[action_triggers_tag]
if action_triggers_tag != "" && !exists {
// only return error if there was something ther for the tag
return errors.New(fmt.Sprintf("Could not get action triggers for tag %v", action_triggers_tag))
}
ub := &UserBalance{
Type: UB_TYPE_PREPAID,
Id: tag,
ActionTriggers: aTriggers,
}
dbr.accountActions = append(dbr.accountActions, ub)
aTimings, exists := dbr.actionsTimings[action_timings_tag]
if !exists {
log.Printf("Could not get action timing for tag %v", action_timings_tag)
log.Printf("Could not get action timing for tag %v", ub.actionTimingsTag)
// must not continue here
}
for _, at := range aTimings {
at.UserBalanceIds = append(at.UserBalanceIds, tag)
aTriggers, exists := dbr.actionsTriggers[ub.actionTriggersTag]
if ub.actionTriggersTag != "" && !exists {
// only return error if there was something ther for the tag
return errors.New(fmt.Sprintf("Could not get action triggers for tag %v", ub.actionTriggersTag))
}
ub.ActionTriggers = aTriggers
at.UserBalanceIds = append(at.UserBalanceIds, ub.Id)
}
}
return rows.Err()*/
return nil
}

View File

@@ -133,6 +133,10 @@ func (rt *RateTiming) GetInterval(r *Rate) (i *Interval) {
return
}
type AccountAction struct {
Tenant, Account, Direction, ActionTimingsTag, ActionTriggersTag string
}
func ValidateCSVData(fn string, re *regexp.Regexp) (err error) {
fin, err := os.Open(fn)
if err != nil {

View File

@@ -29,9 +29,10 @@ const (
)
type RatingProfile struct {
Id string
FallbackKey string
DestinationMap map[string][]*ActivationPeriod
Id string
FallbackKey string
DestinationMap map[string][]*ActivationPeriod
tenant, tor, direction, subject, fallbackSubject, ratesTimingTag, activationTime string // used only for loading
}
// Adds an activation period that applyes to current rating profile if not already present.

View File

@@ -150,12 +150,12 @@ func (rs *GosexyStorage) SetActionTimings(key string, ats ActionTimings) (err er
return
}
func (rs *GosexyStorage) GetAllActionTimings(tpid string) (ats map[string]ActionTimings, err error) {
func (rs *GosexyStorage) GetAllActionTimings(tpid string) (ats map[string][]*ActionTiming, err error) {
keys, err := rs.db.Keys(ACTION_TIMING_PREFIX + tpid + "*")
if err != nil {
return nil, err
}
ats = make(map[string]ActionTimings, len(keys))
ats = make(map[string][]*ActionTiming, len(keys))
for _, key := range keys {
values, err := rs.db.Get(key)
if err != nil {
@@ -163,7 +163,7 @@ func (rs *GosexyStorage) GetAllActionTimings(tpid string) (ats map[string]Action
}
var tempAts ActionTimings
err = rs.ms.Unmarshal([]byte(values), &tempAts)
ats[key[len(ACTION_TIMING_PREFIX):]] = tempAts
ats[key[len(ACTION_TIMING_PREFIX+tpid):]] = tempAts
}
return
@@ -248,3 +248,6 @@ func (rs *GosexyStorage) GetAllActions(string) (map[string][]*Action, error) {
func (rs *GosexyStorage) GetAllActionTriggers(string) (map[string][]*ActionTrigger, error) {
return nil, nil
}
func (rs *GosexyStorage) GetAllUserBalances(string) ([]*UserBalance, error) {
return nil, nil
}

View File

@@ -63,7 +63,7 @@ type DataStorage interface {
SetUserBalance(*UserBalance) error
GetActionTimings(string) (ActionTimings, error)
SetActionTimings(string, ActionTimings) error
GetAllActionTimings(string) (map[string]ActionTimings, error)
GetAllActionTimings(string) (map[string][]*ActionTiming, error)
SetCdr(utils.CDR) error
SetRatedCdr(utils.CDR, *CallCost) error
//GetAllActionTimingsLogs() (map[string]ActionsTimings, error)
@@ -78,8 +78,9 @@ type DataStorage interface {
GetAllTimings(string) (map[string][]*Timing, error)
GetAllRateTimings(string) ([]*RateTiming, error)
GetAllRatingProfiles(string) (map[string]*RatingProfile, error)
GetAllAllActions(string) (map[string][]*Action, error)
GetAllActions(string) (map[string][]*Action, error)
GetAllActionTriggers(string) (map[string][]*ActionTrigger, error)
GetAllUserBalances(string) ([]*UserBalance, error)
}
type Marshaler interface {

View File

@@ -124,15 +124,15 @@ func (ms *MapStorage) SetActionTimings(key string, ats ActionTimings) (err error
return
}
func (ms *MapStorage) GetAllActionTimings() (ats map[string]ActionTimings, err error) {
ats = make(map[string]ActionTimings)
func (ms *MapStorage) GetAllActionTimings(tpid string) (ats map[string][]*ActionTiming, err error) {
ats = make(map[string][]*ActionTiming)
for key, value := range ms.dict {
if !strings.Contains(key, ACTION_TIMING_PREFIX) {
if !strings.Contains(key, ACTION_TIMING_PREFIX+tpid) {
continue
}
var tempAts ActionTimings
err = ms.ms.Unmarshal(value, &tempAts)
ats[key[len(ACTION_TIMING_PREFIX):]] = tempAts
ats[key[len(ACTION_TIMING_PREFIX+tpid):]] = tempAts
}
return
@@ -192,6 +192,28 @@ func (ms *MapStorage) SetRatedCdr(utils.CDR, *CallCost) error {
return nil
}
func (ms *MapStorage) GetDestinations(tpid string) ([]*Destination, error) {
func (ms *MapStorage) GetAllDestinations(tpid string) ([]*Destination, error) {
return nil, nil
}
func (ms *MapStorage) GetAllRates(string) (map[string][]*Rate, error) {
return nil, nil
}
func (ms *MapStorage) GetAllTimings(string) (map[string][]*Timing, error) {
return nil, nil
}
func (ms *MapStorage) GetAllRateTimings(string) ([]*RateTiming, error) {
return nil, nil
}
func (ms *MapStorage) GetAllRatingProfiles(string) (map[string]*RatingProfile, error) {
return nil, nil
}
func (ms *MapStorage) GetAllActions(string) (map[string][]*Action, error) {
return nil, nil
}
func (ms *MapStorage) GetAllActionTriggers(string) (map[string][]*ActionTrigger, error) {
return nil, nil
}
func (ms *MapStorage) GetAllUserBalances(string) ([]*UserBalance, error) {
return nil, nil
}

View File

@@ -176,10 +176,10 @@ func (ms *MongoStorage) SetActionTimings(key string, ats ActionTimings) error {
return ms.db.C("actiontimings").Insert(&AtKeyValue{key, ats})
}
func (ms *MongoStorage) GetAllActionTimings() (ats map[string]ActionTimings, err error) {
func (ms *MongoStorage) GetAllActionTimings(tpid string) (ats map[string][]*ActionTiming, err error) {
result := AtKeyValue{}
iter := ms.db.C("actiontimings").Find(nil).Iter()
ats = make(map[string]ActionTimings)
ats = make(map[string][]*ActionTiming)
for iter.Next(&result) {
ats[result.Key] = result.Value
}
@@ -220,3 +220,29 @@ func (ms *MongoStorage) SetRatedCdr(utils.CDR, *CallCost) error {
func (ms *MongoStorage) GetDestinations(tpid string) ([]*Destination, error) {
return nil, nil
}
func (ms *MongoStorage) GetAllDestinations(tpid string) ([]*Destination, error) {
return nil, nil
}
func (ms *MongoStorage) GetAllRates(string) (map[string][]*Rate, error) {
return nil, nil
}
func (ms *MongoStorage) GetAllTimings(string) (map[string][]*Timing, error) {
return nil, nil
}
func (ms *MongoStorage) GetAllRateTimings(string) ([]*RateTiming, error) {
return nil, nil
}
func (ms *MongoStorage) GetAllRatingProfiles(string) (map[string]*RatingProfile, error) {
return nil, nil
}
func (ms *MongoStorage) GetAllActions(string) (map[string][]*Action, error) {
return nil, nil
}
func (ms *MongoStorage) GetAllActionTriggers(string) (map[string][]*ActionTrigger, error) {
return nil, nil
}
func (ms *MongoStorage) GetAllUserBalances(string) ([]*UserBalance, error) {
return nil, nil
}

View File

@@ -77,7 +77,30 @@ func (mys *MySQLStorage) GetActionTimings(key string) (ats ActionTimings, err er
func (mys *MySQLStorage) SetActionTimings(key string, ats ActionTimings) (err error) { return }
func (mys *MySQLStorage) GetAllActionTimings() (ats map[string]ActionTimings, err error) { return }
func (mys *MySQLStorage) GetAllActionTimings(tpid string) (ats map[string][]*ActionTiming, err error) {
ats = make(map[string][]*ActionTiming)
rows, err := mys.Db.Query("SELECT * FROM tp_action_timings WHERE tpid=?", tpid)
if err != nil {
return nil, err
}
for rows.Next() {
var id int
var weight float64
var tpid, tag, actions_tag, timings_tag string
if err := rows.Scan(&id, &tpid, &tag, &actions_tag, &timings_tag, &weight); err != nil {
return nil, err
}
at := &ActionTiming{
Id: GenUUID(),
Tag: timings_tag,
Weight: weight,
ActionsId: actions_tag,
}
ats[tag] = append(ats[tag], at)
}
return ats, rows.Err()
}
func (mys *MySQLStorage) LogCallCost(uuid, source string, cc *CallCost) (err error) {
if mys.Db == nil {
@@ -225,9 +248,9 @@ func (mys *MySQLStorage) GetAllRates(tpid string) (map[string][]*Rate, error) {
return rts, rows.Err()
}
func (mys *MySQLStorage) GetAllTimings(string) (map[string][]*Timing, error) {
func (mys *MySQLStorage) GetAllTimings(tpid string) (map[string][]*Timing, error) {
tms := make(map[string][]*Timing)
rows, err := dbr.db.Query("SELECT * FROM tp_timings WHERE tpid=?", tpid)
rows, err := mys.Db.Query("SELECT * FROM tp_timings WHERE tpid=?", tpid)
if err != nil {
return nil, err
}
@@ -243,7 +266,7 @@ func (mys *MySQLStorage) GetAllTimings(string) (map[string][]*Timing, error) {
return tms, rows.Err()
}
func (mys *MySQLStorage) GetAllRateTimings(string) ([]*RateTiming, error) {
func (mys *MySQLStorage) GetAllRateTimings(tpid string) ([]*RateTiming, error) {
var rts []*RateTiming
rows, err := mys.Db.Query("SELECT * FROM tp_rate_timings WHERE tpid=?", tpid)
if err != nil {
@@ -254,7 +277,7 @@ func (mys *MySQLStorage) GetAllRateTimings(string) ([]*RateTiming, error) {
var weight float64
var tpid, tag, rates_tag, timings_tag string
if err := rows.Scan(&id, &tpid, &tag, &rates_tag, &timings_tag, &weight); err != nil {
return err
return nil, err
}
rt := &RateTiming{
Tag: tag,
@@ -267,22 +290,18 @@ func (mys *MySQLStorage) GetAllRateTimings(string) ([]*RateTiming, error) {
return rts, rows.Err()
}
func (mys *MySQLStorage) GetAllRatingProfiles(string) (map[string]*RatingProfile, error) {
func (mys *MySQLStorage) GetAllRatingProfiles(tpid string) (map[string]*RatingProfile, error) {
rpfs := make(map[string]*RatingProfile)
rows, err := mys.Db.Query("SELECT * FROM tp_rate_profiles WHERE tpid=?", tpid)
if err != nil {
return err
return nil, err
}
for rows.Next() {
var id int
var tpid, tenant, tor, direction, subject, fallbacksubject, rates_timing_tag, activation_time string
if err := rows.Scan(&id, &tpid, &tenant, &tor, &direction, &subject, &fallbacksubject, &rates_timing_tag, &activation_time); err != nil {
return err
}
at, err := time.Parse(time.RFC3339, activation_time)
if err != nil {
return errors.New(fmt.Sprintf("Cannot parse activation time from %v", activation_time))
return nil, err
}
key := fmt.Sprintf("%s:%s:%s:%s", direction, tenant, tor, subject)
rp, ok := rpfs[key]
@@ -290,13 +309,112 @@ func (mys *MySQLStorage) GetAllRatingProfiles(string) (map[string]*RatingProfile
rp = &RatingProfile{Id: key}
rpfs[key] = rp
}
rp.tor = tor
rp.direction = direction
rp.subject = subject
rp.fallbackSubject = fallbacksubject
rp.ratesTimingTag = rates_timing_tag
rp.activationTime = activation_time
}
return rpfs, rows.Err()
}
func (mys *MySQLStorage) GetAllActions(string) (map[string][]*Action, error) {
return nil, nil
func (mys *MySQLStorage) GetAllActions(tpid string) (map[string][]*Action, error) {
as := make(map[string][]*Action)
rows, err := mys.Db.Query("SELECT * FROM tp_actions WHERE tpid=?", tpid)
if err != nil {
return nil, err
}
for rows.Next() {
var id int
var units, rate, minutes_weight, weight float64
var tpid, tag, action, balances_tag, direction, destinations_tag, rate_type string
if err := rows.Scan(&id, &tpid, &tag, &action, &balances_tag, &direction, &units, &destinations_tag, &rate_type, &rate, &minutes_weight, &weight); err != nil {
return nil, err
}
var a *Action
if balances_tag != MINUTES {
a = &Action{
ActionType: action,
BalanceId: balances_tag,
Direction: direction,
Units: units,
}
} else {
var percent, price float64
if rate_type == PERCENT {
percent = rate
}
if rate_type == ABSOLUTE {
price = rate
}
a = &Action{
Id: GenUUID(),
ActionType: action,
BalanceId: balances_tag,
Direction: direction,
Weight: weight,
MinuteBucket: &MinuteBucket{
Seconds: units,
Weight: minutes_weight,
Price: price,
Percent: percent,
DestinationId: destinations_tag,
},
}
}
as[tag] = append(as[tag], a)
}
return as, rows.Err()
}
func (mys *MySQLStorage) GetAllActionTriggers(string) (map[string][]*ActionTrigger, error) {
return nil, nil
func (mys *MySQLStorage) GetAllActionTriggers(tpid string) (map[string][]*ActionTrigger, error) {
ats := make(map[string][]*ActionTrigger)
rows, err := mys.Db.Query("SELECT * FROM tp_action_triggers WHERE tpid=?", tpid)
if err != nil {
return nil, err
}
for rows.Next() {
var id int
var threshold, weight float64
var tpid, tag, balances_tag, direction, destinations_tag, actions_tag string
if err := rows.Scan(&id, &tpid, &tag, &balances_tag, &direction, &threshold, &destinations_tag, &actions_tag, &weight); err != nil {
return nil, err
}
at := &ActionTrigger{
Id: GenUUID(),
BalanceId: balances_tag,
Direction: direction,
ThresholdValue: threshold,
DestinationId: destinations_tag,
ActionsId: actions_tag,
Weight: weight,
}
ats[tag] = append(ats[tag], at)
}
return ats, rows.Err()
}
func (mys *MySQLStorage) GetAllUserBalances(tpid string) ([]*UserBalance, error) {
var ubs []*UserBalance
rows, err := mys.Db.Query("SELECT * FROM tp_account_actions WHERE tpid=?", tpid)
if err != nil {
return nil, err
}
for rows.Next() {
var id int
var tpid, tenant, account, direction, action_timings_tag, action_triggers_tag string
if err := rows.Scan(&id, &tpid, &tenant, &account, &direction, &action_timings_tag, &action_triggers_tag); err != nil {
return nil, err
}
tag := fmt.Sprintf("%s:%s:%s", direction, tenant, account)
ub := &UserBalance{
Type: UB_TYPE_PREPAID,
Id: tag,
actionTriggersTag: action_triggers_tag,
actionTimingsTag: action_timings_tag,
}
ubs = append(ubs, ub)
}
return ubs, rows.Err()
}

View File

@@ -77,7 +77,9 @@ func (psl *PostgresStorage) GetActionTimings(key string) (ats ActionTimings, err
func (psl *PostgresStorage) SetActionTimings(key string, ats ActionTimings) (err error) { return }
func (psl *PostgresStorage) GetAllActionTimings() (ats map[string]ActionTimings, err error) { return }
func (psl *PostgresStorage) GetAllActionTimings(tpid string) (ats map[string][]*ActionTiming, err error) {
return
}
func (psl *PostgresStorage) LogCallCost(uuid, source string, cc *CallCost) (err error) {
if psl.Db == nil {
@@ -173,6 +175,228 @@ func (psl *PostgresStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost) (err error)
return
}
func (psl *PostgresStorage) GetDestinations(tpid string) ([]*Destination, error) {
return nil, nil
func (psl *PostgresStorage) GetAllDestinations(tpid string) ([]*Destination, error) {
var dests []*Destination
rows, err := psl.Db.Query("SELECT * FROM tp_destinations WHERE tpid=?", tpid)
if err != nil {
return nil, err
}
for rows.Next() {
var id int
var tpid, tag, prefix string
if err := rows.Scan(id, tpid, &tag, &prefix); err != nil {
return nil, err
}
var dest *Destination
for _, d := range dests {
if d.Id == tag {
dest = d
break
}
}
if dest == nil {
dest = &Destination{Id: tag}
dests = append(dests, dest)
}
dest.Prefixes = append(dest.Prefixes, prefix)
}
return dests, rows.Err()
}
func (psl *PostgresStorage) GetAllRates(tpid string) (map[string][]*Rate, error) {
rts := make(map[string][]*Rate)
rows, err := psl.Db.Query("SELECT * FROM tp_rates WHERE tpid=?", tpid)
if err != nil {
return nil, err
}
for rows.Next() {
var id int
var tpid, tag, destinations_tag string
var connect_fee, rate, priced_units, rate_increments float64
if err := rows.Scan(&id, &tpid, &tag, &destinations_tag, &connect_fee, &rate, &priced_units, &rate_increments); err != nil {
return nil, err
}
r := &Rate{
DestinationsTag: destinations_tag,
ConnectFee: connect_fee,
Price: rate,
PricedUnits: priced_units,
RateIncrements: rate_increments,
}
rts[tag] = append(rts[tag], r)
}
return rts, rows.Err()
}
func (psl *PostgresStorage) GetAllTimings(tpid string) (map[string][]*Timing, error) {
tms := make(map[string][]*Timing)
rows, err := psl.Db.Query("SELECT * FROM tp_timings WHERE tpid=?", tpid)
if err != nil {
return nil, err
}
for rows.Next() {
var id int
var tpid, tag, years, months, month_days, week_days, start_time string
if err := rows.Scan(&id, &tpid, &tag, &years, &months, &month_days, &week_days, &start_time); err != nil {
return nil, err
}
t := NewTiming(years, months, month_days, week_days, start_time)
tms[tag] = append(tms[tag], t)
}
return tms, rows.Err()
}
func (psl *PostgresStorage) GetAllRateTimings(tpid string) ([]*RateTiming, error) {
var rts []*RateTiming
rows, err := psl.Db.Query("SELECT * FROM tp_rate_timings WHERE tpid=?", tpid)
if err != nil {
return nil, err
}
for rows.Next() {
var id int
var weight float64
var tpid, tag, rates_tag, timings_tag string
if err := rows.Scan(&id, &tpid, &tag, &rates_tag, &timings_tag, &weight); err != nil {
return nil, err
}
rt := &RateTiming{
Tag: tag,
RatesTag: rates_tag,
Weight: weight,
TimingsTag: timings_tag,
}
rts = append(rts, rt)
}
return rts, rows.Err()
}
func (psl *PostgresStorage) GetAllRatingProfiles(tpid string) (map[string]*RatingProfile, error) {
rpfs := make(map[string]*RatingProfile)
rows, err := psl.Db.Query("SELECT * FROM tp_rate_profiles WHERE tpid=?", tpid)
if err != nil {
return nil, err
}
for rows.Next() {
var id int
var tpid, tenant, tor, direction, subject, fallbacksubject, rates_timing_tag, activation_time string
if err := rows.Scan(&id, &tpid, &tenant, &tor, &direction, &subject, &fallbacksubject, &rates_timing_tag, &activation_time); err != nil {
return nil, err
}
key := fmt.Sprintf("%s:%s:%s:%s", direction, tenant, tor, subject)
rp, ok := rpfs[key]
if !ok {
rp = &RatingProfile{Id: key}
rpfs[key] = rp
}
rp.tor = tor
rp.direction = direction
rp.subject = subject
rp.fallbackSubject = fallbacksubject
rp.ratesTimingTag = rates_timing_tag
rp.activationTime = activation_time
}
return rpfs, rows.Err()
}
func (psl *PostgresStorage) GetAllActions(tpid string) (map[string][]*Action, error) {
as := make(map[string][]*Action)
rows, err := psl.Db.Query("SELECT * FROM tp_actions WHERE tpid=?", tpid)
if err != nil {
return nil, err
}
for rows.Next() {
var id int
var units, rate, minutes_weight, weight float64
var tpid, tag, action, balances_tag, direction, destinations_tag, rate_type string
if err := rows.Scan(&id, &tpid, &tag, &action, &balances_tag, &direction, &units, &destinations_tag, &rate_type, &rate, &minutes_weight, &weight); err != nil {
return nil, err
}
var a *Action
if balances_tag != MINUTES {
a = &Action{
ActionType: action,
BalanceId: balances_tag,
Direction: direction,
Units: units,
}
} else {
var percent, price float64
if rate_type == PERCENT {
percent = rate
}
if rate_type == ABSOLUTE {
price = rate
}
a = &Action{
Id: GenUUID(),
ActionType: action,
BalanceId: balances_tag,
Direction: direction,
Weight: weight,
MinuteBucket: &MinuteBucket{
Seconds: units,
Weight: minutes_weight,
Price: price,
Percent: percent,
DestinationId: destinations_tag,
},
}
}
as[tag] = append(as[tag], a)
}
return as, rows.Err()
}
func (psl *PostgresStorage) GetAllActionTriggers(tpid string) (map[string][]*ActionTrigger, error) {
ats := make(map[string][]*ActionTrigger)
rows, err := psl.Db.Query("SELECT * FROM tp_action_triggers WHERE tpid=?", tpid)
if err != nil {
return nil, err
}
for rows.Next() {
var id int
var threshold, weight float64
var tpid, tag, balances_tag, direction, destinations_tag, actions_tag string
if err := rows.Scan(&id, &tpid, &tag, &balances_tag, &direction, &threshold, &destinations_tag, &actions_tag, &weight); err != nil {
return nil, err
}
at := &ActionTrigger{
Id: GenUUID(),
BalanceId: balances_tag,
Direction: direction,
ThresholdValue: threshold,
DestinationId: destinations_tag,
ActionsId: actions_tag,
Weight: weight,
}
ats[tag] = append(ats[tag], at)
}
return ats, rows.Err()
}
func (psl *PostgresStorage) GetAllUserBalances(tpid string) ([]*UserBalance, error) {
var ubs []*UserBalance
rows, err := psl.Db.Query("SELECT * FROM tp_account_actions WHERE tpid=?", tpid)
if err != nil {
return nil, err
}
for rows.Next() {
var id int
var tpid, tenant, account, direction, action_timings_tag, action_triggers_tag string
if err := rows.Scan(&id, &tpid, &tenant, &account, &direction, &action_timings_tag, &action_triggers_tag); err != nil {
return nil, err
}
tag := fmt.Sprintf("%s:%s:%s", direction, tenant, account)
ub := &UserBalance{
Type: UB_TYPE_PREPAID,
Id: tag,
actionTriggersTag: action_triggers_tag,
actionTimingsTag: action_timings_tag,
}
ubs = append(ubs, ub)
}
return ubs, rows.Err()
}

View File

@@ -142,8 +142,8 @@ func (rs *RedigoStorage) SetActionTimings(key string, ats ActionTimings) (err er
return
}
func (rs *RedigoStorage) GetAllActionTimings() (ats map[string]ActionTimings, err error) {
reply, err := redis.Values(rs.db.Do("keys", ACTION_TIMING_PREFIX+"*"))
func (rs *RedigoStorage) GetAllActionTimings(tpid string) (ats map[string][]*ActionTiming, err error) {
reply, err := redis.Values(rs.db.Do("keys", ACTION_TIMING_PREFIX+tpid+"*"))
if err != nil {
return nil, err
}
@@ -153,7 +153,7 @@ func (rs *RedigoStorage) GetAllActionTimings() (ats map[string]ActionTimings, er
keys = append(keys, string(v))
}
}
ats = make(map[string]ActionTimings, len(keys))
ats = make(map[string][]*ActionTiming, len(keys))
for _, key := range keys {
values, err := redis.Bytes(rs.db.Do("get", key))
if err != nil {
@@ -161,7 +161,7 @@ func (rs *RedigoStorage) GetAllActionTimings() (ats map[string]ActionTimings, er
}
var tempAts ActionTimings
err = rs.ms.Unmarshal(values, &tempAts)
ats[key[len(ACTION_TIMING_PREFIX):]] = tempAts
ats[key[len(ACTION_TIMING_PREFIX+tpid):]] = tempAts
}
return
@@ -224,6 +224,28 @@ func (rs *RedigoStorage) SetRatedCdr(utils.CDR, *CallCost) error {
return nil
}
func (rs *RedigoStorage) GetDestinations(tpid string) ([]*Destination, error) {
func (ms *RedigoStorage) GetAllDestinations(tpid string) ([]*Destination, error) {
return nil, nil
}
func (ms *RedigoStorage) GetAllRates(string) (map[string][]*Rate, error) {
return nil, nil
}
func (ms *RedigoStorage) GetAllTimings(string) (map[string][]*Timing, error) {
return nil, nil
}
func (ms *RedigoStorage) GetAllRateTimings(string) ([]*RateTiming, error) {
return nil, nil
}
func (ms *RedigoStorage) GetAllRatingProfiles(string) (map[string]*RatingProfile, error) {
return nil, nil
}
func (ms *RedigoStorage) GetAllActions(string) (map[string][]*Action, error) {
return nil, nil
}
func (ms *RedigoStorage) GetAllActionTriggers(string) (map[string][]*ActionTrigger, error) {
return nil, nil
}
func (ms *RedigoStorage) GetAllUserBalances(string) ([]*UserBalance, error) {
return nil, nil
}

View File

@@ -169,12 +169,12 @@ func (rs *RedisStorage) SetActionTimings(key string, ats ActionTimings) (err err
return
}
func (rs *RedisStorage) GetAllActionTimings() (ats map[string]ActionTimings, err error) {
keys, err := rs.db.Cmd("keys", ACTION_TIMING_PREFIX+"*").List()
func (rs *RedisStorage) GetAllActionTimings(tpid string) (ats map[string][]*ActionTiming, err error) {
keys, err := rs.db.Cmd("keys", ACTION_TIMING_PREFIX+tpid+"*").List()
if err != nil {
return
}
ats = make(map[string]ActionTimings, len(keys))
ats = make(map[string][]*ActionTiming, len(keys))
for _, key := range keys {
values, err := rs.db.Cmd("get", key).Bytes()
if err != nil {
@@ -182,7 +182,7 @@ func (rs *RedisStorage) GetAllActionTimings() (ats map[string]ActionTimings, err
}
var tempAts ActionTimings
err = rs.ms.Unmarshal(values, &tempAts)
ats[key[len(ACTION_TIMING_PREFIX):]] = tempAts
ats[key[len(ACTION_TIMING_PREFIX+tpid):]] = tempAts
}
return
@@ -251,6 +251,28 @@ func (rs *RedisStorage) SetRatedCdr(utils.CDR, *CallCost) error {
return nil
}
func (rs *RedisStorage) GetDestinations(tpid string) ([]*Destination, error) {
func (ms *RedisStorage) GetAllDestinations(tpid string) ([]*Destination, error) {
return nil, nil
}
func (ms *RedisStorage) GetAllRates(string) (map[string][]*Rate, error) {
return nil, nil
}
func (ms *RedisStorage) GetAllTimings(string) (map[string][]*Timing, error) {
return nil, nil
}
func (ms *RedisStorage) GetAllRateTimings(string) ([]*RateTiming, error) {
return nil, nil
}
func (ms *RedisStorage) GetAllRatingProfiles(string) (map[string]*RatingProfile, error) {
return nil, nil
}
func (ms *RedisStorage) GetAllActions(string) (map[string][]*Action, error) {
return nil, nil
}
func (ms *RedisStorage) GetAllActionTriggers(string) (map[string][]*ActionTrigger, error) {
return nil, nil
}
func (ms *RedisStorage) GetAllUserBalances(string) ([]*UserBalance, error) {
return nil, nil
}

View File

@@ -44,12 +44,13 @@ const (
Structure containing information about user's credit (minutes, cents, sms...).'
*/
type UserBalance struct {
Id string
Type string // prepaid-postpaid
BalanceMap map[string]float64
MinuteBuckets []*MinuteBucket
UnitCounters []*UnitsCounter
ActionTriggers ActionTriggerPriotityList
Id string
Type string // prepaid-postpaid
BalanceMap map[string]float64
MinuteBuckets []*MinuteBucket
UnitCounters []*UnitsCounter
ActionTriggers ActionTriggerPriotityList
actionTriggersTag, actionTimingsTag string // used only for loading
}
/*

View File

@@ -64,7 +64,7 @@ func (s *Scheduler) Loop() {
}
func (s *Scheduler) LoadActionTimings(storage rater.DataStorage) {
actionTimings, err := storage.GetAllActionTimings()
actionTimings, err := storage.GetAllActionTimings("")
if err != nil {
rater.Logger.Warning(fmt.Sprintf("Cannot get action timings: %v", err))
}