mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
TrendS.computeTrend implementation
This commit is contained in:
@@ -19,6 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package engine
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -29,13 +31,40 @@ type TrendProfile struct {
|
||||
ID string
|
||||
Schedule string // Cron expression scheduling gathering of the metrics
|
||||
StatID string
|
||||
Metrics []MetricWithSettings
|
||||
Metrics []*MetricWithSettings
|
||||
QueueLength int
|
||||
TTL time.Duration
|
||||
TrendType string // *last, *average
|
||||
ThresholdIDs []string
|
||||
}
|
||||
|
||||
// Clone will clone the TrendProfile so it can be used by scheduler safely
|
||||
func (tP *TrendProfile) Clone() (clnTp *TrendProfile) {
|
||||
clnTp = &TrendProfile{
|
||||
Tenant: tP.Tenant,
|
||||
ID: tP.ID,
|
||||
Schedule: tP.Schedule,
|
||||
StatID: tP.StatID,
|
||||
QueueLength: tP.QueueLength,
|
||||
TTL: tP.TTL,
|
||||
TrendType: tP.TrendType,
|
||||
}
|
||||
if tP.Metrics != nil {
|
||||
clnTp.Metrics = make([]*MetricWithSettings, len(tP.Metrics))
|
||||
for i, m := range tP.Metrics {
|
||||
clnTp.Metrics[i] = &MetricWithSettings{MetricID: m.MetricID,
|
||||
TrendSwingMargin: m.TrendSwingMargin}
|
||||
}
|
||||
}
|
||||
if tP.ThresholdIDs != nil {
|
||||
clnTp.ThresholdIDs = make([]string, len(tP.ThresholdIDs))
|
||||
for i, tID := range tP.ThresholdIDs {
|
||||
clnTp.ThresholdIDs[i] = tID
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// MetricWithSettings adds specific settings to the Metric
|
||||
type MetricWithSettings struct {
|
||||
MetricID string
|
||||
@@ -63,18 +92,66 @@ type TrendWithAPIOpts struct {
|
||||
|
||||
// Trend is the unit matched by filters
|
||||
type Trend struct {
|
||||
sync.RWMutex
|
||||
|
||||
Tenant string
|
||||
ID string
|
||||
RunTimes []time.Time
|
||||
Metrics map[time.Time]map[string]MetricWithTrend
|
||||
totals map[string]float64 // cached sum, used for average calculations
|
||||
Metrics map[time.Time]map[string]*MetricWithTrend
|
||||
|
||||
// indexes help faster processing
|
||||
mLast map[string]time.Time // last time a metric was present
|
||||
mCounts map[string]int // number of times a metric is present in Metrics
|
||||
mTotals map[string]float64 // cached sum, used for average calculations
|
||||
}
|
||||
|
||||
// computeIndexes should be called after each retrieval from DB
|
||||
func (t *Trend) computeIndexes() {
|
||||
for _, runTime := range t.RunTimes {
|
||||
for _, mWt := range t.Metrics[runTime] {
|
||||
t.indexesAppendMetric(mWt, runTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// indexesAppendMetric appends a single metric to indexes
|
||||
func (t *Trend) indexesAppendMetric(mWt *MetricWithTrend, rTime time.Time) {
|
||||
t.mLast[mWt.ID] = rTime
|
||||
t.mCounts[mWt.ID] += 1
|
||||
t.mTotals[mWt.ID] += mWt.Value
|
||||
}
|
||||
|
||||
// getTrendLabel identifies the trend label for the instant value of the metric
|
||||
//
|
||||
// *positive, *negative, *constant, N/A
|
||||
func (t *Trend) getTrendLabel(mID string, mVal float64, swingMargin float64) (lbl string) {
|
||||
var prevVal *float64
|
||||
if _, has := t.mLast[mID]; has {
|
||||
prevVal = &t.Metrics[t.mLast[mID]][mID].Value
|
||||
}
|
||||
if prevVal == nil {
|
||||
return utils.NotAvailable
|
||||
}
|
||||
diffVal := mVal - *prevVal
|
||||
switch {
|
||||
case diffVal > 0:
|
||||
lbl = utils.MetaPositive
|
||||
case diffVal < 0:
|
||||
lbl = utils.MetaNegative
|
||||
default:
|
||||
lbl = utils.MetaConstant
|
||||
}
|
||||
if math.Abs(diffVal*100/(*prevVal)) <= swingMargin { // percentage value of diff is lower than threshold
|
||||
lbl = utils.MetaConstant
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// MetricWithTrend represents one read from StatS
|
||||
type MetricWithTrend struct {
|
||||
ID string // Metric ID
|
||||
Value float64 // Metric Value
|
||||
Trend string // *positive, *negative, *neutral
|
||||
Trend string // *positive, *negative, *constant, N/A
|
||||
}
|
||||
|
||||
func (tr *Trend) TenantID() string {
|
||||
|
||||
@@ -1761,7 +1761,7 @@ func APItoTrends(tr *utils.TPTrendsProfile) (sr *TrendProfile, err error) {
|
||||
StatID: tr.StatID,
|
||||
Schedule: tr.Schedule,
|
||||
QueueLength: tr.QueueLength,
|
||||
Metrics: make([]MetricWithSettings, len(tr.Metrics)),
|
||||
Metrics: make([]*MetricWithSettings, len(tr.Metrics)),
|
||||
TrendType: tr.TrendType,
|
||||
ThresholdIDs: make([]string, len(tr.ThresholdIDs)),
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ package engine
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
@@ -62,7 +63,59 @@ type TrendS struct {
|
||||
// computeTrend will query a stat and build the Trend for it
|
||||
//
|
||||
// it is be called by Cron service
|
||||
func (tS *TrendS) computeTrend(tP *TrendProfile) (err error) {
|
||||
func (tS *TrendS) computeTrend(tP *TrendProfile) {
|
||||
var floatMetrics map[string]float64
|
||||
if err := tS.connMgr.Call(context.Background(), tS.cgrcfg.TrendSCfg().StatSConns,
|
||||
utils.StatSv1GetQueueFloatMetrics,
|
||||
&utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: tP.Tenant, ID: tP.StatID}},
|
||||
floatMetrics); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> computing trend for with id: <%s:%s> stats <%s> error: <%s>",
|
||||
utils.TrendS, tP.Tenant, tP.ID, tP.StatID, err.Error()))
|
||||
return
|
||||
}
|
||||
trend, err := tS.dm.GetTrend(tP.Tenant, tP.ID, true, true, utils.NonTransactional)
|
||||
if err == utils.ErrNotFound {
|
||||
trend = &Trend{
|
||||
Tenant: tP.Tenant,
|
||||
ID: tP.ID,
|
||||
RunTimes: make([]time.Time, 0),
|
||||
Metrics: make(map[time.Time]map[string]*MetricWithTrend),
|
||||
}
|
||||
} else if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> querying trend for with id: <%s:%s> dm error: <%s>",
|
||||
utils.TrendS, tP.Tenant, tP.ID, err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
trend.Lock()
|
||||
defer trend.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
var metricWithSettings []*MetricWithSettings
|
||||
if len(tP.Metrics) != 0 {
|
||||
metricWithSettings = tP.Metrics // read only
|
||||
}
|
||||
if len(metricWithSettings) == 0 { // unlimited metrics in trend
|
||||
for mID := range floatMetrics {
|
||||
metricWithSettings = append(metricWithSettings, &MetricWithSettings{MetricID: mID})
|
||||
}
|
||||
}
|
||||
trend.RunTimes = append(trend.RunTimes, now)
|
||||
for _, mWS := range metricWithSettings {
|
||||
mWt := &MetricWithTrend{ID: mWS.MetricID}
|
||||
var has bool
|
||||
if mWt.Value, has = floatMetrics[mWS.MetricID]; !has { // no stats computed for metric
|
||||
mWt.Value = -1.0
|
||||
mWt.Trend = utils.NotAvailable
|
||||
continue
|
||||
}
|
||||
mWt.Trend = trend.getTrendLabel(mWt.ID, mWt.Value, mWS.TrendSwingMargin)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -82,7 +135,7 @@ func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []
|
||||
utils.TrendS, tnt, tID, err.Error()))
|
||||
complete = false
|
||||
} else if entryID, err := tS.crn.AddFunc(tP.Schedule,
|
||||
func() { tS.computeTrend(tP) }); err != nil {
|
||||
func() { tS.computeTrend(tP.Clone()) }); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> scheduling TrendProfile <%s:%s>, error: <%s>",
|
||||
@@ -91,6 +144,7 @@ func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []
|
||||
} else {
|
||||
tS.crnTQsMux.Lock()
|
||||
tS.crnTQs[tP.Tenant][tP.ID] = entryID
|
||||
tS.crnTQsMux.Unlock()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -317,6 +317,8 @@ const (
|
||||
CreateTariffPlanTablesSQL = "create_tariffplan_tables.sql"
|
||||
TestSQL = "TEST_SQL"
|
||||
MetaConstant = "*constant"
|
||||
MetaPositive = "*positive"
|
||||
MetaNegative = "*negative"
|
||||
MetaFiller = "*filler"
|
||||
MetaHTTPPost = "*http_post"
|
||||
MetaHTTPjsonMap = "*http_json_map"
|
||||
|
||||
Reference in New Issue
Block a user