This commit is contained in:
DanB
2015-07-31 16:30:07 +02:00
9 changed files with 583 additions and 83 deletions

View File

@@ -483,18 +483,12 @@ func main() {
server.RpcRegisterName("ScribeV1", scribeServer)
}
if cfg.UserServerEnabled {
userServer, err = engine.NewUserMap(ratingDb)
userServer, err = engine.NewUserMap(accountDb, cfg.UserServerIndexes)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<UsersService> Could not start, error: %s", err.Error()))
exitChan <- true
}
server.RpcRegisterName("UsersV1", userServer)
if len(cfg.UserServerIndexes) != 0 {
var s string
if err := userServer.AddIndex(cfg.UserServerIndexes, &s); err != nil {
engine.Logger.Err(fmt.Sprintf("Error adding %v indexes to user profile service: %v", cfg.UserServerIndexes, err))
}
}
}
// Register session manager service // FixMe: make sure this is thread safe

View File

@@ -70,6 +70,7 @@ var (
historyServer = flag.String("history_server", cgrConfig.RPCGOBListen, "The history server address:port, empty to disable automaticautomatic history archiving")
raterAddress = flag.String("rater_address", cgrConfig.RPCGOBListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads")
cdrstatsAddress = flag.String("cdrstats_address", cgrConfig.RPCGOBListen, "CDRStats service to contact for data reloads, empty to disable automatic data reloads")
usersAddress = flag.String("users_address", cgrConfig.RPCGOBListen, "Users service to contact for data reloads, empty to disable automatic data reloads")
runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields")
)
@@ -83,7 +84,7 @@ func main() {
var ratingDb engine.RatingStorage
var accountDb engine.AccountingStorage
var storDb engine.LoadStorage
var rater, cdrstats *rpc.Client
var rater, cdrstats, users *rpc.Client
var loader engine.LoadReader
// Init necessary db connections, only if not already
if !*dryRun { // make sure we do not need db connections on dry run, also not importing into any stordb
@@ -206,6 +207,19 @@ func main() {
} else {
log.Print("WARNING: CDRStats automatic data reload is disabled!")
}
if *usersAddress != "" { // Init connection to rater so we can reload it's data
if *usersAddress == *raterAddress {
users = rater
} else {
users, err = rpc.Dial("tcp", *usersAddress)
if err != nil {
log.Fatalf("Could not connect to Users API: %s", err.Error())
return
}
}
} else {
log.Print("WARNING: Users automatic data reload is disabled!")
}
// write maps to database
if err := tpReader.WriteToDatabase(*flush, *verbose); err != nil {
@@ -272,4 +286,14 @@ func main() {
}
}
}
if users != nil {
if *verbose {
log.Print("Reloading Users data")
}
var reply string
if err := cdrstats.Call("UsersV1.ReloadUsers", "", &reply); err != nil {
log.Printf("WARNING: Failed reloading users data, error: %s\n", err.Error())
}
}
}

View File

@@ -18,7 +18,7 @@ Strategy indicates supplier selection algorithm and StrategyParams will be speci
\*static (filter)
Will use the suppliers provided as params.
StrategyParams: suppier1;supplier2;etc
\*lowest_cost (sorting)
Matching suppliers will be sorted by ascending cost.
StrategyParams: None
@@ -35,6 +35,13 @@ Strategy indicates supplier selection algorithm and StrategyParams will be speci
The system will sort by metrics in the order of appearance.
StrategyParams: metric1;metric2;etc
\*load_distribution (sorting/filter)
The system will sort the suppliers in order to achieve the specified load distribution.
- if all have less than ratio return random order
- if some have a cdr count not divisible by ratio return them first and all ordered by cdr times, oldest first
- if all have a multiple of ratio return in the order of cdr times, oldest first
StrategyParams: supplier1:ratio;supplier2:ratio;*default:ratio
ActivationTime is the date/time when the LCR entry starts to be active.
Weight is used to sort the rules with the same activation time.
@@ -43,7 +50,7 @@ Example
+++++++
::
*in, cgrates.org,call,*any,*any,EU_LANDLINE,LCR_STANDARD,*static,ivo;dan;rif,2012-01-01T00:00:00Z,10
Code implementation
@@ -63,7 +70,7 @@ For the QOS strategies the suppliers are searched using call descriptor paramete
For the lowest/highest cost strategies the matched suppliers are sorted ascending/descending on cost.
::
{
"Entry": {
"DestinationId": "*any",

View File

@@ -20,6 +20,7 @@ package engine
import (
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
@@ -37,6 +38,12 @@ const (
LCR_STRATEGY_QOS_THRESHOLD = "*qos_threshold"
LCR_STRATEGY_QOS = "*qos"
LCR_STRATEGY_LOAD = "*load_distribution"
// used for load distribution sorting
RAND_LIMIT = 99
LOW_PRIORITY_LIMIT = 100
MED_PRIORITY_LIMIT = 200
HIGH_PRIORITY_LIMIT = 300
)
// A request for LCR, used in APIer and SM where we need to expose it
@@ -295,6 +302,7 @@ func (lc *LCRCost) Sort() {
sort.Sort(QOSSorter(lc.SupplierCosts))
case LCR_STRATEGY_LOAD:
lc.SortLoadDistribution()
sort.Sort(HighestSupplierCostSorter(lc.SupplierCosts))
}
}
@@ -317,11 +325,11 @@ func (lc *LCRCost) SortLoadDistribution() {
}
}
}
supplierQueues := make(map[string]*StatsQueue)
supplierQueues := make(map[*LCRSupplierCost]*StatsQueue)
for _, supCost := range lc.SupplierCosts {
for _, sq := range supCost.supplierQueues {
if sq.conf.TimeWindow == winnerTimeWindow {
supplierQueues[supCost.Supplier] = sq
supplierQueues[supCost] = sq
break
}
}
@@ -329,43 +337,76 @@ func (lc *LCRCost) SortLoadDistribution() {
/*for supplier, sq := range supplierQueues {
log.Printf("Useful supplier qeues: %s %v", supplier, sq.conf.TimeWindow)
}*/
// if all have less than ponder return random order
// if some have a cdr count not divisible by ponder return them first and all ordered by cdr times, oldest first
// if all have a multiple of ponder return in the order of cdr times, oldest first
// if all have less than ratio return random order
// if some have a cdr count not divisible by ratio return them first and all ordered by cdr times, oldest first
// if all have a multiple of ratio return in the order of cdr times, oldest first
// first put them in one of the above categories
haveRatiolessSuppliers := false
for supCost, sq := range supplierQueues {
ratio := lc.GetSupplierRatio(supCost.Supplier)
if ratio == -1 {
supCost.Cost = -1
haveRatiolessSuppliers = true
continue
}
cdrCount := len(sq.Cdrs)
if cdrCount < ratio {
supCost.Cost = float64(LOW_PRIORITY_LIMIT + rand.Intn(RAND_LIMIT))
continue
}
if cdrCount%ratio == 0 {
supCost.Cost = float64(MED_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + (time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() / RAND_LIMIT)
continue
} else {
supCost.Cost = float64(HIGH_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + (time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() / RAND_LIMIT)
continue
}
}
if haveRatiolessSuppliers {
var filteredSupplierCost []*LCRSupplierCost
for _, supCost := range lc.SupplierCosts {
if supCost.Cost != -1 {
filteredSupplierCost = append(filteredSupplierCost, supCost)
}
}
lc.SupplierCosts = filteredSupplierCost
}
}
// used in load distribution strategy only
// receives a long supplier id and will return the ponder found in strategy params
func (lc *LCRCost) GetSupplierPonder(supplier string) float64 {
// receives a long supplier id and will return the ratio found in strategy params
func (lc *LCRCost) GetSupplierRatio(supplier string) int {
// parse strategy params
ponders := make(map[string]float64)
ratios := make(map[string]int)
params := strings.Split(lc.Entry.StrategyParams, utils.INFIELD_SEP)
for _, param := range params {
ponderSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP)
if len(ponderSlice) != 2 {
ratioSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP)
if len(ratioSlice) != 2 {
Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams))
continue
}
p, err := strconv.ParseFloat(ponderSlice[1], 64)
p, err := strconv.Atoi(ratioSlice[1])
if err != nil {
Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams))
continue
}
ponders[ponderSlice[0]] = p
ratios[ratioSlice[0]] = p
}
parts := strings.Split(supplier, utils.CONCATENATED_KEY_SEP)
if len(parts) > 0 {
supplierSubject := parts[len(parts)-1]
if ponder, found := ponders[supplierSubject]; found {
return ponder
if ratio, found := ratios[supplierSubject]; found {
return ratio
}
if ponder, found := ponders[utils.META_DEFAULT]; found {
return ponder
if ratio, found := ratios[utils.META_DEFAULT]; found {
return ratio
}
}
return 1
if len(ratios) == 0 {
return 1 // use random/last cdr date sorting
}
return -1 // exclude missing suppliers
}
func (lc *LCRCost) HasErrors() bool {

View File

@@ -215,11 +215,11 @@ func TestLcrGet(t *testing.T) {
func TestLcrRequestAsCallDescriptor(t *testing.T) {
sTime := time.Date(2015, 04, 06, 17, 40, 0, 0, time.UTC)
callDur := time.Duration(1) * time.Minute
lcrReq := &LcrRequest{Account: "1001", StartTime: sTime.String()}
lcrReq := &LcrRequest{Account: "2001", StartTime: sTime.String()}
if _, err := lcrReq.AsCallDescriptor(); err == nil || err != utils.ErrMandatoryIeMissing {
t.Error("Unexpected error received: %v", err)
}
lcrReq = &LcrRequest{Account: "1001", Destination: "1002", StartTime: sTime.String()}
lcrReq = &LcrRequest{Account: "2001", Destination: "2002", StartTime: sTime.String()}
eCd := &CallDescriptor{
Direction: utils.OUT,
Tenant: config.CgrConfig().DefaultTenant,
@@ -280,6 +280,7 @@ func TestLCRCostSuppliersString(t *testing.T) {
}
func TestLCRCostSuppliersLoad(t *testing.T) {
setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC)
lcrCost := &LCRCost{
Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:10;dan12:3;*default:7", Weight: 10.0},
SupplierCosts: []*LCRSupplierCost{
@@ -287,21 +288,21 @@ func TestLCRCostSuppliersLoad(t *testing.T) {
Supplier: "*out:tenant12:call:ivo12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}},
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 3 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}},
Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}},
Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
@@ -313,21 +314,21 @@ func TestLCRCostSuppliersLoad(t *testing.T) {
Supplier: "*out:tenant12:call:dan12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}},
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}},
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}},
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
@@ -339,28 +340,28 @@ func TestLCRCostSuppliersLoad(t *testing.T) {
Supplier: "*out:tenant12:call:rif12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}},
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}},
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}},
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}},
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
@@ -371,9 +372,405 @@ func TestLCRCostSuppliersLoad(t *testing.T) {
},
}
lcrCost.Sort()
if lcrCost.SupplierCosts[0].Supplier != "" ||
lcrCost.SupplierCosts[1].Supplier != "" ||
lcrCost.SupplierCosts[2].Supplier != "" {
//t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost))
if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:dan12" {
t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost))
}
}
func TestLCRCostSuppliersLoadAllRounded(t *testing.T) {
setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC)
lcrCost := &LCRCost{
Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:3;dan12:5;*default:2", Weight: 10.0},
SupplierCosts: []*LCRSupplierCost{
&LCRSupplierCost{
Supplier: "*out:tenant12:call:ivo12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 3 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
},
},
&LCRSupplierCost{
Supplier: "*out:tenant12:call:dan12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
},
},
&LCRSupplierCost{
Supplier: "*out:tenant12:call:rif12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(400 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
},
},
},
}
lcrCost.Sort()
if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" ||
lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" ||
lcrCost.SupplierCosts[2].Supplier != "*out:tenant12:call:rif12" {
t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost))
}
}
func TestLCRCostSuppliersLoadAllOver(t *testing.T) {
setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC)
lcrCost := &LCRCost{
Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:2;dan12:4;*default:2", Weight: 10.0},
SupplierCosts: []*LCRSupplierCost{
&LCRSupplierCost{
Supplier: "*out:tenant12:call:ivo12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 3 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
},
},
&LCRSupplierCost{
Supplier: "*out:tenant12:call:dan12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
},
},
&LCRSupplierCost{
Supplier: "*out:tenant12:call:rif12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(400 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
},
},
},
}
lcrCost.Sort()
if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" ||
lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" ||
lcrCost.SupplierCosts[2].Supplier != "*out:tenant12:call:rif12" {
t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost))
}
}
func TestLCRCostSuppliersLoadAllOverMisingDefault(t *testing.T) {
setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC)
lcrCost := &LCRCost{
Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:2;dan12:4", Weight: 10.0},
SupplierCosts: []*LCRSupplierCost{
&LCRSupplierCost{
Supplier: "*out:tenant12:call:ivo12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 3 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
},
},
&LCRSupplierCost{
Supplier: "*out:tenant12:call:dan12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
},
},
&LCRSupplierCost{
Supplier: "*out:tenant12:call:rif12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(400 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
},
},
},
}
lcrCost.Sort()
if len(lcrCost.SupplierCosts) != 2 ||
lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" ||
lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" {
t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost))
}
}
func TestLCRCostSuppliersLoadAllOverMisingParams(t *testing.T) {
setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC)
lcrCost := &LCRCost{
Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "", Weight: 10.0},
SupplierCosts: []*LCRSupplierCost{
&LCRSupplierCost{
Supplier: "*out:tenant12:call:ivo12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 3 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
},
},
&LCRSupplierCost{
Supplier: "*out:tenant12:call:dan12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(60 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
},
},
&LCRSupplierCost{
Supplier: "*out:tenant12:call:rif12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
},
},
},
}
lcrCost.Sort()
if len(lcrCost.SupplierCosts) != 3 {
t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost))
}
}

View File

@@ -71,10 +71,6 @@ type RatingStorage interface {
SetAccAlias(string, string) error
RemoveAccAliases([]*TenantAccount, bool) error
GetAccountAliases(string, string, bool) ([]string, error)
SetUser(*UserProfile) error
GetUser(string) (*UserProfile, error)
GetUsers() ([]*UserProfile, error)
RemoveUser(string) error
}
type AccountingStorage interface {
@@ -87,6 +83,10 @@ type AccountingStorage interface {
GetSubscribers() (map[string]*SubscriberData, error)
SetSubscriber(string, *SubscriberData) error
RemoveSubscriber(string) error
SetUser(*UserProfile) error
GetUser(string) (*UserProfile, error)
GetUsers() ([]*UserProfile, error)
RemoveUser(string) error
}
type CdrStorage interface {

View File

@@ -1063,7 +1063,7 @@ func (tpr *TpReader) LoadUsersFiltered(filter *TpUser) (bool, error) {
for _, tpUser := range tpUsers {
user.Profile[tpUser.AttributeName] = tpUser.AttributeValue
}
tpr.ratingStorage.SetUser(user)
tpr.accountingStorage.SetUser(user)
return len(tpUsers) > 0, err
}
@@ -1306,7 +1306,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) {
log.Print("Users:")
}
for _, u := range tpr.users {
err = tpr.ratingStorage.SetUser(u)
err = tpr.accountingStorage.SetUser(u)
if err != nil {
return err
}

View File

@@ -1,6 +1,7 @@
package engine
import (
"fmt"
"sort"
"strings"
"sync"
@@ -55,46 +56,78 @@ type UserService interface {
GetUsers(UserProfile, *UserProfiles) error
AddIndex([]string, *string) error
GetIndexes(string, *map[string][]string) error
ReloadUsers(string, *string) error
}
type UserMap struct {
table map[string]map[string]string
index map[string]map[string]bool
indexKeys []string
ratingDb RatingStorage
mu sync.RWMutex
table map[string]map[string]string
index map[string]map[string]bool
indexKeys []string
accountingDb AccountingStorage
mu sync.RWMutex
}
func NewUserMap(ratingDb RatingStorage) (*UserMap, error) {
um := newUserMap(ratingDb)
// load from rating db
if ups, err := um.ratingDb.GetUsers(); err == nil {
for _, up := range ups {
um.table[up.GetId()] = up.Profile
}
} else {
func NewUserMap(accountingDb AccountingStorage, indexes []string) (*UserMap, error) {
um := newUserMap(accountingDb, indexes)
var reply string
if err := um.ReloadUsers("", &reply); err != nil {
return nil, err
}
return um, nil
}
func newUserMap(ratingDb RatingStorage) *UserMap {
func newUserMap(accountingDb AccountingStorage, indexes []string) *UserMap {
return &UserMap{
table: make(map[string]map[string]string),
index: make(map[string]map[string]bool),
ratingDb: ratingDb,
table: make(map[string]map[string]string),
index: make(map[string]map[string]bool),
indexKeys: indexes,
accountingDb: accountingDb,
}
}
func (um *UserMap) ReloadUsers(in string, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
// backup old data
oldTable := um.table
oldIndex := um.index
um.table = make(map[string]map[string]string)
um.index = make(map[string]map[string]bool)
// load from rating db
if ups, err := um.accountingDb.GetUsers(); err == nil {
for _, up := range ups {
um.table[up.GetId()] = up.Profile
}
} else {
// restore old data before return
um.table = oldTable
um.index = oldIndex
*reply = err.Error()
return err
}
if len(um.indexKeys) != 0 {
var s string
if err := um.AddIndex(um.indexKeys, &s); err != nil {
Logger.Err(fmt.Sprintf("Error adding %v indexes to user profile service: %v", um.indexKeys, err))
}
}
*reply = utils.OK
return nil
}
func (um *UserMap) SetUser(up UserProfile, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
if err := um.ratingDb.SetUser(&up); err != nil {
if err := um.accountingDb.SetUser(&up); err != nil {
*reply = err.Error()
return err
}
um.table[up.GetId()] = up.Profile
um.addIndex(&up)
um.addIndex(&up, um.indexKeys)
*reply = utils.OK
return nil
}
@@ -102,7 +135,7 @@ func (um *UserMap) SetUser(up UserProfile, reply *string) error {
func (um *UserMap) RemoveUser(up UserProfile, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
if err := um.ratingDb.RemoveUser(up.GetId()); err != nil {
if err := um.accountingDb.RemoveUser(up.GetId()); err != nil {
*reply = err.Error()
return err
}
@@ -140,13 +173,13 @@ func (um *UserMap) UpdateUser(up UserProfile, reply *string) error {
UserName: up.UserName,
Profile: m,
}
if err := um.ratingDb.SetUser(finalUp); err != nil {
if err := um.accountingDb.SetUser(finalUp); err != nil {
*reply = err.Error()
return err
}
um.table[up.GetId()] = m
um.deleteIndex(oldUp)
um.addIndex(finalUp)
um.addIndex(finalUp, um.indexKeys)
*reply = utils.OK
return nil
}
@@ -235,19 +268,19 @@ func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error {
func (um *UserMap) AddIndex(indexes []string, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
um.indexKeys = indexes
um.indexKeys = append(um.indexKeys, indexes...)
for key, values := range um.table {
up := &UserProfile{Profile: values}
up.SetId(key)
um.addIndex(up)
um.addIndex(up, indexes)
}
*reply = utils.OK
return nil
}
func (um *UserMap) addIndex(up *UserProfile) {
func (um *UserMap) addIndex(up *UserProfile, indexes []string) {
key := up.GetId()
for _, index := range um.indexKeys {
for _, index := range indexes {
if index == "Tenant" {
if up.Tenant != "" {
indexKey := utils.ConcatenatedKey(index, up.Tenant)
@@ -369,6 +402,10 @@ func (ps *ProxyUserService) GetIndexes(in string, reply *map[string][]string) er
return ps.Client.Call("UsersV1.AddIndex", in, reply)
}
func (ps *ProxyUserService) ReloadUsers(in string, reply *string) error {
return ps.Client.Call("UsersV1.ReloadUsers", in, reply)
}
// extraFields - Field name in the interface containing extraFields information
func LoadUserProfile(in interface{}, extraFields string) (interface{}, error) {
if userService == nil { // no user service => no fun

View File

@@ -19,7 +19,7 @@ var testMap = UserMap{
}
func TestUsersAdd(t *testing.T) {
tm := newUserMap(ratingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
Tenant: "test",
@@ -40,7 +40,7 @@ func TestUsersAdd(t *testing.T) {
}
func TestUsersUpdate(t *testing.T) {
tm := newUserMap(ratingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
Tenant: "test",
@@ -71,7 +71,7 @@ func TestUsersUpdate(t *testing.T) {
}
func TestUsersUpdateNotFound(t *testing.T) {
tm := newUserMap(ratingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
Tenant: "test",
@@ -89,7 +89,7 @@ func TestUsersUpdateNotFound(t *testing.T) {
}
func TestUsersUpdateInit(t *testing.T) {
tm := newUserMap(ratingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
Tenant: "test",
@@ -115,7 +115,7 @@ func TestUsersUpdateInit(t *testing.T) {
}
func TestUsersRemove(t *testing.T) {
tm := newUserMap(ratingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
Tenant: "test",
@@ -445,7 +445,7 @@ func TestUsersGetMissingIdTwoINdex(t *testing.T) {
}
func TestUsersAddUpdateRemoveIndexes(t *testing.T) {
tm := newUserMap(ratingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
tm.AddIndex([]string{"t"}, &r)
if len(tm.index) != 0 {