diff --git a/config/config_defaults.go b/config/config_defaults.go index 6a9c0ebec..28dc78d7e 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -565,6 +565,7 @@ const CGRATES_CFG_JSON = ` ], "resources_conns": [], // connections to ResourceS for *res sorting, empty to disable functionality: <""|*internal|x.y.z.y:1234> "stats_conns": [], // connections to StatS for *stats sorting, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> + "default_ratio":1 // default ratio used in case of *load strategy }, diff --git a/config/config_json_test.go b/config/config_json_test.go index e27fd3e34..8d4a5a657 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -862,6 +862,7 @@ func TestDfSupplierSJsonCfg(t *testing.T) { }, Resources_conns: &[]*RemoteHostJson{}, Stats_conns: &[]*RemoteHostJson{}, + Default_ratio: utils.IntPointer(1), } if cfg, err := dfCgrJsonCfg.SupplierSJsonCfg(); err != nil { t.Error(err) diff --git a/config/config_test.go b/config/config_test.go index a99187f3b..8b4761353 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -861,6 +861,7 @@ func TestCgrCfgJSONDefaultSupplierSCfg(t *testing.T) { }, ResourceSConns: []*RemoteHost{}, StatSConns: []*RemoteHost{}, + DefaultRatio: 1, } if !reflect.DeepEqual(eSupplSCfg, cgrCfg.supplierSCfg) { t.Errorf("received: %+v, expecting: %+v", eSupplSCfg, cgrCfg.supplierSCfg) diff --git a/config/libconfig_json.go b/config/libconfig_json.go index b8c10199d..c4d9a8447 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -434,6 +434,7 @@ type SupplierSJsonCfg struct { Rals_conns *[]*RemoteHostJson Resources_conns *[]*RemoteHostJson Stats_conns *[]*RemoteHostJson + Default_ratio *int } type LoaderJsonDataType struct { diff --git a/config/supplierscfg.go b/config/supplierscfg.go index c85d6ac86..8375d8e5b 100644 --- a/config/supplierscfg.go +++ b/config/supplierscfg.go @@ -28,6 +28,7 @@ type SupplierSCfg struct { RALsConns []*RemoteHost ResourceSConns []*RemoteHost StatSConns []*RemoteHost + DefaultRatio int } func (spl *SupplierSCfg) loadFromJsonCfg(jsnCfg *SupplierSJsonCfg) (err error) { @@ -82,5 +83,8 @@ func (spl *SupplierSCfg) loadFromJsonCfg(jsnCfg *SupplierSJsonCfg) (err error) { spl.StatSConns[idx].loadFromJsonCfg(jsnHaCfg) } } + if jsnCfg.Default_ratio != nil { + spl.DefaultRatio = *jsnCfg.Default_ratio + } return nil } diff --git a/config/supplierscfg_test.go b/config/supplierscfg_test.go index 7075d7779..df65806cf 100644 --- a/config/supplierscfg_test.go +++ b/config/supplierscfg_test.go @@ -46,6 +46,7 @@ func TestSupplierSCfgloadFromJsonCfg(t *testing.T) { ], "resources_conns": [], // address where to reach the Resource service, empty to disable functionality: <""|*internal|x.y.z.y:1234> "stats_conns": [], // address where to reach the Stat service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> + "default_ratio":1, }, }` expected = SupplierSCfg{ @@ -54,6 +55,7 @@ func TestSupplierSCfgloadFromJsonCfg(t *testing.T) { RALsConns: []*RemoteHost{{Address: "*internal"}}, ResourceSConns: []*RemoteHost{}, StatSConns: []*RemoteHost{}, + DefaultRatio: 1, } if jsnCfg, err := NewCgrJsonCfgFromReader(strings.NewReader(cfgJSONStr)); err != nil { t.Error(err) diff --git a/data/tariffplans/testit/Suppliers.csv b/data/tariffplans/testit/Suppliers.csv index 6ae67c6cd..9b27b6888 100644 --- a/data/tariffplans/testit/Suppliers.csv +++ b/data/tariffplans/testit/Suppliers.csv @@ -27,4 +27,4 @@ cgrates.org,SPL_QOS_FILTRED2,FLTR_SPP_QOS_2,2017-11-27T00:00:00Z,*qos,*acd;*tcd; cgrates.org,SPL_QOS_FILTRED2,,,,,supplier2,FLTR_QOS_SP2_2,,RP_RETAIL1,,Stat_2,20,,, cgrates.org,SPL_QOS_FILTRED2,,,,,supplier3,,,,,Stat_3,35,,, cgrates.org,SPL_LCR,FLTR_TEST,2017-11-27T00:00:00Z,*lc,,supplier_1,,,RP_TEST_1,,,10,,,50 -cgrates.org,SPL_LCR,,,,,supplier_2,,,RP_TEST_2,,,,,, +cgrates.org,SPL_LCR,,,,,supplier_2,,,RP_TEST_2,,,,,, \ No newline at end of file diff --git a/engine/datamanager.go b/engine/datamanager.go index 4a32c24b2..8a6b7ff47 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -1006,6 +1006,10 @@ func (dm *DataManager) GetSupplierProfile(tenant, id string, cacheRead, cacheWri } return nil, err } + // populate cache will compute specific config parameters + if err = supp.Compile(); err != nil { + return nil, err + } if cacheWrite { Cache.Set(utils.CacheSupplierProfiles, tntID, supp, nil, cacheCommit(transactionID), transactionID) diff --git a/engine/libsuppliers.go b/engine/libsuppliers.go index 859eb0e81..a92119de0 100644 --- a/engine/libsuppliers.go +++ b/engine/libsuppliers.go @@ -143,6 +143,19 @@ func (sSpls *SortedSuppliers) SortResourceDescendent() { }) } +// SortLoadDistribution is part of sort interface, +// sort based on the following formula (float64(ratio + metricVal) / float64(ratio)) -1 with fallback on Weight +func (sSpls *SortedSuppliers) SortLoadDistribution() { + sort.Slice(sSpls.SortedSuppliers, func(i, j int) bool { + splIVal := ((sSpls.SortedSuppliers[i].SortingData[utils.Ratio].(float64)+sSpls.SortedSuppliers[i].SortingData[utils.LoadValue].(float64))/sSpls.SortedSuppliers[i].SortingData[utils.Ratio].(float64) - 1.0) + splJVal := ((sSpls.SortedSuppliers[j].SortingData[utils.Ratio].(float64)+sSpls.SortedSuppliers[j].SortingData[utils.LoadValue].(float64))/sSpls.SortedSuppliers[j].SortingData[utils.Ratio].(float64) - 1.0) + if splIVal == splJVal { + return sSpls.SortedSuppliers[i].SortingData[utils.Weight].(float64) > sSpls.SortedSuppliers[j].SortingData[utils.Weight].(float64) + } + return splIVal < splJVal + }) +} + // Digest returns list of supplierIDs + parameters for easier outside access // format suppl1:suppl1params,suppl2:suppl2params func (sSpls *SortedSuppliers) Digest() string { @@ -186,6 +199,7 @@ func NewSupplierSortDispatcher(lcrS *SupplierService) (ssd SupplierSortDispatche ssd[utils.MetaQOS] = NewQOSSupplierSorter(lcrS) ssd[utils.MetaReas] = NewResourceAscendetSorter(lcrS) ssd[utils.MetaReds] = NewResourceDescendentSorter(lcrS) + ssd[utils.MetaLoad] = NewLoadDistributionSorter(lcrS) return } diff --git a/engine/libsuppliers_test.go b/engine/libsuppliers_test.go index 666cfb123..53b9c4159 100644 --- a/engine/libsuppliers_test.go +++ b/engine/libsuppliers_test.go @@ -630,3 +630,44 @@ func TestLibSuppliersSortQOS8(t *testing.T) { eIds, rcv) } } + +func TestLibSuppliersSortLoadDistribution(t *testing.T) { + sSpls := &SortedSuppliers{ + SortedSuppliers: []*SortedSupplier{ + &SortedSupplier{ + SupplierID: "supplier1", + SortingData: map[string]interface{}{ + utils.Weight: 25.0, + utils.Ratio: 4.0, + utils.LoadValue: 3.0, + }, + }, + &SortedSupplier{ + SupplierID: "supplier2", + SortingData: map[string]interface{}{ + utils.Weight: 15.0, + utils.Ratio: 10.0, + utils.LoadValue: 5.0, + }, + }, + &SortedSupplier{ + SupplierID: "supplier3", + SortingData: map[string]interface{}{ + utils.Weight: 25.0, + utils.Ratio: 1.0, + utils.LoadValue: 1.0, + }, + }, + }, + } + sSpls.SortLoadDistribution() + rcv := make([]string, len(sSpls.SortedSuppliers)) + eIds := []string{"supplier2", "supplier1", "supplier3"} + for i, spl := range sSpls.SortedSuppliers { + rcv[i] = spl.SupplierID + } + if !reflect.DeepEqual(eIds, rcv) { + t.Errorf("Expecting: %+v, \n received: %+v", + eIds, rcv) + } +} diff --git a/engine/spls_load_distribution.go b/engine/spls_load_distribution.go new file mode 100644 index 000000000..2f0bd80d4 --- /dev/null +++ b/engine/spls_load_distribution.go @@ -0,0 +1,61 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "fmt" + + "github.com/cgrates/cgrates/utils" +) + +func NewLoadDistributionSorter(spS *SupplierService) *LoadDistributionSorter { + return &LoadDistributionSorter{spS: spS, + sorting: utils.MetaLD} +} + +// ResourceAscendentSorter orders suppliers based on their Resource Usage +type LoadDistributionSorter struct { + sorting string + spS *SupplierService +} + +func (ws *LoadDistributionSorter) SortSuppliers(prflID string, + suppls []*Supplier, suplEv *utils.CGREvent, extraOpts *optsGetSuppliers) (sortedSuppls *SortedSuppliers, err error) { + sortedSuppls = &SortedSuppliers{ProfileID: prflID, + Sorting: ws.sorting, + SortedSuppliers: make([]*SortedSupplier, 0)} + for _, s := range suppls { + // we should have at least 1 statID defined for counting CDR (a.k.a *sum:1) + if len(s.StatIDs) == 0 { + utils.Logger.Warning( + fmt.Sprintf("<%s> supplier: <%s> - empty StatIDs", + utils.SupplierS, s.ID)) + return nil, utils.NewErrMandatoryIeMissing("StatIDs") + } + if srtSpl, pass, err := ws.spS.populateSortingData(suplEv, s, extraOpts); err != nil { + return nil, err + } else if pass && srtSpl != nil { + // Add the ratio in SortingData so we can used it later in SortLoadDistribution + srtSpl.SortingData[utils.Ratio] = s.cacheSupplier[utils.MetaRatio].(float64) + sortedSuppls.SortedSuppliers = append(sortedSuppls.SortedSuppliers, srtSpl) + } + } + sortedSuppls.SortLoadDistribution() + return +} diff --git a/engine/suppliers.go b/engine/suppliers.go index e0ee7469a..8db27abad 100644 --- a/engine/suppliers.go +++ b/engine/suppliers.go @@ -23,6 +23,7 @@ import ( "reflect" "sort" "strconv" + "strings" "time" "github.com/cgrates/cgrates/config" @@ -41,6 +42,8 @@ type Supplier struct { Weight float64 Blocker bool // do not process further supplier after this one SupplierParameters string + + cacheSupplier map[string]interface{} // cache["*ratio"]=ratio } // SupplierProfile represents the configuration of a Supplier profile @@ -53,6 +56,43 @@ type SupplierProfile struct { SortingParameters []string Suppliers []*Supplier Weight float64 + + cache map[string]interface{} +} + +func (sp *SupplierProfile) compileCacheParameters() error { + if sp.Sorting == utils.MetaLoad { + // construct the map for ratio + ratioMap := make(map[string]int) + // []string{"supplierID:Ratio"} + for _, splIDWithRatio := range sp.SortingParameters { + splitted := strings.Split(splIDWithRatio, utils.CONCATENATED_KEY_SEP) + ratioVal, err := strconv.Atoi(splitted[1]) + if err != nil { + return err + } + ratioMap[splitted[0]] = ratioVal + } + // add the ratio for each supplier + for _, supplier := range sp.Suppliers { + supplier.cacheSupplier = make(map[string]interface{}) + if ratioSupplier, has := ratioMap[supplier.ID]; !has { // in case that ratio isn't defined for specific suppliers check for default + if ratioDefault, has := ratioMap[utils.MetaDefault]; !has { // in case that *default ratio isn't defined take it from config + supplier.cacheSupplier[utils.MetaRatio] = config.CgrConfig().SupplierSCfg().DefaultRatio + } else { + supplier.cacheSupplier[utils.MetaRatio] = ratioDefault + } + } else { + supplier.cacheSupplier[utils.MetaRatio] = ratioSupplier + } + } + } + return nil +} + +// Compile is a wrapper for convenience setting up the SupplierProfile +func (sp *SupplierProfile) Compile() error { + return sp.compileCacheParameters() } // TenantID returns unique identifier of the LCRProfile in a multi-tenant environment @@ -272,6 +312,7 @@ func (spS *SupplierService) statMetrics(statIDs []string, tenant string) (stsMet provStsMetrics := make(map[string][]float64) if spS.statS != nil { for _, statID := range statIDs { + // check if we get an ID in the following form (StatID:MetricID) var metrics map[string]float64 if err = spS.statS.Call(utils.StatSv1GetQueueFloatMetrics, &utils.TenantIDWithArgDispatcher{TenantID: &utils.TenantID{Tenant: tenant, ID: statID}}, &metrics); err != nil && @@ -295,6 +336,46 @@ func (spS *SupplierService) statMetrics(statIDs []string, tenant string) (stsMet return } +// statMetricsForLoadDistribution will query a list of statIDs and return the sum of metrics +// first metric found is always returned +func (spS *SupplierService) statMetricsForLoadDistribution(statIDs []string, tenant string) (result float64, err error) { + provStsMetrics := make(map[string][]float64) + if spS.statS != nil { + for _, statID := range statIDs { + // check if we get an ID in the following form (StatID:MetricID) + statWithMetric := strings.Split(statID, utils.InInFieldSep) + var metrics map[string]float64 + if err = spS.statS.Call(utils.StatSv1GetQueueFloatMetrics, + &utils.TenantIDWithArgDispatcher{TenantID: &utils.TenantID{Tenant: tenant, ID: statWithMetric[0]}}, &metrics); err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf(" error: %s getting statMetrics for stat : %s", err.Error(), statWithMetric[0])) + } + if len(statWithMetric) == 2 { // in case we have MetricID defined with StatID we consider only that metric + // check if statQueue have metric defined + if metricVal, has := metrics[statWithMetric[1]]; !has { + return 0, fmt.Errorf("<%s> error: %s metric %s for statID: %s", utils.SupplierS, utils.ErrNotFound, statWithMetric[1], statWithMetric[0]) + } else { + provStsMetrics[statWithMetric[1]] = append(provStsMetrics[statWithMetric[1]], metricVal) + } + } else { // otherwise we consider all metrics + for key, val := range metrics { + //add value of metric in a slice in case that we get the same metric from different stat + provStsMetrics[key] = append(provStsMetrics[key], val) + } + } + } + for _, slice := range provStsMetrics { + sum := 0.0 + for _, val := range slice { + sum += val + } + result += sum + } + } + return +} + // resourceUsage returns sum of all resource usages out of list func (spS *SupplierService) resourceUsage(resIDs []string, tenant string) (tUsage float64, err error) { if spS.resourceS != nil { @@ -349,32 +430,48 @@ func (spS *SupplierService) populateSortingData(ev *utils.CGREvent, spl *Supplie } } //calculate metrics + //in case we have *load strategy we use statMetricsForLoadDistribution function to calculate the result if len(spl.StatIDs) != 0 { - metricSupp, err := spS.statMetrics(spl.StatIDs, ev.Tenant) //create metric map for suppier - if err != nil { - if extraOpts.ignoreErrors { - utils.Logger.Warning( - fmt.Sprintf("<%s> ignoring supplier with ID: %s, err: %s", - utils.SupplierS, spl.ID, err.Error())) - return nil, false, nil - } else { - return nil, false, err + if extraOpts.sortingStragety == utils.MetaLoad { + metricSum, err := spS.statMetricsForLoadDistribution(spl.StatIDs, ev.Tenant) //create metric map for suppier + if err != nil { + if extraOpts.ignoreErrors { + utils.Logger.Warning( + fmt.Sprintf("<%s> ignoring supplier with ID: %s, err: %s", + utils.SupplierS, spl.ID, err.Error())) + return nil, false, nil + } else { + return nil, false, err + } } - } - //add metrics from statIDs in SortingData - for key, val := range metricSupp { - sortedSpl.SortingData[key] = val - } - //check if the supplier have the metric from sortingParameters - //in case that the metric don't exist - //we use 10000000 for *pdd and -1 for others - for _, metric := range extraOpts.sortingParameters { - if _, hasMetric := metricSupp[metric]; !hasMetric { - switch metric { - default: - sortedSpl.SortingData[metric] = -1.0 - case utils.MetaPDD: - sortedSpl.SortingData[metric] = 10000000.0 + sortedSpl.SortingData[utils.LoadValue] = metricSum + } else { + metricSupp, err := spS.statMetrics(spl.StatIDs, ev.Tenant) //create metric map for suppier + if err != nil { + if extraOpts.ignoreErrors { + utils.Logger.Warning( + fmt.Sprintf("<%s> ignoring supplier with ID: %s, err: %s", + utils.SupplierS, spl.ID, err.Error())) + return nil, false, nil + } else { + return nil, false, err + } + } + //add metrics from statIDs in SortingData + for key, val := range metricSupp { + sortedSpl.SortingData[key] = val + } + //check if the supplier have the metric from sortingParameters + //in case that the metric don't exist + //we use 10000000 for *pdd and -1 for others + for _, metric := range extraOpts.sortingParameters { + if _, hasMetric := metricSupp[metric]; !hasMetric { + switch metric { + default: + sortedSpl.SortingData[metric] = -1.0 + case utils.MetaPDD: + sortedSpl.SortingData[metric] = 10000000.0 + } } } } @@ -427,6 +524,7 @@ func (spS *SupplierService) sortedSuppliersForEvent(args *ArgsGetSuppliers) (sor return nil, err } extraOpts.sortingParameters = splPrfl.SortingParameters // populate sortingParameters in extraOpts + extraOpts.sortingStragety = splPrfl.Sorting // populate sortinStrategy in extraOpts sortedSuppliers, err := spS.sorter.SortSuppliers(splPrfl.ID, splPrfl.Sorting, splPrfl.Suppliers, args.CGREvent, extraOpts) if err != nil { @@ -484,6 +582,7 @@ type optsGetSuppliers struct { ignoreErrors bool maxCost float64 sortingParameters []string //used for QOS strategy + sortingStragety string } // V1GetSupplierProfilesForEvent returns the list of valid supplier IDs diff --git a/utils/consts.go b/utils/consts.go index 266ebf216..18e2e1aca 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -465,6 +465,7 @@ const ( MetaQOS = "*qos" MetaReas = "*reas" MetaReds = "*reds" + MetaLD = "*ld" Weight = "Weight" Cost = "Cost" RatingPlanID = "RatingPlanID" @@ -564,6 +565,8 @@ const ( RatingPlanIDs = "RatingPlanIDs" MetaAccount = "*account" ERs = "ERs" + Ratio = "Ratio" + LoadValue = "LoadValue" ) // Migrator Action