This commit is contained in:
DanB
2015-07-31 09:16:08 +02:00
5 changed files with 273 additions and 51 deletions

View File

@@ -795,6 +795,7 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
if lcrCost.Entry == nil {
return lcrCost, nil
}
//log.Printf("Entry: %+v", lcrCost.Entry)
if lcrCost.Entry.Strategy == LCR_STRATEGY_STATIC {
for _, supplier := range lcrCost.Entry.GetParams() {
@@ -865,7 +866,7 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
accNeverConsidered := true
tccNeverConsidered := true
ddcNeverConsidered := true
if utils.IsSliceMember([]string{LCR_STRATEGY_QOS, LCR_STRATEGY_QOS_THRESHOLD}, lcrCost.Entry.Strategy) {
if utils.IsSliceMember([]string{LCR_STRATEGY_QOS, LCR_STRATEGY_QOS_THRESHOLD, LCR_STRATEGY_LOAD}, lcrCost.Entry.Strategy) {
if stats == nil {
lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{
Supplier: supplier,
@@ -891,60 +892,84 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
}
}
}
statsErr := false
var supplierQueues []*StatsQueue
for _, qId := range cdrStatsQueueIds {
statValues := make(map[string]float64)
if err := stats.GetValues(qId, &statValues); err != nil {
lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{
Supplier: supplier,
Error: fmt.Sprintf("Get stats values for queue id %s, error %s", qId, err.Error()),
})
statsErr = true
break
}
if asr, exists := statValues[ASR]; exists {
if asr > STATS_NA {
asrValues = append(asrValues, asr)
if lcrCost.Entry.Strategy == LCR_STRATEGY_LOAD {
for _, qId := range cdrStatsQueueIds {
sq := &StatsQueue{}
if err := stats.GetQueue(qId, sq); err == nil {
if sq.conf.QueueLength == 0 { //only add qeues that don't have fixed length
supplierQueues = append(supplierQueues, sq)
}
}
}
asrNeverConsidered = false
}
if pdd, exists := statValues[PDD]; exists {
if pdd > STATS_NA {
pddValues = append(pddValues, pdd)
} else {
statValues := make(map[string]float64)
if err := stats.GetValues(qId, &statValues); err != nil {
lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{
Supplier: supplier,
Error: fmt.Sprintf("Get stats values for queue id %s, error %s", qId, err.Error()),
})
statsErr = true
break
}
pddNeverConsidered = false
}
if acd, exists := statValues[ACD]; exists {
if acd > STATS_NA {
acdValues = append(acdValues, acd)
if asr, exists := statValues[ASR]; exists {
if asr > STATS_NA {
asrValues = append(asrValues, asr)
}
asrNeverConsidered = false
}
acdNeverConsidered = false
}
if tcd, exists := statValues[TCD]; exists {
if tcd > STATS_NA {
tcdValues = append(tcdValues, tcd)
if pdd, exists := statValues[PDD]; exists {
if pdd > STATS_NA {
pddValues = append(pddValues, pdd)
}
pddNeverConsidered = false
}
tcdNeverConsidered = false
}
if acc, exists := statValues[ACC]; exists {
if acc > STATS_NA {
accValues = append(accValues, acc)
if acd, exists := statValues[ACD]; exists {
if acd > STATS_NA {
acdValues = append(acdValues, acd)
}
acdNeverConsidered = false
}
accNeverConsidered = false
}
if tcc, exists := statValues[TCC]; exists {
if tcc > STATS_NA {
tccValues = append(tccValues, tcc)
if tcd, exists := statValues[TCD]; exists {
if tcd > STATS_NA {
tcdValues = append(tcdValues, tcd)
}
tcdNeverConsidered = false
}
tccNeverConsidered = false
}
if ddc, exists := statValues[TCC]; exists {
if ddc > STATS_NA {
ddcValues = append(ddcValues, ddc)
if acc, exists := statValues[ACC]; exists {
if acc > STATS_NA {
accValues = append(accValues, acc)
}
accNeverConsidered = false
}
ddcNeverConsidered = false
if tcc, exists := statValues[TCC]; exists {
if tcc > STATS_NA {
tccValues = append(tccValues, tcc)
}
tccNeverConsidered = false
}
if ddc, exists := statValues[TCC]; exists {
if ddc > STATS_NA {
ddcValues = append(ddcValues, ddc)
}
ddcNeverConsidered = false
}
}
}
if lcrCost.Entry.Strategy == LCR_STRATEGY_LOAD {
if len(supplierQueues) > 0 {
lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{
Supplier: supplier,
supplierQueues: supplierQueues,
})
}
continue // next supplier
}
if statsErr { // Stats error in loop, to go next supplier
continue
}

View File

@@ -36,6 +36,7 @@ const (
LCR_STRATEGY_HIGHEST = "*highest_cost"
LCR_STRATEGY_QOS_THRESHOLD = "*qos_threshold"
LCR_STRATEGY_QOS = "*qos"
LCR_STRATEGY_LOAD = "*load_distribution"
)
// A request for LCR, used in APIer and SM where we need to expose it
@@ -134,12 +135,13 @@ type LCRCost struct {
}
type LCRSupplierCost struct {
Supplier string
Cost float64
Duration time.Duration
Error string // Not error due to JSON automatic serialization into struct
QOS map[string]float64
qosSortParams []string
Supplier string
Cost float64
Duration time.Duration
Error string // Not error due to JSON automatic serialization into struct
QOS map[string]float64
qosSortParams []string
supplierQueues []*StatsQueue // used for load distribution
}
func (lcr *LCR) GetId() string {
@@ -291,11 +293,84 @@ func (lc *LCRCost) Sort() {
sort.Sort(HighestSupplierCostSorter(lc.SupplierCosts))
case LCR_STRATEGY_QOS:
sort.Sort(QOSSorter(lc.SupplierCosts))
case LCR_STRATEGY_LOAD:
lc.SortLoadDistribution()
}
}
func (lc *LCRCost) SortLoadDistribution() {
// find the time window that is common to all qeues
scoreBoard := make(map[time.Duration]int) // register TimeWindow across suppliers
var winnerTimeWindow time.Duration
maxScore := 0
for _, supCost := range lc.SupplierCosts {
timeWindowFlag := make(map[time.Duration]bool) // flags appearance in same supplier
for _, sq := range supCost.supplierQueues {
if !timeWindowFlag[sq.conf.TimeWindow] {
timeWindowFlag[sq.conf.TimeWindow] = true
scoreBoard[sq.conf.TimeWindow]++
}
if scoreBoard[sq.conf.TimeWindow] > maxScore {
maxScore = scoreBoard[sq.conf.TimeWindow]
winnerTimeWindow = sq.conf.TimeWindow
}
}
}
supplierQueues := make(map[string]*StatsQueue)
for _, supCost := range lc.SupplierCosts {
for _, sq := range supCost.supplierQueues {
if sq.conf.TimeWindow == winnerTimeWindow {
supplierQueues[supCost.Supplier] = sq
break
}
}
}
/*for supplier, sq := range supplierQueues {
log.Printf("Useful supplier qeues: %s %v", supplier, sq.conf.TimeWindow)
}*/
// if all have less than ponder return random order
// if some have a cdr count not divisible by ponder return them first and all ordered by cdr times, oldest first
// if all have a multiple of ponder return in the order of cdr times, oldest first
}
// used in load distribution strategy only
// receives a long supplier id and will return the ponder found in strategy params
func (lc *LCRCost) GetSupplierPonder(supplier string) float64 {
// parse strategy params
ponders := make(map[string]float64)
params := strings.Split(lc.Entry.StrategyParams, utils.INFIELD_SEP)
for _, param := range params {
ponderSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP)
if len(ponderSlice) != 2 {
Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams))
continue
}
p, err := strconv.ParseFloat(ponderSlice[1], 64)
if err != nil {
Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams))
continue
}
ponders[ponderSlice[0]] = p
}
parts := strings.Split(supplier, utils.CONCATENATED_KEY_SEP)
if len(parts) > 0 {
supplierSubject := parts[len(parts)-1]
if ponder, found := ponders[supplierSubject]; found {
return ponder
}
if ponder, found := ponders[utils.META_DEFAULT]; found {
return ponder
}
}
return 1
}
func (lc *LCRCost) HasErrors() bool {
for _, supplCost := range lc.SupplierCosts {
if len(supplCost.Error) != 0 {
return true
}

View File

@@ -278,3 +278,102 @@ func TestLCRCostSuppliersString(t *testing.T) {
t.Errorf("Expecting: %s, received: %s", eSupplStr, supplStr)
}
}
func TestLCRCostSuppliersLoad(t *testing.T) {
lcrCost := &LCRCost{
Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:10;dan12:3;*default:7", Weight: 10.0},
SupplierCosts: []*LCRSupplierCost{
&LCRSupplierCost{
Supplier: "*out:tenant12:call:ivo12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 3 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
},
},
&LCRSupplierCost{
Supplier: "*out:tenant12:call:dan12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
},
},
&LCRSupplierCost{
Supplier: "*out:tenant12:call:rif12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
},
},
},
}
lcrCost.Sort()
if lcrCost.SupplierCosts[0].Supplier != "" ||
lcrCost.SupplierCosts[1].Supplier != "" ||
lcrCost.SupplierCosts[2].Supplier != "" {
//t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost))
}
}

View File

@@ -354,7 +354,16 @@ func TestGetLCR(t *testing.T) {
},
},
}
for _, lcr := range []*LCR{lcrStatic, lcrLowestCost, lcrQosThreshold, lcrQos} {
lcrLoad := &LCR{Direction: utils.OUT, Tenant: "tenant12", Category: "call_load", Account: utils.ANY, Subject: utils.ANY,
Activations: []*LCRActivation{
&LCRActivation{
ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC),
Entries: []*LCREntry{
&LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:10;dan12:3", Weight: 10.0}},
},
},
}
for _, lcr := range []*LCR{lcrStatic, lcrLowestCost, lcrQosThreshold, lcrQos, lcrLoad} {
if err := ratingStorage.SetLCR(lcr); err != nil {
t.Error(err)
}

View File

@@ -3,6 +3,7 @@ package engine
import (
"sort"
"strings"
"sync"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
@@ -61,6 +62,7 @@ type UserMap struct {
index map[string]map[string]bool
indexKeys []string
ratingDb RatingStorage
mu sync.RWMutex
}
func NewUserMap(ratingDb RatingStorage) (*UserMap, error) {
@@ -85,6 +87,8 @@ func newUserMap(ratingDb RatingStorage) *UserMap {
}
func (um *UserMap) SetUser(up UserProfile, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
if err := um.ratingDb.SetUser(&up); err != nil {
*reply = err.Error()
return err
@@ -96,6 +100,8 @@ func (um *UserMap) SetUser(up UserProfile, reply *string) error {
}
func (um *UserMap) RemoveUser(up UserProfile, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
if err := um.ratingDb.RemoveUser(up.GetId()); err != nil {
*reply = err.Error()
return err
@@ -107,6 +113,8 @@ func (um *UserMap) RemoveUser(up UserProfile, reply *string) error {
}
func (um *UserMap) UpdateUser(up UserProfile, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
m, found := um.table[up.GetId()]
if !found {
*reply = utils.ErrNotFound.Error()
@@ -144,6 +152,8 @@ func (um *UserMap) UpdateUser(up UserProfile, reply *string) error {
}
func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error {
um.mu.RLock()
defer um.mu.RUnlock()
table := um.table // no index
indexUnionKeys := make(map[string]bool)
@@ -223,6 +233,8 @@ func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error {
}
func (um *UserMap) AddIndex(indexes []string, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
um.indexKeys = indexes
for key, values := range um.table {
up := &UserProfile{Profile: values}
@@ -305,6 +317,8 @@ func (um *UserMap) deleteIndex(up *UserProfile) {
}
func (um *UserMap) GetIndexes(in string, reply *map[string][]string) error {
um.mu.RLock()
defer um.mu.RUnlock()
indexes := make(map[string][]string)
for key, values := range um.index {
var vs []string