Started shared balances logic

This commit is contained in:
Radu Ioan Fericean
2014-01-07 22:34:40 +02:00
parent 99a9b2c987
commit 58ddae9b70
13 changed files with 206 additions and 43 deletions

View File

@@ -21,12 +21,13 @@ package apier
import (
"errors"
"fmt"
"path"
"github.com/cgrates/cgrates/cache2go"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/utils"
"path"
)
const (
@@ -204,7 +205,7 @@ func (self *ApierV1) SetRatingProfile(attrs AttrSetRatingProfile, reply *string)
tpRpf := utils.TPRatingProfile{Tenant: attrs.Tenant, TOR: attrs.TOR, Direction: attrs.Direction, Subject: attrs.Subject}
keyId := tpRpf.KeyId()
if !attrs.Overwrite {
if exists, err := self.RatingDb.ExistsData(engine.RATING_PROFILE_PREFIX, keyId); err != nil {
if exists, err := self.RatingDb.DataExists(engine.RATING_PROFILE_PREFIX, keyId); err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
} else if exists {
return errors.New(utils.ERR_EXISTS)
@@ -216,7 +217,7 @@ func (self *ApierV1) SetRatingProfile(attrs AttrSetRatingProfile, reply *string)
if err != nil {
return fmt.Errorf(fmt.Sprintf("%s:Cannot parse activation time from %v", utils.ERR_SERVER_ERROR, ra.ActivationTime))
}
if exists, err := self.RatingDb.ExistsData(engine.RATING_PLAN_PREFIX, ra.RatingPlanId); err != nil {
if exists, err := self.RatingDb.DataExists(engine.RATING_PLAN_PREFIX, ra.RatingPlanId); err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
} else if !exists {
return fmt.Errorf(fmt.Sprintf("%s:RatingPlanId:%s", utils.ERR_NOT_FOUND, ra.RatingPlanId))
@@ -251,7 +252,7 @@ func (self *ApierV1) SetActions(attrs AttrSetActions, reply *string) error {
}
}
if !attrs.Overwrite {
if exists, err := self.AccountDb.ExistsData(engine.ACTION_PREFIX, attrs.ActionsId); err != nil {
if exists, err := self.AccountDb.DataExists(engine.ACTION_PREFIX, attrs.ActionsId); err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
} else if exists {
return errors.New(utils.ERR_EXISTS)
@@ -311,7 +312,7 @@ func (self *ApierV1) SetActionTimings(attrs AttrSetActionTimings, reply *string)
}
}
if !attrs.Overwrite {
if exists, err := self.AccountDb.ExistsData(engine.ACTION_TIMING_PREFIX, attrs.ActionTimingsId); err != nil {
if exists, err := self.AccountDb.DataExists(engine.ACTION_TIMING_PREFIX, attrs.ActionTimingsId); err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
} else if exists {
return errors.New(utils.ERR_EXISTS)
@@ -319,7 +320,7 @@ func (self *ApierV1) SetActionTimings(attrs AttrSetActionTimings, reply *string)
}
storeAtms := make(engine.ActionTimings, len(attrs.ActionTimings))
for idx, apiAtm := range attrs.ActionTimings {
if exists, err := self.AccountDb.ExistsData(engine.ACTION_PREFIX, apiAtm.ActionsId); err != nil {
if exists, err := self.AccountDb.DataExists(engine.ACTION_PREFIX, apiAtm.ActionsId); err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
} else if !exists {
return fmt.Errorf("%s:%s", utils.ERR_BROKEN_REFERENCE, err.Error())
@@ -475,7 +476,7 @@ func (self *ApierV1) ReloadScheduler(input string, reply *string) error {
}
func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) error {
var dstKeys, rpKeys, rpfKeys, actKeys []string
var dstKeys, rpKeys, rpfKeys, actKeys, shgKeys []string
if len(attrs.DestinationIds) > 0 {
dstKeys = make([]string, len(attrs.DestinationIds))
for idx, dId := range attrs.DestinationIds {
@@ -500,10 +501,16 @@ func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) erro
actKeys[idx] = engine.ACTION_PREFIX + actId
}
}
if len(attrs.SharedGroupIds) > 0 {
shgKeys = make([]string, len(attrs.SharedGroupIds))
for idx, shgId := range attrs.SharedGroupIds {
shgKeys[idx] = engine.SHARED_GROUP_PREFIX + shgId
}
}
if err := self.RatingDb.CacheRating(dstKeys, rpKeys, rpfKeys); err != nil {
return err
}
if err := self.AccountDb.CacheAccounting(actKeys); err != nil {
if err := self.AccountDb.CacheAccounting(actKeys, shgKeys); err != nil {
return err
}
*reply = "OK"
@@ -526,8 +533,8 @@ func (self *ApierV1) GetCachedItemAge(itemId string, reply *utils.CachedItemAge)
}
cachedItemAge := new(utils.CachedItemAge)
var found bool
for idx, cacheKey := range []string{ engine.DESTINATION_PREFIX+itemId, engine.RATING_PLAN_PREFIX+itemId, engine.RATING_PROFILE_PREFIX+itemId,
engine.ACTION_PREFIX+itemId} {
for idx, cacheKey := range []string{engine.DESTINATION_PREFIX + itemId, engine.RATING_PLAN_PREFIX + itemId, engine.RATING_PROFILE_PREFIX + itemId,
engine.ACTION_PREFIX + itemId} {
if age, err := cache2go.GetKeyAge(cacheKey); err == nil {
found = true
switch idx {

View File

@@ -30,7 +30,7 @@ func init() {
type AccountLock struct {
queue map[string]chan bool
sync.Mutex
sync.RWMutex
}
func NewAccountLock() *AccountLock {
@@ -38,13 +38,15 @@ func NewAccountLock() *AccountLock {
}
func (cm *AccountLock) GuardGetCost(name string, handler func() (*CallCost, error)) (reply *CallCost, err error) {
cm.Lock()
cm.RLock()
lock, exists := AccLock.queue[name]
cm.RUnlock()
if !exists {
cm.Lock()
lock = make(chan bool, 1)
AccLock.queue[name] = lock
cm.Unlock()
}
cm.Unlock()
lock <- true
reply, err = handler()
<-lock
@@ -52,10 +54,14 @@ func (cm *AccountLock) GuardGetCost(name string, handler func() (*CallCost, erro
}
func (cm *AccountLock) Guard(name string, handler func() (float64, error)) (reply float64, err error) {
cm.RLock()
lock, exists := AccLock.queue[name]
cm.RUnlock()
if !exists {
cm.Lock()
lock = make(chan bool, 1)
AccLock.queue[name] = lock
cm.Unlock()
}
lock <- true
reply, err = handler()

View File

@@ -35,6 +35,7 @@ type Balance struct {
GroupIds []string
DestinationId string
RateSubject string
SharedGroup string
precision int
}

View File

@@ -85,3 +85,12 @@ func (cc *CallCost) CreateCallDescriptor() *CallDescriptor {
Destination: cc.Destination,
}
}
func (cc *CallCost) IsPaid() bool {
for _, ts := range cc.Timespans {
if paid, _ := ts.IsPaid(); !paid {
return false
}
}
return true
}

View File

@@ -588,7 +588,7 @@ func (cd *CallDescriptor) FlushCache() (err error) {
cache2go.XFlush()
cache2go.Flush()
dataStorage.CacheRating(nil, nil, nil)
accountingStorage.CacheAccounting(nil)
accountingStorage.CacheAccounting(nil, nil)
return nil
}

View File

@@ -335,7 +335,7 @@ func (csvr *CSVReader) LoadDestinationRates() (err error) {
}
}
if !destinationExists {
if dbExists, err := csvr.dataStorage.ExistsData(DESTINATION_PREFIX, record[1]); err != nil {
if dbExists, err := csvr.dataStorage.DataExists(DESTINATION_PREFIX, record[1]); err != nil {
return err
} else if !dbExists {
return fmt.Errorf("Could not get destination for tag %v", record[1])
@@ -418,7 +418,7 @@ func (csvr *CSVReader) LoadRatingProfiles() (err error) {
}
_, exists := csvr.ratingPlans[record[5]]
if !exists {
if dbExists, err := csvr.dataStorage.ExistsData(RATING_PLAN_PREFIX, record[5]); err != nil {
if dbExists, err := csvr.dataStorage.DataExists(RATING_PLAN_PREFIX, record[5]); err != nil {
return err
} else if !dbExists {
return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", record[5]))

View File

@@ -224,7 +224,7 @@ func (dbr *DbReader) LoadDestinationRates() (err error) {
}
}
if !destinationExists {
if dbExists, err := dbr.dataDb.ExistsData(DESTINATION_PREFIX, dr.DestinationId); err != nil {
if dbExists, err := dbr.dataDb.DataExists(DESTINATION_PREFIX, dr.DestinationId); err != nil {
return err
} else if !dbExists {
return errors.New(fmt.Sprintf("Could not get destination for tag %v", dr.DestinationId))
@@ -278,7 +278,7 @@ func (dbr *DbReader) LoadRatingProfiles() error {
}
_, exists := dbr.ratingPlans[tpRa.RatingPlanId]
if !exists {
if dbExists, err := dbr.dataDb.ExistsData(RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil {
if dbExists, err := dbr.dataDb.DataExists(RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil {
return err
} else if !dbExists {
return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", tpRa.RatingPlanId))
@@ -332,7 +332,7 @@ func (dbr *DbReader) LoadRatingPlanByTag(tag string) (bool, error) {
if err != nil {
return false, err
} else if len(dms) == 0 {
if dbExists, err := dbr.dataDb.ExistsData(DESTINATION_PREFIX, drate.DestinationId); err != nil {
if dbExists, err := dbr.dataDb.DataExists(DESTINATION_PREFIX, drate.DestinationId); err != nil {
return false, err
} else if !dbExists {
return false, fmt.Errorf("Could not get destination for tag %v", drate.DestinationId)
@@ -369,7 +369,7 @@ func (dbr *DbReader) LoadRatingProfileFiltered(qriedRpf *utils.TPRatingProfile)
}
_, exists := dbr.ratingPlans[tpRa.RatingPlanId]
if !exists {
if dbExists, err := dbr.dataDb.ExistsData(RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil {
if dbExists, err := dbr.dataDb.DataExists(RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil {
return err
} else if !dbExists {
return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", tpRa.RatingPlanId))

28
engine/sharedgroup.go Normal file
View File

@@ -0,0 +1,28 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2013 ITsysCOM
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
type SharedGroup struct {
Id string
Strategy string
Account string
RateSubject string
Weight float64
Members []string
}

View File

@@ -36,6 +36,7 @@ const (
RATING_PLAN_PREFIX = "rpl_"
RATING_PROFILE_PREFIX = "rpf_"
ACTION_PREFIX = "act_"
SHARED_GROUP_PREFIX = "shg_"
USER_BALANCE_PREFIX = "ubl_"
DESTINATION_PREFIX = "dst_"
TEMP_DESTINATION_PREFIX = "tmp_"
@@ -69,7 +70,7 @@ Interface for storage providers.
type RatingStorage interface {
Storage
CacheRating([]string, []string, []string) error
ExistsData(string, string) (bool, error)
DataExists(string, string) (bool, error)
GetRatingPlan(string, bool) (*RatingPlan, error)
SetRatingPlan(*RatingPlan) error
GetRatingProfile(string, bool) (*RatingProfile, error)
@@ -80,10 +81,12 @@ type RatingStorage interface {
type AccountingStorage interface {
Storage
ExistsData(string, string) (bool, error)
CacheAccounting([]string) error
DataExists(string, string) (bool, error)
CacheAccounting([]string, []string) error
GetActions(string, bool) (Actions, error)
SetActions(string, Actions) error
GetSharedGroup(string, bool) (*SharedGroup, error)
SetSharedGroup(string, *SharedGroup) error
GetUserBalance(string) (*UserBalance, error)
SetUserBalance(*UserBalance) error
GetActionTimings(string) (ActionTimings, error)

View File

@@ -22,11 +22,11 @@ import (
"errors"
"fmt"
"strings"
"time"
"github.com/cgrates/cgrates/cache2go"
"github.com/cgrates/cgrates/history"
"github.com/cgrates/cgrates/utils"
"strings"
"time"
)
type MapStorage struct {
@@ -72,7 +72,7 @@ func (ms *MapStorage) CacheRating(dKeys, rpKeys, rpfKeys []string) error {
return nil
}
func (ms *MapStorage) CacheAccounting(actKeys []string) error {
func (ms *MapStorage) CacheAccounting(actKeys, shgKeys []string) error {
if actKeys == nil {
cache2go.RemPrefixKey(ACTION_PREFIX) // Forced until we can fine tune it
}
@@ -84,11 +84,22 @@ func (ms *MapStorage) CacheAccounting(actKeys []string) error {
}
}
}
if shgKeys == nil {
cache2go.RemPrefixKey(SHARED_GROUP_PREFIX) // Forced until we can fine tune it
}
for k, _ := range ms.dict {
if strings.HasPrefix(k, SHARED_GROUP_PREFIX) {
cache2go.RemKey(k)
if _, err := ms.GetSharedGroup(k[len(SHARED_GROUP_PREFIX):], true); err != nil {
return err
}
}
}
return nil
}
// Used to check if specific subject is stored using prefix key attached to entity
func (ms *MapStorage) ExistsData(categ, subject string) (bool, error) {
func (ms *MapStorage) DataExists(categ, subject string) (bool, error) {
switch categ {
case DESTINATION_PREFIX:
_, exists := ms.dict[DESTINATION_PREFIX+subject]
@@ -216,6 +227,37 @@ func (ms *MapStorage) SetActions(key string, as Actions) (err error) {
return
}
func (ms *MapStorage) GetSharedGroup(key string, checkDb bool) (sg *SharedGroup, err error) {
if values, ok := ms.dict[SHARED_GROUP_PREFIX+key]; ok {
err = ms.ms.Unmarshal(values, &sg)
} else {
return nil, errors.New("not found")
}
return
key = SHARED_GROUP_PREFIX + key
if x, err := cache2go.GetCached(key); err == nil {
return x.(*SharedGroup), nil
}
if !checkDb {
return nil, errors.New(utils.ERR_NOT_FOUND)
}
if values, ok := ms.dict[key]; ok {
err = ms.ms.Unmarshal(values, &sg)
cache2go.Cache(key, sg)
} else {
return nil, errors.New("not found")
}
return
}
func (ms *MapStorage) SetSharedGroup(key string, sg *SharedGroup) (err error) {
result, err := ms.ms.Marshal(sg)
ms.dict[SHARED_GROUP_PREFIX+key] = result
cache2go.Cache(ACTION_PREFIX+key, sg)
return
}
func (ms *MapStorage) GetUserBalance(key string) (ub *UserBalance, err error) {
if values, ok := ms.dict[USER_BALANCE_PREFIX+key]; ok {
ub = &UserBalance{Id: key}

View File

@@ -126,7 +126,7 @@ func (rs *RedisStorage) CacheRating(dKeys, rpKeys, rpfKeys []string) (err error)
return
}
func (rs *RedisStorage) CacheAccounting(actKeys []string) (err error) {
func (rs *RedisStorage) CacheAccounting(actKeys, shgKeys []string) (err error) {
if actKeys == nil {
cache2go.RemPrefixKey(ACTION_PREFIX)
}
@@ -147,11 +147,31 @@ func (rs *RedisStorage) CacheAccounting(actKeys []string) (err error) {
if len(actKeys) != 0 {
Logger.Info("Finished actions caching.")
}
if shgKeys == nil {
cache2go.RemPrefixKey(SHARED_GROUP_PREFIX)
}
if shgKeys == nil {
Logger.Info("Caching all shared groups")
if shgKeys, err = rs.db.Keys(SHARED_GROUP_PREFIX + "*"); err != nil {
return
}
} else if len(shgKeys) != 0 {
Logger.Info(fmt.Sprintf("Caching shared groups: %v", shgKeys))
}
for _, key := range shgKeys {
cache2go.RemKey(key)
if _, err = rs.GetSharedGroup(key[len(SHARED_GROUP_PREFIX):], true); err != nil {
return err
}
}
if len(shgKeys) != 0 {
Logger.Info("Finished shared groups caching.")
}
return nil
}
// Used to check if specific subject is stored using prefix key attached to entity
func (rs *RedisStorage) ExistsData(category, subject string) (bool, error) {
func (rs *RedisStorage) DataExists(category, subject string) (bool, error) {
switch category {
case DESTINATION_PREFIX, RATING_PLAN_PREFIX, RATING_PROFILE_PREFIX, ACTION_PREFIX, ACTION_TIMING_PREFIX, USER_BALANCE_PREFIX:
return rs.db.Exists(category + subject)
@@ -301,6 +321,29 @@ func (rs *RedisStorage) SetActions(key string, as Actions) (err error) {
return
}
func (rs *RedisStorage) GetSharedGroup(key string, checkDb bool) (sg *SharedGroup, err error) {
key = SHARED_GROUP_PREFIX + key
if x, err := cache2go.GetCached(key); err == nil {
return x.(*SharedGroup), nil
}
if !checkDb {
return nil, errors.New(utils.ERR_NOT_FOUND)
}
var values []byte
if values, err = rs.db.Get(key); err == nil {
err = rs.ms.Unmarshal(values, &sg)
cache2go.Cache(key, sg)
}
return
}
func (rs *RedisStorage) SetSharedGroup(key string, sg *SharedGroup) (err error) {
result, err := rs.ms.Marshal(sg)
err = rs.db.Set(ACTION_PREFIX+key, result)
cache2go.Cache(ACTION_PREFIX+key, sg)
return
}
func (rs *RedisStorage) GetUserBalance(key string) (ub *UserBalance, err error) {
var values []byte
if values, err = rs.db.Get(USER_BALANCE_PREFIX + key); err == nil {

View File

@@ -20,6 +20,7 @@ package engine
import (
"errors"
"fmt"
"time"
"github.com/cgrates/cgrates/cache2go"
@@ -180,12 +181,31 @@ func (ub *UserBalance) debitCreditBalance(cc *CallCost, count bool) error {
// debit minutes
for _, balance := range usefulMinuteBalances {
balance.DebitMinutes(cc, count, ub, usefulMoneyBalances)
if cc.IsPaid() {
return nil
} else {
// chack if it's shared
if balance.SharedGroup != "" {
sharedGroup, err := accountingStorage.GetSharedGroup(balance.SharedGroup, false)
if err != nil {
Logger.Warning(fmt.Sprintf("Could not get shared group: %s", balance.SharedGroup))
continue
}
for _, ubId := range sharedGroup.Members {
AccLock.Guard(ubId, func() (float64, error) {
_, err := accountingStorage.GetUserBalance(ubId)
if err != nil {
Logger.Warning(fmt.Sprintf("Could not get user balance: %s", ubId))
}
return 0, nil
})
}
}
}
}
allPaidWithMinutes := true
for tsIndex := 0; tsIndex < len(cc.Timespans); tsIndex++ {
ts := cc.Timespans[tsIndex]
if paid, incrementIndex := ts.IsPaid(); !paid {
allPaidWithMinutes = false
newTs := ts.SplitByIncrement(incrementIndex)
if newTs != nil {
idx := tsIndex + 1
@@ -195,9 +215,6 @@ func (ub *UserBalance) debitCreditBalance(cc *CallCost, count bool) error {
}
}
}
if allPaidWithMinutes {
return nil
}
// debit money
for _, balance := range usefulMoneyBalances {
balance.DebitMoney(cc, count, ub)
@@ -406,8 +423,7 @@ func (ub *UserBalance) initCounters() {
}
func (ub *UserBalance) CleanExpiredBalancesAndBuckets() {
for key, _ := range ub.BalanceMap {
bm := ub.BalanceMap[key]
for key, bm := range ub.BalanceMap {
for i := 0; i < len(bm); i++ {
if bm[i].IsExpired() {
// delete it
@@ -416,9 +432,16 @@ func (ub *UserBalance) CleanExpiredBalancesAndBuckets() {
}
ub.BalanceMap[key] = bm
}
for i := 0; i < len(ub.BalanceMap[MINUTES+OUTBOUND]); i++ {
if ub.BalanceMap[MINUTES+OUTBOUND][i].IsExpired() {
ub.BalanceMap[MINUTES+OUTBOUND] = append(ub.BalanceMap[MINUTES+OUTBOUND][:i], ub.BalanceMap[MINUTES+OUTBOUND][i+1:]...)
}
// returns the shared groups that this user balance belnongs to
func (ub *UserBalance) GetSharedGroups() (groups []string) {
for _, balanceChain := range ub.BalanceMap {
for _, b := range balanceChain {
if b.SharedGroup != "" {
groups = append(groups, b.SharedGroup)
}
}
}
return
}

View File

@@ -281,6 +281,7 @@ type ApiReloadCache struct {
RatingPlanIds []string
RatingProfileIds []string
ActionIds []string
SharedGroupIds []string
}
type AttrCacheStats struct { // Add in the future filters here maybe so we avoid counting complete cache
@@ -299,8 +300,8 @@ type AttrCachedItemAge struct {
}
type CachedItemAge struct {
Destination time.Duration
RatingPlan time.Duration
RatingProfile time.Duration
Action time.Duration
Destination time.Duration
RatingPlan time.Duration
RatingProfile time.Duration
Action time.Duration
}