RankingSummary structure, LastUpdate timestamp inside Ranking struct, Ranking documentation

This commit is contained in:
DanB
2024-10-18 18:53:50 +02:00
parent 2e192dde78
commit 14292a77ea
6 changed files with 253 additions and 57 deletions

View File

@@ -152,7 +152,7 @@ func (rplSv1 *ReplicatorSv1) GetRanking(ctx *context.Context, tntID *utils.Tenan
reply.ID = rcv.ID
reply.Tenant = rcv.Tenant
reply.Sorting = rcv.Sorting
reply.StatMetrics = rcv.StatMetrics
reply.Metrics = rcv.Metrics
reply.SortedStatIDs = rcv.SortedStatIDs
reply.SortingParameters = rcv.SortingParameters
return nil

166
docs/rankings.rst Normal file
View File

@@ -0,0 +1,166 @@
.. _RankingS:
RankingS
========
**RankingS** is a standalone subsystem part of the **CGRateS** infrastructure, designed to work as an extension of the :ref:`StatS`, by regularly querying it for a list of predefined StatProfiles and ordering them based on their metrics.
Complete interaction with **RankingS** is possible via `CGRateS RPC APIs <https://pkg.go.dev/github.com/cgrates/cgrates/apier@master/>`_.
Due it's real-time nature, **RankingS** are designed towards high throughput being able to process thousands of queries per second. This is doable since each *Ranking* is a very light object, held in memory and eventually backed up in :ref:`DataDB`.
Processing logic
----------------
In order for **RankingS** to start querying the :ref:`StatS`, it will need to be *scheduled* to do that. Scheduling is being done using `Cron Expressions <https://en.wikipedia.org/wiki/Cron>`_.
Once *Cron Expressions* are defined within a *RankingProfile*, internal **Cron Scheduler** needs to be triggered. This can happen in two different ways:
Automatic Query Scheduling
The profiles needing querying will be inserted into **RankingS** :ref:`JSON configuration <configuration>`. By leaving any part of *ranking_id* or *tenat* empty, it will be interpreted as catch-all filter.
API Scheduling
The profiles needing querying will be sent inside arguments to the `RankingSv1.ScheduleQueries API call <https://pkg.go.dev/github.com/cgrates/cgrates/apier@master/>`_.
Offline storage
---------------
Offline storage is optionally possible, by enabling profile *Stored* flag and configuring the *store_interval* inside :ref:`JSON configuration <configuration>`.
Ranking querying
----------------
In order to query a **Ranking** (ie: to be displayed in a web interface), one should use the `RankingSv1.GetRanking API call <https://pkg.go.dev/github.com/cgrates/cgrates/apier@master/>`_ or `RankingSv1.GetRankingSummary API call <https://pkg.go.dev/github.com/cgrates/cgrates/apier@master/>`.
Ranking exporting
---------------
On each **Ranking** change, it will be possible to send a specially crafted *RankingSummary* event to one of the following subsystems:
**ThresholdS**
Sending the **RankingUpdate** Event gives the administrator the possiblity to react to *Ranking* changes, including escalation strategies offered by the **TresholdS** paramters.
Fine tuning parameters (ie. selecting only specific ThresholdProfiles to increase speed) are available directly within the **TrendProfile**.
**EEs**
**EEs** makes it possible to export the **RankingUpdate** to all the availabe outside interfaces of **CGRateS**.
Both exporting options are enabled within :ref:`JSON configuration <configuration>`.
Parameters
----------
RankingS
^^^^^^
**RankingS** is the **CGRateS** service component responsible of generating the **Ranking** queries.
It is configured within **RankingS** section from :ref:`JSON configuration <configuration>` via the following parameters:
enabled
Will enable starting of the service. Possible values: <true|false>.
store_interval
Time interval for backing up the RankingS into *DataDB*. 0 To completely disable the functionality, -1 to enable synchronous backup. Anything higher than 0 will give the interval for asynchronous backups.
stats_conns
List of connections where we will query the stats.
scheduled_ids
Limit the RankingProfiles generating queries towards **StatS**. Empty to enable all available RankingProfiles or just tenants for all the available profiles on a tenant.
thresholds_conns
Connection IDs towards *ThresholdS* component. If not defined, there will be no notifications sent to *ThresholdS* on *Trend* changes.
ees_conns
Connection IDs towards the *EEs* component. If left empty, no exports will be performed on *Trend* changes.
ees_exporter_ids
Limit the exporter profiles executed on *Ranking* changes.
RankingProfile
^^^^^^^^^^^^^^
Ís made of the following fields:
Tenant
The tenant on the platform (one can see the tenant as partition ID).
ID
Identifier for the *RankingProfile*, unique within a *Tenant*.
Schedule
Cron expression scheduling gathering of the metrics.
StatIDs
List of **StatS** instances to query.
MetricIDs
Limit the list of metrics from the stats instance queried.
Sorting
Sorting strategy for the StatIDs. Possible values:
\*asc
Sort the StatIDs ascendent based on list of MetricIDs provided in SortParameters. One or more MetricIDs can be specified in hte SortingParameters for the cases when one level sort is not enough to differentiate them. If all metrics will be equal, a random sort will be applied.
\*desc
Sort the StatIDs descendat based on list of MetricIDs provided in SortParameters. One or more MetricIDs can be specified in hte SortingParameters for the cases when one level sort is not enough to differentiate them. If all metrics will be equal, a random sort will be applied.
SortingParameters
List of sorting parameters. For the current sorting strategies (\*asc/\*desc) there will be one or more MetricIDs defined.
Metric can be defined in compressed mode (ie. ["Metric1","Metric2"]) or extended mode (ie: ["Metric1:true", "Metric2:false"]) where *false* will reverse the sorting logic for that particular metric (ie: ["\*tcc:true","\*pdd:false"] with \*desc sorting strategy).
Stored
Enable storing of this *Ranking* intance for persistence.
ThresholdIDs
Limit *TresholdProfiles* processing the *RankingUpdate* for this *RankingProfile*.
Ranking
^^^^^^^
instance is made out of the following fields:
Tenant
The tenant on the platform (one can see the tenant as partition ID).
ID
Unique *Ranking* identifier on a *Tenant*.
LastUpdate
Time of the last Metrics update.
Metrics
Stat Metrics and their values at the query time.
Sorting
Archived sorting strategy from the profile.
SortingParameters
Archived list of sorted parameters from the profile.
SortedStatIDs
List of queried stats, sorted based on sorting strategy and parameters.
Use cases
---------
* Ranking computation for commercial and monitoring applications.
* Revenue assurance applications.
* Fraud detection by ranking specific billing metrics during sensitive time intervals (\*acc, \*tcc, \*tcd).
* Building call patterns.
* Building statistical information to train systems capable of artificial intelligence.
* Building quality metrics used in traffic routing.

View File

@@ -22,6 +22,7 @@ import (
"sort"
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/utils"
)
@@ -31,16 +32,17 @@ type RankingProfileWithAPIOpts struct {
APIOpts map[string]any
}
// RankingProfile represents one profile querying StatS and sorting them.
type RankingProfile struct {
Tenant string
ID string
Schedule string
StatIDs []string
MetricIDs []string
Sorting string
SortingParameters []string
Stored bool
ThresholdIDs []string
Tenant string // Tenant this profile belongs to
ID string // Profile identification
Schedule string // Cron schedule this profile should run at
StatIDs []string // List of stat instances to query
MetricIDs []string // Filter out only specific metrics in reply for sorting
Sorting string // Sorting strategy. Possible values: <*asc|*desc>
SortingParameters []string // Sorting parameters: depending on sorting type, list of metric ids for now with optional true or false in case of reverse logic is desired
Stored bool // Offline storage activation for this profile
ThresholdIDs []string // List of threshold IDs to limit this Ranking to. *none to disable threshold processing for it.
}
func (rkp *RankingProfile) TenantID() string {
@@ -73,10 +75,10 @@ func (rkP *RankingProfile) Clone() (cln *RankingProfile) {
// NewRankingFromProfile is a constructor for an empty ranking out of it's profile
func NewRankingFromProfile(rkP *RankingProfile) (rk *Ranking) {
rk = &Ranking{
Tenant: rkP.Tenant,
ID: rkP.ID,
Sorting: rkP.Sorting,
StatMetrics: make(map[string]map[string]float64),
Tenant: rkP.Tenant,
ID: rkP.ID,
Sorting: rkP.Sorting,
Metrics: make(map[string]map[string]float64),
rkPrfl: rkP,
metricIDs: utils.NewStringSet(rkP.MetricIDs),
@@ -98,7 +100,8 @@ type Ranking struct {
Tenant string
ID string
StatMetrics map[string]map[string]float64 // map[statID]map[metricID]metricValue
LastUpdate time.Time
Metrics map[string]map[string]float64 // map[statID]map[metricID]metricValue
Sorting string
SortingParameters []string
@@ -113,15 +116,26 @@ func (r *Ranking) TenantID() string {
return utils.ConcatenatedKey(r.Tenant, r.ID)
}
// asRankingSummary converts the Ranking instance into a RankingSummary one
func (rk *Ranking) asRankingSummary() (rkSm *RankingSummary) {
rkSm = &RankingSummary{
Tenant: rk.Tenant,
ID: rk.ID,
LastUpdate: rk.LastUpdate,
}
copy(rkSm.SortedStatIDs, rk.SortedStatIDs)
return
}
type rankingSorter interface {
sortStatIDs() []string // sortStatIDs returns the sorted list of statIDs
}
// rankingSortStats will return the list of sorted statIDs out of the sortingData map
func rankingSortStats(sortingType string, sortingParams []string,
statMetrics map[string]map[string]float64) (sortedStatIDs []string, err error) {
Metrics map[string]map[string]float64) (sortedStatIDs []string, err error) {
var rnkSrtr rankingSorter
if rnkSrtr, err = newRankingSorter(sortingType, sortingParams, statMetrics); err != nil {
if rnkSrtr, err = newRankingSorter(sortingType, sortingParams, Metrics); err != nil {
return
}
return rnkSrtr.sortStatIDs(), nil
@@ -131,21 +145,21 @@ func rankingSortStats(sortingType string, sortingParams []string,
//
// returns error if the sortingType is not implemented
func newRankingSorter(sortingType string, sortingParams []string,
statMetrics map[string]map[string]float64) (rkStr rankingSorter, err error) {
Metrics map[string]map[string]float64) (rkStr rankingSorter, err error) {
switch sortingType {
default:
err = utils.ErrPrefixNotErrNotImplemented(sortingType)
return
case utils.MetaDesc:
return newRankingDescSorter(sortingParams, statMetrics), nil
return newRankingDescSorter(sortingParams, Metrics), nil
case utils.MetaAsc:
return newRankingAscSorter(sortingParams, statMetrics), nil
return newRankingAscSorter(sortingParams, Metrics), nil
}
}
// newRankingDescSorter is a constructor for rankingDescSorter
func newRankingDescSorter(sortingParams []string,
statMetrics map[string]map[string]float64) (rkDsrtr *rankingDescSorter) {
Metrics map[string]map[string]float64) (rkDsrtr *rankingDescSorter) {
clnSp := make([]string, len(sortingParams))
sPReversed := make(utils.StringSet)
for i, sP := range sortingParams { // clean the sortingParams, out of param:false or param:true definitions
@@ -158,9 +172,9 @@ func newRankingDescSorter(sortingParams []string,
rkDsrtr = &rankingDescSorter{
clnSp,
sPReversed,
statMetrics,
make([]string, 0, len(statMetrics))}
for statID := range rkDsrtr.statMetrics {
Metrics,
make([]string, 0, len(Metrics))}
for statID := range rkDsrtr.Metrics {
rkDsrtr.statIDs = append(rkDsrtr.statIDs, statID)
}
return
@@ -168,11 +182,11 @@ func newRankingDescSorter(sortingParams []string,
// rankingDescSorter will sort data descendent for metrics in sortingParams or random if all equal
type rankingDescSorter struct {
sMetricIDs []string
sMetricRev utils.StringSet // list of exceptios for sortingParams, reverting the sorting logic
statMetrics map[string]map[string]float64
sMetricIDs []string
sMetricRev utils.StringSet // list of exceptios for sortingParams, reverting the sorting logic
Metrics map[string]map[string]float64
statIDs []string // list of keys of the statMetrics
statIDs []string // list of keys of the Metrics
}
// sortStatIDs implements rankingSorter interface
@@ -182,11 +196,11 @@ func (rkDsrtr *rankingDescSorter) sortStatIDs() []string {
}
sort.Slice(rkDsrtr.statIDs, func(i, j int) bool {
for _, metricID := range rkDsrtr.sMetricIDs {
val1, hasMetric1 := rkDsrtr.statMetrics[rkDsrtr.statIDs[i]][metricID]
val1, hasMetric1 := rkDsrtr.Metrics[rkDsrtr.statIDs[i]][metricID]
if !hasMetric1 {
return false
}
val2, hasMetric2 := rkDsrtr.statMetrics[rkDsrtr.statIDs[j]][metricID]
val2, hasMetric2 := rkDsrtr.Metrics[rkDsrtr.statIDs[j]][metricID]
if !hasMetric2 {
return true
}
@@ -208,7 +222,7 @@ func (rkDsrtr *rankingDescSorter) sortStatIDs() []string {
// newRankingAscSorter is a constructor for rankingAscSorter
func newRankingAscSorter(sortingParams []string,
statMetrics map[string]map[string]float64) (rkASrtr *rankingAscSorter) {
Metrics map[string]map[string]float64) (rkASrtr *rankingAscSorter) {
clnSp := make([]string, len(sortingParams))
sPReversed := make(utils.StringSet)
for i, sP := range sortingParams { // clean the sortingParams, out of param:false or param:true definitions
@@ -221,9 +235,9 @@ func newRankingAscSorter(sortingParams []string,
rkASrtr = &rankingAscSorter{
clnSp,
sPReversed,
statMetrics,
make([]string, 0, len(statMetrics))}
for statID := range rkASrtr.statMetrics {
Metrics,
make([]string, 0, len(Metrics))}
for statID := range rkASrtr.Metrics {
rkASrtr.statIDs = append(rkASrtr.statIDs, statID)
}
return
@@ -231,11 +245,11 @@ func newRankingAscSorter(sortingParams []string,
// rankingAscSorter will sort data ascendent for metrics in sortingParams or randomly if all equal
type rankingAscSorter struct {
sMetricIDs []string
sMetricRev utils.StringSet // list of exceptios for sortingParams, reverting the sorting logic
statMetrics map[string]map[string]float64
sMetricIDs []string
sMetricRev utils.StringSet // list of exceptios for sortingParams, reverting the sorting logic
Metrics map[string]map[string]float64
statIDs []string // list of keys of the statMetrics
statIDs []string // list of keys of the Metrics
}
// sortStatIDs implements rankingSorter interface
@@ -245,11 +259,11 @@ func (rkASrtr *rankingAscSorter) sortStatIDs() []string {
}
sort.Slice(rkASrtr.statIDs, func(i, j int) bool {
for _, metricID := range rkASrtr.sMetricIDs {
val1, hasMetric1 := rkASrtr.statMetrics[rkASrtr.statIDs[i]][metricID]
val1, hasMetric1 := rkASrtr.Metrics[rkASrtr.statIDs[i]][metricID]
if !hasMetric1 {
return false
}
val2, hasMetric2 := rkASrtr.statMetrics[rkASrtr.statIDs[j]][metricID]
val2, hasMetric2 := rkASrtr.Metrics[rkASrtr.statIDs[j]][metricID]
if !hasMetric2 {
return true
}
@@ -268,3 +282,11 @@ func (rkASrtr *rankingAscSorter) sortStatIDs() []string {
})
return rkASrtr.statIDs
}
// RankingSummary is the event sent to TrendS and EEs
type RankingSummary struct {
Tenant string
ID string
LastUpdate time.Time
SortedStatIDs []string
}

View File

@@ -24,32 +24,32 @@ import (
)
func TestRankingDescSorterSortStatIDs(t *testing.T) {
statMetrics := map[string]map[string]float64{
Metrics := map[string]map[string]float64{
"STATS1": {"*acc": 12.1, "*tcc": 24.2},
"STATS2": {"*acc": 12.1, "*tcc": 24.3},
"STATS3": {"*acc": 10.1, "*tcc": 25.3},
"STATS4": {"*tcc": 26.3},
}
sortMetrics := []string{"*acc", "*tcc"}
rdscSrtr := newRankingDescSorter(sortMetrics, statMetrics)
rdscSrtr := newRankingDescSorter(sortMetrics, Metrics)
eStatIDs := []string{"STATS2", "STATS1", "STATS3", "STATS4"}
if statIDs := rdscSrtr.sortStatIDs(); !reflect.DeepEqual(eStatIDs, statIDs) {
t.Errorf("Expecting: %v, received %v", eStatIDs, statIDs)
}
sortMetrics = []string{"*acc:false", "*tcc"} // changed the order of checks, stats4 should come first
rdscSrtr = newRankingDescSorter(sortMetrics, statMetrics)
rdscSrtr = newRankingDescSorter(sortMetrics, Metrics)
eStatIDs = []string{"STATS3", "STATS2", "STATS1", "STATS4"}
if statIDs := rdscSrtr.sortStatIDs(); !reflect.DeepEqual(eStatIDs, statIDs) {
t.Errorf("Expecting: %v, received %v", eStatIDs, statIDs)
}
sortMetrics = []string{"*tcc", "*acc:true"} // changed the order of checks, stats4 should come first
rdscSrtr = newRankingDescSorter(sortMetrics, statMetrics)
rdscSrtr = newRankingDescSorter(sortMetrics, Metrics)
eStatIDs = []string{"STATS4", "STATS3", "STATS2", "STATS1"}
if statIDs := rdscSrtr.sortStatIDs(); !reflect.DeepEqual(eStatIDs, statIDs) {
t.Errorf("Expecting: %v, received %v", eStatIDs, statIDs)
}
sortMetrics = []string{"*tcc:false", "*acc"} // reversed *tcc which should consider ascendent instead of descendent
rdscSrtr = newRankingDescSorter(sortMetrics, statMetrics)
rdscSrtr = newRankingDescSorter(sortMetrics, Metrics)
eStatIDs = []string{"STATS1", "STATS2", "STATS3", "STATS4"}
if statIDs := rdscSrtr.sortStatIDs(); !reflect.DeepEqual(eStatIDs, statIDs) {
t.Errorf("Expecting: %v, received %v", eStatIDs, statIDs)
@@ -57,32 +57,32 @@ func TestRankingDescSorterSortStatIDs(t *testing.T) {
}
func TestRankingAscSorterSortStatIDs(t *testing.T) {
statMetrics := map[string]map[string]float64{
Metrics := map[string]map[string]float64{
"STATS1": {"*acc": 12.1, "*tcc": 24.2},
"STATS2": {"*acc": 12.1, "*tcc": 24.3},
"STATS3": {"*acc": 10.1, "*tcc": 25.3},
"STATS4": {"*tcc": 26.3},
}
sortMetrics := []string{"*acc", "*tcc"}
rtAscSrtr := newRankingAscSorter(sortMetrics, statMetrics)
rtAscSrtr := newRankingAscSorter(sortMetrics, Metrics)
eStatIDs := []string{"STATS3", "STATS1", "STATS2", "STATS4"}
if statIDs := rtAscSrtr.sortStatIDs(); !reflect.DeepEqual(eStatIDs, statIDs) {
t.Errorf("Expecting: %v, received %v", eStatIDs, statIDs)
}
sortMetrics = []string{"*acc:false", "*tcc"}
rtAscSrtr = newRankingAscSorter(sortMetrics, statMetrics)
rtAscSrtr = newRankingAscSorter(sortMetrics, Metrics)
eStatIDs = []string{"STATS1", "STATS2", "STATS3", "STATS4"}
if statIDs := rtAscSrtr.sortStatIDs(); !reflect.DeepEqual(eStatIDs, statIDs) {
t.Errorf("Expecting: %v, received %v", eStatIDs, statIDs)
}
sortMetrics = []string{"*tcc", "*acc:true"}
rtAscSrtr = newRankingAscSorter(sortMetrics, statMetrics)
rtAscSrtr = newRankingAscSorter(sortMetrics, Metrics)
eStatIDs = []string{"STATS1", "STATS2", "STATS3", "STATS4"}
if statIDs := rtAscSrtr.sortStatIDs(); !reflect.DeepEqual(eStatIDs, statIDs) {
t.Errorf("Expecting: %v, received %v", eStatIDs, statIDs)
}
sortMetrics = []string{"*tcc:false", "*acc"}
rtAscSrtr = newRankingAscSorter(sortMetrics, statMetrics)
rtAscSrtr = newRankingAscSorter(sortMetrics, Metrics)
eStatIDs = []string{"STATS4", "STATS3", "STATS2", "STATS1"}
if statIDs := rtAscSrtr.sortStatIDs(); !reflect.DeepEqual(eStatIDs, statIDs) {
t.Errorf("Expecting: %v, received %v", eStatIDs, statIDs)

View File

@@ -88,6 +88,9 @@ func (rkS *RankingS) computeRanking(rkP *RankingProfile) {
if rk.rkPrfl == nil {
rk.rkPrfl = rkP
}
rk.LastUpdate = time.Now()
rk.Metrics = make(map[string]map[string]float64) // reset previous values
rk.SortedStatIDs = make([]string, 0)
for _, statID := range rkP.StatIDs {
var floatMetrics map[string]float64
if err := rkS.connMgr.Call(context.Background(), rkS.cgrcfg.RankingSCfg().StatSConns,
@@ -107,15 +110,17 @@ func (rkS *RankingS) computeRanking(rkP *RankingProfile) {
}
}
}
if len(floatMetrics) != 0 {
rk.StatMetrics[statID] = make(map[string]float64)
rk.Metrics[statID] = make(map[string]float64)
}
for metricID, val := range floatMetrics {
rk.StatMetrics[statID][metricID] = val
rk.Metrics[statID][metricID] = val
}
}
if rk.SortedStatIDs, err = rankingSortStats(rkP.Sorting,
rkP.SortingParameters, rk.StatMetrics); err != nil {
rkP.SortingParameters, rk.Metrics); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> sorting stats for Ranking with ID: <%s:%s> error: <%s>",
@@ -170,6 +175,7 @@ func (rkS *RankingS) processThresholds(rk *Ranking) (err error) {
APIOpts: opts,
Event: map[string]any{
utils.RankingID: rk.ID,
utils.LastUpdate: rk.LastUpdate,
utils.SortedStatIDs: copy([]string{}, rk.SortedStatIDs),
},
}
@@ -205,6 +211,7 @@ func (rkS *RankingS) processEEs(rk *Ranking) (err error) {
APIOpts: opts,
Event: map[string]any{
utils.RankingID: rk.ID,
utils.LastUpdate: rk.LastUpdate,
utils.SortedStatIDs: copy([]string{}, rk.SortedStatIDs),
},
}
@@ -445,11 +452,11 @@ func (rkS *RankingS) V1GetRanking(ctx *context.Context, arg *utils.TenantIDWithA
defer rk.rMux.RUnlock()
retRanking.Tenant = rk.Tenant // avoid vet complaining for mutex copying
retRanking.ID = rk.ID
retRanking.StatMetrics = make(map[string]map[string]float64)
for statID, metrics := range rk.StatMetrics {
retRanking.StatMetrics[statID] = make(map[string]float64)
retRanking.Metrics = make(map[string]map[string]float64)
for statID, metrics := range rk.Metrics {
retRanking.Metrics[statID] = make(map[string]float64)
for metricID, val := range metrics {
retRanking.StatMetrics[statID][metricID] = val
retRanking.Metrics[statID][metricID] = val
}
}
retRanking.Sorting = rk.Sorting

View File

@@ -514,6 +514,7 @@ const (
StatID = "StatID"
StatIDs = "StatIDs"
SortedStatIDs = "SortedStatIDs"
LastUpdate = "LastUpdate"
TrendID = "TrendID"
RankingID = "RankingID"
BalanceType = "BalanceType"