started destination hash storage and colapsed debit actions

This commit is contained in:
Radu Ioan Fericean
2013-09-09 20:45:46 +03:00
parent 2f6b1f3f84
commit 372fbe0e10
13 changed files with 116 additions and 89 deletions

View File

@@ -161,11 +161,7 @@ func genericDebit(ub *UserBalance, a *Action) (err error) {
if ub.BalanceMap == nil {
ub.BalanceMap = make(map[string]BalanceChain)
}
if a.BalanceId == MINUTES {
ub.debitMinuteBalance(a.Balance)
} else {
ub.debitBalanceAction(a)
}
ub.debitBalanceAction(a)
return
}

View File

@@ -515,17 +515,6 @@ func TestActionTriggerPriotityList(t *testing.T) {
}
}
/*func TestActionLog(t *testing.T) {
a := &Action{
ActionType: "TEST",
BalanceId: "BALANCE",
Units: 10,
Weight: 11,
Balance: &Balance{},
}
logAction(nil, a)
}*/
func TestActionResetTriggres(t *testing.T) {
ub := &UserBalance{
Id: "TEST_UB",
@@ -667,7 +656,7 @@ func TestActionTopupResetMinutes(t *testing.T) {
len(ub.UnitCounters) != 1 ||
len(ub.BalanceMap[MINUTES+OUTBOUND]) != 2 ||
ub.ActionTriggers[0].Executed != true || ub.ActionTriggers[1].Executed != true {
t.Errorf("Topup reset minutes action failed: %v", ub.BalanceMap[MINUTES+OUTBOUND].GetTotalValue())
t.Errorf("Topup reset minutes action failed: %v", ub.BalanceMap[MINUTES+OUTBOUND][0])
}
}
@@ -698,15 +687,15 @@ func TestActionTopupMinutes(t *testing.T) {
UnitCounters: []*UnitsCounter{&UnitsCounter{BalanceId: CREDIT, Units: 1}},
ActionTriggers: ActionTriggerPriotityList{&ActionTrigger{BalanceId: CREDIT, ThresholdValue: 2, ActionsId: "TEST_ACTIONS", Executed: true}, &ActionTrigger{BalanceId: CREDIT, ThresholdValue: 2, ActionsId: "TEST_ACTIONS", Executed: true}},
}
a := &Action{BalanceId: MINUTES, Balance: &Balance{Value: 5, Weight: 20, SpecialPrice: 1, DestinationId: "NAT"}}
a := &Action{BalanceId: MINUTES, Direction: OUTBOUND, Balance: &Balance{Value: 5, Weight: 20, SpecialPrice: 1, DestinationId: "NAT"}}
topupAction(ub, a)
if ub.Type != UB_TYPE_PREPAID ||
ub.BalanceMap[MINUTES+OUTBOUND][0].Value != 15 ||
ub.BalanceMap[MINUTES+OUTBOUND].GetTotalValue() != 15 ||
ub.BalanceMap[CREDIT].GetTotalValue() != 100 ||
len(ub.UnitCounters) != 1 ||
len(ub.BalanceMap[MINUTES+OUTBOUND]) != 2 ||
ub.ActionTriggers[0].Executed != true || ub.ActionTriggers[1].Executed != true {
t.Error("Topup minutes action failed!", ub.BalanceMap[MINUTES+OUTBOUND][0])
t.Error("Topup minutes action failed!", ub.BalanceMap[MINUTES+OUTBOUND])
}
}
@@ -737,7 +726,7 @@ func TestActionDebitMinutes(t *testing.T) {
UnitCounters: []*UnitsCounter{&UnitsCounter{BalanceId: CREDIT, Units: 1}},
ActionTriggers: ActionTriggerPriotityList{&ActionTrigger{BalanceId: CREDIT, ThresholdValue: 2, ActionsId: "TEST_ACTIONS", Executed: true}, &ActionTrigger{BalanceId: CREDIT, ThresholdValue: 2, ActionsId: "TEST_ACTIONS", Executed: true}},
}
a := &Action{BalanceId: MINUTES, Balance: &Balance{Value: 5, Weight: 20, SpecialPrice: 1, DestinationId: "NAT"}}
a := &Action{BalanceId: MINUTES, Direction: OUTBOUND, Balance: &Balance{Value: 5, Weight: 20, SpecialPrice: 1, DestinationId: "NAT"}}
debitAction(ub, a)
if ub.Type != UB_TYPE_PREPAID ||
ub.BalanceMap[MINUTES+OUTBOUND][0].Value != 5 ||
@@ -797,8 +786,6 @@ func TestActionResetCounterMinutes(t *testing.T) {
len(ub.UnitCounters[1].MinuteBalances) != 1 ||
len(ub.BalanceMap[MINUTES]) != 2 ||
ub.ActionTriggers[0].Executed != true {
t.Logf("%#v", ub.UnitCounters[1].MinuteBalances[0])
t.Logf("%#v", ub.UnitCounters[1].MinuteBalances[1])
t.Error("Reset counters action failed!", ub.UnitCounters[1].MinuteBalances)
}
if len(ub.UnitCounters) < 2 || len(ub.UnitCounters[1].MinuteBalances) < 1 {
@@ -841,7 +828,7 @@ func TestActionTriggerLogging(t *testing.T) {
}
as, err := storageGetter.GetActions(at.ActionsId)
if err != nil {
t.Error("Error getting actions for the action timing: ", err)
t.Error("Error getting actions for the action timing: ", as, err)
}
storageLogger.LogActionTrigger("rif", RATER_SOURCE, at, as)
//expected := "rif*some_uuid;MONETARY;OUT;NAT;TEST_ACTIONS;100;10;false*|TOPUP|MONETARY|OUT|10|0"

View File

@@ -39,8 +39,10 @@ func init() {
//db_server := "127.0.0.1"
//db_server := "192.168.0.17"
m, _ := NewMapStorage()
//m, _ = NewMongoStorage(db_server, "27017", "cgrates_test", "", "")
//m, _ = NewRedisStorage(db_server+":6379", 11, "")
//m, _ := NewMongoStorage(db_server, "27017", "cgrates_test", "", "")
//m, _ := NewRedisStorage(db_server+":6379", 11, "")
//m, _ := NewRedigoStorage(db_server+":6379", 11, "")
//m, _ := NewRadixStorage(db_server+":6379", 11, "")
storageGetter, _ = m.(DataStorage)
storageLogger = storageGetter.(LogStorage)

View File

@@ -45,9 +45,6 @@ func GetDestination(dId string) (d *Destination, err error) {
return
}
/*
De-serializes the destination for the storage. Used for key-value storages.
*/
func (d *Destination) containsPrefix(prefix string) (precision int, ok bool) {
if d == nil {
return

View File

@@ -38,8 +38,11 @@ func TestDestinationStoreRestore(t *testing.T) {
func TestDestinationStorageStore(t *testing.T) {
nationale := &Destination{Id: "nat", Prefixes: []string{"0257", "0256", "0723"}}
storageGetter.SetDestination(nationale)
result, _ := storageGetter.GetDestination(nationale.Id)
err := storageGetter.SetDestination(nationale)
if err != nil {
t.Error("Error storing destination: ", err)
}
result, err := storageGetter.GetDestination(nationale.Id)
if !reflect.DeepEqual(nationale, result) {
t.Errorf("Expected %q was %q", nationale, result)
}
@@ -86,8 +89,8 @@ func TestDestinationGetNotExists(t *testing.T) {
func TestDestinationGetNotExistsCache(t *testing.T) {
GetDestination("not existing")
if _, err := cache2go.GetCached("not existing"); err == nil {
t.Error("Bad destination cached")
if d, err := cache2go.GetCached("not existing"); err == nil {
t.Error("Bad destination cached: ", d)
}
}

View File

@@ -23,11 +23,6 @@ import (
"fmt"
)
const (
// the minimum length for a destination prefix to be matched.
MIN_PREFIX_LENGTH = 2
)
type RatingProfile struct {
Id string
FallbackKey string // FallbackKey is used as complete combination of Tenant:TOR:Direction:Subject

View File

@@ -69,8 +69,8 @@ type DataStorage interface {
GetRatingProfile(string) (*RatingProfile, error)
SetRatingProfile(*RatingProfile) error
GetDestination(string) (*Destination, error)
DestinationContainsPrefix(string, string) (int, error)
SetDestination(*Destination) error
// End Apier functions
GetActions(string) (Actions, error)
SetActions(string, Actions) error
GetUserBalance(string) (*UserBalance, error)

View File

@@ -70,6 +70,20 @@ func (ms *MapStorage) GetDestination(key string) (dest *Destination, err error)
}
return
}
func (ms *MapStorage) DestinationContainsPrefix(key string, prefix string) (precision int, err error) {
if d, err := ms.GetDestination(key); err != nil {
return 0, err
} else {
for _, p := range utils.SplitPrefix(prefix) {
if precision, ok := d.containsPrefix(p); ok {
return precision, nil
}
}
return precision, nil
}
}
func (ms *MapStorage) SetDestination(dest *Destination) (err error) {
result, err := ms.ms.Marshal(dest)
ms.dict[DESTINATION_PREFIX+dest.Id] = result

View File

@@ -21,6 +21,7 @@ package engine
import (
"fmt"
"github.com/cgrates/cgrates/history"
"github.com/cgrates/cgrates/utils"
"menteslibres.net/gosexy/redis"
"strconv"
"strings"
@@ -87,20 +88,31 @@ func (rs *RedisStorage) SetRatingProfile(rp *RatingProfile) (err error) {
}
func (rs *RedisStorage) GetDestination(key string) (dest *Destination, err error) {
var values string
if values, err = rs.db.Get(DESTINATION_PREFIX + key); err == nil {
dest = &Destination{Id: key}
err = rs.ms.Unmarshal([]byte(values), dest)
var values []string
if values, err = rs.db.HKeys(DESTINATION_PREFIX + key); len(values) > 0 && err == nil {
dest = &Destination{Id: key, Prefixes: values}
}
return
}
func (rs *RedisStorage) DestinationContainsPrefix(key string, prefix string) (precision int, err error) {
var values []string
if values, err = rs.db.HMGet(DESTINATION_PREFIX+key, utils.SplitPrefix(prefix)...); err == nil {
for i, p := range values {
if p != "" {
return len(prefix) - i, nil
}
}
}
return
}
func (rs *RedisStorage) SetDestination(dest *Destination) (err error) {
var result []byte
if result, err = rs.ms.Marshal(dest); err != nil {
return
var newPrefixes []interface{}
for _, p := range dest.Prefixes {
newPrefixes = append(newPrefixes, p, "*")
}
_, err = rs.db.Set(DESTINATION_PREFIX+dest.Id, result)
_, err = rs.db.HMSet(DESTINATION_PREFIX+dest.Id, newPrefixes...)
if err == nil && historyScribe != nil {
response := 0
historyScribe.Record(&history.Record{DESTINATION_PREFIX + dest.Id, dest}, &response)

View File

@@ -100,32 +100,37 @@ func (ub *UserBalance) getSecondsForPrefix(prefix string) (seconds, credit float
return
}
// Debit seconds from specified minute bucket
func (ub *UserBalance) debitMinuteBalance(newMb *Balance) error {
if newMb == nil {
return errors.New("Nil minute bucket!")
// Debits some amount of user's specified balance adding the balance if it does not exists.
// Returns the remaining credit in user's balance.
func (ub *UserBalance) debitBalanceAction(a *Action) error {
if a == nil {
return errors.New("nil minute action!")
}
if a.Balance.Id == "" {
a.Balance.Id = utils.GenUUID()
}
if ub.BalanceMap == nil {
ub.BalanceMap = make(map[string]BalanceChain, 0)
}
found := false
for _, mb := range ub.BalanceMap[MINUTES+OUTBOUND] {
if mb.IsExpired() {
continue
id := a.BalanceId + a.Direction
for _, b := range ub.BalanceMap[id] {
if b.IsExpired() {
continue // we can clean expired balances balances here
}
if mb.Equal(newMb) {
mb.Value -= newMb.Value
if b.Equal(a.Balance) {
b.Value -= a.Balance.Value
found = true
break
}
}
// if it is not found and the Seconds are negative (topup)
// then we add it to the list
if !found && newMb.Value <= 0 {
newMb.Value = -newMb.Value
ub.BalanceMap[MINUTES+OUTBOUND] = append(ub.BalanceMap[MINUTES+OUTBOUND], newMb)
if !found && a.Balance.Value <= 0 {
a.Balance.Value = -a.Balance.Value
ub.BalanceMap[id] = append(ub.BalanceMap[id], a.Balance)
}
return nil
return nil //ub.BalanceMap[id].GetTotalValue()
}
/*
@@ -179,29 +184,6 @@ func (ub *UserBalance) debitMinutesBalance(amount float64, prefix string, count
return nil
}
// Debits some amount of user's specified balance adding the balance if it does not exists.
// Returns the remaining credit in user's balance.
func (ub *UserBalance) debitBalanceAction(a *Action) float64 {
newBalance := &Balance{Id: utils.GenUUID()}
if a.Balance != nil {
newBalance.ExpirationDate = a.Balance.ExpirationDate
newBalance.Weight = a.Balance.Weight
}
found := false
id := a.BalanceId + a.Direction
for _, b := range ub.BalanceMap[id] {
if b.Equal(newBalance) {
b.Value -= a.Balance.Value
found = true
}
}
if !found {
newBalance.Value -= a.Balance.Value
ub.BalanceMap[id] = append(ub.BalanceMap[id], newBalance)
}
return ub.BalanceMap[a.BalanceId+OUTBOUND].GetTotalValue()
}
/*
Debits some amount of user's specified balance. Returns the remaining credit in user's balance.
*/

View File

@@ -344,7 +344,8 @@ func TestUserBalancedebitBalance(t *testing.T) {
BalanceMap: map[string]BalanceChain{SMS: BalanceChain{&Balance{Value: 14}}, TRAFFIC: BalanceChain{&Balance{Value: 1204}}, MINUTES + OUTBOUND: BalanceChain{&Balance{Weight: 20, SpecialPrice: 1, DestinationId: "NAT"}, &Balance{Weight: 10, SpecialPrice: 10, SpecialPriceType: PRICE_ABSOLUTE, DestinationId: "RET"}}},
}
newMb := &Balance{Weight: 20, SpecialPrice: 1, DestinationId: "NEW"}
ub.debitMinuteBalance(newMb)
a := &Action{BalanceId: MINUTES, Direction: OUTBOUND, Balance: newMb}
ub.debitBalanceAction(a)
if len(ub.BalanceMap[MINUTES+OUTBOUND]) != 3 || ub.BalanceMap[MINUTES+OUTBOUND][2] != newMb {
t.Error("Error adding minute bucket!", len(ub.BalanceMap[MINUTES+OUTBOUND]), ub.BalanceMap[MINUTES+OUTBOUND])
}
@@ -358,7 +359,8 @@ func TestUserBalancedebitBalanceExists(t *testing.T) {
BalanceMap: map[string]BalanceChain{SMS + OUTBOUND: BalanceChain{&Balance{Value: 14}}, TRAFFIC + OUTBOUND: BalanceChain{&Balance{Value: 1024}}, MINUTES + OUTBOUND: BalanceChain{&Balance{Value: 15, Weight: 20, SpecialPrice: 1, DestinationId: "NAT"}, &Balance{Weight: 10, SpecialPrice: 10, SpecialPriceType: PRICE_ABSOLUTE, DestinationId: "RET"}}},
}
newMb := &Balance{Value: -10, Weight: 20, SpecialPrice: 1, DestinationId: "NAT"}
ub.debitMinuteBalance(newMb)
a := &Action{BalanceId: MINUTES, Direction: OUTBOUND, Balance: newMb}
ub.debitBalanceAction(a)
if len(ub.BalanceMap[MINUTES+OUTBOUND]) != 2 || ub.BalanceMap[MINUTES+OUTBOUND][0].Value != 25 {
t.Error("Error adding minute bucket!")
}
@@ -370,7 +372,7 @@ func TestUserBalanceAddMinuteNil(t *testing.T) {
Type: UB_TYPE_POSTPAID,
BalanceMap: map[string]BalanceChain{SMS + OUTBOUND: BalanceChain{&Balance{Value: 14}}, TRAFFIC + OUTBOUND: BalanceChain{&Balance{Value: 1024}}, MINUTES + OUTBOUND: BalanceChain{&Balance{Weight: 20, SpecialPrice: 1, DestinationId: "NAT"}, &Balance{Weight: 10, SpecialPrice: 10, SpecialPriceType: PRICE_ABSOLUTE, DestinationId: "RET"}}},
}
ub.debitMinuteBalance(nil)
ub.debitBalanceAction(nil)
if len(ub.BalanceMap[MINUTES+OUTBOUND]) != 2 {
t.Error("Error adding minute bucket!")
}
@@ -381,15 +383,18 @@ func TestUserBalanceAddMinutBucketEmpty(t *testing.T) {
mb2 := &Balance{Value: -10, DestinationId: "NAT"}
mb3 := &Balance{Value: -10, DestinationId: "OTHER"}
ub := &UserBalance{}
ub.debitMinuteBalance(mb1)
a := &Action{BalanceId: MINUTES, Direction: OUTBOUND, Balance: mb1}
ub.debitBalanceAction(a)
if len(ub.BalanceMap[MINUTES+OUTBOUND]) != 1 {
t.Error("Error adding minute bucket: ", ub.BalanceMap[MINUTES+OUTBOUND])
}
ub.debitMinuteBalance(mb2)
a = &Action{BalanceId: MINUTES, Direction: OUTBOUND, Balance: mb2}
ub.debitBalanceAction(a)
if len(ub.BalanceMap[MINUTES+OUTBOUND]) != 1 || ub.BalanceMap[MINUTES+OUTBOUND][0].Value != 20 {
t.Error("Error adding minute bucket: ", ub.BalanceMap[MINUTES+OUTBOUND])
}
ub.debitMinuteBalance(mb3)
a = &Action{BalanceId: MINUTES, Direction: OUTBOUND, Balance: mb3}
ub.debitBalanceAction(a)
if len(ub.BalanceMap[MINUTES+OUTBOUND]) != 2 {
t.Error("Error adding minute bucket: ", ub.BalanceMap[MINUTES+OUTBOUND])
}

View File

@@ -121,9 +121,29 @@ func ParseDate(date string) (expDate time.Time, err error) {
return expDate, err
}
// returns a number equeal or larger than the peram that exactly
// is divisible to 60
func RoundToMinute(seconds float64) float64 {
if math.Mod(seconds, 60) == 0 {
return seconds
}
return (60 - math.Mod(seconds, 60)) + seconds
}
func SplitPrefix(prefix string) []string {
var subs []string
max := len(prefix)
for i := 0; i < len(prefix)-1; i++ {
subs = append(subs, prefix[:max-i])
}
return subs
}
func SplitPrefixInterface(prefix string) []interface{} {
var subs []interface{}
max := len(prefix)
for i := 0; i < len(prefix)-1; i++ {
subs = append(subs, prefix[:max-i])
}
return subs
}

View File

@@ -227,3 +227,17 @@ func TestRoundToMinute(t *testing.T) {
t.Errorf("Error rounding to minute5: expected %v was %v", expected, result)
}
}
func TestSplitPrefix(t *testing.T) {
a := SplitPrefix("0123456789")
if len(a) != 9 {
t.Error("Error splitting prefix: ", a)
}
}
func TestSplitPrefixEmpty(t *testing.T) {
a := SplitPrefix("")
if len(a) != 0 {
t.Error("Error splitting prefix: ", a)
}
}