Add *load strategy for suppliers

This commit is contained in:
TeoV
2019-08-18 17:25:02 +03:00
committed by Dan Christian Bogos
parent 1e9f214a27
commit 58bf3cd64c
13 changed files with 257 additions and 25 deletions

View File

@@ -565,6 +565,7 @@ const CGRATES_CFG_JSON = `
],
"resources_conns": [], // connections to ResourceS for *res sorting, empty to disable functionality: <""|*internal|x.y.z.y:1234>
"stats_conns": [], // connections to StatS for *stats sorting, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
"default_ratio":1 // default ratio used in case of *load strategy
},

View File

@@ -862,6 +862,7 @@ func TestDfSupplierSJsonCfg(t *testing.T) {
},
Resources_conns: &[]*RemoteHostJson{},
Stats_conns: &[]*RemoteHostJson{},
Default_ratio: utils.IntPointer(1),
}
if cfg, err := dfCgrJsonCfg.SupplierSJsonCfg(); err != nil {
t.Error(err)

View File

@@ -861,6 +861,7 @@ func TestCgrCfgJSONDefaultSupplierSCfg(t *testing.T) {
},
ResourceSConns: []*RemoteHost{},
StatSConns: []*RemoteHost{},
DefaultRatio: 1,
}
if !reflect.DeepEqual(eSupplSCfg, cgrCfg.supplierSCfg) {
t.Errorf("received: %+v, expecting: %+v", eSupplSCfg, cgrCfg.supplierSCfg)

View File

@@ -434,6 +434,7 @@ type SupplierSJsonCfg struct {
Rals_conns *[]*RemoteHostJson
Resources_conns *[]*RemoteHostJson
Stats_conns *[]*RemoteHostJson
Default_ratio *int
}
type LoaderJsonDataType struct {

View File

@@ -28,6 +28,7 @@ type SupplierSCfg struct {
RALsConns []*RemoteHost
ResourceSConns []*RemoteHost
StatSConns []*RemoteHost
DefaultRatio int
}
func (spl *SupplierSCfg) loadFromJsonCfg(jsnCfg *SupplierSJsonCfg) (err error) {
@@ -82,5 +83,8 @@ func (spl *SupplierSCfg) loadFromJsonCfg(jsnCfg *SupplierSJsonCfg) (err error) {
spl.StatSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Default_ratio != nil {
spl.DefaultRatio = *jsnCfg.Default_ratio
}
return nil
}

View File

@@ -46,6 +46,7 @@ func TestSupplierSCfgloadFromJsonCfg(t *testing.T) {
],
"resources_conns": [], // address where to reach the Resource service, empty to disable functionality: <""|*internal|x.y.z.y:1234>
"stats_conns": [], // address where to reach the Stat service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
"default_ratio":1,
},
}`
expected = SupplierSCfg{
@@ -54,6 +55,7 @@ func TestSupplierSCfgloadFromJsonCfg(t *testing.T) {
RALsConns: []*RemoteHost{{Address: "*internal"}},
ResourceSConns: []*RemoteHost{},
StatSConns: []*RemoteHost{},
DefaultRatio: 1,
}
if jsnCfg, err := NewCgrJsonCfgFromReader(strings.NewReader(cfgJSONStr)); err != nil {
t.Error(err)

View File

@@ -27,4 +27,4 @@ cgrates.org,SPL_QOS_FILTRED2,FLTR_SPP_QOS_2,2017-11-27T00:00:00Z,*qos,*acd;*tcd;
cgrates.org,SPL_QOS_FILTRED2,,,,,supplier2,FLTR_QOS_SP2_2,,RP_RETAIL1,,Stat_2,20,,,
cgrates.org,SPL_QOS_FILTRED2,,,,,supplier3,,,,,Stat_3,35,,,
cgrates.org,SPL_LCR,FLTR_TEST,2017-11-27T00:00:00Z,*lc,,supplier_1,,,RP_TEST_1,,,10,,,50
cgrates.org,SPL_LCR,,,,,supplier_2,,,RP_TEST_2,,,,,,
cgrates.org,SPL_LCR,,,,,supplier_2,,,RP_TEST_2,,,,,,
1 #Tenant ID FilterIDs ActivationInterval Sorting SortingParameters SupplierID SupplierFilterIDs SupplierAccountIDs SupplierRatingPlanIDs SupplierResourceIDs SupplierStatIDs SupplierWeight SupplierBlocker SupplierParameters Weight
27 cgrates.org SPL_QOS_FILTRED2 supplier2 FLTR_QOS_SP2_2 RP_RETAIL1 Stat_2 20
28 cgrates.org SPL_QOS_FILTRED2 supplier3 Stat_3 35
29 cgrates.org SPL_LCR FLTR_TEST 2017-11-27T00:00:00Z *lc supplier_1 RP_TEST_1 10 50
30 cgrates.org SPL_LCR supplier_2 RP_TEST_2

View File

@@ -1006,6 +1006,10 @@ func (dm *DataManager) GetSupplierProfile(tenant, id string, cacheRead, cacheWri
}
return nil, err
}
// populate cache will compute specific config parameters
if err = supp.Compile(); err != nil {
return nil, err
}
if cacheWrite {
Cache.Set(utils.CacheSupplierProfiles, tntID, supp, nil,
cacheCommit(transactionID), transactionID)

View File

@@ -143,6 +143,19 @@ func (sSpls *SortedSuppliers) SortResourceDescendent() {
})
}
// SortLoadDistribution is part of sort interface,
// sort based on the following formula (float64(ratio + metricVal) / float64(ratio)) -1 with fallback on Weight
func (sSpls *SortedSuppliers) SortLoadDistribution() {
sort.Slice(sSpls.SortedSuppliers, func(i, j int) bool {
splIVal := ((sSpls.SortedSuppliers[i].SortingData[utils.Ratio].(float64)+sSpls.SortedSuppliers[i].SortingData[utils.LoadValue].(float64))/sSpls.SortedSuppliers[i].SortingData[utils.Ratio].(float64) - 1.0)
splJVal := ((sSpls.SortedSuppliers[j].SortingData[utils.Ratio].(float64)+sSpls.SortedSuppliers[j].SortingData[utils.LoadValue].(float64))/sSpls.SortedSuppliers[j].SortingData[utils.Ratio].(float64) - 1.0)
if splIVal == splJVal {
return sSpls.SortedSuppliers[i].SortingData[utils.Weight].(float64) > sSpls.SortedSuppliers[j].SortingData[utils.Weight].(float64)
}
return splIVal < splJVal
})
}
// Digest returns list of supplierIDs + parameters for easier outside access
// format suppl1:suppl1params,suppl2:suppl2params
func (sSpls *SortedSuppliers) Digest() string {
@@ -186,6 +199,7 @@ func NewSupplierSortDispatcher(lcrS *SupplierService) (ssd SupplierSortDispatche
ssd[utils.MetaQOS] = NewQOSSupplierSorter(lcrS)
ssd[utils.MetaReas] = NewResourceAscendetSorter(lcrS)
ssd[utils.MetaReds] = NewResourceDescendentSorter(lcrS)
ssd[utils.MetaLoad] = NewLoadDistributionSorter(lcrS)
return
}

View File

@@ -630,3 +630,44 @@ func TestLibSuppliersSortQOS8(t *testing.T) {
eIds, rcv)
}
}
func TestLibSuppliersSortLoadDistribution(t *testing.T) {
sSpls := &SortedSuppliers{
SortedSuppliers: []*SortedSupplier{
&SortedSupplier{
SupplierID: "supplier1",
SortingData: map[string]interface{}{
utils.Weight: 25.0,
utils.Ratio: 4.0,
utils.LoadValue: 3.0,
},
},
&SortedSupplier{
SupplierID: "supplier2",
SortingData: map[string]interface{}{
utils.Weight: 15.0,
utils.Ratio: 10.0,
utils.LoadValue: 5.0,
},
},
&SortedSupplier{
SupplierID: "supplier3",
SortingData: map[string]interface{}{
utils.Weight: 25.0,
utils.Ratio: 1.0,
utils.LoadValue: 1.0,
},
},
},
}
sSpls.SortLoadDistribution()
rcv := make([]string, len(sSpls.SortedSuppliers))
eIds := []string{"supplier2", "supplier1", "supplier3"}
for i, spl := range sSpls.SortedSuppliers {
rcv[i] = spl.SupplierID
}
if !reflect.DeepEqual(eIds, rcv) {
t.Errorf("Expecting: %+v, \n received: %+v",
eIds, rcv)
}
}

View File

@@ -0,0 +1,61 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"fmt"
"github.com/cgrates/cgrates/utils"
)
func NewLoadDistributionSorter(spS *SupplierService) *LoadDistributionSorter {
return &LoadDistributionSorter{spS: spS,
sorting: utils.MetaLD}
}
// ResourceAscendentSorter orders suppliers based on their Resource Usage
type LoadDistributionSorter struct {
sorting string
spS *SupplierService
}
func (ws *LoadDistributionSorter) SortSuppliers(prflID string,
suppls []*Supplier, suplEv *utils.CGREvent, extraOpts *optsGetSuppliers) (sortedSuppls *SortedSuppliers, err error) {
sortedSuppls = &SortedSuppliers{ProfileID: prflID,
Sorting: ws.sorting,
SortedSuppliers: make([]*SortedSupplier, 0)}
for _, s := range suppls {
// we should have at least 1 statID defined for counting CDR (a.k.a *sum:1)
if len(s.StatIDs) == 0 {
utils.Logger.Warning(
fmt.Sprintf("<%s> supplier: <%s> - empty StatIDs",
utils.SupplierS, s.ID))
return nil, utils.NewErrMandatoryIeMissing("StatIDs")
}
if srtSpl, pass, err := ws.spS.populateSortingData(suplEv, s, extraOpts); err != nil {
return nil, err
} else if pass && srtSpl != nil {
// Add the ratio in SortingData so we can used it later in SortLoadDistribution
srtSpl.SortingData[utils.Ratio] = s.cacheSupplier[utils.MetaRatio].(float64)
sortedSuppls.SortedSuppliers = append(sortedSuppls.SortedSuppliers, srtSpl)
}
}
sortedSuppls.SortLoadDistribution()
return
}

View File

@@ -23,6 +23,7 @@ import (
"reflect"
"sort"
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/config"
@@ -41,6 +42,8 @@ type Supplier struct {
Weight float64
Blocker bool // do not process further supplier after this one
SupplierParameters string
cacheSupplier map[string]interface{} // cache["*ratio"]=ratio
}
// SupplierProfile represents the configuration of a Supplier profile
@@ -53,6 +56,43 @@ type SupplierProfile struct {
SortingParameters []string
Suppliers []*Supplier
Weight float64
cache map[string]interface{}
}
func (sp *SupplierProfile) compileCacheParameters() error {
if sp.Sorting == utils.MetaLoad {
// construct the map for ratio
ratioMap := make(map[string]int)
// []string{"supplierID:Ratio"}
for _, splIDWithRatio := range sp.SortingParameters {
splitted := strings.Split(splIDWithRatio, utils.CONCATENATED_KEY_SEP)
ratioVal, err := strconv.Atoi(splitted[1])
if err != nil {
return err
}
ratioMap[splitted[0]] = ratioVal
}
// add the ratio for each supplier
for _, supplier := range sp.Suppliers {
supplier.cacheSupplier = make(map[string]interface{})
if ratioSupplier, has := ratioMap[supplier.ID]; !has { // in case that ratio isn't defined for specific suppliers check for default
if ratioDefault, has := ratioMap[utils.MetaDefault]; !has { // in case that *default ratio isn't defined take it from config
supplier.cacheSupplier[utils.MetaRatio] = config.CgrConfig().SupplierSCfg().DefaultRatio
} else {
supplier.cacheSupplier[utils.MetaRatio] = ratioDefault
}
} else {
supplier.cacheSupplier[utils.MetaRatio] = ratioSupplier
}
}
}
return nil
}
// Compile is a wrapper for convenience setting up the SupplierProfile
func (sp *SupplierProfile) Compile() error {
return sp.compileCacheParameters()
}
// TenantID returns unique identifier of the LCRProfile in a multi-tenant environment
@@ -272,6 +312,7 @@ func (spS *SupplierService) statMetrics(statIDs []string, tenant string) (stsMet
provStsMetrics := make(map[string][]float64)
if spS.statS != nil {
for _, statID := range statIDs {
// check if we get an ID in the following form (StatID:MetricID)
var metrics map[string]float64
if err = spS.statS.Call(utils.StatSv1GetQueueFloatMetrics,
&utils.TenantIDWithArgDispatcher{TenantID: &utils.TenantID{Tenant: tenant, ID: statID}}, &metrics); err != nil &&
@@ -295,6 +336,46 @@ func (spS *SupplierService) statMetrics(statIDs []string, tenant string) (stsMet
return
}
// statMetricsForLoadDistribution will query a list of statIDs and return the sum of metrics
// first metric found is always returned
func (spS *SupplierService) statMetricsForLoadDistribution(statIDs []string, tenant string) (result float64, err error) {
provStsMetrics := make(map[string][]float64)
if spS.statS != nil {
for _, statID := range statIDs {
// check if we get an ID in the following form (StatID:MetricID)
statWithMetric := strings.Split(statID, utils.InInFieldSep)
var metrics map[string]float64
if err = spS.statS.Call(utils.StatSv1GetQueueFloatMetrics,
&utils.TenantIDWithArgDispatcher{TenantID: &utils.TenantID{Tenant: tenant, ID: statWithMetric[0]}}, &metrics); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<SupplierS> error: %s getting statMetrics for stat : %s", err.Error(), statWithMetric[0]))
}
if len(statWithMetric) == 2 { // in case we have MetricID defined with StatID we consider only that metric
// check if statQueue have metric defined
if metricVal, has := metrics[statWithMetric[1]]; !has {
return 0, fmt.Errorf("<%s> error: %s metric %s for statID: %s", utils.SupplierS, utils.ErrNotFound, statWithMetric[1], statWithMetric[0])
} else {
provStsMetrics[statWithMetric[1]] = append(provStsMetrics[statWithMetric[1]], metricVal)
}
} else { // otherwise we consider all metrics
for key, val := range metrics {
//add value of metric in a slice in case that we get the same metric from different stat
provStsMetrics[key] = append(provStsMetrics[key], val)
}
}
}
for _, slice := range provStsMetrics {
sum := 0.0
for _, val := range slice {
sum += val
}
result += sum
}
}
return
}
// resourceUsage returns sum of all resource usages out of list
func (spS *SupplierService) resourceUsage(resIDs []string, tenant string) (tUsage float64, err error) {
if spS.resourceS != nil {
@@ -349,32 +430,48 @@ func (spS *SupplierService) populateSortingData(ev *utils.CGREvent, spl *Supplie
}
}
//calculate metrics
//in case we have *load strategy we use statMetricsForLoadDistribution function to calculate the result
if len(spl.StatIDs) != 0 {
metricSupp, err := spS.statMetrics(spl.StatIDs, ev.Tenant) //create metric map for suppier
if err != nil {
if extraOpts.ignoreErrors {
utils.Logger.Warning(
fmt.Sprintf("<%s> ignoring supplier with ID: %s, err: %s",
utils.SupplierS, spl.ID, err.Error()))
return nil, false, nil
} else {
return nil, false, err
if extraOpts.sortingStragety == utils.MetaLoad {
metricSum, err := spS.statMetricsForLoadDistribution(spl.StatIDs, ev.Tenant) //create metric map for suppier
if err != nil {
if extraOpts.ignoreErrors {
utils.Logger.Warning(
fmt.Sprintf("<%s> ignoring supplier with ID: %s, err: %s",
utils.SupplierS, spl.ID, err.Error()))
return nil, false, nil
} else {
return nil, false, err
}
}
}
//add metrics from statIDs in SortingData
for key, val := range metricSupp {
sortedSpl.SortingData[key] = val
}
//check if the supplier have the metric from sortingParameters
//in case that the metric don't exist
//we use 10000000 for *pdd and -1 for others
for _, metric := range extraOpts.sortingParameters {
if _, hasMetric := metricSupp[metric]; !hasMetric {
switch metric {
default:
sortedSpl.SortingData[metric] = -1.0
case utils.MetaPDD:
sortedSpl.SortingData[metric] = 10000000.0
sortedSpl.SortingData[utils.LoadValue] = metricSum
} else {
metricSupp, err := spS.statMetrics(spl.StatIDs, ev.Tenant) //create metric map for suppier
if err != nil {
if extraOpts.ignoreErrors {
utils.Logger.Warning(
fmt.Sprintf("<%s> ignoring supplier with ID: %s, err: %s",
utils.SupplierS, spl.ID, err.Error()))
return nil, false, nil
} else {
return nil, false, err
}
}
//add metrics from statIDs in SortingData
for key, val := range metricSupp {
sortedSpl.SortingData[key] = val
}
//check if the supplier have the metric from sortingParameters
//in case that the metric don't exist
//we use 10000000 for *pdd and -1 for others
for _, metric := range extraOpts.sortingParameters {
if _, hasMetric := metricSupp[metric]; !hasMetric {
switch metric {
default:
sortedSpl.SortingData[metric] = -1.0
case utils.MetaPDD:
sortedSpl.SortingData[metric] = 10000000.0
}
}
}
}
@@ -427,6 +524,7 @@ func (spS *SupplierService) sortedSuppliersForEvent(args *ArgsGetSuppliers) (sor
return nil, err
}
extraOpts.sortingParameters = splPrfl.SortingParameters // populate sortingParameters in extraOpts
extraOpts.sortingStragety = splPrfl.Sorting // populate sortinStrategy in extraOpts
sortedSuppliers, err := spS.sorter.SortSuppliers(splPrfl.ID, splPrfl.Sorting,
splPrfl.Suppliers, args.CGREvent, extraOpts)
if err != nil {
@@ -484,6 +582,7 @@ type optsGetSuppliers struct {
ignoreErrors bool
maxCost float64
sortingParameters []string //used for QOS strategy
sortingStragety string
}
// V1GetSupplierProfilesForEvent returns the list of valid supplier IDs

View File

@@ -465,6 +465,7 @@ const (
MetaQOS = "*qos"
MetaReas = "*reas"
MetaReds = "*reds"
MetaLD = "*ld"
Weight = "Weight"
Cost = "Cost"
RatingPlanID = "RatingPlanID"
@@ -564,6 +565,8 @@ const (
RatingPlanIDs = "RatingPlanIDs"
MetaAccount = "*account"
ERs = "ERs"
Ratio = "Ratio"
LoadValue = "LoadValue"
)
// Migrator Action