This commit is contained in:
DanB
2015-04-04 17:47:52 +02:00
16 changed files with 239 additions and 179 deletions

View File

@@ -80,7 +80,7 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) e
return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing)
}
}
_, err := engine.AccLock.Guard(engine.ACTION_TIMING_PREFIX, func() (float64, error) {
_, err := engine.AccLock.Guard(func() (interface{}, error) {
ats, err := self.AccountDb.GetActionTimings(attrs.ActionPlanId)
if err != nil {
return 0, err
@@ -92,7 +92,7 @@ func (self *ApierV1) RemActionTiming(attrs AttrRemActionTiming, reply *string) e
return 0, err
}
return 0, nil
})
}, engine.ACTION_TIMING_PREFIX)
if err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
}
@@ -130,7 +130,7 @@ func (self *ApierV1) RemAccountActionTriggers(attrs AttrRemAcntActionTriggers, r
return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing)
}
balanceId := utils.AccountKey(attrs.Tenant, attrs.Account, attrs.Direction)
_, err := engine.AccLock.Guard(balanceId, func() (float64, error) {
_, err := engine.AccLock.Guard(func() (interface{}, error) {
ub, err := self.AccountDb.GetAccount(balanceId)
if err != nil {
return 0, err
@@ -150,7 +150,7 @@ func (self *ApierV1) RemAccountActionTriggers(attrs AttrRemAcntActionTriggers, r
return 0, err
}
return 0, nil
})
}, balanceId)
if err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
}
@@ -174,7 +174,7 @@ func (self *ApierV1) SetAccount(attr AttrSetAccount, reply *string) error {
balanceId := utils.AccountKey(attr.Tenant, attr.Account, attr.Direction)
var ub *engine.Account
var ats engine.ActionPlan
_, err := engine.AccLock.Guard(balanceId, func() (float64, error) {
_, err := engine.AccLock.Guard(func() (interface{}, error) {
if bal, _ := self.AccountDb.GetAccount(balanceId); bal != nil {
ub = bal
} else { // Not found in db, create it here
@@ -199,17 +199,17 @@ func (self *ApierV1) SetAccount(attr AttrSetAccount, reply *string) error {
return 0, err
}
return 0, nil
})
}, balanceId)
if err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
}
if len(ats) != 0 {
_, err := engine.AccLock.Guard(engine.ACTION_TIMING_PREFIX, func() (float64, error) { // ToDo: Try locking it above on read somehow
_, err := engine.AccLock.Guard(func() (interface{}, error) { // ToDo: Try locking it above on read somehow
if err := self.AccountDb.SetActionTimings(attr.ActionPlanId, ats); err != nil {
return 0, err
}
return 0, nil
})
}, engine.ACTION_TIMING_PREFIX)
if err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
}

View File

@@ -588,7 +588,7 @@ func (self *ApierV1) AddTriggeredAction(attr AttrAddActionTrigger, reply *string
}
tag := utils.AccountKey(attr.Tenant, attr.Account, attr.BalanceDirection)
_, err = engine.AccLock.Guard(tag, func() (float64, error) {
_, err = engine.AccLock.Guard(func() (interface{}, error) {
userBalance, err := self.AccountDb.GetAccount(tag)
if err != nil {
return 0, err
@@ -600,7 +600,7 @@ func (self *ApierV1) AddTriggeredAction(attr AttrAddActionTrigger, reply *string
return 0, err
}
return 0, nil
})
}, tag)
if err != nil {
*reply = err.Error()
return err
@@ -658,7 +658,7 @@ func (self *ApierV1) ResetTriggeredActions(attr AttrResetTriggeredAction, reply
}
}
accID := utils.AccountKey(attr.Tenant, attr.Account, attr.Direction)
_, err := engine.AccLock.Guard(accID, func() (float64, error) {
_, err := engine.AccLock.Guard(func() (interface{}, error) {
acc, err := self.AccountDb.GetAccount(accID)
if err != nil {
return 0, err
@@ -670,7 +670,7 @@ func (self *ApierV1) ResetTriggeredActions(attr AttrResetTriggeredAction, reply
return 0, err
}
return 0, nil
})
}, accID)
if err != nil {
*reply = err.Error()
return err
@@ -685,12 +685,12 @@ func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *str
return fmt.Errorf("%s:%s", utils.ERR_MANDATORY_IE_MISSING, "TPid")
}
dbReader := engine.NewDbReader(self.StorDb, self.RatingDb, self.AccountDb, attrs.TPid)
if _, err := engine.AccLock.Guard(attrs.KeyId(), func() (float64, error) {
if _, err := engine.AccLock.Guard(func() (interface{}, error) {
if err := dbReader.LoadAccountActionsFiltered(&attrs); err != nil {
return 0, err
}
return 0, nil
}); err != nil {
}, attrs.KeyId()); err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
}
// ToDo: Get the action keys loaded by dbReader so we reload only these in cache

View File

@@ -73,12 +73,12 @@ func (self *ApierV2) LoadAccountActions(attrs AttrLoadAccountActions, reply *str
tpAa := &utils.TPAccountActions{TPid: attrs.TPid}
tpAa.SetAccountActionsId(attrs.AccountActionsId)
if _, err := engine.AccLock.Guard(attrs.AccountActionsId, func() (float64, error) {
if _, err := engine.AccLock.Guard(func() (interface{}, error) {
if err := dbReader.LoadAccountActionsFiltered(tpAa); err != nil {
return 0, err
}
return 0, nil
}); err != nil {
}, attrs.AccountActionsId); err != nil {
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
}
// ToDo: Get the action keys loaded by dbReader so we reload only these in cache

View File

@@ -566,19 +566,15 @@ func (ub *Account) GetSharedGroups() (groups []string) {
return
}
func (account *Account) GetUniqueSharedGroupMembers(destination, direction, category, unitType string) ([]string, error) {
creditBalances := account.getBalancesForPrefix(destination, category, account.BalanceMap[CREDIT+direction], "")
unitBalances := account.getBalancesForPrefix(destination, category, account.BalanceMap[unitType+direction], "")
func (account *Account) GetUniqueSharedGroupMembers(cd *CallDescriptor) ([]string, error) {
var balances []*Balance
balances = append(balances, account.getBalancesForPrefix(cd.Destination, cd.Category, account.BalanceMap[CREDIT+cd.Direction], "")...)
balances = append(balances, account.getBalancesForPrefix(cd.Destination, cd.Category, account.BalanceMap[cd.TOR+cd.Direction], "")...)
// gather all shared group ids
var sharedGroupIds []string
for _, cb := range creditBalances {
if cb.SharedGroup != "" {
sharedGroupIds = append(sharedGroupIds, cb.SharedGroup)
}
}
for _, mb := range unitBalances {
if mb.SharedGroup != "" {
sharedGroupIds = append(sharedGroupIds, mb.SharedGroup)
for _, b := range balances {
if b.SharedGroup != "" {
sharedGroupIds = append(sharedGroupIds, b.SharedGroup)
}
}
var memberIds []string
@@ -588,7 +584,7 @@ func (account *Account) GetUniqueSharedGroupMembers(destination, direction, cate
Logger.Warning(fmt.Sprintf("Could not get shared group: %v", sgID))
return nil, err
}
for _, memberId := range sharedGroup.GetMembersExceptUser(account.Id) {
for _, memberId := range sharedGroup.MemberIds {
if !utils.IsSliceMember(memberIds, memberId) {
memberIds = append(memberIds, memberId)
}

View File

@@ -22,66 +22,25 @@ import (
"sync"
)
var AccLock *AccountLock
func init() {
AccLock = NewAccountLock()
}
// global package variable
var AccLock = &AccountLock{queue: make(map[string]chan bool)}
type AccountLock struct {
queue map[string]chan bool
sync.RWMutex
mu sync.Mutex
}
func NewAccountLock() *AccountLock {
return &AccountLock{queue: make(map[string]chan bool)}
}
func (cm *AccountLock) GuardGetCost(name string, handler func() (*CallCost, error)) (reply *CallCost, err error) {
cm.RLock()
lock, exists := AccLock.queue[name]
cm.RUnlock()
if !exists {
cm.Lock()
lock = make(chan bool, 1)
AccLock.queue[name] = lock
cm.Unlock()
}
lock <- true
reply, err = handler()
<-lock
return
}
func (cm *AccountLock) Guard(name string, handler func() (float64, error)) (reply float64, err error) {
cm.RLock()
lock, exists := AccLock.queue[name]
cm.RUnlock()
if !exists {
cm.Lock()
lock = make(chan bool, 1)
AccLock.queue[name] = lock
cm.Unlock()
}
lock <- true
reply, err = handler()
<-lock
return
}
func (cm *AccountLock) GuardMany(names []string, handler func() (float64, error)) (reply float64, err error) {
func (cm *AccountLock) Guard(handler func() (interface{}, error), names ...string) (reply interface{}, err error) {
cm.mu.Lock()
for _, name := range names {
cm.RLock()
lock, exists := AccLock.queue[name]
cm.RUnlock()
if !exists {
cm.Lock()
lock = make(chan bool, 1)
AccLock.queue[name] = lock
cm.Unlock()
}
lock <- true
}
cm.mu.Unlock()
reply, err = handler()
for _, name := range names {
lock := AccLock.queue[name]

View File

@@ -25,23 +25,23 @@ import (
)
func ATestAccountLock(t *testing.T) {
go AccLock.Guard("1", func() (float64, error) {
go AccLock.Guard(func() (interface{}, error) {
log.Print("first 1")
time.Sleep(1 * time.Second)
log.Print("end first 1")
return 0, nil
})
go AccLock.Guard("2", func() (float64, error) {
}, "1")
go AccLock.Guard(func() (interface{}, error) {
log.Print("first 2")
time.Sleep(1 * time.Second)
log.Print("end first 2")
return 0, nil
})
go AccLock.Guard("1", func() (float64, error) {
}, "2")
go AccLock.Guard(func() (interface{}, error) {
log.Print("second 1")
time.Sleep(1 * time.Second)
log.Print("end second 1")
return 0, nil
})
}, "1")
time.Sleep(3 * time.Second)
}

View File

@@ -257,7 +257,7 @@ func (at *ActionTiming) Execute() (err error) {
return
}
for _, ubId := range at.AccountIds {
_, err := AccLock.Guard(ubId, func() (float64, error) {
_, err := AccLock.Guard(func() (interface{}, error) {
ub, err := accountingStorage.GetAccount(ubId)
if err != nil {
Logger.Warning(fmt.Sprintf("Could not get user balances for this id: %s. Skipping!", ubId))
@@ -270,7 +270,7 @@ func (at *ActionTiming) Execute() (err error) {
//Logger.Info(fmt.Sprintf("After execute, account: %+v", ub))
accountingStorage.SetAccount(ub)
return 0, nil
})
}, ubId)
if err != nil {
Logger.Warning(fmt.Sprintf("Error executing action timing: %v", err))
}

View File

@@ -501,11 +501,11 @@ func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err e
Logger.Err(fmt.Sprintf("Could not get user balance for <%s>: %s.", cd.GetAccountKey(), err.Error()))
return 0, err
} else {
if memberIds, err := account.GetUniqueSharedGroupMembers(cd.Destination, cd.Direction, cd.Category, cd.TOR); err == nil {
AccLock.GuardMany(memberIds, func() (float64, error) {
if memberIds, err := account.GetUniqueSharedGroupMembers(cd); err == nil {
AccLock.Guard(func() (interface{}, error) {
duration, err = cd.getMaxSessionDuration(account)
return 0, err
})
}, memberIds...)
} else {
return 0, err
}
@@ -550,11 +550,11 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
Logger.Err(fmt.Sprintf("Could not get user balance for <%s>: %s.", cd.GetAccountKey(), err.Error()))
return nil, err
} else {
if memberIds, err := account.GetUniqueSharedGroupMembers(cd.Destination, cd.Direction, cd.Category, cd.TOR); err == nil {
AccLock.GuardMany(memberIds, func() (float64, error) {
if memberIds, err := account.GetUniqueSharedGroupMembers(cd); err == nil {
AccLock.Guard(func() (interface{}, error) {
cc, err = cd.debit(account, false, true)
return 0, err
})
}, memberIds...)
} else {
return nil, err
}
@@ -572,8 +572,8 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) {
return nil, err
} else {
//log.Printf("ACC: %+v", account)
if memberIds, err := account.GetUniqueSharedGroupMembers(cd.Destination, cd.Direction, cd.Category, cd.TOR); err == nil {
AccLock.GuardMany(memberIds, func() (float64, error) {
if memberIds, err := account.GetUniqueSharedGroupMembers(cd); err == nil {
AccLock.Guard(func() (interface{}, error) {
remainingDuration, err := cd.getMaxSessionDuration(account)
//log.Print("AFTER MAX SESSION: ", cd)
if err != nil || remainingDuration == 0 {
@@ -589,7 +589,7 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) {
cc, err = cd.debit(account, false, true)
//log.Print(balanceMap[0].Value, balanceMap[1].Value)
return 0, err
})
}, memberIds...)
} else {
return nil, err
}
@@ -657,7 +657,7 @@ func (cd *CallDescriptor) Clone() *CallDescriptor {
}
}
func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
func (cd *CallDescriptor) GetLCR(stats StatsInterface) (LCRCost, error) {
lcr, err := dataStorage.GetLCR(cd.GetLCRKey(""), false)
if err != nil || lcr == nil {
// try the *any customer
@@ -666,26 +666,24 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
}
}
lcr.Sort()
lcrCost := &LCRCost{
TimeSpans: []*LCRTimeSpan{&LCRTimeSpan{StartTime: cd.TimeStart}},
}
lcrCost := LCRCost{&LCRTimeSpan{StartTime: cd.TimeStart}}
for _, lcrActivation := range lcr.Activations {
// TODO: filer entry by destination
lcrEntry := lcrActivation.GetLCREntryForPrefix(cd.Destination)
if lcrActivation.ActivationTime.Before(cd.TimeStart) ||
lcrActivation.ActivationTime.Equal(cd.TimeStart) {
lcrCost.TimeSpans[0].Entry = lcrEntry
lcrCost[0].Entry = lcrEntry
} else {
if lcrActivation.ActivationTime.Before(cd.TimeEnd) {
// add lcr timespan
lcrCost.TimeSpans = append(lcrCost.TimeSpans, &LCRTimeSpan{
lcrCost = append(lcrCost, &LCRTimeSpan{
StartTime: lcrActivation.ActivationTime,
Entry: lcrEntry,
})
}
}
}
for _, ts := range lcrCost.TimeSpans {
for _, ts := range lcrCost {
if ts.Entry.Strategy == LCR_STRATEGY_STATIC {
for _, supplier := range ts.Entry.GetSuppliers() {
lcrCD := cd.Clone()
@@ -718,8 +716,9 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
if cd.account, err = accountingStorage.GetAccount(cd.GetAccountKey()); err != nil {
continue
}
if ts.Entry.Strategy == LCR_STRATEGY_QOS_WITH_THRESHOLD {
// get stats and filter suppliers by qos thresholds
var asr, acd float64
var qosSortParams []string
if ts.Entry.Strategy == LCR_STRATEGY_QOS || ts.Entry.Strategy == LCR_STRATEGY_QOS_WITH_THRESHOLD {
rpfKey := utils.ConcatenatedKey(ratingProfileSearchKey, supplier)
if rpf, err := dataStorage.GetRatingProfile(rpfKey, false); err == nil || rpf != nil {
rpf.RatingPlanActivations.Sort()
@@ -749,19 +748,27 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
}
asrValues.Sort()
acdValues.Sort()
asrMin, asrMax, acdMin, acdMax := ts.Entry.GetQOSLimits()
// skip current supplier if off limits
if asrMin > 0 && asrValues[0] < asrMin {
continue
asr = utils.Avg(asrValues)
acd = utils.Avg(acdValues)
if ts.Entry.Strategy == LCR_STRATEGY_QOS_WITH_THRESHOLD {
qosSortParams = ts.Entry.GetQOSSortParams()
}
if asrMax > 0 && asrValues[len(asrValues)-1] > asrMax {
continue
}
if acdMin > 0 && acdValues[0] < float64(acdMin) {
continue
}
if acdMax > 0 && acdValues[len(acdValues)-1] > float64(acdMax) {
continue
if ts.Entry.Strategy == LCR_STRATEGY_QOS_WITH_THRESHOLD {
// filter suppliers by qos thresholds
asrMin, asrMax, acdMin, acdMax := ts.Entry.GetQOSLimits()
// skip current supplier if off limits
if asrMin > 0 && asrValues[0] < asrMin {
continue
}
if asrMax > 0 && asrValues[len(asrValues)-1] > asrMax {
continue
}
if acdMin > 0 && acdValues[0] < float64(acdMin) {
continue
}
if acdMax > 0 && acdValues[len(acdValues)-1] > float64(acdMax) {
continue
}
}
}
}
@@ -774,6 +781,11 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
ts.SupplierCosts = append(ts.SupplierCosts, &LCRSupplierCost{
Supplier: supplier,
Cost: cc.Cost,
QOS: map[string]float64{
"ASR": asr,
"ACD": acd,
},
qosSortParams: qosSortParams,
})
}
}

View File

@@ -57,9 +57,7 @@ type LCREntry struct {
precision int
}
type LCRCost struct {
TimeSpans []*LCRTimeSpan
}
type LCRCost []*LCRTimeSpan
type LCRTimeSpan struct {
StartTime time.Time
@@ -68,9 +66,11 @@ type LCRTimeSpan struct {
}
type LCRSupplierCost struct {
Supplier string
Cost float64
Error error
Supplier string
Cost float64
Error error
QOS map[string]float64
qosSortParams []string
}
func (lcr *LCR) GetId() string {
@@ -94,7 +94,7 @@ func (lcr *LCR) Sort() {
}
func (le *LCREntry) GetSuppliers() []string {
suppliers := strings.Split(le.StrategyParams, ";")
suppliers := strings.Split(le.StrategyParams, utils.INFIELD_SEP)
for i := 0; i < len(suppliers); i++ {
suppliers[i] = strings.TrimSpace(suppliers[i])
}
@@ -103,7 +103,7 @@ func (le *LCREntry) GetSuppliers() []string {
func (le *LCREntry) GetQOSLimits() (minASR, maxASR float64, minACD, maxACD time.Duration) {
// MIN_ASR;MAX_ASR;MIN_ACD;MAX_ACD
params := strings.Split(le.StrategyParams, ";")
params := strings.Split(le.StrategyParams, utils.INFIELD_SEP)
if len(params) == 4 {
var err error
if minASR, err = strconv.ParseFloat(params[0], 64); err != nil {
@@ -122,6 +122,14 @@ func (le *LCREntry) GetQOSLimits() (minASR, maxASR float64, minACD, maxACD time.
return
}
func (le *LCREntry) GetQOSSortParams() []string {
// ASR;ACD
if params := strings.Split(le.StrategyParams, utils.INFIELD_SEP); len(params) > 0 {
return params
}
return []string{ASR, ACD}
}
type LCREntriesSorter []*LCREntry
func (es LCREntriesSorter) Len() int {
@@ -177,6 +185,8 @@ func (lts *LCRTimeSpan) Sort() {
sort.Sort(LowestSupplierCostSorter(lts.SupplierCosts))
case LCR_STRATEGY_HIGHEST:
sort.Sort(HighestSupplierCostSorter(lts.SupplierCosts))
case LCR_STRATEGY_QOS:
sort.Sort(QOSSorter(lts.SupplierCosts))
}
}
@@ -207,3 +217,25 @@ func (hscs HighestSupplierCostSorter) Swap(i, j int) {
func (hscs HighestSupplierCostSorter) Less(i, j int) bool {
return hscs[i].Cost > hscs[j].Cost
}
type QOSSorter []*LCRSupplierCost
func (qoss QOSSorter) Len() int {
return len(qoss)
}
func (qoss QOSSorter) Swap(i, j int) {
qoss[i], qoss[j] = qoss[j], qoss[i]
}
func (qoss QOSSorter) Less(i, j int) bool {
for _, param := range qoss[i].qosSortParams {
if qoss[i].QOS[param] < qoss[j].QOS[param] {
return true
}
if qoss[i].QOS[param] == qoss[j].QOS[param] {
continue
}
}
return false
}

View File

@@ -17,3 +17,72 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"sort"
"testing"
)
func TestLcrQOSSorter(t *testing.T) {
s := QOSSorter{
&LCRSupplierCost{
QOS: map[string]float64{
"ASR": 3,
"ACD": 3,
},
qosSortParams: []string{ASR, ACD},
},
&LCRSupplierCost{
QOS: map[string]float64{
"ASR": 1,
"ACD": 1,
},
qosSortParams: []string{ASR, ACD},
},
&LCRSupplierCost{
QOS: map[string]float64{
"ASR": 2,
"ACD": 2,
},
qosSortParams: []string{ASR, ACD},
},
}
sort.Sort(s)
if s[0].QOS[ASR] != 1 ||
s[1].QOS[ASR] != 2 ||
s[2].QOS[ASR] != 3 {
t.Error("Lcr qos sort failed: ", s)
}
}
func TestLcrQOSSorterOACD(t *testing.T) {
s := QOSSorter{
&LCRSupplierCost{
QOS: map[string]float64{
"ASR": 1,
"ACD": 3,
},
qosSortParams: []string{ASR, ACD},
},
&LCRSupplierCost{
QOS: map[string]float64{
"ASR": 1,
"ACD": 1,
},
qosSortParams: []string{ASR, ACD},
},
&LCRSupplierCost{
QOS: map[string]float64{
"ASR": 1,
"ACD": 2,
},
qosSortParams: []string{ASR, ACD},
},
}
sort.Sort(s)
if s[0].QOS[ACD] != 1 ||
s[1].QOS[ACD] != 2 ||
s[2].QOS[ACD] != 3 {
t.Error("Lcr qos sort failed: ", s)
}
}

View File

@@ -55,13 +55,13 @@ func (rs *Responder) GetCost(arg CallDescriptor, reply *CallCost) (err error) {
r, e := rs.getCallCost(&arg, "Responder.GetCost")
*reply, err = *r, e
} else {
r, e := AccLock.GuardGetCost(arg.GetAccountKey(), func() (*CallCost, error) {
r, e := AccLock.Guard(func() (interface{}, error) {
return arg.GetCost()
})
}, arg.GetAccountKey())
if e != nil {
return e
} else if r != nil {
*reply = *r
*reply = *r.(*CallCost)
}
}
return
@@ -72,9 +72,7 @@ func (rs *Responder) Debit(arg CallDescriptor, reply *CallCost) (err error) {
r, e := rs.getCallCost(&arg, "Responder.Debit")
*reply, err = *r, e
} else {
r, e := AccLock.GuardGetCost(arg.GetAccountKey(), func() (*CallCost, error) {
return arg.Debit()
})
r, e := arg.Debit()
if e != nil {
return e
} else if r != nil {
@@ -89,9 +87,7 @@ func (rs *Responder) MaxDebit(arg CallDescriptor, reply *CallCost) (err error) {
r, e := rs.getCallCost(&arg, "Responder.MaxDebit")
*reply, err = *r, e
} else {
r, e := AccLock.GuardGetCost(arg.GetAccountKey(), func() (*CallCost, error) {
return arg.MaxDebit()
})
r, e := arg.MaxDebit()
if e != nil {
return e
} else if r != nil {
@@ -105,10 +101,10 @@ func (rs *Responder) RefundIncrements(arg CallDescriptor, reply *float64) (err e
if rs.Bal != nil {
*reply, err = rs.callMethod(&arg, "Responder.RefundIncrements")
} else {
r, e := AccLock.Guard(arg.GetAccountKey(), func() (float64, error) {
r, e := AccLock.Guard(func() (interface{}, error) {
return arg.RefundIncrements()
})
*reply, err = r, e
}, arg.GetAccountKey())
*reply, err = r.(float64), e
}
return
}
@@ -117,11 +113,8 @@ func (rs *Responder) GetMaxSessionTime(arg CallDescriptor, reply *float64) (err
if rs.Bal != nil {
*reply, err = rs.callMethod(&arg, "Responder.GetMaxSessionTime")
} else {
r, e := AccLock.Guard(arg.GetAccountKey(), func() (float64, error) {
d, err := arg.GetMaxSessionDuration()
return float64(d), err
})
*reply, err = r, e
r, e := arg.GetMaxSessionDuration()
*reply, err = float64(r), e
}
return
}
@@ -244,7 +237,7 @@ func (rs *Responder) ProcessCdr(cdr *StoredCdr, reply *string) error {
func (rs *Responder) GetLCR(cd *CallDescriptor, reply *LCRCost) error {
lcrCost, err := cd.GetLCR(rs.Stats)
*reply = *lcrCost
*reply = lcrCost
return err
}
@@ -252,10 +245,10 @@ func (rs *Responder) FlushCache(arg CallDescriptor, reply *float64) (err error)
if rs.Bal != nil {
*reply, err = rs.callMethod(&arg, "Responder.FlushCache")
} else {
r, e := AccLock.Guard(arg.GetAccountKey(), func() (float64, error) {
r, e := AccLock.Guard(func() (interface{}, error) {
return 0, arg.FlushCache()
})
*reply, err = r, e
}, arg.GetAccountKey())
*reply, err = r.(float64), e
}
return
}
@@ -294,11 +287,10 @@ func (rs *Responder) getCallCost(key *CallDescriptor, method string) (reply *Cal
Logger.Info("<Balancer> Waiting for raters to register...")
time.Sleep(1 * time.Second) // wait one second and retry
} else {
reply = &CallCost{}
reply, err = AccLock.GuardGetCost(key.GetAccountKey(), func() (*CallCost, error) {
_, err = AccLock.Guard(func() (interface{}, error) {
err = client.Call(method, *key, reply)
return reply, err
})
}, key.GetAccountKey())
if err != nil {
Logger.Err(fmt.Sprintf("<Balancer> Got en error from rater: %v", err))
}
@@ -318,10 +310,10 @@ func (rs *Responder) callMethod(key *CallDescriptor, method string) (reply float
Logger.Info("Waiting for raters to register...")
time.Sleep(1 * time.Second) // wait one second and retry
} else {
reply, err = AccLock.Guard(key.GetAccountKey(), func() (float64, error) {
_, err = AccLock.Guard(func() (interface{}, error) {
err = client.Call(method, *key, &reply)
return reply, err
})
}, key.GetAccountKey())
if err != nil {
Logger.Info(fmt.Sprintf("Got en error from rater: %v", err))
}

View File

@@ -49,18 +49,6 @@ type SharingParameters struct {
RatingSubject string
}
func (sg *SharedGroup) GetMembersExceptUser(ubId string) []string {
for i, m := range sg.MemberIds {
if m == ubId {
a := make([]string, len(sg.MemberIds))
copy(a, sg.MemberIds)
a[i], a = a[len(a)-1], a[:len(a)-1]
return a
}
}
return sg.MemberIds
}
func (sg *SharedGroup) SortBalancesByStrategy(myBalance *Balance, bc BalanceChain) BalanceChain {
sharingParameters := sg.AccountParameters[utils.ANY]
if sp, hasParamsForAccount := sg.AccountParameters[myBalance.account.Id]; hasParamsForAccount {

View File

@@ -18,24 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"reflect"
"testing"
)
func TestSharedGroupGetMembersExcept(t *testing.T) {
sg := &SharedGroup{
MemberIds: []string{"1", "2", "3"},
}
a1 := sg.GetMembersExceptUser("1")
a2 := sg.GetMembersExceptUser("2")
a3 := sg.GetMembersExceptUser("3")
if !reflect.DeepEqual(a1, []string{"3", "2"}) ||
!reflect.DeepEqual(a2, []string{"1", "3"}) ||
!reflect.DeepEqual(a3, []string{"1", "2"}) {
t.Error("Error getting shared group members: ", a1, a2, a3)
}
}
import "testing"
func TestSharedPopBalanceByStrategyLow(t *testing.T) {
bc := BalanceChain{

View File

@@ -108,10 +108,10 @@ func (s *Scheduler) LoadActionTimings(storage engine.AccountingStorage) {
newAts = append(newAts, at)
}
if toBeSaved {
engine.AccLock.Guard(engine.ACTION_TIMING_PREFIX, func() (float64, error) {
engine.AccLock.Guard(func() (interface{}, error) {
storage.SetActionTimings(key, newAts)
return 0, nil
})
}, engine.ACTION_TIMING_PREFIX)
}
}
sort.Sort(s.queue)

View File

@@ -41,3 +41,14 @@ func SliceMemberHasPrefix(ss []string, prfx string) bool {
}
return false
}
func Avg(values []float64) float64 {
if len(values) == 0 {
return 0.0
}
var sum float64
for _, val := range values {
sum += val
}
return sum / float64(len(values))
}

View File

@@ -436,3 +436,21 @@ func TestConcatenatedKey(t *testing.T) {
t.Error("Unexpected key value received: ", key)
}
}
func TestAvg(t *testing.T) {
values := []float64{1, 2, 3}
result := Avg(values)
expected := 2.0
if expected != result {
t.Errorf("Wrong Avg: expected %v got %v", expected, result)
}
}
func TestAvgEmpty(t *testing.T) {
values := []float64{}
result := Avg(values)
expected := 0.0
if expected != result {
t.Errorf("Wrong Avg: expected %v got %v", expected, result)
}
}