From d4a440d0ec40ce7d33ee43a8b8188bd2322b2ad3 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 31 Jul 2015 11:27:38 +0300 Subject: [PATCH 01/10] first working load distribution lcr --- engine/lcr.go | 34 +++++++++++++++++++++++++++++----- engine/lcr_test.go | 23 ++++++++++++----------- 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/engine/lcr.go b/engine/lcr.go index cdab45627..653bcbf1e 100644 --- a/engine/lcr.go +++ b/engine/lcr.go @@ -20,6 +20,7 @@ package engine import ( "fmt" + "math/rand" "sort" "strconv" "strings" @@ -37,6 +38,12 @@ const ( LCR_STRATEGY_QOS_THRESHOLD = "*qos_threshold" LCR_STRATEGY_QOS = "*qos" LCR_STRATEGY_LOAD = "*load_distribution" + + // used for load distribution sorting + RAND_LIMIT = 99 + LOW_PRIORITY_LIMIT = 100 + MED_PRIORITY_LIMIT = 200 + HIGH_PRIORITY_LIMIT = 300 ) // A request for LCR, used in APIer and SM where we need to expose it @@ -295,6 +302,7 @@ func (lc *LCRCost) Sort() { sort.Sort(QOSSorter(lc.SupplierCosts)) case LCR_STRATEGY_LOAD: lc.SortLoadDistribution() + sort.Sort(HighestSupplierCostSorter(lc.SupplierCosts)) } } @@ -317,11 +325,11 @@ func (lc *LCRCost) SortLoadDistribution() { } } } - supplierQueues := make(map[string]*StatsQueue) + supplierQueues := make(map[*LCRSupplierCost]*StatsQueue) for _, supCost := range lc.SupplierCosts { for _, sq := range supCost.supplierQueues { if sq.conf.TimeWindow == winnerTimeWindow { - supplierQueues[supCost.Supplier] = sq + supplierQueues[supCost] = sq break } } @@ -333,13 +341,29 @@ func (lc *LCRCost) SortLoadDistribution() { // if some have a cdr count not divisible by ponder return them first and all ordered by cdr times, oldest first // if all have a multiple of ponder return in the order of cdr times, oldest first + // first put them in one of the above categories + for supCost, sq := range supplierQueues { + ponder := lc.GetSupplierPonder(supCost.Supplier) + cdrCount := len(sq.Cdrs) + if cdrCount < ponder { + supCost.Cost = float64(LOW_PRIORITY_LIMIT + rand.Intn(RAND_LIMIT)) + continue + } + if cdrCount%ponder == 0 { + supCost.Cost = float64(MED_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() + continue + } else { + supCost.Cost = float64(HIGH_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() + continue + } + } } // used in load distribution strategy only // receives a long supplier id and will return the ponder found in strategy params -func (lc *LCRCost) GetSupplierPonder(supplier string) float64 { +func (lc *LCRCost) GetSupplierPonder(supplier string) int { // parse strategy params - ponders := make(map[string]float64) + ponders := make(map[string]int) params := strings.Split(lc.Entry.StrategyParams, utils.INFIELD_SEP) for _, param := range params { ponderSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP) @@ -347,7 +371,7 @@ func (lc *LCRCost) GetSupplierPonder(supplier string) float64 { Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) continue } - p, err := strconv.ParseFloat(ponderSlice[1], 64) + p, err := strconv.Atoi(ponderSlice[1]) if err != nil { Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) continue diff --git a/engine/lcr_test.go b/engine/lcr_test.go index 847eefbda..4ecb1d8bc 100644 --- a/engine/lcr_test.go +++ b/engine/lcr_test.go @@ -280,6 +280,7 @@ func TestLCRCostSuppliersString(t *testing.T) { } func TestLCRCostSuppliersLoad(t *testing.T) { + setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) lcrCost := &LCRCost{ Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:10;dan12:3;*default:7", Weight: 10.0}, SupplierCosts: []*LCRSupplierCost{ @@ -287,21 +288,21 @@ func TestLCRCostSuppliersLoad(t *testing.T) { Supplier: "*out:tenant12:call:ivo12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 3 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 1 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, @@ -313,21 +314,21 @@ func TestLCRCostSuppliersLoad(t *testing.T) { Supplier: "*out:tenant12:call:dan12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, @@ -339,28 +340,28 @@ func TestLCRCostSuppliersLoad(t *testing.T) { Supplier: "*out:tenant12:call:rif12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 1 * time.Minute, @@ -374,6 +375,6 @@ func TestLCRCostSuppliersLoad(t *testing.T) { if lcrCost.SupplierCosts[0].Supplier != "" || lcrCost.SupplierCosts[1].Supplier != "" || lcrCost.SupplierCosts[2].Supplier != "" { - //t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) } } From 6110c321b59e816cacc13ee63bf11b072a97d2d6 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 31 Jul 2015 11:56:26 +0300 Subject: [PATCH 02/10] fixes and more tests for lcr --- engine/lcr.go | 4 +- engine/lcr_test.go | 120 ++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 111 insertions(+), 13 deletions(-) diff --git a/engine/lcr.go b/engine/lcr.go index 653bcbf1e..543af6e9e 100644 --- a/engine/lcr.go +++ b/engine/lcr.go @@ -350,10 +350,10 @@ func (lc *LCRCost) SortLoadDistribution() { continue } if cdrCount%ponder == 0 { - supCost.Cost = float64(MED_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() + supCost.Cost = float64(MED_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + (time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() / RAND_LIMIT) continue } else { - supCost.Cost = float64(HIGH_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() + supCost.Cost = float64(HIGH_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + (time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() / RAND_LIMIT) continue } } diff --git a/engine/lcr_test.go b/engine/lcr_test.go index 4ecb1d8bc..f158ad5d5 100644 --- a/engine/lcr_test.go +++ b/engine/lcr_test.go @@ -288,7 +288,7 @@ func TestLCRCostSuppliersLoad(t *testing.T) { Supplier: "*out:tenant12:call:ivo12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 3 * time.Minute, @@ -314,21 +314,21 @@ func TestLCRCostSuppliersLoad(t *testing.T) { Supplier: "*out:tenant12:call:dan12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, @@ -340,28 +340,28 @@ func TestLCRCostSuppliersLoad(t *testing.T) { Supplier: "*out:tenant12:call:rif12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 1 * time.Minute, @@ -372,9 +372,107 @@ func TestLCRCostSuppliersLoad(t *testing.T) { }, } lcrCost.Sort() - if lcrCost.SupplierCosts[0].Supplier != "" || - lcrCost.SupplierCosts[1].Supplier != "" || - lcrCost.SupplierCosts[2].Supplier != "" { + if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:dan12" { + t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + } +} + +func TestLCRCostSuppliersLoadAllRounded(t *testing.T) { + setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) + lcrCost := &LCRCost{ + Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:3;dan12:5;*default:2", Weight: 10.0}, + SupplierCosts: []*LCRSupplierCost{ + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:ivo12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 3 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:dan12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(60 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:rif12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(100 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + }, + }, + }, + } + lcrCost.Sort() + if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" || + lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" || + lcrCost.SupplierCosts[2].Supplier != "*out:tenant12:call:rif12" { t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) } } From bc82a4165ab6c6f62d05bab17b839cc90396e92b Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 31 Jul 2015 12:02:12 +0300 Subject: [PATCH 03/10] one more test --- engine/lcr_test.go | 102 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 101 insertions(+), 1 deletion(-) diff --git a/engine/lcr_test.go b/engine/lcr_test.go index f158ad5d5..f8d12133d 100644 --- a/engine/lcr_test.go +++ b/engine/lcr_test.go @@ -452,7 +452,107 @@ func TestLCRCostSuppliersLoadAllRounded(t *testing.T) { }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(100 * time.Minute)}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + }, + }, + }, + } + lcrCost.Sort() + if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" || + lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" || + lcrCost.SupplierCosts[2].Supplier != "*out:tenant12:call:rif12" { + t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + } +} + +func TestLCRCostSuppliersLoadAllOver(t *testing.T) { + setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) + lcrCost := &LCRCost{ + Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:2;dan12:4;*default:2", Weight: 10.0}, + SupplierCosts: []*LCRSupplierCost{ + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:ivo12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 3 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:dan12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(60 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:rif12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, From bc4fd873c56e863f9f7af944b5f8a61ab5471db8 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 31 Jul 2015 13:59:45 +0300 Subject: [PATCH 04/10] load lcr missing ponders --- engine/lcr.go | 21 ++++- engine/lcr_test.go | 208 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 222 insertions(+), 7 deletions(-) diff --git a/engine/lcr.go b/engine/lcr.go index 543af6e9e..71e48361a 100644 --- a/engine/lcr.go +++ b/engine/lcr.go @@ -342,8 +342,14 @@ func (lc *LCRCost) SortLoadDistribution() { // if all have a multiple of ponder return in the order of cdr times, oldest first // first put them in one of the above categories + havePonderlessSuppliers := false for supCost, sq := range supplierQueues { ponder := lc.GetSupplierPonder(supCost.Supplier) + if ponder == -1 { + supCost.Cost = -1 + havePonderlessSuppliers = true + continue + } cdrCount := len(sq.Cdrs) if cdrCount < ponder { supCost.Cost = float64(LOW_PRIORITY_LIMIT + rand.Intn(RAND_LIMIT)) @@ -357,6 +363,15 @@ func (lc *LCRCost) SortLoadDistribution() { continue } } + if havePonderlessSuppliers { + var filteredSupplierCost []*LCRSupplierCost + for _, supCost := range lc.SupplierCosts { + if supCost.Cost != -1 { + filteredSupplierCost = append(filteredSupplierCost, supCost) + } + } + lc.SupplierCosts = filteredSupplierCost + } } // used in load distribution strategy only @@ -388,8 +403,10 @@ func (lc *LCRCost) GetSupplierPonder(supplier string) int { return ponder } } - - return 1 + if len(ponders) == 0 { + return 1 // use random/last cdr date sorting + } + return -1 // exclude missing suppliers } func (lc *LCRCost) HasErrors() bool { diff --git a/engine/lcr_test.go b/engine/lcr_test.go index f8d12133d..ad94fabe9 100644 --- a/engine/lcr_test.go +++ b/engine/lcr_test.go @@ -412,7 +412,7 @@ func TestLCRCostSuppliersLoadAllRounded(t *testing.T) { Supplier: "*out:tenant12:call:dan12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(60 * time.Minute)}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(100 * time.Minute)}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, @@ -452,7 +452,7 @@ func TestLCRCostSuppliersLoadAllRounded(t *testing.T) { }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(300 * time.Minute)}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, @@ -481,6 +481,206 @@ func TestLCRCostSuppliersLoadAllOver(t *testing.T) { setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) lcrCost := &LCRCost{ Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:2;dan12:4;*default:2", Weight: 10.0}, + SupplierCosts: []*LCRSupplierCost{ + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:ivo12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 3 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:dan12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(100 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:rif12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(300 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + }, + }, + }, + } + lcrCost.Sort() + if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" || + lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" || + lcrCost.SupplierCosts[2].Supplier != "*out:tenant12:call:rif12" { + t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + } +} + +func TestLCRCostSuppliersLoadAllOverMisingDefault(t *testing.T) { + setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) + lcrCost := &LCRCost{ + Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:2;dan12:4", Weight: 10.0}, + SupplierCosts: []*LCRSupplierCost{ + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:ivo12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 3 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:dan12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(100 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:rif12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(300 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + }, + }, + }, + } + lcrCost.Sort() + if len(lcrCost.SupplierCosts) != 2 || + lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" || + lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" { + t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + } +} + +func TestLCRCostSuppliersLoadAllOverMisingParams(t *testing.T) { + setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) + lcrCost := &LCRCost{ + Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "", Weight: 10.0}, SupplierCosts: []*LCRSupplierCost{ &LCRSupplierCost{ Supplier: "*out:tenant12:call:ivo12", @@ -570,9 +770,7 @@ func TestLCRCostSuppliersLoadAllOver(t *testing.T) { }, } lcrCost.Sort() - if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" || - lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" || - lcrCost.SupplierCosts[2].Supplier != "*out:tenant12:call:rif12" { + if len(lcrCost.SupplierCosts) != 3 { t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) } } From d4721b17534df277f08c3fa366eedcc05cb0b536 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 31 Jul 2015 14:16:32 +0300 Subject: [PATCH 05/10] added some load distribution lcr documentation --- docs/lcr.rst | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/docs/lcr.rst b/docs/lcr.rst index 75cab353c..a6cd02362 100644 --- a/docs/lcr.rst +++ b/docs/lcr.rst @@ -18,7 +18,7 @@ Strategy indicates supplier selection algorithm and StrategyParams will be speci \*static (filter) Will use the suppliers provided as params. StrategyParams: suppier1;supplier2;etc - + \*lowest_cost (sorting) Matching suppliers will be sorted by ascending cost. StrategyParams: None @@ -35,6 +35,13 @@ Strategy indicates supplier selection algorithm and StrategyParams will be speci The system will sort by metrics in the order of appearance. StrategyParams: metric1;metric2;etc +\*load_distribution (sorting/filter) + The system will sort the suppliers in order to achieve the specified load distribution. + - if all have less than ponder return random order + - if some have a cdr count not divisible by ponder return them first and all ordered by cdr times, oldest first + - if all have a multiple of ponder return in the order of cdr times, oldest first + StrategyParams: supplier1:ponder;supplier2:ponder;*default:ponder + ActivationTime is the date/time when the LCR entry starts to be active. Weight is used to sort the rules with the same activation time. @@ -43,7 +50,7 @@ Example +++++++ :: - + *in, cgrates.org,call,*any,*any,EU_LANDLINE,LCR_STANDARD,*static,ivo;dan;rif,2012-01-01T00:00:00Z,10 Code implementation @@ -63,7 +70,7 @@ For the QOS strategies the suppliers are searched using call descriptor paramete For the lowest/highest cost strategies the matched suppliers are sorted ascending/descending on cost. :: - + { "Entry": { "DestinationId": "*any", From aa3730d0cfbce9184b57793ab4612865ab7ce441 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 31 Jul 2015 15:51:42 +0300 Subject: [PATCH 06/10] moved users in accounting db --- cmd/cgr-engine/cgr-engine.go | 2 +- engine/storage_interface.go | 8 ++++---- engine/tp_reader.go | 4 ++-- engine/users.go | 30 +++++++++++++++--------------- engine/users_test.go | 12 ++++++------ 5 files changed, 28 insertions(+), 28 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index c4a37ebbf..1acc1077c 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -483,7 +483,7 @@ func main() { server.RpcRegisterName("ScribeV1", scribeServer) } if cfg.UserServerEnabled { - userServer, err = engine.NewUserMap(ratingDb) + userServer, err = engine.NewUserMap(accountDb) if err != nil { engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) exitChan <- true diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 6c05d287e..f06851e87 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -71,10 +71,6 @@ type RatingStorage interface { SetAccAlias(string, string) error RemoveAccAliases([]*TenantAccount, bool) error GetAccountAliases(string, string, bool) ([]string, error) - SetUser(*UserProfile) error - GetUser(string) (*UserProfile, error) - GetUsers() ([]*UserProfile, error) - RemoveUser(string) error } type AccountingStorage interface { @@ -87,6 +83,10 @@ type AccountingStorage interface { GetSubscribers() (map[string]*SubscriberData, error) SetSubscriber(string, *SubscriberData) error RemoveSubscriber(string) error + SetUser(*UserProfile) error + GetUser(string) (*UserProfile, error) + GetUsers() ([]*UserProfile, error) + RemoveUser(string) error } type CdrStorage interface { diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 32d53df70..ce737c131 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -1063,7 +1063,7 @@ func (tpr *TpReader) LoadUsersFiltered(filter *TpUser) (bool, error) { for _, tpUser := range tpUsers { user.Profile[tpUser.AttributeName] = tpUser.AttributeValue } - tpr.ratingStorage.SetUser(user) + tpr.accountingStorage.SetUser(user) return len(tpUsers) > 0, err } @@ -1306,7 +1306,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) { log.Print("Users:") } for _, u := range tpr.users { - err = tpr.ratingStorage.SetUser(u) + err = tpr.accountingStorage.SetUser(u) if err != nil { return err } diff --git a/engine/users.go b/engine/users.go index 0cf821c74..4c06d3824 100644 --- a/engine/users.go +++ b/engine/users.go @@ -58,17 +58,17 @@ type UserService interface { } type UserMap struct { - table map[string]map[string]string - index map[string]map[string]bool - indexKeys []string - ratingDb RatingStorage - mu sync.RWMutex + table map[string]map[string]string + index map[string]map[string]bool + indexKeys []string + accountingDb AccountingStorage + mu sync.RWMutex } -func NewUserMap(ratingDb RatingStorage) (*UserMap, error) { - um := newUserMap(ratingDb) +func NewUserMap(accountingDb AccountingStorage) (*UserMap, error) { + um := newUserMap(accountingDb) // load from rating db - if ups, err := um.ratingDb.GetUsers(); err == nil { + if ups, err := um.accountingDb.GetUsers(); err == nil { for _, up := range ups { um.table[up.GetId()] = up.Profile } @@ -78,18 +78,18 @@ func NewUserMap(ratingDb RatingStorage) (*UserMap, error) { return um, nil } -func newUserMap(ratingDb RatingStorage) *UserMap { +func newUserMap(accountingDb AccountingStorage) *UserMap { return &UserMap{ - table: make(map[string]map[string]string), - index: make(map[string]map[string]bool), - ratingDb: ratingDb, + table: make(map[string]map[string]string), + index: make(map[string]map[string]bool), + accountingDb: accountingDb, } } func (um *UserMap) SetUser(up UserProfile, reply *string) error { um.mu.Lock() defer um.mu.Unlock() - if err := um.ratingDb.SetUser(&up); err != nil { + if err := um.accountingDb.SetUser(&up); err != nil { *reply = err.Error() return err } @@ -102,7 +102,7 @@ func (um *UserMap) SetUser(up UserProfile, reply *string) error { func (um *UserMap) RemoveUser(up UserProfile, reply *string) error { um.mu.Lock() defer um.mu.Unlock() - if err := um.ratingDb.RemoveUser(up.GetId()); err != nil { + if err := um.accountingDb.RemoveUser(up.GetId()); err != nil { *reply = err.Error() return err } @@ -140,7 +140,7 @@ func (um *UserMap) UpdateUser(up UserProfile, reply *string) error { UserName: up.UserName, Profile: m, } - if err := um.ratingDb.SetUser(finalUp); err != nil { + if err := um.accountingDb.SetUser(finalUp); err != nil { *reply = err.Error() return err } diff --git a/engine/users_test.go b/engine/users_test.go index 86699a9cb..2cfcc4f11 100644 --- a/engine/users_test.go +++ b/engine/users_test.go @@ -19,7 +19,7 @@ var testMap = UserMap{ } func TestUsersAdd(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage) var r string up := UserProfile{ Tenant: "test", @@ -40,7 +40,7 @@ func TestUsersAdd(t *testing.T) { } func TestUsersUpdate(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage) var r string up := UserProfile{ Tenant: "test", @@ -71,7 +71,7 @@ func TestUsersUpdate(t *testing.T) { } func TestUsersUpdateNotFound(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage) var r string up := UserProfile{ Tenant: "test", @@ -89,7 +89,7 @@ func TestUsersUpdateNotFound(t *testing.T) { } func TestUsersUpdateInit(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage) var r string up := UserProfile{ Tenant: "test", @@ -115,7 +115,7 @@ func TestUsersUpdateInit(t *testing.T) { } func TestUsersRemove(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage) var r string up := UserProfile{ Tenant: "test", @@ -445,7 +445,7 @@ func TestUsersGetMissingIdTwoINdex(t *testing.T) { } func TestUsersAddUpdateRemoveIndexes(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage) var r string tm.AddIndex([]string{"t"}, &r) if len(tm.index) != 0 { From a09e24d5da877e0c3a794d39ce9dea0e6acafef3 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 31 Jul 2015 16:44:25 +0300 Subject: [PATCH 07/10] fix for users csv load --- cmd/cgr-engine/cgr-engine.go | 8 +---- cmd/cgr-loader/cgr-loader.go | 26 +++++++++++++++- engine/users.go | 58 ++++++++++++++++++++++++++---------- engine/users_test.go | 12 ++++---- 4 files changed, 74 insertions(+), 30 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 1acc1077c..e2a55c9e5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -483,18 +483,12 @@ func main() { server.RpcRegisterName("ScribeV1", scribeServer) } if cfg.UserServerEnabled { - userServer, err = engine.NewUserMap(accountDb) + userServer, err = engine.NewUserMap(accountDb, cfg.UserServerIndexes) if err != nil { engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) exitChan <- true } server.RpcRegisterName("UsersV1", userServer) - if len(cfg.UserServerIndexes) != 0 { - var s string - if err := userServer.AddIndex(cfg.UserServerIndexes, &s); err != nil { - engine.Logger.Err(fmt.Sprintf("Error adding %v indexes to user profile service: %v", cfg.UserServerIndexes, err)) - } - } } // Register session manager service // FixMe: make sure this is thread safe diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 268bdbbf9..0aa4f8cd6 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -70,6 +70,7 @@ var ( historyServer = flag.String("history_server", cgrConfig.RPCGOBListen, "The history server address:port, empty to disable automaticautomatic history archiving") raterAddress = flag.String("rater_address", cgrConfig.RPCGOBListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads") cdrstatsAddress = flag.String("cdrstats_address", cgrConfig.RPCGOBListen, "CDRStats service to contact for data reloads, empty to disable automatic data reloads") + usersAddress = flag.String("users_address", cgrConfig.RPCGOBListen, "Users service to contact for data reloads, empty to disable automatic data reloads") runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields") ) @@ -83,7 +84,7 @@ func main() { var ratingDb engine.RatingStorage var accountDb engine.AccountingStorage var storDb engine.LoadStorage - var rater, cdrstats *rpc.Client + var rater, cdrstats, users *rpc.Client var loader engine.LoadReader // Init necessary db connections, only if not already if !*dryRun { // make sure we do not need db connections on dry run, also not importing into any stordb @@ -206,6 +207,19 @@ func main() { } else { log.Print("WARNING: CDRStats automatic data reload is disabled!") } + if *usersAddress != "" { // Init connection to rater so we can reload it's data + if *usersAddress == *raterAddress { + users = rater + } else { + users, err = rpc.Dial("tcp", *usersAddress) + if err != nil { + log.Fatalf("Could not connect to Users API: %s", err.Error()) + return + } + } + } else { + log.Print("WARNING: Users automatic data reload is disabled!") + } // write maps to database if err := tpReader.WriteToDatabase(*flush, *verbose); err != nil { @@ -272,4 +286,14 @@ func main() { } } } + + if users != nil { + if *verbose { + log.Print("Reloading Users data") + } + var reply string + if err := cdrstats.Call("UsersV1.ReloadUsers", "", &reply); err != nil { + log.Printf("WARNING: Failed reloading users data, error: %s\n", err.Error()) + } + } } diff --git a/engine/users.go b/engine/users.go index 4c06d3824..3c3cc22ec 100644 --- a/engine/users.go +++ b/engine/users.go @@ -1,6 +1,7 @@ package engine import ( + "fmt" "sort" "strings" "sync" @@ -55,6 +56,7 @@ type UserService interface { GetUsers(UserProfile, *UserProfiles) error AddIndex([]string, *string) error GetIndexes(string, *map[string][]string) error + ReloadUsers(string, *string) error } type UserMap struct { @@ -65,25 +67,45 @@ type UserMap struct { mu sync.RWMutex } -func NewUserMap(accountingDb AccountingStorage) (*UserMap, error) { - um := newUserMap(accountingDb) +func NewUserMap(accountingDb AccountingStorage, indexes []string) (*UserMap, error) { + um := newUserMap(accountingDb, indexes) + var reply string + if err := um.ReloadUsers("", &reply); err != nil { + return nil, err + } + return um, nil +} + +func newUserMap(accountingDb AccountingStorage, indexes []string) *UserMap { + return &UserMap{ + table: make(map[string]map[string]string), + index: make(map[string]map[string]bool), + indexKeys: indexes, + accountingDb: accountingDb, + } +} + +func (um *UserMap) ReloadUsers(in string, reply *string) error { + um.mu.Lock() + defer um.mu.Unlock() // load from rating db if ups, err := um.accountingDb.GetUsers(); err == nil { for _, up := range ups { um.table[up.GetId()] = up.Profile } } else { - return nil, err + *reply = err.Error() + return err } - return um, nil -} -func newUserMap(accountingDb AccountingStorage) *UserMap { - return &UserMap{ - table: make(map[string]map[string]string), - index: make(map[string]map[string]bool), - accountingDb: accountingDb, + if len(um.indexKeys) != 0 { + var s string + if err := um.AddIndex(um.indexKeys, &s); err != nil { + Logger.Err(fmt.Sprintf("Error adding %v indexes to user profile service: %v", um.indexKeys, err)) + } } + *reply = utils.OK + return nil } func (um *UserMap) SetUser(up UserProfile, reply *string) error { @@ -94,7 +116,7 @@ func (um *UserMap) SetUser(up UserProfile, reply *string) error { return err } um.table[up.GetId()] = up.Profile - um.addIndex(&up) + um.addIndex(&up, um.indexKeys) *reply = utils.OK return nil } @@ -146,7 +168,7 @@ func (um *UserMap) UpdateUser(up UserProfile, reply *string) error { } um.table[up.GetId()] = m um.deleteIndex(oldUp) - um.addIndex(finalUp) + um.addIndex(finalUp, um.indexKeys) *reply = utils.OK return nil } @@ -235,19 +257,19 @@ func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error { func (um *UserMap) AddIndex(indexes []string, reply *string) error { um.mu.Lock() defer um.mu.Unlock() - um.indexKeys = indexes + um.indexKeys = append(um.indexKeys, indexes...) for key, values := range um.table { up := &UserProfile{Profile: values} up.SetId(key) - um.addIndex(up) + um.addIndex(up, indexes) } *reply = utils.OK return nil } -func (um *UserMap) addIndex(up *UserProfile) { +func (um *UserMap) addIndex(up *UserProfile, indexes []string) { key := up.GetId() - for _, index := range um.indexKeys { + for _, index := range indexes { if index == "Tenant" { if up.Tenant != "" { indexKey := utils.ConcatenatedKey(index, up.Tenant) @@ -369,6 +391,10 @@ func (ps *ProxyUserService) GetIndexes(in string, reply *map[string][]string) er return ps.Client.Call("UsersV1.AddIndex", in, reply) } +func (ps *ProxyUserService) ReloadUsers(in string, reply *string) error { + return ps.Client.Call("UsersV1.ReloadUsers", in, reply) +} + // extraFields - Field name in the interface containing extraFields information func LoadUserProfile(in interface{}, extraFields string) (interface{}, error) { if userService == nil { // no user service => no fun diff --git a/engine/users_test.go b/engine/users_test.go index 2cfcc4f11..42fd3d549 100644 --- a/engine/users_test.go +++ b/engine/users_test.go @@ -19,7 +19,7 @@ var testMap = UserMap{ } func TestUsersAdd(t *testing.T) { - tm := newUserMap(accountingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -40,7 +40,7 @@ func TestUsersAdd(t *testing.T) { } func TestUsersUpdate(t *testing.T) { - tm := newUserMap(accountingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -71,7 +71,7 @@ func TestUsersUpdate(t *testing.T) { } func TestUsersUpdateNotFound(t *testing.T) { - tm := newUserMap(accountingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -89,7 +89,7 @@ func TestUsersUpdateNotFound(t *testing.T) { } func TestUsersUpdateInit(t *testing.T) { - tm := newUserMap(accountingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -115,7 +115,7 @@ func TestUsersUpdateInit(t *testing.T) { } func TestUsersRemove(t *testing.T) { - tm := newUserMap(accountingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -445,7 +445,7 @@ func TestUsersGetMissingIdTwoINdex(t *testing.T) { } func TestUsersAddUpdateRemoveIndexes(t *testing.T) { - tm := newUserMap(accountingStorage) + tm := newUserMap(accountingStorage, nil) var r string tm.AddIndex([]string{"t"}, &r) if len(tm.index) != 0 { From 1fdd3ef6547cd6594a7e9cf302464555b02ea892 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 31 Jul 2015 16:54:59 +0300 Subject: [PATCH 08/10] flush users on reload --- engine/lcr_test.go | 16 ++++++++-------- engine/users.go | 11 +++++++++++ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/engine/lcr_test.go b/engine/lcr_test.go index ad94fabe9..93fb2c6d4 100644 --- a/engine/lcr_test.go +++ b/engine/lcr_test.go @@ -215,11 +215,11 @@ func TestLcrGet(t *testing.T) { func TestLcrRequestAsCallDescriptor(t *testing.T) { sTime := time.Date(2015, 04, 06, 17, 40, 0, 0, time.UTC) callDur := time.Duration(1) * time.Minute - lcrReq := &LcrRequest{Account: "1001", StartTime: sTime.String()} + lcrReq := &LcrRequest{Account: "2001", StartTime: sTime.String()} if _, err := lcrReq.AsCallDescriptor(); err == nil || err != utils.ErrMandatoryIeMissing { t.Error("Unexpected error received: %v", err) } - lcrReq = &LcrRequest{Account: "1001", Destination: "1002", StartTime: sTime.String()} + lcrReq = &LcrRequest{Account: "2001", Destination: "2002", StartTime: sTime.String()} eCd := &CallDescriptor{ Direction: utils.OUT, Tenant: config.CgrConfig().DefaultTenant, @@ -412,7 +412,7 @@ func TestLCRCostSuppliersLoadAllRounded(t *testing.T) { Supplier: "*out:tenant12:call:dan12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(100 * time.Minute)}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, @@ -452,7 +452,7 @@ func TestLCRCostSuppliersLoadAllRounded(t *testing.T) { }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(300 * time.Minute)}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(400 * time.Minute)}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, @@ -512,7 +512,7 @@ func TestLCRCostSuppliersLoadAllOver(t *testing.T) { Supplier: "*out:tenant12:call:dan12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(100 * time.Minute)}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, @@ -552,7 +552,7 @@ func TestLCRCostSuppliersLoadAllOver(t *testing.T) { }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(300 * time.Minute)}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(400 * time.Minute)}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, @@ -612,7 +612,7 @@ func TestLCRCostSuppliersLoadAllOverMisingDefault(t *testing.T) { Supplier: "*out:tenant12:call:dan12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(100 * time.Minute)}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, @@ -652,7 +652,7 @@ func TestLCRCostSuppliersLoadAllOverMisingDefault(t *testing.T) { }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(300 * time.Minute)}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(400 * time.Minute)}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, diff --git a/engine/users.go b/engine/users.go index 3c3cc22ec..b8faf8b52 100644 --- a/engine/users.go +++ b/engine/users.go @@ -88,12 +88,23 @@ func newUserMap(accountingDb AccountingStorage, indexes []string) *UserMap { func (um *UserMap) ReloadUsers(in string, reply *string) error { um.mu.Lock() defer um.mu.Unlock() + + // backup old data + oldTable := um.table + oldIndex := um.index + um.table = make(map[string]map[string]string) + um.index = make(map[string]map[string]bool) + // load from rating db if ups, err := um.accountingDb.GetUsers(); err == nil { for _, up := range ups { um.table[up.GetId()] = up.Profile } } else { + // restore old data before return + um.table = oldTable + um.index = oldIndex + *reply = err.Error() return err } From de22799b5d444e3d8d3500b546e0438cd044170a Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 31 Jul 2015 17:04:15 +0300 Subject: [PATCH 09/10] renamed ponder to share --- docs/lcr.rst | 8 ++++---- engine/lcr.go | 44 ++++++++++++++++++++++---------------------- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/docs/lcr.rst b/docs/lcr.rst index a6cd02362..2f0bc4632 100644 --- a/docs/lcr.rst +++ b/docs/lcr.rst @@ -37,10 +37,10 @@ Strategy indicates supplier selection algorithm and StrategyParams will be speci \*load_distribution (sorting/filter) The system will sort the suppliers in order to achieve the specified load distribution. - - if all have less than ponder return random order - - if some have a cdr count not divisible by ponder return them first and all ordered by cdr times, oldest first - - if all have a multiple of ponder return in the order of cdr times, oldest first - StrategyParams: supplier1:ponder;supplier2:ponder;*default:ponder + - if all have less than share return random order + - if some have a cdr count not divisible by share return them first and all ordered by cdr times, oldest first + - if all have a multiple of share return in the order of cdr times, oldest first + StrategyParams: supplier1:share;supplier2:share;*default:share ActivationTime is the date/time when the LCR entry starts to be active. diff --git a/engine/lcr.go b/engine/lcr.go index 71e48361a..05cbe0d36 100644 --- a/engine/lcr.go +++ b/engine/lcr.go @@ -337,25 +337,25 @@ func (lc *LCRCost) SortLoadDistribution() { /*for supplier, sq := range supplierQueues { log.Printf("Useful supplier qeues: %s %v", supplier, sq.conf.TimeWindow) }*/ - // if all have less than ponder return random order - // if some have a cdr count not divisible by ponder return them first and all ordered by cdr times, oldest first - // if all have a multiple of ponder return in the order of cdr times, oldest first + // if all have less than share return random order + // if some have a cdr count not divisible by share return them first and all ordered by cdr times, oldest first + // if all have a multiple of share return in the order of cdr times, oldest first // first put them in one of the above categories - havePonderlessSuppliers := false + haveSharelessSuppliers := false for supCost, sq := range supplierQueues { - ponder := lc.GetSupplierPonder(supCost.Supplier) - if ponder == -1 { + share := lc.GetSupplierShare(supCost.Supplier) + if share == -1 { supCost.Cost = -1 - havePonderlessSuppliers = true + haveSharelessSuppliers = true continue } cdrCount := len(sq.Cdrs) - if cdrCount < ponder { + if cdrCount < share { supCost.Cost = float64(LOW_PRIORITY_LIMIT + rand.Intn(RAND_LIMIT)) continue } - if cdrCount%ponder == 0 { + if cdrCount%share == 0 { supCost.Cost = float64(MED_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + (time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() / RAND_LIMIT) continue } else { @@ -363,7 +363,7 @@ func (lc *LCRCost) SortLoadDistribution() { continue } } - if havePonderlessSuppliers { + if haveSharelessSuppliers { var filteredSupplierCost []*LCRSupplierCost for _, supCost := range lc.SupplierCosts { if supCost.Cost != -1 { @@ -375,35 +375,35 @@ func (lc *LCRCost) SortLoadDistribution() { } // used in load distribution strategy only -// receives a long supplier id and will return the ponder found in strategy params -func (lc *LCRCost) GetSupplierPonder(supplier string) int { +// receives a long supplier id and will return the share found in strategy params +func (lc *LCRCost) GetSupplierShare(supplier string) int { // parse strategy params - ponders := make(map[string]int) + shares := make(map[string]int) params := strings.Split(lc.Entry.StrategyParams, utils.INFIELD_SEP) for _, param := range params { - ponderSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP) - if len(ponderSlice) != 2 { + shareSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP) + if len(shareSlice) != 2 { Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) continue } - p, err := strconv.Atoi(ponderSlice[1]) + p, err := strconv.Atoi(shareSlice[1]) if err != nil { Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) continue } - ponders[ponderSlice[0]] = p + shares[shareSlice[0]] = p } parts := strings.Split(supplier, utils.CONCATENATED_KEY_SEP) if len(parts) > 0 { supplierSubject := parts[len(parts)-1] - if ponder, found := ponders[supplierSubject]; found { - return ponder + if share, found := shares[supplierSubject]; found { + return share } - if ponder, found := ponders[utils.META_DEFAULT]; found { - return ponder + if share, found := shares[utils.META_DEFAULT]; found { + return share } } - if len(ponders) == 0 { + if len(shares) == 0 { return 1 // use random/last cdr date sorting } return -1 // exclude missing suppliers From 10b92844ebd72a5d785eef2816dc7fd32ec990b2 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 31 Jul 2015 17:24:09 +0300 Subject: [PATCH 10/10] renamed share to ratio --- docs/lcr.rst | 8 ++++---- engine/lcr.go | 44 ++++++++++++++++++++++---------------------- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/docs/lcr.rst b/docs/lcr.rst index 2f0bc4632..f803f39b5 100644 --- a/docs/lcr.rst +++ b/docs/lcr.rst @@ -37,10 +37,10 @@ Strategy indicates supplier selection algorithm and StrategyParams will be speci \*load_distribution (sorting/filter) The system will sort the suppliers in order to achieve the specified load distribution. - - if all have less than share return random order - - if some have a cdr count not divisible by share return them first and all ordered by cdr times, oldest first - - if all have a multiple of share return in the order of cdr times, oldest first - StrategyParams: supplier1:share;supplier2:share;*default:share + - if all have less than ratio return random order + - if some have a cdr count not divisible by ratio return them first and all ordered by cdr times, oldest first + - if all have a multiple of ratio return in the order of cdr times, oldest first + StrategyParams: supplier1:ratio;supplier2:ratio;*default:ratio ActivationTime is the date/time when the LCR entry starts to be active. diff --git a/engine/lcr.go b/engine/lcr.go index 05cbe0d36..70099aa22 100644 --- a/engine/lcr.go +++ b/engine/lcr.go @@ -337,25 +337,25 @@ func (lc *LCRCost) SortLoadDistribution() { /*for supplier, sq := range supplierQueues { log.Printf("Useful supplier qeues: %s %v", supplier, sq.conf.TimeWindow) }*/ - // if all have less than share return random order - // if some have a cdr count not divisible by share return them first and all ordered by cdr times, oldest first - // if all have a multiple of share return in the order of cdr times, oldest first + // if all have less than ratio return random order + // if some have a cdr count not divisible by ratio return them first and all ordered by cdr times, oldest first + // if all have a multiple of ratio return in the order of cdr times, oldest first // first put them in one of the above categories - haveSharelessSuppliers := false + haveRatiolessSuppliers := false for supCost, sq := range supplierQueues { - share := lc.GetSupplierShare(supCost.Supplier) - if share == -1 { + ratio := lc.GetSupplierRatio(supCost.Supplier) + if ratio == -1 { supCost.Cost = -1 - haveSharelessSuppliers = true + haveRatiolessSuppliers = true continue } cdrCount := len(sq.Cdrs) - if cdrCount < share { + if cdrCount < ratio { supCost.Cost = float64(LOW_PRIORITY_LIMIT + rand.Intn(RAND_LIMIT)) continue } - if cdrCount%share == 0 { + if cdrCount%ratio == 0 { supCost.Cost = float64(MED_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + (time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() / RAND_LIMIT) continue } else { @@ -363,7 +363,7 @@ func (lc *LCRCost) SortLoadDistribution() { continue } } - if haveSharelessSuppliers { + if haveRatiolessSuppliers { var filteredSupplierCost []*LCRSupplierCost for _, supCost := range lc.SupplierCosts { if supCost.Cost != -1 { @@ -375,35 +375,35 @@ func (lc *LCRCost) SortLoadDistribution() { } // used in load distribution strategy only -// receives a long supplier id and will return the share found in strategy params -func (lc *LCRCost) GetSupplierShare(supplier string) int { +// receives a long supplier id and will return the ratio found in strategy params +func (lc *LCRCost) GetSupplierRatio(supplier string) int { // parse strategy params - shares := make(map[string]int) + ratios := make(map[string]int) params := strings.Split(lc.Entry.StrategyParams, utils.INFIELD_SEP) for _, param := range params { - shareSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP) - if len(shareSlice) != 2 { + ratioSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP) + if len(ratioSlice) != 2 { Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) continue } - p, err := strconv.Atoi(shareSlice[1]) + p, err := strconv.Atoi(ratioSlice[1]) if err != nil { Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) continue } - shares[shareSlice[0]] = p + ratios[ratioSlice[0]] = p } parts := strings.Split(supplier, utils.CONCATENATED_KEY_SEP) if len(parts) > 0 { supplierSubject := parts[len(parts)-1] - if share, found := shares[supplierSubject]; found { - return share + if ratio, found := ratios[supplierSubject]; found { + return ratio } - if share, found := shares[utils.META_DEFAULT]; found { - return share + if ratio, found := ratios[utils.META_DEFAULT]; found { + return ratio } } - if len(shares) == 0 { + if len(ratios) == 0 { return 1 // use random/last cdr date sorting } return -1 // exclude missing suppliers