diff --git a/engine/libtrends.go b/engine/libtrends.go index 76459973e..ff0510419 100644 --- a/engine/libtrends.go +++ b/engine/libtrends.go @@ -19,6 +19,8 @@ along with this program. If not, see package engine import ( + "math" + "sync" "time" "github.com/cgrates/cgrates/utils" @@ -29,13 +31,40 @@ type TrendProfile struct { ID string Schedule string // Cron expression scheduling gathering of the metrics StatID string - Metrics []MetricWithSettings + Metrics []*MetricWithSettings QueueLength int TTL time.Duration TrendType string // *last, *average ThresholdIDs []string } +// Clone will clone the TrendProfile so it can be used by scheduler safely +func (tP *TrendProfile) Clone() (clnTp *TrendProfile) { + clnTp = &TrendProfile{ + Tenant: tP.Tenant, + ID: tP.ID, + Schedule: tP.Schedule, + StatID: tP.StatID, + QueueLength: tP.QueueLength, + TTL: tP.TTL, + TrendType: tP.TrendType, + } + if tP.Metrics != nil { + clnTp.Metrics = make([]*MetricWithSettings, len(tP.Metrics)) + for i, m := range tP.Metrics { + clnTp.Metrics[i] = &MetricWithSettings{MetricID: m.MetricID, + TrendSwingMargin: m.TrendSwingMargin} + } + } + if tP.ThresholdIDs != nil { + clnTp.ThresholdIDs = make([]string, len(tP.ThresholdIDs)) + for i, tID := range tP.ThresholdIDs { + clnTp.ThresholdIDs[i] = tID + } + } + return +} + // MetricWithSettings adds specific settings to the Metric type MetricWithSettings struct { MetricID string @@ -63,18 +92,66 @@ type TrendWithAPIOpts struct { // Trend is the unit matched by filters type Trend struct { + sync.RWMutex + Tenant string ID string RunTimes []time.Time - Metrics map[time.Time]map[string]MetricWithTrend - totals map[string]float64 // cached sum, used for average calculations + Metrics map[time.Time]map[string]*MetricWithTrend + + // indexes help faster processing + mLast map[string]time.Time // last time a metric was present + mCounts map[string]int // number of times a metric is present in Metrics + mTotals map[string]float64 // cached sum, used for average calculations +} + +// computeIndexes should be called after each retrieval from DB +func (t *Trend) computeIndexes() { + for _, runTime := range t.RunTimes { + for _, mWt := range t.Metrics[runTime] { + t.indexesAppendMetric(mWt, runTime) + } + } +} + +// indexesAppendMetric appends a single metric to indexes +func (t *Trend) indexesAppendMetric(mWt *MetricWithTrend, rTime time.Time) { + t.mLast[mWt.ID] = rTime + t.mCounts[mWt.ID] += 1 + t.mTotals[mWt.ID] += mWt.Value +} + +// getTrendLabel identifies the trend label for the instant value of the metric +// +// *positive, *negative, *constant, N/A +func (t *Trend) getTrendLabel(mID string, mVal float64, swingMargin float64) (lbl string) { + var prevVal *float64 + if _, has := t.mLast[mID]; has { + prevVal = &t.Metrics[t.mLast[mID]][mID].Value + } + if prevVal == nil { + return utils.NotAvailable + } + diffVal := mVal - *prevVal + switch { + case diffVal > 0: + lbl = utils.MetaPositive + case diffVal < 0: + lbl = utils.MetaNegative + default: + lbl = utils.MetaConstant + } + if math.Abs(diffVal*100/(*prevVal)) <= swingMargin { // percentage value of diff is lower than threshold + lbl = utils.MetaConstant + } + return } // MetricWithTrend represents one read from StatS type MetricWithTrend struct { ID string // Metric ID Value float64 // Metric Value - Trend string // *positive, *negative, *neutral + Trend string // *positive, *negative, *constant, N/A } func (tr *Trend) TenantID() string { diff --git a/engine/model_helpers.go b/engine/model_helpers.go index cdb476a4f..2b5df1779 100644 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -1761,7 +1761,7 @@ func APItoTrends(tr *utils.TPTrendsProfile) (sr *TrendProfile, err error) { StatID: tr.StatID, Schedule: tr.Schedule, QueueLength: tr.QueueLength, - Metrics: make([]MetricWithSettings, len(tr.Metrics)), + Metrics: make([]*MetricWithSettings, len(tr.Metrics)), TrendType: tr.TrendType, ThresholdIDs: make([]string, len(tr.ThresholdIDs)), } diff --git a/engine/trends.go b/engine/trends.go index 3e9773d02..78f03a0b4 100644 --- a/engine/trends.go +++ b/engine/trends.go @@ -21,6 +21,7 @@ package engine import ( "fmt" "sync" + "time" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" @@ -62,7 +63,59 @@ type TrendS struct { // computeTrend will query a stat and build the Trend for it // // it is be called by Cron service -func (tS *TrendS) computeTrend(tP *TrendProfile) (err error) { +func (tS *TrendS) computeTrend(tP *TrendProfile) { + var floatMetrics map[string]float64 + if err := tS.connMgr.Call(context.Background(), tS.cgrcfg.TrendSCfg().StatSConns, + utils.StatSv1GetQueueFloatMetrics, + &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: tP.Tenant, ID: tP.StatID}}, + floatMetrics); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> computing trend for with id: <%s:%s> stats <%s> error: <%s>", + utils.TrendS, tP.Tenant, tP.ID, tP.StatID, err.Error())) + return + } + trend, err := tS.dm.GetTrend(tP.Tenant, tP.ID, true, true, utils.NonTransactional) + if err == utils.ErrNotFound { + trend = &Trend{ + Tenant: tP.Tenant, + ID: tP.ID, + RunTimes: make([]time.Time, 0), + Metrics: make(map[time.Time]map[string]*MetricWithTrend), + } + } else if err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> querying trend for with id: <%s:%s> dm error: <%s>", + utils.TrendS, tP.Tenant, tP.ID, err.Error())) + return + } + + trend.Lock() + defer trend.Unlock() + + now := time.Now() + var metricWithSettings []*MetricWithSettings + if len(tP.Metrics) != 0 { + metricWithSettings = tP.Metrics // read only + } + if len(metricWithSettings) == 0 { // unlimited metrics in trend + for mID := range floatMetrics { + metricWithSettings = append(metricWithSettings, &MetricWithSettings{MetricID: mID}) + } + } + trend.RunTimes = append(trend.RunTimes, now) + for _, mWS := range metricWithSettings { + mWt := &MetricWithTrend{ID: mWS.MetricID} + var has bool + if mWt.Value, has = floatMetrics[mWS.MetricID]; !has { // no stats computed for metric + mWt.Value = -1.0 + mWt.Trend = utils.NotAvailable + continue + } + mWt.Trend = trend.getTrendLabel(mWt.ID, mWt.Value, mWS.TrendSwingMargin) + } + return } @@ -82,7 +135,7 @@ func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs [] utils.TrendS, tnt, tID, err.Error())) complete = false } else if entryID, err := tS.crn.AddFunc(tP.Schedule, - func() { tS.computeTrend(tP) }); err != nil { + func() { tS.computeTrend(tP.Clone()) }); err != nil { utils.Logger.Warning( fmt.Sprintf( "<%s> scheduling TrendProfile <%s:%s>, error: <%s>", @@ -91,6 +144,7 @@ func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs [] } else { tS.crnTQsMux.Lock() tS.crnTQs[tP.Tenant][tP.ID] = entryID + tS.crnTQsMux.Unlock() } } diff --git a/utils/consts.go b/utils/consts.go index c8e128bc1..29fbfd474 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -317,6 +317,8 @@ const ( CreateTariffPlanTablesSQL = "create_tariffplan_tables.sql" TestSQL = "TEST_SQL" MetaConstant = "*constant" + MetaPositive = "*positive" + MetaNegative = "*negative" MetaFiller = "*filler" MetaHTTPPost = "*http_post" MetaHTTPjsonMap = "*http_json_map"