Merge branch 'master' into hapool

This commit is contained in:
Radu Ioan Fericean
2015-07-31 12:49:25 +03:00
28 changed files with 844 additions and 561 deletions

View File

@@ -132,7 +132,8 @@ type CallDescriptor struct {
FallbackSubject string // the subject to check for destination if not found on primary subject
RatingInfos RatingInfos
Increments Increments
TOR string // used unit balances selector
TOR string // used unit balances selector
ExtraFields map[string]string // Extra fields, mostly used for user profile matching
// session limits
MaxRate float64
MaxRateUnit time.Duration
@@ -795,6 +796,7 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
if lcrCost.Entry == nil {
return lcrCost, nil
}
//log.Printf("Entry: %+v", lcrCost.Entry)
if lcrCost.Entry.Strategy == LCR_STRATEGY_STATIC {
for _, supplier := range lcrCost.Entry.GetParams() {
@@ -865,7 +867,7 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
accNeverConsidered := true
tccNeverConsidered := true
ddcNeverConsidered := true
if utils.IsSliceMember([]string{LCR_STRATEGY_QOS, LCR_STRATEGY_QOS_THRESHOLD}, lcrCost.Entry.Strategy) {
if utils.IsSliceMember([]string{LCR_STRATEGY_QOS, LCR_STRATEGY_QOS_THRESHOLD, LCR_STRATEGY_LOAD}, lcrCost.Entry.Strategy) {
if stats == nil {
lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{
Supplier: supplier,
@@ -891,60 +893,84 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
}
}
}
statsErr := false
var supplierQueues []*StatsQueue
for _, qId := range cdrStatsQueueIds {
statValues := make(map[string]float64)
if err := stats.GetValues(qId, &statValues); err != nil {
lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{
Supplier: supplier,
Error: fmt.Sprintf("Get stats values for queue id %s, error %s", qId, err.Error()),
})
statsErr = true
break
}
if asr, exists := statValues[ASR]; exists {
if asr > STATS_NA {
asrValues = append(asrValues, asr)
if lcrCost.Entry.Strategy == LCR_STRATEGY_LOAD {
for _, qId := range cdrStatsQueueIds {
sq := &StatsQueue{}
if err := stats.GetQueue(qId, sq); err == nil {
if sq.conf.QueueLength == 0 { //only add qeues that don't have fixed length
supplierQueues = append(supplierQueues, sq)
}
}
}
asrNeverConsidered = false
}
if pdd, exists := statValues[PDD]; exists {
if pdd > STATS_NA {
pddValues = append(pddValues, pdd)
} else {
statValues := make(map[string]float64)
if err := stats.GetValues(qId, &statValues); err != nil {
lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{
Supplier: supplier,
Error: fmt.Sprintf("Get stats values for queue id %s, error %s", qId, err.Error()),
})
statsErr = true
break
}
pddNeverConsidered = false
}
if acd, exists := statValues[ACD]; exists {
if acd > STATS_NA {
acdValues = append(acdValues, acd)
if asr, exists := statValues[ASR]; exists {
if asr > STATS_NA {
asrValues = append(asrValues, asr)
}
asrNeverConsidered = false
}
acdNeverConsidered = false
}
if tcd, exists := statValues[TCD]; exists {
if tcd > STATS_NA {
tcdValues = append(tcdValues, tcd)
if pdd, exists := statValues[PDD]; exists {
if pdd > STATS_NA {
pddValues = append(pddValues, pdd)
}
pddNeverConsidered = false
}
tcdNeverConsidered = false
}
if acc, exists := statValues[ACC]; exists {
if acc > STATS_NA {
accValues = append(accValues, acc)
if acd, exists := statValues[ACD]; exists {
if acd > STATS_NA {
acdValues = append(acdValues, acd)
}
acdNeverConsidered = false
}
accNeverConsidered = false
}
if tcc, exists := statValues[TCC]; exists {
if tcc > STATS_NA {
tccValues = append(tccValues, tcc)
if tcd, exists := statValues[TCD]; exists {
if tcd > STATS_NA {
tcdValues = append(tcdValues, tcd)
}
tcdNeverConsidered = false
}
tccNeverConsidered = false
}
if ddc, exists := statValues[TCC]; exists {
if ddc > STATS_NA {
ddcValues = append(ddcValues, ddc)
if acc, exists := statValues[ACC]; exists {
if acc > STATS_NA {
accValues = append(accValues, acc)
}
accNeverConsidered = false
}
ddcNeverConsidered = false
if tcc, exists := statValues[TCC]; exists {
if tcc > STATS_NA {
tccValues = append(tccValues, tcc)
}
tccNeverConsidered = false
}
if ddc, exists := statValues[TCC]; exists {
if ddc > STATS_NA {
ddcValues = append(ddcValues, ddc)
}
ddcNeverConsidered = false
}
}
}
if lcrCost.Entry.Strategy == LCR_STRATEGY_LOAD {
if len(supplierQueues) > 0 {
lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{
Supplier: supplier,
supplierQueues: supplierQueues,
})
}
continue // next supplier
}
if statsErr { // Stats error in loop, to go next supplier
continue
}

View File

@@ -147,6 +147,12 @@ func (self *CdrServer) RateCdrs(cgrIds, runIds, tors, cdrHosts, cdrSources, reqT
// Returns error if not able to properly store the CDR, mediation is async since we can always recover offline
func (self *CdrServer) processCdr(storedCdr *StoredCdr) (err error) {
if upData, err := LoadUserProfile(storedCdr, "ExtraFields"); err != nil {
return err
} else {
cdrRcv := upData.(*StoredCdr)
*storedCdr = *cdrRcv
}
if storedCdr.ReqType == utils.META_NONE {
return nil
}

View File

@@ -20,6 +20,7 @@ package engine
import (
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
@@ -36,6 +37,13 @@ const (
LCR_STRATEGY_HIGHEST = "*highest_cost"
LCR_STRATEGY_QOS_THRESHOLD = "*qos_threshold"
LCR_STRATEGY_QOS = "*qos"
LCR_STRATEGY_LOAD = "*load_distribution"
// used for load distribution sorting
RAND_LIMIT = 99
LOW_PRIORITY_LIMIT = 100
MED_PRIORITY_LIMIT = 200
HIGH_PRIORITY_LIMIT = 300
)
// A request for LCR, used in APIer and SM where we need to expose it
@@ -134,12 +142,13 @@ type LCRCost struct {
}
type LCRSupplierCost struct {
Supplier string
Cost float64
Duration time.Duration
Error string // Not error due to JSON automatic serialization into struct
QOS map[string]float64
qosSortParams []string
Supplier string
Cost float64
Duration time.Duration
Error string // Not error due to JSON automatic serialization into struct
QOS map[string]float64
qosSortParams []string
supplierQueues []*StatsQueue // used for load distribution
}
func (lcr *LCR) GetId() string {
@@ -291,11 +300,101 @@ func (lc *LCRCost) Sort() {
sort.Sort(HighestSupplierCostSorter(lc.SupplierCosts))
case LCR_STRATEGY_QOS:
sort.Sort(QOSSorter(lc.SupplierCosts))
case LCR_STRATEGY_LOAD:
lc.SortLoadDistribution()
sort.Sort(HighestSupplierCostSorter(lc.SupplierCosts))
}
}
func (lc *LCRCost) SortLoadDistribution() {
// find the time window that is common to all qeues
scoreBoard := make(map[time.Duration]int) // register TimeWindow across suppliers
var winnerTimeWindow time.Duration
maxScore := 0
for _, supCost := range lc.SupplierCosts {
timeWindowFlag := make(map[time.Duration]bool) // flags appearance in same supplier
for _, sq := range supCost.supplierQueues {
if !timeWindowFlag[sq.conf.TimeWindow] {
timeWindowFlag[sq.conf.TimeWindow] = true
scoreBoard[sq.conf.TimeWindow]++
}
if scoreBoard[sq.conf.TimeWindow] > maxScore {
maxScore = scoreBoard[sq.conf.TimeWindow]
winnerTimeWindow = sq.conf.TimeWindow
}
}
}
supplierQueues := make(map[*LCRSupplierCost]*StatsQueue)
for _, supCost := range lc.SupplierCosts {
for _, sq := range supCost.supplierQueues {
if sq.conf.TimeWindow == winnerTimeWindow {
supplierQueues[supCost] = sq
break
}
}
}
/*for supplier, sq := range supplierQueues {
log.Printf("Useful supplier qeues: %s %v", supplier, sq.conf.TimeWindow)
}*/
// if all have less than ponder return random order
// if some have a cdr count not divisible by ponder return them first and all ordered by cdr times, oldest first
// if all have a multiple of ponder return in the order of cdr times, oldest first
// first put them in one of the above categories
for supCost, sq := range supplierQueues {
ponder := lc.GetSupplierPonder(supCost.Supplier)
cdrCount := len(sq.Cdrs)
if cdrCount < ponder {
supCost.Cost = float64(LOW_PRIORITY_LIMIT + rand.Intn(RAND_LIMIT))
continue
}
if cdrCount%ponder == 0 {
supCost.Cost = float64(MED_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + (time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() / RAND_LIMIT)
continue
} else {
supCost.Cost = float64(HIGH_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + (time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() / RAND_LIMIT)
continue
}
}
}
// used in load distribution strategy only
// receives a long supplier id and will return the ponder found in strategy params
func (lc *LCRCost) GetSupplierPonder(supplier string) int {
// parse strategy params
ponders := make(map[string]int)
params := strings.Split(lc.Entry.StrategyParams, utils.INFIELD_SEP)
for _, param := range params {
ponderSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP)
if len(ponderSlice) != 2 {
Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams))
continue
}
p, err := strconv.Atoi(ponderSlice[1])
if err != nil {
Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams))
continue
}
ponders[ponderSlice[0]] = p
}
parts := strings.Split(supplier, utils.CONCATENATED_KEY_SEP)
if len(parts) > 0 {
supplierSubject := parts[len(parts)-1]
if ponder, found := ponders[supplierSubject]; found {
return ponder
}
if ponder, found := ponders[utils.META_DEFAULT]; found {
return ponder
}
}
return 1
}
func (lc *LCRCost) HasErrors() bool {
for _, supplCost := range lc.SupplierCosts {
if len(supplCost.Error) != 0 {
return true
}

View File

@@ -278,3 +278,301 @@ func TestLCRCostSuppliersString(t *testing.T) {
t.Errorf("Expecting: %s, received: %s", eSupplStr, supplStr)
}
}
func TestLCRCostSuppliersLoad(t *testing.T) {
setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC)
lcrCost := &LCRCost{
Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:10;dan12:3;*default:7", Weight: 10.0},
SupplierCosts: []*LCRSupplierCost{
&LCRSupplierCost{
Supplier: "*out:tenant12:call:ivo12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 3 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
},
},
&LCRSupplierCost{
Supplier: "*out:tenant12:call:dan12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
},
},
&LCRSupplierCost{
Supplier: "*out:tenant12:call:rif12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
},
},
},
}
lcrCost.Sort()
if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:dan12" {
t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost))
}
}
func TestLCRCostSuppliersLoadAllRounded(t *testing.T) {
setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC)
lcrCost := &LCRCost{
Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:3;dan12:5;*default:2", Weight: 10.0},
SupplierCosts: []*LCRSupplierCost{
&LCRSupplierCost{
Supplier: "*out:tenant12:call:ivo12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 3 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
},
},
&LCRSupplierCost{
Supplier: "*out:tenant12:call:dan12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(60 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
},
},
&LCRSupplierCost{
Supplier: "*out:tenant12:call:rif12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
},
},
},
}
lcrCost.Sort()
if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" ||
lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" ||
lcrCost.SupplierCosts[2].Supplier != "*out:tenant12:call:rif12" {
t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost))
}
}
func TestLCRCostSuppliersLoadAllOver(t *testing.T) {
setupTime := time.Date(2015, 7, 31, 6, 43, 0, 0, time.UTC)
lcrCost := &LCRCost{
Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:2;dan12:4;*default:2", Weight: 10.0},
SupplierCosts: []*LCRSupplierCost{
&LCRSupplierCost{
Supplier: "*out:tenant12:call:ivo12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 3 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
},
},
&LCRSupplierCost{
Supplier: "*out:tenant12:call:dan12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(60 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
},
},
&LCRSupplierCost{
Supplier: "*out:tenant12:call:rif12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 7 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 1 * time.Minute,
},
},
},
},
},
}
lcrCost.Sort()
if lcrCost.SupplierCosts[0].Supplier != "*out:tenant12:call:ivo12" ||
lcrCost.SupplierCosts[1].Supplier != "*out:tenant12:call:dan12" ||
lcrCost.SupplierCosts[2].Supplier != "*out:tenant12:call:rif12" {
t.Error("Error soring on load distribution: ", utils.ToIJSON(lcrCost))
}
}

View File

@@ -73,6 +73,12 @@ func (rs *Responder) GetCost(arg *CallDescriptor, reply *CallCost) (err error) {
if arg.Subject == "" {
arg.Subject = arg.Account
}
if upData, err := LoadUserProfile(arg, "ExtraFields"); err != nil {
return err
} else {
udRcv := upData.(*CallDescriptor)
*arg = *udRcv
}
if rs.Bal != nil {
r, e := rs.getCallCost(arg, "Responder.GetCost")
*reply, err = *r, e
@@ -93,6 +99,12 @@ func (rs *Responder) Debit(arg *CallDescriptor, reply *CallCost) (err error) {
if arg.Subject == "" {
arg.Subject = arg.Account
}
if upData, err := LoadUserProfile(arg, "ExtraFields"); err != nil {
return err
} else {
udRcv := upData.(*CallDescriptor)
*arg = *udRcv
}
if rs.Bal != nil {
r, e := rs.getCallCost(arg, "Responder.Debit")
*reply, err = *r, e
@@ -115,6 +127,12 @@ func (rs *Responder) MaxDebit(arg *CallDescriptor, reply *CallCost) (err error)
if arg.Subject == "" {
arg.Subject = arg.Account
}
if upData, err := LoadUserProfile(arg, "ExtraFields"); err != nil {
return err
} else {
udRcv := upData.(*CallDescriptor)
*arg = *udRcv
}
if rs.Bal != nil {
r, e := rs.getCallCost(arg, "Responder.MaxDebit")
*reply, err = *r, e
@@ -144,6 +162,12 @@ func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err
if arg.Subject == "" {
arg.Subject = arg.Account
}
if upData, err := LoadUserProfile(arg, "ExtraFields"); err != nil {
return err
} else {
udRcv := upData.(*CallDescriptor)
*arg = *udRcv
}
if rs.Bal != nil {
*reply, err = rs.callMethod(arg, "Responder.RefundIncrements")
} else {
@@ -163,6 +187,12 @@ func (rs *Responder) GetMaxSessionTime(arg *CallDescriptor, reply *float64) (err
if arg.Subject == "" {
arg.Subject = arg.Account
}
if upData, err := LoadUserProfile(arg, "ExtraFields"); err != nil {
return err
} else {
udRcv := upData.(*CallDescriptor)
*arg = *udRcv
}
if rs.Bal != nil {
*reply, err = rs.callMethod(arg, "Responder.GetMaxSessionTime")
} else {
@@ -174,11 +204,17 @@ func (rs *Responder) GetMaxSessionTime(arg *CallDescriptor, reply *float64) (err
// Returns MaxSessionTime for an event received in SessionManager, considering DerivedCharging for it
func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) error {
if rs.Bal != nil {
return errors.New("unsupported method on the balancer")
}
if ev.Subject == "" {
ev.Subject = ev.Account
}
if rs.Bal != nil {
return errors.New("unsupported method on the balancer")
if upData, err := LoadUserProfile(ev, "ExtraFields"); err != nil {
return err
} else {
udRcv := upData.(*StoredCdr)
*ev = *udRcv
}
maxCallDuration := -1.0
attrsDC := &utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT),
@@ -243,15 +279,17 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) err
// Used by SM to get all the prepaid CallDescriptors attached to a session
func (rs *Responder) GetSessionRuns(ev *StoredCdr, sRuns *[]*SessionRun) error {
if item, err := rs.getCache().Get(utils.GET_SESS_RUNS_CACHE_PREFIX + ev.CgrId); err == nil && item != nil {
*sRuns = *(item.Value.(*[]*SessionRun))
return item.Err
if rs.Bal != nil {
return errors.New("Unsupported method on the balancer")
}
if ev.Subject == "" {
ev.Subject = ev.Account
}
if rs.Bal != nil {
return errors.New("Unsupported method on the balancer")
if upData, err := LoadUserProfile(ev, "ExtraFields"); err != nil {
return err
} else {
udRcv := upData.(*StoredCdr)
*ev = *udRcv
}
attrsDC := &utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT),
Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
@@ -308,6 +346,12 @@ func (rs *Responder) ProcessCdr(cdr *StoredCdr, reply *string) error {
if rs.CdrSrv == nil {
return errors.New("CDR_SERVER_NOT_RUNNING")
}
if upData, err := LoadUserProfile(cdr, "ExtraFields"); err != nil {
return err
} else {
udRcv := upData.(*StoredCdr)
*cdr = *udRcv
}
if err := rs.CdrSrv.ProcessCdr(cdr); err != nil {
return err
}
@@ -344,6 +388,12 @@ func (rs *Responder) GetLCR(cd *CallDescriptor, reply *LCRCost) error {
if cd.Subject == "" {
cd.Subject = cd.Account
}
if upData, err := LoadUserProfile(cd, "ExtraFields"); err != nil {
return err
} else {
udRcv := upData.(*CallDescriptor)
*cd = *udRcv
}
lcrCost, err := cd.GetLCR(rs.Stats)
if err != nil {
return err

View File

@@ -354,7 +354,16 @@ func TestGetLCR(t *testing.T) {
},
},
}
for _, lcr := range []*LCR{lcrStatic, lcrLowestCost, lcrQosThreshold, lcrQos} {
lcrLoad := &LCR{Direction: utils.OUT, Tenant: "tenant12", Category: "call_load", Account: utils.ANY, Subject: utils.ANY,
Activations: []*LCRActivation{
&LCRActivation{
ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC),
Entries: []*LCREntry{
&LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_LOAD, StrategyParams: "ivo12:10;dan12:3", Weight: 10.0}},
},
},
}
for _, lcr := range []*LCR{lcrStatic, lcrLowestCost, lcrQosThreshold, lcrQos, lcrLoad} {
if err := ratingStorage.SetLCR(lcr); err != nil {
t.Error(err)
}

View File

@@ -23,6 +23,7 @@ import (
"compress/zlib"
"errors"
"fmt"
"log"
"strings"
"github.com/cgrates/cgrates/cache2go"
@@ -430,10 +431,10 @@ func (rs *RedisStorage) RemoveRpAliases(tenantRtSubjects []*TenantRatingSubject,
if tntRSubj.Subject != alias {
continue
}
cache2go.RemKey(key)
if _, err = rs.db.Del(key); err != nil {
return err
}
cache2go.RemKey(key)
break
}
}
@@ -553,10 +554,12 @@ func (rs *RedisStorage) RemoveAccAliases(tenantAccounts []*TenantAccount, skipCa
if tntAcnt.Account != alias {
continue
}
cache2go.RemKey(key)
if _, err = rs.db.Del(key); err != nil {
log.Print("")
return err
}
cache2go.RemKey(key)
break
}
}
@@ -580,6 +583,7 @@ func (rs *RedisStorage) RemoveAccAliases(tenantAccounts []*TenantAccount, skipCa
return err
}
cache2go.RemKey(utils.ACC_ALIAS_PREFIX + key)
break
}
}
}

View File

@@ -33,7 +33,7 @@ func NewStoredCdrFromExternalCdr(extCdr *ExternalCdr) (*StoredCdr, error) {
var err error
storedCdr := &StoredCdr{CgrId: extCdr.CgrId, OrderId: extCdr.OrderId, TOR: extCdr.TOR, AccId: extCdr.AccId, CdrHost: extCdr.CdrHost, CdrSource: extCdr.CdrSource,
ReqType: extCdr.ReqType, Direction: extCdr.Direction, Tenant: extCdr.Tenant, Category: extCdr.Category, Account: extCdr.Account, Subject: extCdr.Subject,
Destination: extCdr.Destination, Supplier: extCdr.Supplier, DisconnectCause: extCdr.DisconnectCause, ExtraFields: extCdr.ExtraFields,
Destination: extCdr.Destination, Supplier: extCdr.Supplier, DisconnectCause: extCdr.DisconnectCause,
MediationRunId: extCdr.MediationRunId, RatedAccount: extCdr.RatedAccount, RatedSubject: extCdr.RatedSubject, Cost: extCdr.Cost, Rated: extCdr.Rated}
if storedCdr.SetupTime, err = utils.ParseTimeDetectLayout(extCdr.SetupTime); err != nil {
return nil, err
@@ -55,6 +55,12 @@ func NewStoredCdrFromExternalCdr(extCdr *ExternalCdr) (*StoredCdr, error) {
return nil, err
}
}
if extCdr.ExtraFields != nil {
storedCdr.ExtraFields = make(map[string]string)
}
for k, v := range extCdr.ExtraFields {
storedCdr.ExtraFields[k] = v
}
return storedCdr, nil
}
@@ -651,6 +657,7 @@ type UsageRecord struct {
SetupTime string
AnswerTime string
Usage string
ExtraFields map[string]string
}
func (self *UsageRecord) AsStoredCdr() (*StoredCdr, error) {
@@ -666,24 +673,18 @@ func (self *UsageRecord) AsStoredCdr() (*StoredCdr, error) {
if storedCdr.Usage, err = utils.ParseDurationWithSecs(self.Usage); err != nil {
return nil, err
}
if self.ExtraFields != nil {
storedCdr.ExtraFields = make(map[string]string)
}
for k, v := range self.ExtraFields {
storedCdr.ExtraFields[k] = v
}
return storedCdr, nil
}
func (self *UsageRecord) AsCallDescriptor() (*CallDescriptor, error) {
var err error
timeStr := self.AnswerTime
if len(timeStr) == 0 { // In case of auth, answer time will not be defined, so take it out of setup one
timeStr = self.SetupTime
}
startTime, err := utils.ParseTimeDetectLayout(timeStr)
if err != nil {
return nil, err
}
usage, err := utils.ParseDurationWithSecs(self.Usage)
if err != nil {
return nil, err
}
return &CallDescriptor{
cd := &CallDescriptor{
TOR: self.TOR,
Direction: self.Direction,
Tenant: self.Tenant,
@@ -691,7 +692,24 @@ func (self *UsageRecord) AsCallDescriptor() (*CallDescriptor, error) {
Subject: self.Subject,
Account: self.Account,
Destination: self.Destination,
TimeStart: startTime,
TimeEnd: startTime.Add(usage),
}, nil
}
timeStr := self.AnswerTime
if len(timeStr) == 0 { // In case of auth, answer time will not be defined, so take it out of setup one
timeStr = self.SetupTime
}
if cd.TimeStart, err = utils.ParseTimeDetectLayout(timeStr); err != nil {
return nil, err
}
if usage, err := utils.ParseDurationWithSecs(self.Usage); err != nil {
return nil, err
} else {
cd.TimeEnd = cd.TimeStart.Add(usage)
}
if self.ExtraFields != nil {
cd.ExtraFields = make(map[string]string)
}
for k, v := range self.ExtraFields {
cd.ExtraFields[k] = v
}
return cd, nil
}

View File

@@ -3,6 +3,7 @@ package engine
import (
"sort"
"strings"
"sync"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
@@ -61,6 +62,7 @@ type UserMap struct {
index map[string]map[string]bool
indexKeys []string
ratingDb RatingStorage
mu sync.RWMutex
}
func NewUserMap(ratingDb RatingStorage) (*UserMap, error) {
@@ -85,6 +87,8 @@ func newUserMap(ratingDb RatingStorage) *UserMap {
}
func (um *UserMap) SetUser(up UserProfile, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
if err := um.ratingDb.SetUser(&up); err != nil {
*reply = err.Error()
return err
@@ -96,6 +100,8 @@ func (um *UserMap) SetUser(up UserProfile, reply *string) error {
}
func (um *UserMap) RemoveUser(up UserProfile, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
if err := um.ratingDb.RemoveUser(up.GetId()); err != nil {
*reply = err.Error()
return err
@@ -107,6 +113,8 @@ func (um *UserMap) RemoveUser(up UserProfile, reply *string) error {
}
func (um *UserMap) UpdateUser(up UserProfile, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
m, found := um.table[up.GetId()]
if !found {
*reply = utils.ErrNotFound.Error()
@@ -144,6 +152,8 @@ func (um *UserMap) UpdateUser(up UserProfile, reply *string) error {
}
func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error {
um.mu.RLock()
defer um.mu.RUnlock()
table := um.table // no index
indexUnionKeys := make(map[string]bool)
@@ -223,6 +233,8 @@ func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error {
}
func (um *UserMap) AddIndex(indexes []string, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
um.indexKeys = indexes
for key, values := range um.table {
up := &UserProfile{Profile: values}
@@ -305,6 +317,8 @@ func (um *UserMap) deleteIndex(up *UserProfile) {
}
func (um *UserMap) GetIndexes(in string, reply *map[string][]string) error {
um.mu.RLock()
defer um.mu.RUnlock()
indexes := make(map[string][]string)
for key, values := range um.index {
var vs []string
@@ -355,12 +369,22 @@ func (ps *ProxyUserService) GetIndexes(in string, reply *map[string][]string) er
return ps.Client.Call("UsersV1.AddIndex", in, reply)
}
// extraFields - Field name in the interface containing extraFields information
func LoadUserProfile(in interface{}, extraFields string) (interface{}, error) {
if userService == nil { // no user service => no fun
return in, nil
}
m := utils.ToMapStringString(in)
var needsUsers bool
for _, val := range m {
if val == utils.USERS {
needsUsers = true
break
}
}
if !needsUsers { // Do not process further if user profile is not needed
return in, nil
}
up := &UserProfile{
Profile: make(map[string]string),
}

View File

@@ -3,6 +3,7 @@ package engine
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/utils"
)
@@ -520,7 +521,7 @@ func TestUsersAddUpdateRemoveIndexes(t *testing.T) {
}
}
func TestUsersStoredCDRGetLoadUserProfile(t *testing.T) {
func TestUsersUsageRecordGetLoadUserProfile(t *testing.T) {
userService = &UserMap{
table: map[string]map[string]string{
"test:user": map[string]string{"TOR": "01", "ReqType": "1", "Direction": "*out", "Category": "c1", "Account": "dan", "Subject": "0723", "Destination": "+401", "SetupTime": "s1", "AnswerTime": "t1", "Usage": "10"},
@@ -568,7 +569,7 @@ func TestUsersStoredCDRGetLoadUserProfile(t *testing.T) {
}
}
func TestUsersStoredCDRGetLoadUserProfileExtraFields(t *testing.T) {
func TestUsersExternalCdrGetLoadUserProfileExtraFields(t *testing.T) {
userService = &UserMap{
table: map[string]map[string]string{
"test:user": map[string]string{"TOR": "01", "ReqType": "1", "Direction": "*out", "Category": "c1", "Account": "dan", "Subject": "0723", "Destination": "+401", "SetupTime": "s1", "AnswerTime": "t1", "Usage": "10"},
@@ -622,7 +623,7 @@ func TestUsersStoredCDRGetLoadUserProfileExtraFields(t *testing.T) {
}
}
func TestUsersStoredCDRGetLoadUserProfileExtraFieldsNotFound(t *testing.T) {
func TestUsersExternalCdrGetLoadUserProfileExtraFieldsNotFound(t *testing.T) {
userService = &UserMap{
table: map[string]map[string]string{
"test:user": map[string]string{"TOR": "01", "ReqType": "1", "Direction": "*out", "Category": "c1", "Account": "dan", "Subject": "0723", "Destination": "+401", "SetupTime": "s1", "AnswerTime": "t1", "Usage": "10"},
@@ -656,7 +657,7 @@ func TestUsersStoredCDRGetLoadUserProfileExtraFieldsNotFound(t *testing.T) {
}
}
func TestUsersStoredCDRGetLoadUserProfileExtraFieldsSet(t *testing.T) {
func TestUsersExternalCdrGetLoadUserProfileExtraFieldsSet(t *testing.T) {
userService = &UserMap{
table: map[string]map[string]string{
"test:user": map[string]string{"TOR": "01", "ReqType": "1", "Direction": "*out", "Category": "c1", "Account": "dan", "Subject": "0723", "Destination": "+401", "SetupTime": "s1", "AnswerTime": "t1", "Usage": "10"},
@@ -711,3 +712,91 @@ func TestUsersStoredCDRGetLoadUserProfileExtraFieldsSet(t *testing.T) {
t.Errorf("Expected: %+v got: %+v", expected, ur)
}
}
func TestUsersCallDescLoadUserProfile(t *testing.T) {
userService = &UserMap{
table: map[string]map[string]string{
"cgrates.org:dan": map[string]string{"ReqType": "*prepaid", "Category": "call1", "Account": "dan", "Subject": "dan", "Cli": "+4986517174963"},
"cgrates.org:danvoice": map[string]string{"TOR": "*voice", "ReqType": "*prepaid", "Category": "call1", "Account": "dan", "Subject": "0723"},
"cgrates:rif": map[string]string{"ReqType": "*postpaid", "Direction": "*out", "Category": "call", "Account": "rif", "Subject": "0726"},
},
index: make(map[string]map[string]bool),
}
startTime := time.Now()
cd := &CallDescriptor{
TOR: "*sms",
Tenant: utils.USERS,
Category: utils.USERS,
Subject: utils.USERS,
Account: utils.USERS,
Destination: "+4986517174963",
TimeStart: startTime,
TimeEnd: startTime.Add(time.Duration(1) * time.Minute),
ExtraFields: map[string]string{"Cli": "+4986517174963"},
}
expected := &CallDescriptor{
TOR: "*sms",
Tenant: "cgrates.org",
Category: "call1",
Account: "dan",
Subject: "dan",
Destination: "+4986517174963",
TimeStart: startTime,
TimeEnd: startTime.Add(time.Duration(1) * time.Minute),
ExtraFields: map[string]string{"Cli": "+4986517174963"},
}
out, err := LoadUserProfile(cd, "ExtraFields")
if err != nil {
t.Error("Error loading user profile: ", err)
}
cdRcv := out.(*CallDescriptor)
if !reflect.DeepEqual(expected, cdRcv) {
t.Errorf("Expected: %+v got: %+v", expected, cdRcv)
}
}
func TestUsersStoredCdrLoadUserProfile(t *testing.T) {
userService = &UserMap{
table: map[string]map[string]string{
"cgrates.org:dan": map[string]string{"ReqType": "*prepaid", "Category": "call1", "Account": "dan", "Subject": "dan", "Cli": "+4986517174963"},
"cgrates.org:danvoice": map[string]string{"TOR": "*voice", "ReqType": "*prepaid", "Category": "call1", "Account": "dan", "Subject": "0723"},
"cgrates:rif": map[string]string{"ReqType": "*postpaid", "Direction": "*out", "Category": "call", "Account": "rif", "Subject": "0726"},
},
index: make(map[string]map[string]bool),
}
startTime := time.Now()
cdr := &StoredCdr{
TOR: "*sms",
ReqType: utils.USERS,
Tenant: utils.USERS,
Category: utils.USERS,
Account: utils.USERS,
Subject: utils.USERS,
Destination: "+4986517174963",
SetupTime: startTime,
AnswerTime: startTime,
Usage: time.Duration(1) * time.Minute,
ExtraFields: map[string]string{"Cli": "+4986517174963"},
}
expected := &StoredCdr{
TOR: "*sms",
ReqType: "*prepaid",
Tenant: "cgrates.org",
Category: "call1",
Account: "dan",
Subject: "dan",
Destination: "+4986517174963",
SetupTime: startTime,
AnswerTime: startTime,
Usage: time.Duration(1) * time.Minute,
ExtraFields: map[string]string{"Cli": "+4986517174963"},
}
out, err := LoadUserProfile(cdr, "ExtraFields")
if err != nil {
t.Error("Error loading user profile: ", err)
}
cdRcv := out.(*StoredCdr)
if !reflect.DeepEqual(expected, cdRcv) {
t.Errorf("Expected: %+v got: %+v", expected, cdRcv)
}
}