diff --git a/engine/accountlock.go b/engine/accountlock.go index 9f77e078f..5309f75b8 100644 --- a/engine/accountlock.go +++ b/engine/accountlock.go @@ -30,22 +30,22 @@ func init() { type AccountLock struct { queue map[string]chan bool - sync.RWMutex + mu sync.RWMutex } func NewAccountLock() *AccountLock { return &AccountLock{queue: make(map[string]chan bool)} } -func (cm *AccountLock) GuardGetCost(name string, handler func() (*CallCost, error)) (reply *CallCost, err error) { - cm.RLock() +func (cm *AccountLock) GuardCallCost(handler func() (*CallCost, error), name string) (reply *CallCost, err error) { + cm.mu.RLock() lock, exists := AccLock.queue[name] - cm.RUnlock() + cm.mu.RUnlock() if !exists { - cm.Lock() + cm.mu.Lock() lock = make(chan bool, 1) AccLock.queue[name] = lock - cm.Unlock() + cm.mu.Unlock() } lock <- true reply, err = handler() @@ -53,32 +53,16 @@ func (cm *AccountLock) GuardGetCost(name string, handler func() (*CallCost, erro return } -func (cm *AccountLock) Guard(name string, handler func() (float64, error)) (reply float64, err error) { - cm.RLock() - lock, exists := AccLock.queue[name] - cm.RUnlock() - if !exists { - cm.Lock() - lock = make(chan bool, 1) - AccLock.queue[name] = lock - cm.Unlock() - } - lock <- true - reply, err = handler() - <-lock - return -} - -func (cm *AccountLock) GuardMany(names []string, handler func() (float64, error)) (reply float64, err error) { +func (cm *AccountLock) Guard(handler func() (float64, error), names ...string) (reply float64, err error) { for _, name := range names { - cm.RLock() + cm.mu.RLock() lock, exists := AccLock.queue[name] - cm.RUnlock() + cm.mu.RUnlock() if !exists { - cm.Lock() + cm.mu.Lock() lock = make(chan bool, 1) AccLock.queue[name] = lock - cm.Unlock() + cm.mu.Unlock() } lock <- true } diff --git a/engine/accountlock_test.go b/engine/accountlock_test.go index f9f085956..db2193cd9 100644 --- a/engine/accountlock_test.go +++ b/engine/accountlock_test.go @@ -25,23 +25,23 @@ import ( ) func ATestAccountLock(t *testing.T) { - go AccLock.Guard("1", func() (float64, error) { + go AccLock.Guard(func() (float64, error) { log.Print("first 1") time.Sleep(1 * time.Second) log.Print("end first 1") return 0, nil - }) - go AccLock.Guard("2", func() (float64, error) { + }, "1") + go AccLock.Guard(func() (float64, error) { log.Print("first 2") time.Sleep(1 * time.Second) log.Print("end first 2") return 0, nil - }) - go AccLock.Guard("1", func() (float64, error) { + }, "2") + go AccLock.Guard(func() (float64, error) { log.Print("second 1") time.Sleep(1 * time.Second) log.Print("end second 1") return 0, nil - }) + }, "1") time.Sleep(3 * time.Second) } diff --git a/engine/action_timing.go b/engine/action_timing.go index e22a5f923..dbc4b032c 100644 --- a/engine/action_timing.go +++ b/engine/action_timing.go @@ -257,7 +257,7 @@ func (at *ActionTiming) Execute() (err error) { return } for _, ubId := range at.AccountIds { - _, err := AccLock.Guard(ubId, func() (float64, error) { + _, err := AccLock.Guard(func() (float64, error) { ub, err := accountingStorage.GetAccount(ubId) if err != nil { Logger.Warning(fmt.Sprintf("Could not get user balances for this id: %s. Skipping!", ubId)) @@ -270,7 +270,7 @@ func (at *ActionTiming) Execute() (err error) { //Logger.Info(fmt.Sprintf("After execute, account: %+v", ub)) accountingStorage.SetAccount(ub) return 0, nil - }) + }, ubId) if err != nil { Logger.Warning(fmt.Sprintf("Error executing action timing: %v", err)) } diff --git a/engine/calldesc.go b/engine/calldesc.go index 082028845..809023ae8 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -502,10 +502,10 @@ func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err e return 0, err } else { if memberIds, err := account.GetUniqueSharedGroupMembers(cd.Destination, cd.Direction, cd.Category, cd.TOR); err == nil { - AccLock.GuardMany(memberIds, func() (float64, error) { + AccLock.Guard(func() (float64, error) { duration, err = cd.getMaxSessionDuration(account) return 0, err - }) + }, memberIds...) } else { return 0, err } @@ -551,10 +551,10 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) { return nil, err } else { if memberIds, err := account.GetUniqueSharedGroupMembers(cd.Destination, cd.Direction, cd.Category, cd.TOR); err == nil { - AccLock.GuardMany(memberIds, func() (float64, error) { + AccLock.Guard(func() (float64, error) { cc, err = cd.debit(account, false, true) return 0, err - }) + }, memberIds...) } else { return nil, err } @@ -573,7 +573,7 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) { } else { //log.Printf("ACC: %+v", account) if memberIds, err := account.GetUniqueSharedGroupMembers(cd.Destination, cd.Direction, cd.Category, cd.TOR); err == nil { - AccLock.GuardMany(memberIds, func() (float64, error) { + AccLock.Guard(func() (float64, error) { remainingDuration, err := cd.getMaxSessionDuration(account) //log.Print("AFTER MAX SESSION: ", cd) if err != nil || remainingDuration == 0 { @@ -589,7 +589,7 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) { cc, err = cd.debit(account, false, true) //log.Print(balanceMap[0].Value, balanceMap[1].Value) return 0, err - }) + }, memberIds...) } else { return nil, err } @@ -657,7 +657,7 @@ func (cd *CallDescriptor) Clone() *CallDescriptor { } } -func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) { +func (cd *CallDescriptor) GetLCR(stats StatsInterface) (LCRCost, error) { lcr, err := dataStorage.GetLCR(cd.GetLCRKey(""), false) if err != nil || lcr == nil { // try the *any customer @@ -666,26 +666,24 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) { } } lcr.Sort() - lcrCost := &LCRCost{ - TimeSpans: []*LCRTimeSpan{&LCRTimeSpan{StartTime: cd.TimeStart}}, - } + lcrCost := LCRCost{&LCRTimeSpan{StartTime: cd.TimeStart}} for _, lcrActivation := range lcr.Activations { // TODO: filer entry by destination lcrEntry := lcrActivation.GetLCREntryForPrefix(cd.Destination) if lcrActivation.ActivationTime.Before(cd.TimeStart) || lcrActivation.ActivationTime.Equal(cd.TimeStart) { - lcrCost.TimeSpans[0].Entry = lcrEntry + lcrCost[0].Entry = lcrEntry } else { if lcrActivation.ActivationTime.Before(cd.TimeEnd) { // add lcr timespan - lcrCost.TimeSpans = append(lcrCost.TimeSpans, &LCRTimeSpan{ + lcrCost = append(lcrCost, &LCRTimeSpan{ StartTime: lcrActivation.ActivationTime, Entry: lcrEntry, }) } } } - for _, ts := range lcrCost.TimeSpans { + for _, ts := range lcrCost { if ts.Entry.Strategy == LCR_STRATEGY_STATIC { for _, supplier := range ts.Entry.GetSuppliers() { lcrCD := cd.Clone() @@ -718,8 +716,9 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) { if cd.account, err = accountingStorage.GetAccount(cd.GetAccountKey()); err != nil { continue } - if ts.Entry.Strategy == LCR_STRATEGY_QOS_WITH_THRESHOLD { - // get stats and filter suppliers by qos thresholds + var asr, acd float64 + var qosSortParams []string + if ts.Entry.Strategy == LCR_STRATEGY_QOS || ts.Entry.Strategy == LCR_STRATEGY_QOS_WITH_THRESHOLD { rpfKey := utils.ConcatenatedKey(ratingProfileSearchKey, supplier) if rpf, err := dataStorage.GetRatingProfile(rpfKey, false); err == nil || rpf != nil { rpf.RatingPlanActivations.Sort() @@ -749,19 +748,27 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) { } asrValues.Sort() acdValues.Sort() - asrMin, asrMax, acdMin, acdMax := ts.Entry.GetQOSLimits() - // skip current supplier if off limits - if asrMin > 0 && asrValues[0] < asrMin { - continue + asr = utils.Avg(asrValues) + acd = utils.Avg(acdValues) + if ts.Entry.Strategy == LCR_STRATEGY_QOS_WITH_THRESHOLD { + qosSortParams = ts.Entry.GetQOSSortParams() } - if asrMax > 0 && asrValues[len(asrValues)-1] > asrMax { - continue - } - if acdMin > 0 && acdValues[0] < float64(acdMin) { - continue - } - if acdMax > 0 && acdValues[len(acdValues)-1] > float64(acdMax) { - continue + if ts.Entry.Strategy == LCR_STRATEGY_QOS_WITH_THRESHOLD { + // filter suppliers by qos thresholds + asrMin, asrMax, acdMin, acdMax := ts.Entry.GetQOSLimits() + // skip current supplier if off limits + if asrMin > 0 && asrValues[0] < asrMin { + continue + } + if asrMax > 0 && asrValues[len(asrValues)-1] > asrMax { + continue + } + if acdMin > 0 && acdValues[0] < float64(acdMin) { + continue + } + if acdMax > 0 && acdValues[len(acdValues)-1] > float64(acdMax) { + continue + } } } } @@ -774,6 +781,11 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) { ts.SupplierCosts = append(ts.SupplierCosts, &LCRSupplierCost{ Supplier: supplier, Cost: cc.Cost, + QOS: map[string]float64{ + "ASR": asr, + "ACD": acd, + }, + qosSortParams: qosSortParams, }) } } diff --git a/engine/lcr.go b/engine/lcr.go index 0ea217b29..e8bc0ba03 100644 --- a/engine/lcr.go +++ b/engine/lcr.go @@ -57,9 +57,7 @@ type LCREntry struct { precision int } -type LCRCost struct { - TimeSpans []*LCRTimeSpan -} +type LCRCost []*LCRTimeSpan type LCRTimeSpan struct { StartTime time.Time @@ -68,9 +66,11 @@ type LCRTimeSpan struct { } type LCRSupplierCost struct { - Supplier string - Cost float64 - Error error + Supplier string + Cost float64 + Error error + QOS map[string]float64 + qosSortParams []string } func (lcr *LCR) GetId() string { @@ -94,7 +94,7 @@ func (lcr *LCR) Sort() { } func (le *LCREntry) GetSuppliers() []string { - suppliers := strings.Split(le.StrategyParams, ";") + suppliers := strings.Split(le.StrategyParams, utils.INFIELD_SEP) for i := 0; i < len(suppliers); i++ { suppliers[i] = strings.TrimSpace(suppliers[i]) } @@ -103,7 +103,7 @@ func (le *LCREntry) GetSuppliers() []string { func (le *LCREntry) GetQOSLimits() (minASR, maxASR float64, minACD, maxACD time.Duration) { // MIN_ASR;MAX_ASR;MIN_ACD;MAX_ACD - params := strings.Split(le.StrategyParams, ";") + params := strings.Split(le.StrategyParams, utils.INFIELD_SEP) if len(params) == 4 { var err error if minASR, err = strconv.ParseFloat(params[0], 64); err != nil { @@ -122,6 +122,14 @@ func (le *LCREntry) GetQOSLimits() (minASR, maxASR float64, minACD, maxACD time. return } +func (le *LCREntry) GetQOSSortParams() []string { + // ASR;ACD + if params := strings.Split(le.StrategyParams, utils.INFIELD_SEP); len(params) > 0 { + return params + } + return []string{ASR, ACD} +} + type LCREntriesSorter []*LCREntry func (es LCREntriesSorter) Len() int { @@ -177,6 +185,8 @@ func (lts *LCRTimeSpan) Sort() { sort.Sort(LowestSupplierCostSorter(lts.SupplierCosts)) case LCR_STRATEGY_HIGHEST: sort.Sort(HighestSupplierCostSorter(lts.SupplierCosts)) + case LCR_STRATEGY_QOS: + sort.Sort(QOSSorter(lts.SupplierCosts)) } } @@ -207,3 +217,19 @@ func (hscs HighestSupplierCostSorter) Swap(i, j int) { func (hscs HighestSupplierCostSorter) Less(i, j int) bool { return hscs[i].Cost > hscs[j].Cost } + +type QOSSorter []*LCRSupplierCost + +func (qoss QOSSorter) Len() int { + return len(qoss) +} + +func (qoss QOSSorter) Swap(i, j int) { + qoss[i], qoss[j] = qoss[j], qoss[i] +} + +func (qoss QOSSorter) Less(i, j int) bool { + //for _, param := range qoss[i].qosSortParams + //qoss[i].Cost > qoss[j].Cost + return false +} diff --git a/engine/responder.go b/engine/responder.go index 6d349a313..d322da1eb 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -55,9 +55,9 @@ func (rs *Responder) GetCost(arg CallDescriptor, reply *CallCost) (err error) { r, e := rs.getCallCost(&arg, "Responder.GetCost") *reply, err = *r, e } else { - r, e := AccLock.GuardGetCost(arg.GetAccountKey(), func() (*CallCost, error) { + r, e := AccLock.GuardCallCost(func() (*CallCost, error) { return arg.GetCost() - }) + }, arg.GetAccountKey()) if e != nil { return e } else if r != nil { @@ -72,9 +72,9 @@ func (rs *Responder) Debit(arg CallDescriptor, reply *CallCost) (err error) { r, e := rs.getCallCost(&arg, "Responder.Debit") *reply, err = *r, e } else { - r, e := AccLock.GuardGetCost(arg.GetAccountKey(), func() (*CallCost, error) { + r, e := AccLock.GuardCallCost(func() (*CallCost, error) { return arg.Debit() - }) + }, arg.GetAccountKey()) if e != nil { return e } else if r != nil { @@ -89,9 +89,9 @@ func (rs *Responder) MaxDebit(arg CallDescriptor, reply *CallCost) (err error) { r, e := rs.getCallCost(&arg, "Responder.MaxDebit") *reply, err = *r, e } else { - r, e := AccLock.GuardGetCost(arg.GetAccountKey(), func() (*CallCost, error) { + r, e := AccLock.GuardCallCost(func() (*CallCost, error) { return arg.MaxDebit() - }) + }, arg.GetAccountKey()) if e != nil { return e } else if r != nil { @@ -105,9 +105,9 @@ func (rs *Responder) RefundIncrements(arg CallDescriptor, reply *float64) (err e if rs.Bal != nil { *reply, err = rs.callMethod(&arg, "Responder.RefundIncrements") } else { - r, e := AccLock.Guard(arg.GetAccountKey(), func() (float64, error) { + r, e := AccLock.Guard(func() (float64, error) { return arg.RefundIncrements() - }) + }, arg.GetAccountKey()) *reply, err = r, e } return @@ -117,10 +117,10 @@ func (rs *Responder) GetMaxSessionTime(arg CallDescriptor, reply *float64) (err if rs.Bal != nil { *reply, err = rs.callMethod(&arg, "Responder.GetMaxSessionTime") } else { - r, e := AccLock.Guard(arg.GetAccountKey(), func() (float64, error) { + r, e := AccLock.Guard(func() (float64, error) { d, err := arg.GetMaxSessionDuration() return float64(d), err - }) + }, arg.GetAccountKey()) *reply, err = r, e } return @@ -244,7 +244,7 @@ func (rs *Responder) ProcessCdr(cdr *StoredCdr, reply *string) error { func (rs *Responder) GetLCR(cd *CallDescriptor, reply *LCRCost) error { lcrCost, err := cd.GetLCR(rs.Stats) - *reply = *lcrCost + *reply = lcrCost return err } @@ -252,9 +252,9 @@ func (rs *Responder) FlushCache(arg CallDescriptor, reply *float64) (err error) if rs.Bal != nil { *reply, err = rs.callMethod(&arg, "Responder.FlushCache") } else { - r, e := AccLock.Guard(arg.GetAccountKey(), func() (float64, error) { + r, e := AccLock.Guard(func() (float64, error) { return 0, arg.FlushCache() - }) + }, arg.GetAccountKey()) *reply, err = r, e } return @@ -295,10 +295,10 @@ func (rs *Responder) getCallCost(key *CallDescriptor, method string) (reply *Cal time.Sleep(1 * time.Second) // wait one second and retry } else { reply = &CallCost{} - reply, err = AccLock.GuardGetCost(key.GetAccountKey(), func() (*CallCost, error) { + reply, err = AccLock.GuardCallCost(func() (*CallCost, error) { err = client.Call(method, *key, reply) return reply, err - }) + }, key.GetAccountKey()) if err != nil { Logger.Err(fmt.Sprintf(" Got en error from rater: %v", err)) } @@ -318,10 +318,10 @@ func (rs *Responder) callMethod(key *CallDescriptor, method string) (reply float Logger.Info("Waiting for raters to register...") time.Sleep(1 * time.Second) // wait one second and retry } else { - reply, err = AccLock.Guard(key.GetAccountKey(), func() (float64, error) { + reply, err = AccLock.Guard(func() (float64, error) { err = client.Call(method, *key, &reply) return reply, err - }) + }, key.GetAccountKey()) if err != nil { Logger.Info(fmt.Sprintf("Got en error from rater: %v", err)) } diff --git a/utils/slice.go b/utils/slice.go index dc5432331..a496d66dc 100644 --- a/utils/slice.go +++ b/utils/slice.go @@ -41,3 +41,14 @@ func SliceMemberHasPrefix(ss []string, prfx string) bool { } return false } + +func Avg(values []float64) float64 { + if len(values) == 0 { + return 0.0 + } + var sum float64 + for _, val := range values { + sum += val + } + return sum / float64(len(values)) +} diff --git a/utils/utils_test.go b/utils/utils_test.go index 077d96875..8fdabdb36 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -436,3 +436,21 @@ func TestConcatenatedKey(t *testing.T) { t.Error("Unexpected key value received: ", key) } } + +func TestAvg(t *testing.T) { + values := []float64{1, 2, 3} + result := Avg(values) + expected := 2.0 + if expected != result { + t.Errorf("Wrong Avg: expected %v got %v", expected, result) + } +} + +func TestAvgEmpty(t *testing.T) { + values := []float64{} + result := Avg(values) + expected := 0.0 + if expected != result { + t.Errorf("Wrong Avg: expected %v got %v", expected, result) + } +}