diff --git a/engine/calldesc.go b/engine/calldesc.go index 958177991..5580c0687 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -795,6 +795,7 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) { if lcrCost.Entry == nil { return lcrCost, nil } + //log.Printf("Entry: %+v", lcrCost.Entry) if lcrCost.Entry.Strategy == LCR_STRATEGY_STATIC { for _, supplier := range lcrCost.Entry.GetParams() { @@ -865,7 +866,7 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) { accNeverConsidered := true tccNeverConsidered := true ddcNeverConsidered := true - if utils.IsSliceMember([]string{LCR_STRATEGY_QOS, LCR_STRATEGY_QOS_THRESHOLD}, lcrCost.Entry.Strategy) { + if utils.IsSliceMember([]string{LCR_STRATEGY_QOS, LCR_STRATEGY_QOS_THRESHOLD, LCR_STRATEGY_LOAD}, lcrCost.Entry.Strategy) { if stats == nil { lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{ Supplier: supplier, @@ -891,60 +892,84 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) { } } } + statsErr := false + var supplierQueues []*StatsQueue for _, qId := range cdrStatsQueueIds { - statValues := make(map[string]float64) - if err := stats.GetValues(qId, &statValues); err != nil { - lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{ - Supplier: supplier, - Error: fmt.Sprintf("Get stats values for queue id %s, error %s", qId, err.Error()), - }) - statsErr = true - break - } - if asr, exists := statValues[ASR]; exists { - if asr > STATS_NA { - asrValues = append(asrValues, asr) + if lcrCost.Entry.Strategy == LCR_STRATEGY_LOAD { + for _, qId := range cdrStatsQueueIds { + sq := &StatsQueue{} + if err := stats.GetQueue(qId, sq); err == nil { + if sq.conf.QueueLength == 0 { //only add qeues that don't have fixed length + supplierQueues = append(supplierQueues, sq) + } + } } - asrNeverConsidered = false - } - if pdd, exists := statValues[PDD]; exists { - if pdd > STATS_NA { - pddValues = append(pddValues, pdd) + } else { + statValues := make(map[string]float64) + if err := stats.GetValues(qId, &statValues); err != nil { + lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{ + Supplier: supplier, + Error: fmt.Sprintf("Get stats values for queue id %s, error %s", qId, err.Error()), + }) + statsErr = true + break } - pddNeverConsidered = false - } - if acd, exists := statValues[ACD]; exists { - if acd > STATS_NA { - acdValues = append(acdValues, acd) + if asr, exists := statValues[ASR]; exists { + if asr > STATS_NA { + asrValues = append(asrValues, asr) + } + asrNeverConsidered = false } - acdNeverConsidered = false - } - if tcd, exists := statValues[TCD]; exists { - if tcd > STATS_NA { - tcdValues = append(tcdValues, tcd) + if pdd, exists := statValues[PDD]; exists { + if pdd > STATS_NA { + pddValues = append(pddValues, pdd) + } + pddNeverConsidered = false } - tcdNeverConsidered = false - } - if acc, exists := statValues[ACC]; exists { - if acc > STATS_NA { - accValues = append(accValues, acc) + if acd, exists := statValues[ACD]; exists { + if acd > STATS_NA { + acdValues = append(acdValues, acd) + } + acdNeverConsidered = false } - accNeverConsidered = false - } - if tcc, exists := statValues[TCC]; exists { - if tcc > STATS_NA { - tccValues = append(tccValues, tcc) + if tcd, exists := statValues[TCD]; exists { + if tcd > STATS_NA { + tcdValues = append(tcdValues, tcd) + } + tcdNeverConsidered = false } - tccNeverConsidered = false - } - if ddc, exists := statValues[TCC]; exists { - if ddc > STATS_NA { - ddcValues = append(ddcValues, ddc) + if acc, exists := statValues[ACC]; exists { + if acc > STATS_NA { + accValues = append(accValues, acc) + } + accNeverConsidered = false } - ddcNeverConsidered = false + if tcc, exists := statValues[TCC]; exists { + if tcc > STATS_NA { + tccValues = append(tccValues, tcc) + } + tccNeverConsidered = false + } + if ddc, exists := statValues[TCC]; exists { + if ddc > STATS_NA { + ddcValues = append(ddcValues, ddc) + } + ddcNeverConsidered = false + } + } } + if lcrCost.Entry.Strategy == LCR_STRATEGY_LOAD { + if len(supplierQueues) > 0 { + lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{ + Supplier: supplier, + supplierQueues: supplierQueues, + }) + } + continue // next supplier + } + if statsErr { // Stats error in loop, to go next supplier continue } diff --git a/engine/lcr.go b/engine/lcr.go index ba78df927..cdab45627 100644 --- a/engine/lcr.go +++ b/engine/lcr.go @@ -36,6 +36,7 @@ const ( LCR_STRATEGY_HIGHEST = "*highest_cost" LCR_STRATEGY_QOS_THRESHOLD = "*qos_threshold" LCR_STRATEGY_QOS = "*qos" + LCR_STRATEGY_LOAD = "*load_distribution" ) // A request for LCR, used in APIer and SM where we need to expose it @@ -134,12 +135,13 @@ type LCRCost struct { } type LCRSupplierCost struct { - Supplier string - Cost float64 - Duration time.Duration - Error string // Not error due to JSON automatic serialization into struct - QOS map[string]float64 - qosSortParams []string + Supplier string + Cost float64 + Duration time.Duration + Error string // Not error due to JSON automatic serialization into struct + QOS map[string]float64 + qosSortParams []string + supplierQueues []*StatsQueue // used for load distribution } func (lcr *LCR) GetId() string { @@ -291,11 +293,84 @@ func (lc *LCRCost) Sort() { sort.Sort(HighestSupplierCostSorter(lc.SupplierCosts)) case LCR_STRATEGY_QOS: sort.Sort(QOSSorter(lc.SupplierCosts)) + case LCR_STRATEGY_LOAD: + lc.SortLoadDistribution() } } +func (lc *LCRCost) SortLoadDistribution() { + // find the time window that is common to all qeues + scoreBoard := make(map[time.Duration]int) // register TimeWindow across suppliers + + var winnerTimeWindow time.Duration + maxScore := 0 + for _, supCost := range lc.SupplierCosts { + timeWindowFlag := make(map[time.Duration]bool) // flags appearance in same supplier + for _, sq := range supCost.supplierQueues { + if !timeWindowFlag[sq.conf.TimeWindow] { + timeWindowFlag[sq.conf.TimeWindow] = true + scoreBoard[sq.conf.TimeWindow]++ + } + if scoreBoard[sq.conf.TimeWindow] > maxScore { + maxScore = scoreBoard[sq.conf.TimeWindow] + winnerTimeWindow = sq.conf.TimeWindow + } + } + } + supplierQueues := make(map[string]*StatsQueue) + for _, supCost := range lc.SupplierCosts { + for _, sq := range supCost.supplierQueues { + if sq.conf.TimeWindow == winnerTimeWindow { + supplierQueues[supCost.Supplier] = sq + break + } + } + } + /*for supplier, sq := range supplierQueues { + log.Printf("Useful supplier qeues: %s %v", supplier, sq.conf.TimeWindow) + }*/ + // if all have less than ponder return random order + // if some have a cdr count not divisible by ponder return them first and all ordered by cdr times, oldest first + // if all have a multiple of ponder return in the order of cdr times, oldest first + +} + +// used in load distribution strategy only +// receives a long supplier id and will return the ponder found in strategy params +func (lc *LCRCost) GetSupplierPonder(supplier string) float64 { + // parse strategy params + ponders := make(map[string]float64) + params := strings.Split(lc.Entry.StrategyParams, utils.INFIELD_SEP) + for _, param := range params { + ponderSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP) + if len(ponderSlice) != 2 { + Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) + continue + } + p, err := strconv.ParseFloat(ponderSlice[1], 64) + if err != nil { + Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) + continue + } + ponders[ponderSlice[0]] = p + } + parts := strings.Split(supplier, utils.CONCATENATED_KEY_SEP) + if len(parts) > 0 { + supplierSubject := parts[len(parts)-1] + if ponder, found := ponders[supplierSubject]; found { + return ponder + } + if ponder, found := ponders[utils.META_DEFAULT]; found { + return ponder + } + } + + return 1 +} + func (lc *LCRCost) HasErrors() bool { for _, supplCost := range lc.SupplierCosts { + if len(supplCost.Error) != 0 { return true } diff --git a/engine/lcr_test.go b/engine/lcr_test.go index 0364c4652..847eefbda 100644 --- a/engine/lcr_test.go +++ b/engine/lcr_test.go @@ -278,3 +278,102 @@ func TestLCRCostSuppliersString(t *testing.T) { t.Errorf("Expecting: %s, received: %s", eSupplStr, supplStr) } } + +func TestLCRCostSuppliersLoad(t *testing.T) { + lcrCost := &LCRCost{ + Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:10;dan12:3;*default:7", Weight: 10.0}, + SupplierCosts: []*LCRSupplierCost{ + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:ivo12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 3 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:dan12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:rif12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + }, + }, + }, + } + lcrCost.Sort() + if lcrCost.SupplierCosts[0].Supplier != "" || + lcrCost.SupplierCosts[1].Supplier != "" || + lcrCost.SupplierCosts[2].Supplier != "" { + //t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + } +} diff --git a/engine/responder_test.go b/engine/responder_test.go index c0ad5ecd4..68d5054a7 100644 --- a/engine/responder_test.go +++ b/engine/responder_test.go @@ -354,7 +354,16 @@ func TestGetLCR(t *testing.T) { }, }, } - for _, lcr := range []*LCR{lcrStatic, lcrLowestCost, lcrQosThreshold, lcrQos} { + lcrLoad := &LCR{Direction: utils.OUT, Tenant: "tenant12", Category: "call_load", Account: utils.ANY, Subject: utils.ANY, + Activations: []*LCRActivation{ + &LCRActivation{ + ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC), + Entries: []*LCREntry{ + &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:10;dan12:3", Weight: 10.0}}, + }, + }, + } + for _, lcr := range []*LCR{lcrStatic, lcrLowestCost, lcrQosThreshold, lcrQos, lcrLoad} { if err := ratingStorage.SetLCR(lcr); err != nil { t.Error(err) } diff --git a/engine/users.go b/engine/users.go index 84018ac9c..0cf821c74 100644 --- a/engine/users.go +++ b/engine/users.go @@ -3,6 +3,7 @@ package engine import ( "sort" "strings" + "sync" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" @@ -61,6 +62,7 @@ type UserMap struct { index map[string]map[string]bool indexKeys []string ratingDb RatingStorage + mu sync.RWMutex } func NewUserMap(ratingDb RatingStorage) (*UserMap, error) { @@ -85,6 +87,8 @@ func newUserMap(ratingDb RatingStorage) *UserMap { } func (um *UserMap) SetUser(up UserProfile, reply *string) error { + um.mu.Lock() + defer um.mu.Unlock() if err := um.ratingDb.SetUser(&up); err != nil { *reply = err.Error() return err @@ -96,6 +100,8 @@ func (um *UserMap) SetUser(up UserProfile, reply *string) error { } func (um *UserMap) RemoveUser(up UserProfile, reply *string) error { + um.mu.Lock() + defer um.mu.Unlock() if err := um.ratingDb.RemoveUser(up.GetId()); err != nil { *reply = err.Error() return err @@ -107,6 +113,8 @@ func (um *UserMap) RemoveUser(up UserProfile, reply *string) error { } func (um *UserMap) UpdateUser(up UserProfile, reply *string) error { + um.mu.Lock() + defer um.mu.Unlock() m, found := um.table[up.GetId()] if !found { *reply = utils.ErrNotFound.Error() @@ -144,6 +152,8 @@ func (um *UserMap) UpdateUser(up UserProfile, reply *string) error { } func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error { + um.mu.RLock() + defer um.mu.RUnlock() table := um.table // no index indexUnionKeys := make(map[string]bool) @@ -223,6 +233,8 @@ func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error { } func (um *UserMap) AddIndex(indexes []string, reply *string) error { + um.mu.Lock() + defer um.mu.Unlock() um.indexKeys = indexes for key, values := range um.table { up := &UserProfile{Profile: values} @@ -305,6 +317,8 @@ func (um *UserMap) deleteIndex(up *UserProfile) { } func (um *UserMap) GetIndexes(in string, reply *map[string][]string) error { + um.mu.RLock() + defer um.mu.RUnlock() indexes := make(map[string][]string) for key, values := range um.index { var vs []string