simplified account locking and more lcr qos

This commit is contained in:
Radu Ioan Fericean
2015-04-03 19:20:25 +03:00
parent a8038687fd
commit 4926e33528
8 changed files with 138 additions and 87 deletions

View File

@@ -30,22 +30,22 @@ func init() {
type AccountLock struct {
queue map[string]chan bool
sync.RWMutex
mu sync.RWMutex
}
func NewAccountLock() *AccountLock {
return &AccountLock{queue: make(map[string]chan bool)}
}
func (cm *AccountLock) GuardGetCost(name string, handler func() (*CallCost, error)) (reply *CallCost, err error) {
cm.RLock()
func (cm *AccountLock) GuardCallCost(handler func() (*CallCost, error), name string) (reply *CallCost, err error) {
cm.mu.RLock()
lock, exists := AccLock.queue[name]
cm.RUnlock()
cm.mu.RUnlock()
if !exists {
cm.Lock()
cm.mu.Lock()
lock = make(chan bool, 1)
AccLock.queue[name] = lock
cm.Unlock()
cm.mu.Unlock()
}
lock <- true
reply, err = handler()
@@ -53,32 +53,16 @@ func (cm *AccountLock) GuardGetCost(name string, handler func() (*CallCost, erro
return
}
func (cm *AccountLock) Guard(name string, handler func() (float64, error)) (reply float64, err error) {
cm.RLock()
lock, exists := AccLock.queue[name]
cm.RUnlock()
if !exists {
cm.Lock()
lock = make(chan bool, 1)
AccLock.queue[name] = lock
cm.Unlock()
}
lock <- true
reply, err = handler()
<-lock
return
}
func (cm *AccountLock) GuardMany(names []string, handler func() (float64, error)) (reply float64, err error) {
func (cm *AccountLock) Guard(handler func() (float64, error), names ...string) (reply float64, err error) {
for _, name := range names {
cm.RLock()
cm.mu.RLock()
lock, exists := AccLock.queue[name]
cm.RUnlock()
cm.mu.RUnlock()
if !exists {
cm.Lock()
cm.mu.Lock()
lock = make(chan bool, 1)
AccLock.queue[name] = lock
cm.Unlock()
cm.mu.Unlock()
}
lock <- true
}

View File

@@ -25,23 +25,23 @@ import (
)
func ATestAccountLock(t *testing.T) {
go AccLock.Guard("1", func() (float64, error) {
go AccLock.Guard(func() (float64, error) {
log.Print("first 1")
time.Sleep(1 * time.Second)
log.Print("end first 1")
return 0, nil
})
go AccLock.Guard("2", func() (float64, error) {
}, "1")
go AccLock.Guard(func() (float64, error) {
log.Print("first 2")
time.Sleep(1 * time.Second)
log.Print("end first 2")
return 0, nil
})
go AccLock.Guard("1", func() (float64, error) {
}, "2")
go AccLock.Guard(func() (float64, error) {
log.Print("second 1")
time.Sleep(1 * time.Second)
log.Print("end second 1")
return 0, nil
})
}, "1")
time.Sleep(3 * time.Second)
}

View File

@@ -257,7 +257,7 @@ func (at *ActionTiming) Execute() (err error) {
return
}
for _, ubId := range at.AccountIds {
_, err := AccLock.Guard(ubId, func() (float64, error) {
_, err := AccLock.Guard(func() (float64, error) {
ub, err := accountingStorage.GetAccount(ubId)
if err != nil {
Logger.Warning(fmt.Sprintf("Could not get user balances for this id: %s. Skipping!", ubId))
@@ -270,7 +270,7 @@ func (at *ActionTiming) Execute() (err error) {
//Logger.Info(fmt.Sprintf("After execute, account: %+v", ub))
accountingStorage.SetAccount(ub)
return 0, nil
})
}, ubId)
if err != nil {
Logger.Warning(fmt.Sprintf("Error executing action timing: %v", err))
}

View File

@@ -502,10 +502,10 @@ func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err e
return 0, err
} else {
if memberIds, err := account.GetUniqueSharedGroupMembers(cd.Destination, cd.Direction, cd.Category, cd.TOR); err == nil {
AccLock.GuardMany(memberIds, func() (float64, error) {
AccLock.Guard(func() (float64, error) {
duration, err = cd.getMaxSessionDuration(account)
return 0, err
})
}, memberIds...)
} else {
return 0, err
}
@@ -551,10 +551,10 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
return nil, err
} else {
if memberIds, err := account.GetUniqueSharedGroupMembers(cd.Destination, cd.Direction, cd.Category, cd.TOR); err == nil {
AccLock.GuardMany(memberIds, func() (float64, error) {
AccLock.Guard(func() (float64, error) {
cc, err = cd.debit(account, false, true)
return 0, err
})
}, memberIds...)
} else {
return nil, err
}
@@ -573,7 +573,7 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) {
} else {
//log.Printf("ACC: %+v", account)
if memberIds, err := account.GetUniqueSharedGroupMembers(cd.Destination, cd.Direction, cd.Category, cd.TOR); err == nil {
AccLock.GuardMany(memberIds, func() (float64, error) {
AccLock.Guard(func() (float64, error) {
remainingDuration, err := cd.getMaxSessionDuration(account)
//log.Print("AFTER MAX SESSION: ", cd)
if err != nil || remainingDuration == 0 {
@@ -589,7 +589,7 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) {
cc, err = cd.debit(account, false, true)
//log.Print(balanceMap[0].Value, balanceMap[1].Value)
return 0, err
})
}, memberIds...)
} else {
return nil, err
}
@@ -657,7 +657,7 @@ func (cd *CallDescriptor) Clone() *CallDescriptor {
}
}
func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
func (cd *CallDescriptor) GetLCR(stats StatsInterface) (LCRCost, error) {
lcr, err := dataStorage.GetLCR(cd.GetLCRKey(""), false)
if err != nil || lcr == nil {
// try the *any customer
@@ -666,26 +666,24 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
}
}
lcr.Sort()
lcrCost := &LCRCost{
TimeSpans: []*LCRTimeSpan{&LCRTimeSpan{StartTime: cd.TimeStart}},
}
lcrCost := LCRCost{&LCRTimeSpan{StartTime: cd.TimeStart}}
for _, lcrActivation := range lcr.Activations {
// TODO: filer entry by destination
lcrEntry := lcrActivation.GetLCREntryForPrefix(cd.Destination)
if lcrActivation.ActivationTime.Before(cd.TimeStart) ||
lcrActivation.ActivationTime.Equal(cd.TimeStart) {
lcrCost.TimeSpans[0].Entry = lcrEntry
lcrCost[0].Entry = lcrEntry
} else {
if lcrActivation.ActivationTime.Before(cd.TimeEnd) {
// add lcr timespan
lcrCost.TimeSpans = append(lcrCost.TimeSpans, &LCRTimeSpan{
lcrCost = append(lcrCost, &LCRTimeSpan{
StartTime: lcrActivation.ActivationTime,
Entry: lcrEntry,
})
}
}
}
for _, ts := range lcrCost.TimeSpans {
for _, ts := range lcrCost {
if ts.Entry.Strategy == LCR_STRATEGY_STATIC {
for _, supplier := range ts.Entry.GetSuppliers() {
lcrCD := cd.Clone()
@@ -718,8 +716,9 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
if cd.account, err = accountingStorage.GetAccount(cd.GetAccountKey()); err != nil {
continue
}
if ts.Entry.Strategy == LCR_STRATEGY_QOS_WITH_THRESHOLD {
// get stats and filter suppliers by qos thresholds
var asr, acd float64
var qosSortParams []string
if ts.Entry.Strategy == LCR_STRATEGY_QOS || ts.Entry.Strategy == LCR_STRATEGY_QOS_WITH_THRESHOLD {
rpfKey := utils.ConcatenatedKey(ratingProfileSearchKey, supplier)
if rpf, err := dataStorage.GetRatingProfile(rpfKey, false); err == nil || rpf != nil {
rpf.RatingPlanActivations.Sort()
@@ -749,19 +748,27 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
}
asrValues.Sort()
acdValues.Sort()
asrMin, asrMax, acdMin, acdMax := ts.Entry.GetQOSLimits()
// skip current supplier if off limits
if asrMin > 0 && asrValues[0] < asrMin {
continue
asr = utils.Avg(asrValues)
acd = utils.Avg(acdValues)
if ts.Entry.Strategy == LCR_STRATEGY_QOS_WITH_THRESHOLD {
qosSortParams = ts.Entry.GetQOSSortParams()
}
if asrMax > 0 && asrValues[len(asrValues)-1] > asrMax {
continue
}
if acdMin > 0 && acdValues[0] < float64(acdMin) {
continue
}
if acdMax > 0 && acdValues[len(acdValues)-1] > float64(acdMax) {
continue
if ts.Entry.Strategy == LCR_STRATEGY_QOS_WITH_THRESHOLD {
// filter suppliers by qos thresholds
asrMin, asrMax, acdMin, acdMax := ts.Entry.GetQOSLimits()
// skip current supplier if off limits
if asrMin > 0 && asrValues[0] < asrMin {
continue
}
if asrMax > 0 && asrValues[len(asrValues)-1] > asrMax {
continue
}
if acdMin > 0 && acdValues[0] < float64(acdMin) {
continue
}
if acdMax > 0 && acdValues[len(acdValues)-1] > float64(acdMax) {
continue
}
}
}
}
@@ -774,6 +781,11 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
ts.SupplierCosts = append(ts.SupplierCosts, &LCRSupplierCost{
Supplier: supplier,
Cost: cc.Cost,
QOS: map[string]float64{
"ASR": asr,
"ACD": acd,
},
qosSortParams: qosSortParams,
})
}
}

View File

@@ -57,9 +57,7 @@ type LCREntry struct {
precision int
}
type LCRCost struct {
TimeSpans []*LCRTimeSpan
}
type LCRCost []*LCRTimeSpan
type LCRTimeSpan struct {
StartTime time.Time
@@ -68,9 +66,11 @@ type LCRTimeSpan struct {
}
type LCRSupplierCost struct {
Supplier string
Cost float64
Error error
Supplier string
Cost float64
Error error
QOS map[string]float64
qosSortParams []string
}
func (lcr *LCR) GetId() string {
@@ -94,7 +94,7 @@ func (lcr *LCR) Sort() {
}
func (le *LCREntry) GetSuppliers() []string {
suppliers := strings.Split(le.StrategyParams, ";")
suppliers := strings.Split(le.StrategyParams, utils.INFIELD_SEP)
for i := 0; i < len(suppliers); i++ {
suppliers[i] = strings.TrimSpace(suppliers[i])
}
@@ -103,7 +103,7 @@ func (le *LCREntry) GetSuppliers() []string {
func (le *LCREntry) GetQOSLimits() (minASR, maxASR float64, minACD, maxACD time.Duration) {
// MIN_ASR;MAX_ASR;MIN_ACD;MAX_ACD
params := strings.Split(le.StrategyParams, ";")
params := strings.Split(le.StrategyParams, utils.INFIELD_SEP)
if len(params) == 4 {
var err error
if minASR, err = strconv.ParseFloat(params[0], 64); err != nil {
@@ -122,6 +122,14 @@ func (le *LCREntry) GetQOSLimits() (minASR, maxASR float64, minACD, maxACD time.
return
}
func (le *LCREntry) GetQOSSortParams() []string {
// ASR;ACD
if params := strings.Split(le.StrategyParams, utils.INFIELD_SEP); len(params) > 0 {
return params
}
return []string{ASR, ACD}
}
type LCREntriesSorter []*LCREntry
func (es LCREntriesSorter) Len() int {
@@ -177,6 +185,8 @@ func (lts *LCRTimeSpan) Sort() {
sort.Sort(LowestSupplierCostSorter(lts.SupplierCosts))
case LCR_STRATEGY_HIGHEST:
sort.Sort(HighestSupplierCostSorter(lts.SupplierCosts))
case LCR_STRATEGY_QOS:
sort.Sort(QOSSorter(lts.SupplierCosts))
}
}
@@ -207,3 +217,19 @@ func (hscs HighestSupplierCostSorter) Swap(i, j int) {
func (hscs HighestSupplierCostSorter) Less(i, j int) bool {
return hscs[i].Cost > hscs[j].Cost
}
type QOSSorter []*LCRSupplierCost
func (qoss QOSSorter) Len() int {
return len(qoss)
}
func (qoss QOSSorter) Swap(i, j int) {
qoss[i], qoss[j] = qoss[j], qoss[i]
}
func (qoss QOSSorter) Less(i, j int) bool {
//for _, param := range qoss[i].qosSortParams
//qoss[i].Cost > qoss[j].Cost
return false
}

View File

@@ -55,9 +55,9 @@ func (rs *Responder) GetCost(arg CallDescriptor, reply *CallCost) (err error) {
r, e := rs.getCallCost(&arg, "Responder.GetCost")
*reply, err = *r, e
} else {
r, e := AccLock.GuardGetCost(arg.GetAccountKey(), func() (*CallCost, error) {
r, e := AccLock.GuardCallCost(func() (*CallCost, error) {
return arg.GetCost()
})
}, arg.GetAccountKey())
if e != nil {
return e
} else if r != nil {
@@ -72,9 +72,9 @@ func (rs *Responder) Debit(arg CallDescriptor, reply *CallCost) (err error) {
r, e := rs.getCallCost(&arg, "Responder.Debit")
*reply, err = *r, e
} else {
r, e := AccLock.GuardGetCost(arg.GetAccountKey(), func() (*CallCost, error) {
r, e := AccLock.GuardCallCost(func() (*CallCost, error) {
return arg.Debit()
})
}, arg.GetAccountKey())
if e != nil {
return e
} else if r != nil {
@@ -89,9 +89,9 @@ func (rs *Responder) MaxDebit(arg CallDescriptor, reply *CallCost) (err error) {
r, e := rs.getCallCost(&arg, "Responder.MaxDebit")
*reply, err = *r, e
} else {
r, e := AccLock.GuardGetCost(arg.GetAccountKey(), func() (*CallCost, error) {
r, e := AccLock.GuardCallCost(func() (*CallCost, error) {
return arg.MaxDebit()
})
}, arg.GetAccountKey())
if e != nil {
return e
} else if r != nil {
@@ -105,9 +105,9 @@ func (rs *Responder) RefundIncrements(arg CallDescriptor, reply *float64) (err e
if rs.Bal != nil {
*reply, err = rs.callMethod(&arg, "Responder.RefundIncrements")
} else {
r, e := AccLock.Guard(arg.GetAccountKey(), func() (float64, error) {
r, e := AccLock.Guard(func() (float64, error) {
return arg.RefundIncrements()
})
}, arg.GetAccountKey())
*reply, err = r, e
}
return
@@ -117,10 +117,10 @@ func (rs *Responder) GetMaxSessionTime(arg CallDescriptor, reply *float64) (err
if rs.Bal != nil {
*reply, err = rs.callMethod(&arg, "Responder.GetMaxSessionTime")
} else {
r, e := AccLock.Guard(arg.GetAccountKey(), func() (float64, error) {
r, e := AccLock.Guard(func() (float64, error) {
d, err := arg.GetMaxSessionDuration()
return float64(d), err
})
}, arg.GetAccountKey())
*reply, err = r, e
}
return
@@ -244,7 +244,7 @@ func (rs *Responder) ProcessCdr(cdr *StoredCdr, reply *string) error {
func (rs *Responder) GetLCR(cd *CallDescriptor, reply *LCRCost) error {
lcrCost, err := cd.GetLCR(rs.Stats)
*reply = *lcrCost
*reply = lcrCost
return err
}
@@ -252,9 +252,9 @@ func (rs *Responder) FlushCache(arg CallDescriptor, reply *float64) (err error)
if rs.Bal != nil {
*reply, err = rs.callMethod(&arg, "Responder.FlushCache")
} else {
r, e := AccLock.Guard(arg.GetAccountKey(), func() (float64, error) {
r, e := AccLock.Guard(func() (float64, error) {
return 0, arg.FlushCache()
})
}, arg.GetAccountKey())
*reply, err = r, e
}
return
@@ -295,10 +295,10 @@ func (rs *Responder) getCallCost(key *CallDescriptor, method string) (reply *Cal
time.Sleep(1 * time.Second) // wait one second and retry
} else {
reply = &CallCost{}
reply, err = AccLock.GuardGetCost(key.GetAccountKey(), func() (*CallCost, error) {
reply, err = AccLock.GuardCallCost(func() (*CallCost, error) {
err = client.Call(method, *key, reply)
return reply, err
})
}, key.GetAccountKey())
if err != nil {
Logger.Err(fmt.Sprintf("<Balancer> Got en error from rater: %v", err))
}
@@ -318,10 +318,10 @@ func (rs *Responder) callMethod(key *CallDescriptor, method string) (reply float
Logger.Info("Waiting for raters to register...")
time.Sleep(1 * time.Second) // wait one second and retry
} else {
reply, err = AccLock.Guard(key.GetAccountKey(), func() (float64, error) {
reply, err = AccLock.Guard(func() (float64, error) {
err = client.Call(method, *key, &reply)
return reply, err
})
}, key.GetAccountKey())
if err != nil {
Logger.Info(fmt.Sprintf("Got en error from rater: %v", err))
}

View File

@@ -41,3 +41,14 @@ func SliceMemberHasPrefix(ss []string, prfx string) bool {
}
return false
}
func Avg(values []float64) float64 {
if len(values) == 0 {
return 0.0
}
var sum float64
for _, val := range values {
sum += val
}
return sum / float64(len(values))
}

View File

@@ -436,3 +436,21 @@ func TestConcatenatedKey(t *testing.T) {
t.Error("Unexpected key value received: ", key)
}
}
func TestAvg(t *testing.T) {
values := []float64{1, 2, 3}
result := Avg(values)
expected := 2.0
if expected != result {
t.Errorf("Wrong Avg: expected %v got %v", expected, result)
}
}
func TestAvgEmpty(t *testing.T) {
values := []float64{}
result := Avg(values)
expected := 0.0
if expected != result {
t.Errorf("Wrong Avg: expected %v got %v", expected, result)
}
}