diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index c4a37ebbf..e2a55c9e5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -483,18 +483,12 @@ func main() { server.RpcRegisterName("ScribeV1", scribeServer) } if cfg.UserServerEnabled { - userServer, err = engine.NewUserMap(ratingDb) + userServer, err = engine.NewUserMap(accountDb, cfg.UserServerIndexes) if err != nil { engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) exitChan <- true } server.RpcRegisterName("UsersV1", userServer) - if len(cfg.UserServerIndexes) != 0 { - var s string - if err := userServer.AddIndex(cfg.UserServerIndexes, &s); err != nil { - engine.Logger.Err(fmt.Sprintf("Error adding %v indexes to user profile service: %v", cfg.UserServerIndexes, err)) - } - } } // Register session manager service // FixMe: make sure this is thread safe diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 268bdbbf9..0aa4f8cd6 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -70,6 +70,7 @@ var ( historyServer = flag.String("history_server", cgrConfig.RPCGOBListen, "The history server address:port, empty to disable automaticautomatic history archiving") raterAddress = flag.String("rater_address", cgrConfig.RPCGOBListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads") cdrstatsAddress = flag.String("cdrstats_address", cgrConfig.RPCGOBListen, "CDRStats service to contact for data reloads, empty to disable automatic data reloads") + usersAddress = flag.String("users_address", cgrConfig.RPCGOBListen, "Users service to contact for data reloads, empty to disable automatic data reloads") runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields") ) @@ -83,7 +84,7 @@ func main() { var ratingDb engine.RatingStorage var accountDb engine.AccountingStorage var storDb engine.LoadStorage - var rater, cdrstats *rpc.Client + var rater, cdrstats, users *rpc.Client var loader engine.LoadReader // Init necessary db connections, only if not already if !*dryRun { // make sure we do not need db connections on dry run, also not importing into any stordb @@ -206,6 +207,19 @@ func main() { } else { log.Print("WARNING: CDRStats automatic data reload is disabled!") } + if *usersAddress != "" { // Init connection to rater so we can reload it's data + if *usersAddress == *raterAddress { + users = rater + } else { + users, err = rpc.Dial("tcp", *usersAddress) + if err != nil { + log.Fatalf("Could not connect to Users API: %s", err.Error()) + return + } + } + } else { + log.Print("WARNING: Users automatic data reload is disabled!") + } // write maps to database if err := tpReader.WriteToDatabase(*flush, *verbose); err != nil { @@ -272,4 +286,14 @@ func main() { } } } + + if users != nil { + if *verbose { + log.Print("Reloading Users data") + } + var reply string + if err := cdrstats.Call("UsersV1.ReloadUsers", "", &reply); err != nil { + log.Printf("WARNING: Failed reloading users data, error: %s\n", err.Error()) + } + } } diff --git a/docs/lcr.rst b/docs/lcr.rst index 75cab353c..f803f39b5 100644 --- a/docs/lcr.rst +++ b/docs/lcr.rst @@ -18,7 +18,7 @@ Strategy indicates supplier selection algorithm and StrategyParams will be speci \*static (filter) Will use the suppliers provided as params. StrategyParams: suppier1;supplier2;etc - + \*lowest_cost (sorting) Matching suppliers will be sorted by ascending cost. StrategyParams: None @@ -35,6 +35,13 @@ Strategy indicates supplier selection algorithm and StrategyParams will be speci The system will sort by metrics in the order of appearance. StrategyParams: metric1;metric2;etc +\*load_distribution (sorting/filter) + The system will sort the suppliers in order to achieve the specified load distribution. + - if all have less than ratio return random order + - if some have a cdr count not divisible by ratio return them first and all ordered by cdr times, oldest first + - if all have a multiple of ratio return in the order of cdr times, oldest first + StrategyParams: supplier1:ratio;supplier2:ratio;*default:ratio + ActivationTime is the date/time when the LCR entry starts to be active. Weight is used to sort the rules with the same activation time. @@ -43,7 +50,7 @@ Example +++++++ :: - + *in, cgrates.org,call,*any,*any,EU_LANDLINE,LCR_STANDARD,*static,ivo;dan;rif,2012-01-01T00:00:00Z,10 Code implementation @@ -63,7 +70,7 @@ For the QOS strategies the suppliers are searched using call descriptor paramete For the lowest/highest cost strategies the matched suppliers are sorted ascending/descending on cost. :: - + { "Entry": { "DestinationId": "*any", diff --git a/engine/lcr.go b/engine/lcr.go index cdab45627..70099aa22 100644 --- a/engine/lcr.go +++ b/engine/lcr.go @@ -20,6 +20,7 @@ package engine import ( "fmt" + "math/rand" "sort" "strconv" "strings" @@ -37,6 +38,12 @@ const ( LCR_STRATEGY_QOS_THRESHOLD = "*qos_threshold" LCR_STRATEGY_QOS = "*qos" LCR_STRATEGY_LOAD = "*load_distribution" + + // used for load distribution sorting + RAND_LIMIT = 99 + LOW_PRIORITY_LIMIT = 100 + MED_PRIORITY_LIMIT = 200 + HIGH_PRIORITY_LIMIT = 300 ) // A request for LCR, used in APIer and SM where we need to expose it @@ -295,6 +302,7 @@ func (lc *LCRCost) Sort() { sort.Sort(QOSSorter(lc.SupplierCosts)) case LCR_STRATEGY_LOAD: lc.SortLoadDistribution() + sort.Sort(HighestSupplierCostSorter(lc.SupplierCosts)) } } @@ -317,11 +325,11 @@ func (lc *LCRCost) SortLoadDistribution() { } } } - supplierQueues := make(map[string]*StatsQueue) + supplierQueues := make(map[*LCRSupplierCost]*StatsQueue) for _, supCost := range lc.SupplierCosts { for _, sq := range supCost.supplierQueues { if sq.conf.TimeWindow == winnerTimeWindow { - supplierQueues[supCost.Supplier] = sq + supplierQueues[supCost] = sq break } } @@ -329,43 +337,76 @@ func (lc *LCRCost) SortLoadDistribution() { /*for supplier, sq := range supplierQueues { log.Printf("Useful supplier qeues: %s %v", supplier, sq.conf.TimeWindow) }*/ - // if all have less than ponder return random order - // if some have a cdr count not divisible by ponder return them first and all ordered by cdr times, oldest first - // if all have a multiple of ponder return in the order of cdr times, oldest first + // if all have less than ratio return random order + // if some have a cdr count not divisible by ratio return them first and all ordered by cdr times, oldest first + // if all have a multiple of ratio return in the order of cdr times, oldest first + // first put them in one of the above categories + haveRatiolessSuppliers := false + for supCost, sq := range supplierQueues { + ratio := lc.GetSupplierRatio(supCost.Supplier) + if ratio == -1 { + supCost.Cost = -1 + haveRatiolessSuppliers = true + continue + } + cdrCount := len(sq.Cdrs) + if cdrCount < ratio { + supCost.Cost = float64(LOW_PRIORITY_LIMIT + rand.Intn(RAND_LIMIT)) + continue + } + if cdrCount%ratio == 0 { + supCost.Cost = float64(MED_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + (time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() / RAND_LIMIT) + continue + } else { + supCost.Cost = float64(HIGH_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + (time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() / RAND_LIMIT) + continue + } + } + if haveRatiolessSuppliers { + var filteredSupplierCost []*LCRSupplierCost + for _, supCost := range lc.SupplierCosts { + if supCost.Cost != -1 { + filteredSupplierCost = append(filteredSupplierCost, supCost) + } + } + lc.SupplierCosts = filteredSupplierCost + } } // used in load distribution strategy only -// receives a long supplier id and will return the ponder found in strategy params -func (lc *LCRCost) GetSupplierPonder(supplier string) float64 { +// receives a long supplier id and will return the ratio found in strategy params +func (lc *LCRCost) GetSupplierRatio(supplier string) int { // parse strategy params - ponders := make(map[string]float64) + ratios := make(map[string]int) params := strings.Split(lc.Entry.StrategyParams, utils.INFIELD_SEP) for _, param := range params { - ponderSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP) - if len(ponderSlice) != 2 { + ratioSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP) + if len(ratioSlice) != 2 { Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) continue } - p, err := strconv.ParseFloat(ponderSlice[1], 64) + p, err := strconv.Atoi(ratioSlice[1]) if err != nil { Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams)) continue } - ponders[ponderSlice[0]] = p + ratios[ratioSlice[0]] = p } parts := strings.Split(supplier, utils.CONCATENATED_KEY_SEP) if len(parts) > 0 { supplierSubject := parts[len(parts)-1] - if ponder, found := ponders[supplierSubject]; found { - return ponder + if ratio, found := ratios[supplierSubject]; found { + return ratio } - if ponder, found := ponders[utils.META_DEFAULT]; found { - return ponder + if ratio, found := ratios[utils.META_DEFAULT]; found { + return ratio } } - - return 1 + if len(ratios) == 0 { + return 1 // use random/last cdr date sorting + } + return -1 // exclude missing suppliers } func (lc *LCRCost) HasErrors() bool { diff --git a/engine/lcr_test.go b/engine/lcr_test.go index 847eefbda..93fb2c6d4 100644 --- a/engine/lcr_test.go +++ b/engine/lcr_test.go @@ -215,11 +215,11 @@ func TestLcrGet(t *testing.T) { func TestLcrRequestAsCallDescriptor(t *testing.T) { sTime := time.Date(2015, 04, 06, 17, 40, 0, 0, time.UTC) callDur := time.Duration(1) * time.Minute - lcrReq := &LcrRequest{Account: "1001", StartTime: sTime.String()} + lcrReq := &LcrRequest{Account: "2001", StartTime: sTime.String()} if _, err := lcrReq.AsCallDescriptor(); err == nil || err != utils.ErrMandatoryIeMissing { t.Error("Unexpected error received: %v", err) } - lcrReq = &LcrRequest{Account: "1001", Destination: "1002", StartTime: sTime.String()} + lcrReq = &LcrRequest{Account: "2001", Destination: "2002", StartTime: sTime.String()} eCd := &CallDescriptor{ Direction: utils.OUT, Tenant: config.CgrConfig().DefaultTenant, @@ -280,6 +280,7 @@ func TestLCRCostSuppliersString(t *testing.T) { } func TestLCRCostSuppliersLoad(t *testing.T) { + setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) lcrCost := &LCRCost{ Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:10;dan12:3;*default:7", Weight: 10.0}, SupplierCosts: []*LCRSupplierCost{ @@ -287,21 +288,21 @@ func TestLCRCostSuppliersLoad(t *testing.T) { Supplier: "*out:tenant12:call:ivo12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 3 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 1 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, @@ -313,21 +314,21 @@ func TestLCRCostSuppliersLoad(t *testing.T) { Supplier: "*out:tenant12:call:dan12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, @@ -339,28 +340,28 @@ func TestLCRCostSuppliersLoad(t *testing.T) { Supplier: "*out:tenant12:call:rif12", supplierQueues: []*StatsQueue{ &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 7 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 10 * time.Minute, }, }, &StatsQueue{ - Cdrs: []*QCdr{&QCdr{}, &QCdr{}}, + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, conf: &CdrStats{ QueueLength: 0, TimeWindow: 1 * time.Minute, @@ -371,9 +372,405 @@ func TestLCRCostSuppliersLoad(t *testing.T) { }, } lcrCost.Sort() - if lcrCost.SupplierCosts[0].Supplier != "" || - lcrCost.SupplierCosts[1].Supplier != "" || - lcrCost.SupplierCosts[2].Supplier != "" { - //t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:dan12" { + t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + } +} + +func TestLCRCostSuppliersLoadAllRounded(t *testing.T) { + setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) + lcrCost := &LCRCost{ + Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:3;dan12:5;*default:2", Weight: 10.0}, + SupplierCosts: []*LCRSupplierCost{ + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:ivo12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 3 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:dan12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:rif12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(400 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + }, + }, + }, + } + lcrCost.Sort() + if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" || + lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" || + lcrCost.SupplierCosts[2].Supplier != "*out:tenant12:call:rif12" { + t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + } +} + +func TestLCRCostSuppliersLoadAllOver(t *testing.T) { + setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) + lcrCost := &LCRCost{ + Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:2;dan12:4;*default:2", Weight: 10.0}, + SupplierCosts: []*LCRSupplierCost{ + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:ivo12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 3 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:dan12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:rif12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(400 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + }, + }, + }, + } + lcrCost.Sort() + if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" || + lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" || + lcrCost.SupplierCosts[2].Supplier != "*out:tenant12:call:rif12" { + t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + } +} + +func TestLCRCostSuppliersLoadAllOverMisingDefault(t *testing.T) { + setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) + lcrCost := &LCRCost{ + Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:2;dan12:4", Weight: 10.0}, + SupplierCosts: []*LCRSupplierCost{ + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:ivo12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 3 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:dan12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:rif12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(400 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + }, + }, + }, + } + lcrCost.Sort() + if len(lcrCost.SupplierCosts) != 2 || + lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" || + lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" { + t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) + } +} + +func TestLCRCostSuppliersLoadAllOverMisingParams(t *testing.T) { + setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC) + lcrCost := &LCRCost{ + Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "", Weight: 10.0}, + SupplierCosts: []*LCRSupplierCost{ + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:ivo12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 3 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:dan12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(60 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + }, + }, + &LCRSupplierCost{ + Supplier: "*out:tenant12:call:rif12", + supplierQueues: []*StatsQueue{ + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 7 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 10 * time.Minute, + }, + }, + &StatsQueue{ + Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}}, + conf: &CdrStats{ + QueueLength: 0, + TimeWindow: 1 * time.Minute, + }, + }, + }, + }, + }, + } + lcrCost.Sort() + if len(lcrCost.SupplierCosts) != 3 { + t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost)) } } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 6c05d287e..f06851e87 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -71,10 +71,6 @@ type RatingStorage interface { SetAccAlias(string, string) error RemoveAccAliases([]*TenantAccount, bool) error GetAccountAliases(string, string, bool) ([]string, error) - SetUser(*UserProfile) error - GetUser(string) (*UserProfile, error) - GetUsers() ([]*UserProfile, error) - RemoveUser(string) error } type AccountingStorage interface { @@ -87,6 +83,10 @@ type AccountingStorage interface { GetSubscribers() (map[string]*SubscriberData, error) SetSubscriber(string, *SubscriberData) error RemoveSubscriber(string) error + SetUser(*UserProfile) error + GetUser(string) (*UserProfile, error) + GetUsers() ([]*UserProfile, error) + RemoveUser(string) error } type CdrStorage interface { diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 32d53df70..ce737c131 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -1063,7 +1063,7 @@ func (tpr *TpReader) LoadUsersFiltered(filter *TpUser) (bool, error) { for _, tpUser := range tpUsers { user.Profile[tpUser.AttributeName] = tpUser.AttributeValue } - tpr.ratingStorage.SetUser(user) + tpr.accountingStorage.SetUser(user) return len(tpUsers) > 0, err } @@ -1306,7 +1306,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) { log.Print("Users:") } for _, u := range tpr.users { - err = tpr.ratingStorage.SetUser(u) + err = tpr.accountingStorage.SetUser(u) if err != nil { return err } diff --git a/engine/users.go b/engine/users.go index ab334a9ad..2c64bec62 100644 --- a/engine/users.go +++ b/engine/users.go @@ -1,6 +1,7 @@ package engine import ( + "fmt" "sort" "strings" "sync" @@ -55,46 +56,78 @@ type UserService interface { GetUsers(UserProfile, *UserProfiles) error AddIndex([]string, *string) error GetIndexes(string, *map[string][]string) error + ReloadUsers(string, *string) error } type UserMap struct { - table map[string]map[string]string - index map[string]map[string]bool - indexKeys []string - ratingDb RatingStorage - mu sync.RWMutex + table map[string]map[string]string + index map[string]map[string]bool + indexKeys []string + accountingDb AccountingStorage + mu sync.RWMutex } -func NewUserMap(ratingDb RatingStorage) (*UserMap, error) { - um := newUserMap(ratingDb) - // load from rating db - if ups, err := um.ratingDb.GetUsers(); err == nil { - for _, up := range ups { - um.table[up.GetId()] = up.Profile - } - } else { +func NewUserMap(accountingDb AccountingStorage, indexes []string) (*UserMap, error) { + um := newUserMap(accountingDb, indexes) + var reply string + if err := um.ReloadUsers("", &reply); err != nil { return nil, err } return um, nil } -func newUserMap(ratingDb RatingStorage) *UserMap { +func newUserMap(accountingDb AccountingStorage, indexes []string) *UserMap { return &UserMap{ - table: make(map[string]map[string]string), - index: make(map[string]map[string]bool), - ratingDb: ratingDb, + table: make(map[string]map[string]string), + index: make(map[string]map[string]bool), + indexKeys: indexes, + accountingDb: accountingDb, } } +func (um *UserMap) ReloadUsers(in string, reply *string) error { + um.mu.Lock() + defer um.mu.Unlock() + + // backup old data + oldTable := um.table + oldIndex := um.index + um.table = make(map[string]map[string]string) + um.index = make(map[string]map[string]bool) + + // load from rating db + if ups, err := um.accountingDb.GetUsers(); err == nil { + for _, up := range ups { + um.table[up.GetId()] = up.Profile + } + } else { + // restore old data before return + um.table = oldTable + um.index = oldIndex + + *reply = err.Error() + return err + } + + if len(um.indexKeys) != 0 { + var s string + if err := um.AddIndex(um.indexKeys, &s); err != nil { + Logger.Err(fmt.Sprintf("Error adding %v indexes to user profile service: %v", um.indexKeys, err)) + } + } + *reply = utils.OK + return nil +} + func (um *UserMap) SetUser(up UserProfile, reply *string) error { um.mu.Lock() defer um.mu.Unlock() - if err := um.ratingDb.SetUser(&up); err != nil { + if err := um.accountingDb.SetUser(&up); err != nil { *reply = err.Error() return err } um.table[up.GetId()] = up.Profile - um.addIndex(&up) + um.addIndex(&up, um.indexKeys) *reply = utils.OK return nil } @@ -102,7 +135,7 @@ func (um *UserMap) SetUser(up UserProfile, reply *string) error { func (um *UserMap) RemoveUser(up UserProfile, reply *string) error { um.mu.Lock() defer um.mu.Unlock() - if err := um.ratingDb.RemoveUser(up.GetId()); err != nil { + if err := um.accountingDb.RemoveUser(up.GetId()); err != nil { *reply = err.Error() return err } @@ -140,13 +173,13 @@ func (um *UserMap) UpdateUser(up UserProfile, reply *string) error { UserName: up.UserName, Profile: m, } - if err := um.ratingDb.SetUser(finalUp); err != nil { + if err := um.accountingDb.SetUser(finalUp); err != nil { *reply = err.Error() return err } um.table[up.GetId()] = m um.deleteIndex(oldUp) - um.addIndex(finalUp) + um.addIndex(finalUp, um.indexKeys) *reply = utils.OK return nil } @@ -235,19 +268,19 @@ func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error { func (um *UserMap) AddIndex(indexes []string, reply *string) error { um.mu.Lock() defer um.mu.Unlock() - um.indexKeys = indexes + um.indexKeys = append(um.indexKeys, indexes...) for key, values := range um.table { up := &UserProfile{Profile: values} up.SetId(key) - um.addIndex(up) + um.addIndex(up, indexes) } *reply = utils.OK return nil } -func (um *UserMap) addIndex(up *UserProfile) { +func (um *UserMap) addIndex(up *UserProfile, indexes []string) { key := up.GetId() - for _, index := range um.indexKeys { + for _, index := range indexes { if index == "Tenant" { if up.Tenant != "" { indexKey := utils.ConcatenatedKey(index, up.Tenant) @@ -369,6 +402,10 @@ func (ps *ProxyUserService) GetIndexes(in string, reply *map[string][]string) er return ps.Client.Call("UsersV1.AddIndex", in, reply) } +func (ps *ProxyUserService) ReloadUsers(in string, reply *string) error { + return ps.Client.Call("UsersV1.ReloadUsers", in, reply) +} + // extraFields - Field name in the interface containing extraFields information func LoadUserProfile(in interface{}, extraFields string) (interface{}, error) { if userService == nil { // no user service => no fun diff --git a/engine/users_test.go b/engine/users_test.go index 86699a9cb..42fd3d549 100644 --- a/engine/users_test.go +++ b/engine/users_test.go @@ -19,7 +19,7 @@ var testMap = UserMap{ } func TestUsersAdd(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -40,7 +40,7 @@ func TestUsersAdd(t *testing.T) { } func TestUsersUpdate(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -71,7 +71,7 @@ func TestUsersUpdate(t *testing.T) { } func TestUsersUpdateNotFound(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -89,7 +89,7 @@ func TestUsersUpdateNotFound(t *testing.T) { } func TestUsersUpdateInit(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -115,7 +115,7 @@ func TestUsersUpdateInit(t *testing.T) { } func TestUsersRemove(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -445,7 +445,7 @@ func TestUsersGetMissingIdTwoINdex(t *testing.T) { } func TestUsersAddUpdateRemoveIndexes(t *testing.T) { - tm := newUserMap(ratingStorage) + tm := newUserMap(accountingStorage, nil) var r string tm.AddIndex([]string{"t"}, &r) if len(tm.index) != 0 {