mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-20 14:48:43 +05:00
move trends to dedicated package
reivse/add comments and order of funcs/definitions
This commit is contained in:
committed by
Dan Christian Bogos
parent
24d886c8e0
commit
47fb25b4ef
@@ -48,8 +48,8 @@ type DataDBMock struct {
|
||||
RemoveResourceProfileDrvF func(ctx *context.Context, tnt, id string) error
|
||||
RemoveResourceDrvF func(ctx *context.Context, tnt, id string) error
|
||||
SetResourceDrvF func(ctx *context.Context, r *Resource) error
|
||||
SetTrendProfileDrvF func(ctx *context.Context, tr *TrendProfile) (err error)
|
||||
GetTrendProfileDrvF func(ctx *context.Context, tenant string, id string) (sq *TrendProfile, err error)
|
||||
SetTrendProfileDrvF func(ctx *context.Context, tr *utils.TrendProfile) (err error)
|
||||
GetTrendProfileDrvF func(ctx *context.Context, tenant string, id string) (sq *utils.TrendProfile, err error)
|
||||
RemTrendProfileDrvF func(ctx *context.Context, tenant string, id string) (err error)
|
||||
SetRankingProfileDrvF func(ctx *context.Context, sq *RankingProfile) (err error)
|
||||
GetRankingProfileDrvF func(ctx *context.Context, tenant string, id string) (sq *RankingProfile, err error)
|
||||
@@ -261,14 +261,14 @@ func (dbM *DataDBMock) RemoveRankingDrv(ctx *context.Context, _ string, _ string
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (dbM *DataDBMock) GetTrendProfileDrv(ctx *context.Context, tenant, id string) (sg *TrendProfile, err error) {
|
||||
func (dbM *DataDBMock) GetTrendProfileDrv(ctx *context.Context, tenant, id string) (sg *utils.TrendProfile, err error) {
|
||||
if dbM.GetStatQueueProfileDrvF != nil {
|
||||
return dbM.GetTrendProfileDrvF(ctx, tenant, id)
|
||||
}
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (dbM *DataDBMock) SetTrendProfileDrv(ctx *context.Context, trend *TrendProfile) (err error) {
|
||||
func (dbM *DataDBMock) SetTrendProfileDrv(ctx *context.Context, trend *utils.TrendProfile) (err error) {
|
||||
if dbM.SetTrendProfileDrvF(ctx, trend) != nil {
|
||||
return dbM.SetTrendProfileDrvF(ctx, trend)
|
||||
}
|
||||
@@ -451,11 +451,11 @@ func (dbM *DataDBMock) RemoveRateProfileDrv(ctx *context.Context, str1 string, s
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (dbM *DataDBMock) GetTrendDrv(ctx *context.Context, tenant, id string) (*Trend, error) {
|
||||
func (dbM *DataDBMock) GetTrendDrv(ctx *context.Context, tenant, id string) (*utils.Trend, error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (dbM *DataDBMock) SetTrendDrv(ctx *context.Context, tr *Trend) error {
|
||||
func (dbM *DataDBMock) SetTrendDrv(ctx *context.Context, tr *utils.Trend) error {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
|
||||
@@ -949,7 +949,7 @@ func (dm *DataManager) RemoveStatQueueProfile(ctx *context.Context, tenant, id s
|
||||
|
||||
// GetTrend retrieves a Trend from dataDB
|
||||
func (dm *DataManager) GetTrend(ctx *context.Context, tenant, id string,
|
||||
cacheRead, cacheWrite bool, transactionID string) (tr *Trend, err error) {
|
||||
cacheRead, cacheWrite bool, transactionID string) (tr *utils.Trend, err error) {
|
||||
tntID := utils.ConcatenatedKey(tenant, id)
|
||||
|
||||
if cacheRead {
|
||||
@@ -957,7 +957,7 @@ func (dm *DataManager) GetTrend(ctx *context.Context, tenant, id string,
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*Trend), nil
|
||||
return x.(*utils.Trend), nil
|
||||
}
|
||||
}
|
||||
if dm == nil {
|
||||
@@ -996,7 +996,7 @@ func (dm *DataManager) GetTrend(ctx *context.Context, tenant, id string,
|
||||
return
|
||||
}
|
||||
}
|
||||
if err = tr.uncompress(dm.ms); err != nil {
|
||||
if err = tr.Uncompress(dm.ms); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cacheWrite {
|
||||
@@ -1010,12 +1010,12 @@ func (dm *DataManager) GetTrend(ctx *context.Context, tenant, id string,
|
||||
}
|
||||
|
||||
// SetTrend stores Trend in dataDB
|
||||
func (dm *DataManager) SetTrend(ctx *context.Context, tr *Trend) (err error) {
|
||||
func (dm *DataManager) SetTrend(ctx *context.Context, tr *utils.Trend) (err error) {
|
||||
if dm == nil {
|
||||
return utils.ErrNoDatabaseConn
|
||||
}
|
||||
if dm.dataDB.GetStorageType() != utils.MetaInternal {
|
||||
if tr, err = tr.compress(dm.ms); err != nil {
|
||||
if tr, err = tr.Compress(dm.ms, dm.cfg.TrendSCfg().StoreUncompressedLimit); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -1027,7 +1027,7 @@ func (dm *DataManager) SetTrend(ctx *context.Context, tr *Trend) (err error) {
|
||||
dm.cfg.DataDbCfg().RplFiltered,
|
||||
utils.TrendPrefix, tr.TenantID(), // this are used to get the host IDs from cache
|
||||
utils.ReplicatorSv1SetTrend,
|
||||
&TrendWithAPIOpts{
|
||||
&utils.TrendWithAPIOpts{
|
||||
Trend: tr,
|
||||
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
|
||||
dm.cfg.DataDbCfg().RplCache, utils.EmptyString)}); err != nil {
|
||||
@@ -1059,14 +1059,14 @@ func (dm *DataManager) RemoveTrend(ctx *context.Context, tenant, id string) (err
|
||||
}
|
||||
|
||||
func (dm *DataManager) GetTrendProfile(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool,
|
||||
transactionID string) (trp *TrendProfile, err error) {
|
||||
transactionID string) (trp *utils.TrendProfile, err error) {
|
||||
tntID := utils.ConcatenatedKey(tenant, id)
|
||||
if cacheRead {
|
||||
if x, ok := Cache.Get(utils.CacheTrendProfiles, tntID); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*TrendProfile), nil
|
||||
return x.(*utils.TrendProfile), nil
|
||||
}
|
||||
}
|
||||
if dm == nil {
|
||||
@@ -1140,7 +1140,7 @@ func (dm *DataManager) GetTrendProfileIDs(ctx *context.Context, tenants []string
|
||||
return
|
||||
}
|
||||
|
||||
func (dm *DataManager) SetTrendProfile(ctx *context.Context, trp *TrendProfile) (err error) {
|
||||
func (dm *DataManager) SetTrendProfile(ctx *context.Context, trp *utils.TrendProfile) (err error) {
|
||||
if dm == nil {
|
||||
return utils.ErrNoDatabaseConn
|
||||
}
|
||||
@@ -1156,7 +1156,7 @@ func (dm *DataManager) SetTrendProfile(ctx *context.Context, trp *TrendProfile)
|
||||
dm.cfg.DataDbCfg().RplFiltered,
|
||||
utils.TrendProfilePrefix, trp.TenantID(),
|
||||
utils.ReplicatorSv1SetTrendProfile,
|
||||
&TrendProfileWithAPIOpts{
|
||||
&utils.TrendProfileWithAPIOpts{
|
||||
TrendProfile: trp,
|
||||
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
|
||||
dm.cfg.DataDbCfg().RplCache, utils.EmptyString)})
|
||||
@@ -1164,7 +1164,7 @@ func (dm *DataManager) SetTrendProfile(ctx *context.Context, trp *TrendProfile)
|
||||
if oldTrd == nil ||
|
||||
oldTrd.QueueLength != trp.QueueLength ||
|
||||
oldTrd.Schedule != trp.Schedule {
|
||||
if err = dm.SetTrend(ctx, NewTrendFromProfile(trp)); err != nil {
|
||||
if err = dm.SetTrend(ctx, utils.NewTrendFromProfile(trp)); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -131,7 +131,7 @@ func (dDP *dynamicDP) fieldAsInterface(fldPath []string) (val any, err error) {
|
||||
|
||||
case utils.MetaTrends:
|
||||
//sample of fieldName : ~*trends.TrendID.Metrics.*acd.Value
|
||||
var trendSum TrendSummary
|
||||
var trendSum utils.TrendSummary
|
||||
if err := connMgr.Call(context.TODO(), dDP.trdConns, utils.TrendSv1GetTrendSummary, &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: dDP.tenant, ID: fldPath[1]}}, &trendSum); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -2533,11 +2533,11 @@ func TestFilterTrends(t *testing.T) {
|
||||
if argGetTrend.ID == "Trend1" && argGetTrend.Tenant == "cgrates.org" {
|
||||
now := time.Now()
|
||||
now2 := now.Add(time.Second)
|
||||
tr := Trend{
|
||||
tr := utils.Trend{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "Trend1",
|
||||
RunTimes: []time.Time{now, now2},
|
||||
Metrics: map[time.Time]map[string]*MetricWithTrend{
|
||||
Metrics: map[time.Time]map[string]*utils.MetricWithTrend{
|
||||
now: {
|
||||
"*acc": {ID: "*acc", Value: 45, TrendGrowth: -1.0, TrendLabel: utils.NotAvailable},
|
||||
"*acd": {ID: "*acd", Value: 50, TrendGrowth: -1.0, TrendLabel: utils.NotAvailable},
|
||||
@@ -2548,8 +2548,8 @@ func TestFilterTrends(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
trS := tr.asTrendSummary()
|
||||
*reply.(*TrendSummary) = *trS
|
||||
trS := tr.AsTrendSummary()
|
||||
*reply.(*utils.TrendSummary) = *trS
|
||||
return nil
|
||||
}
|
||||
return utils.ErrNotFound
|
||||
|
||||
@@ -1,437 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"math"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// A TrendProfile represents the settings of a Trend
|
||||
type TrendProfile struct {
|
||||
Tenant string
|
||||
ID string
|
||||
Schedule string // Cron expression scheduling gathering of the metrics
|
||||
StatID string
|
||||
Metrics []string
|
||||
TTL time.Duration
|
||||
QueueLength int
|
||||
MinItems int // minimum number of items for building Trends
|
||||
CorrelationType string // *last, *average
|
||||
Tolerance float64 // allow this deviation margin for *constant trend
|
||||
Stored bool // store the Trend in dataDB
|
||||
ThresholdIDs []string
|
||||
}
|
||||
|
||||
// Clone will clone the TrendProfile so it can be used by scheduler safely
|
||||
func (tP *TrendProfile) Clone() (clnTp *TrendProfile) {
|
||||
clnTp = &TrendProfile{
|
||||
Tenant: tP.Tenant,
|
||||
ID: tP.ID,
|
||||
Schedule: tP.Schedule,
|
||||
StatID: tP.StatID,
|
||||
QueueLength: tP.QueueLength,
|
||||
TTL: tP.TTL,
|
||||
MinItems: tP.MinItems,
|
||||
CorrelationType: tP.CorrelationType,
|
||||
Tolerance: tP.Tolerance,
|
||||
Stored: tP.Stored,
|
||||
}
|
||||
if tP.Metrics != nil {
|
||||
clnTp.Metrics = make([]string, len(tP.Metrics))
|
||||
for i, mID := range tP.Metrics {
|
||||
clnTp.Metrics[i] = mID
|
||||
}
|
||||
}
|
||||
if tP.ThresholdIDs != nil {
|
||||
clnTp.ThresholdIDs = make([]string, len(tP.ThresholdIDs))
|
||||
for i, tID := range tP.ThresholdIDs {
|
||||
clnTp.ThresholdIDs[i] = tID
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type TrendProfileWithAPIOpts struct {
|
||||
*TrendProfile
|
||||
APIOpts map[string]any
|
||||
}
|
||||
|
||||
func (srp *TrendProfile) TenantID() string {
|
||||
return utils.ConcatenatedKey(srp.Tenant, srp.ID)
|
||||
}
|
||||
|
||||
type TrendWithAPIOpts struct {
|
||||
*Trend
|
||||
APIOpts map[string]any
|
||||
}
|
||||
|
||||
// NewTrendFromProfile is a constructor for an empty trend out of it's profile
|
||||
func NewTrendFromProfile(tP *TrendProfile) *Trend {
|
||||
return &Trend{
|
||||
Tenant: tP.Tenant,
|
||||
ID: tP.ID,
|
||||
RunTimes: make([]time.Time, 0),
|
||||
Metrics: make(map[time.Time]map[string]*MetricWithTrend),
|
||||
|
||||
tPrfl: tP,
|
||||
}
|
||||
}
|
||||
|
||||
// Trend is the unit matched by filters
|
||||
type Trend struct {
|
||||
tMux sync.RWMutex
|
||||
|
||||
Tenant string
|
||||
ID string
|
||||
RunTimes []time.Time
|
||||
Metrics map[time.Time]map[string]*MetricWithTrend
|
||||
CompressedMetrics []byte // if populated, Metrics and RunTimes will be emty
|
||||
|
||||
// indexes help faster processing
|
||||
mLast map[string]time.Time // last time a metric was present
|
||||
mCounts map[string]int // number of times a metric is present in Metrics
|
||||
mTotals map[string]float64 // cached sum, used for average calculations
|
||||
|
||||
tPrfl *TrendProfile // store here the trend profile so we can have it at hands further
|
||||
|
||||
}
|
||||
|
||||
func (t *Trend) Clone() (tC *Trend) {
|
||||
return
|
||||
}
|
||||
|
||||
// AsTrendSummary transforms the trend into TrendSummary
|
||||
func (t *Trend) asTrendSummary() (ts *TrendSummary) {
|
||||
ts = &TrendSummary{
|
||||
Tenant: t.Tenant,
|
||||
ID: t.ID,
|
||||
Metrics: make(map[string]*MetricWithTrend),
|
||||
}
|
||||
if len(t.RunTimes) != 0 {
|
||||
ts.Time = t.RunTimes[len(t.RunTimes)-1]
|
||||
for mID, mWt := range t.Metrics[ts.Time] {
|
||||
ts.Metrics[mID] = &MetricWithTrend{
|
||||
ID: mWt.ID,
|
||||
Value: mWt.Value,
|
||||
TrendGrowth: mWt.TrendGrowth,
|
||||
TrendLabel: mWt.TrendLabel,
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (t *Trend) compress(ms utils.Marshaler) (tr *Trend, err error) {
|
||||
if config.CgrConfig().TrendSCfg().StoreUncompressedLimit > len(t.RunTimes) {
|
||||
return
|
||||
}
|
||||
tr = &Trend{
|
||||
Tenant: t.Tenant,
|
||||
ID: t.ID,
|
||||
}
|
||||
tr.CompressedMetrics, err = ms.Marshal(tr.Metrics)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return tr, nil
|
||||
}
|
||||
|
||||
func (t *Trend) uncompress(ms utils.Marshaler) (err error) {
|
||||
if t == nil || t.CompressedMetrics == nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = ms.Unmarshal(t.CompressedMetrics, &t.Metrics)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
t.CompressedMetrics = nil
|
||||
t.RunTimes = make([]time.Time, len(t.Metrics))
|
||||
i := 0
|
||||
for key := range t.Metrics {
|
||||
t.RunTimes[i] = key
|
||||
i++
|
||||
}
|
||||
slices.SortFunc(t.RunTimes, func(a, b time.Time) int {
|
||||
return a.Compare(b)
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Compile is used to initialize or cleanup the Trend
|
||||
//
|
||||
// thread safe since it should be used close to source
|
||||
func (t *Trend) Compile(cleanTtl time.Duration, qLength int) {
|
||||
t.cleanup(cleanTtl, qLength)
|
||||
if len(t.mTotals) == 0 { // indexes were not yet built
|
||||
t.computeIndexes()
|
||||
}
|
||||
}
|
||||
|
||||
// cleanup will clean stale data out of
|
||||
func (t *Trend) cleanup(ttl time.Duration, qLength int) (altered bool) {
|
||||
if ttl >= 0 {
|
||||
expTime := time.Now().Add(-ttl)
|
||||
var expIdx *int
|
||||
for i, rT := range t.RunTimes {
|
||||
if rT.After(expTime) {
|
||||
continue
|
||||
}
|
||||
expIdx = &i
|
||||
delete(t.Metrics, rT)
|
||||
}
|
||||
if expIdx != nil {
|
||||
if len(t.RunTimes)-1 == *expIdx {
|
||||
t.RunTimes = make([]time.Time, 0)
|
||||
} else {
|
||||
t.RunTimes = t.RunTimes[*expIdx+1:]
|
||||
}
|
||||
altered = true
|
||||
}
|
||||
}
|
||||
|
||||
diffLen := len(t.RunTimes) - qLength
|
||||
if qLength > 0 && diffLen > 0 {
|
||||
var rmTms []time.Time
|
||||
rmTms, t.RunTimes = t.RunTimes[:diffLen], t.RunTimes[diffLen:]
|
||||
for _, rmTm := range rmTms {
|
||||
delete(t.Metrics, rmTm)
|
||||
}
|
||||
altered = true
|
||||
}
|
||||
if altered {
|
||||
t.computeIndexes()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// computeIndexes should be called after each retrieval from DB
|
||||
func (t *Trend) computeIndexes() {
|
||||
t.mLast = make(map[string]time.Time)
|
||||
t.mCounts = make(map[string]int)
|
||||
t.mTotals = make(map[string]float64)
|
||||
for _, runTime := range t.RunTimes {
|
||||
for _, mWt := range t.Metrics[runTime] {
|
||||
t.indexesAppendMetric(mWt, runTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// indexesAppendMetric appends a single metric to indexes
|
||||
func (t *Trend) indexesAppendMetric(mWt *MetricWithTrend, rTime time.Time) {
|
||||
t.mLast[mWt.ID] = rTime
|
||||
t.mCounts[mWt.ID]++
|
||||
t.mTotals[mWt.ID] += mWt.Value
|
||||
}
|
||||
|
||||
// getTrendGrowth returns the percentage growth for a specific metric
|
||||
//
|
||||
// @correlation parameter will define whether the comparison is against last or average value
|
||||
// errors in case of previous
|
||||
func (t *Trend) getTrendGrowth(mID string, mVal float64, correlation string, roundDec int) (tG float64, err error) {
|
||||
var prevVal float64
|
||||
if _, has := t.mLast[mID]; !has {
|
||||
return -1.0, utils.ErrNotFound
|
||||
}
|
||||
if _, has := t.Metrics[t.mLast[mID]][mID]; !has {
|
||||
return -1.0, utils.ErrNotFound
|
||||
}
|
||||
switch correlation {
|
||||
case utils.MetaLast:
|
||||
prevVal = t.Metrics[t.mLast[mID]][mID].Value
|
||||
case utils.MetaAverage:
|
||||
prevVal = t.mTotals[mID] / float64(t.mCounts[mID])
|
||||
default:
|
||||
return -1.0, utils.ErrCorrelationUndefined
|
||||
}
|
||||
|
||||
diffVal := mVal - prevVal
|
||||
return utils.Round(diffVal*100/prevVal, roundDec, utils.MetaRoundingMiddle), nil
|
||||
}
|
||||
|
||||
// getTrendLabel identifies the trend label for the instant value of the metric
|
||||
//
|
||||
// *positive, *negative, *constant, N/A
|
||||
func (t *Trend) getTrendLabel(tGrowth float64, tolerance float64) (lbl string) {
|
||||
switch {
|
||||
case tGrowth > 0:
|
||||
lbl = utils.MetaPositive
|
||||
case tGrowth < 0:
|
||||
lbl = utils.MetaNegative
|
||||
default:
|
||||
lbl = utils.MetaConstant
|
||||
}
|
||||
if math.Abs(tGrowth) <= tolerance { // percentage value of diff is lower than threshold
|
||||
lbl = utils.MetaConstant
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// MetricWithTrend represents one read from StatS
|
||||
type MetricWithTrend struct {
|
||||
ID string // Metric ID
|
||||
Value float64 // Metric Value
|
||||
TrendGrowth float64 // Difference between last and previous
|
||||
TrendLabel string // *positive, *negative, *constant, N/A
|
||||
}
|
||||
|
||||
func (tr *Trend) TenantID() string {
|
||||
return utils.ConcatenatedKey(tr.Tenant, tr.ID)
|
||||
}
|
||||
|
||||
// TrendSummary represents the last trend computed
|
||||
type TrendSummary struct {
|
||||
Tenant string
|
||||
ID string
|
||||
Time time.Time
|
||||
Metrics map[string]*MetricWithTrend
|
||||
}
|
||||
|
||||
func (tp *TrendProfile) Set(path []string, val any, _ bool) (err error) {
|
||||
if len(path) != 1 {
|
||||
return utils.ErrWrongPath
|
||||
}
|
||||
|
||||
switch path[0] {
|
||||
default:
|
||||
return utils.ErrWrongPath
|
||||
case utils.Tenant:
|
||||
tp.Tenant = utils.IfaceAsString(val)
|
||||
case utils.ID:
|
||||
tp.ID = utils.IfaceAsString(val)
|
||||
case utils.Schedule:
|
||||
tp.Schedule = utils.IfaceAsString(val)
|
||||
case utils.StatID:
|
||||
tp.StatID = utils.IfaceAsString(val)
|
||||
case utils.Metrics:
|
||||
var valA []string
|
||||
valA, err = utils.IfaceAsStringSlice(val)
|
||||
tp.Metrics = append(tp.Metrics, valA...)
|
||||
case utils.TTL:
|
||||
tp.TTL, err = utils.IfaceAsDuration(val)
|
||||
case utils.QueueLength:
|
||||
tp.QueueLength, err = utils.IfaceAsInt(val)
|
||||
case utils.MinItems:
|
||||
tp.MinItems, err = utils.IfaceAsInt(val)
|
||||
case utils.CorrelationType:
|
||||
tp.CorrelationType = utils.IfaceAsString(val)
|
||||
case utils.Tolerance:
|
||||
tp.Tolerance, err = utils.IfaceAsFloat64(val)
|
||||
case utils.Stored:
|
||||
tp.Stored, err = utils.IfaceAsBool(val)
|
||||
case utils.ThresholdIDs:
|
||||
var valA []string
|
||||
valA, err = utils.IfaceAsStringSlice(val)
|
||||
tp.ThresholdIDs = append(tp.ThresholdIDs, valA...)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (tp *TrendProfile) Merge(v2 any) {
|
||||
vi := v2.(*TrendProfile)
|
||||
if len(vi.Tenant) != 0 {
|
||||
tp.Tenant = vi.Tenant
|
||||
}
|
||||
if len(vi.ID) != 0 {
|
||||
tp.ID = vi.ID
|
||||
}
|
||||
if len(vi.Schedule) != 0 {
|
||||
tp.Schedule = vi.Schedule
|
||||
}
|
||||
if len(vi.StatID) != 0 {
|
||||
tp.StatID = vi.StatID
|
||||
}
|
||||
tp.Metrics = append(tp.Metrics, vi.Metrics...)
|
||||
tp.ThresholdIDs = append(tp.ThresholdIDs, vi.ThresholdIDs...)
|
||||
if vi.Stored {
|
||||
tp.Stored = vi.Stored
|
||||
}
|
||||
if vi.TTL != 0 {
|
||||
tp.TTL = vi.TTL
|
||||
}
|
||||
if vi.QueueLength != 0 {
|
||||
tp.QueueLength = vi.QueueLength
|
||||
}
|
||||
if vi.MinItems != 0 {
|
||||
tp.MinItems = vi.MinItems
|
||||
}
|
||||
if len(vi.CorrelationType) != 0 {
|
||||
tp.CorrelationType = vi.CorrelationType
|
||||
}
|
||||
if vi.Tolerance != 0 {
|
||||
tp.Tolerance = vi.Tolerance
|
||||
}
|
||||
}
|
||||
|
||||
func (tp *TrendProfile) String() string { return utils.ToJSON(tp) }
|
||||
func (tp *TrendProfile) FieldAsString(fldPath []string) (_ string, err error) {
|
||||
var val any
|
||||
if val, err = tp.FieldAsInterface(fldPath); err != nil {
|
||||
return
|
||||
}
|
||||
return utils.IfaceAsString(val), nil
|
||||
}
|
||||
func (tp *TrendProfile) FieldAsInterface(fldPath []string) (_ any, err error) {
|
||||
if len(fldPath) != 1 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
switch fldPath[0] {
|
||||
default:
|
||||
fld, idx := utils.GetPathIndex(fldPath[0])
|
||||
if idx != nil {
|
||||
switch fld {
|
||||
case utils.Metrics:
|
||||
if *idx < len(tp.Metrics) {
|
||||
return tp.Metrics[*idx], nil
|
||||
}
|
||||
case utils.ThresholdIDs:
|
||||
if *idx < len(tp.ThresholdIDs) {
|
||||
return tp.ThresholdIDs[*idx], nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, utils.ErrNotFound
|
||||
case utils.Tenant:
|
||||
return tp.Tenant, nil
|
||||
case utils.ID:
|
||||
return tp.ID, nil
|
||||
case utils.Schedule:
|
||||
return tp.Schedule, nil
|
||||
case utils.StatID:
|
||||
return tp.StatID, nil
|
||||
case utils.TTL:
|
||||
return tp.TTL, nil
|
||||
case utils.QueueLength:
|
||||
return tp.QueueLength, nil
|
||||
case utils.MinItems:
|
||||
return tp.MinItems, nil
|
||||
case utils.CorrelationType:
|
||||
return tp.CorrelationType, nil
|
||||
case utils.Tolerance:
|
||||
return tp.Tolerance, nil
|
||||
case utils.Stored:
|
||||
return tp.Stored, nil
|
||||
}
|
||||
}
|
||||
@@ -1,921 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestTrendProfileClone(t *testing.T) {
|
||||
|
||||
original := &TrendProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "ID",
|
||||
Schedule: "Schedule",
|
||||
StatID: "StatID",
|
||||
Metrics: []string{"metric1", "metric2"},
|
||||
TTL: 10 * time.Minute,
|
||||
QueueLength: 100,
|
||||
MinItems: 10,
|
||||
CorrelationType: "average",
|
||||
Tolerance: 0.05,
|
||||
Stored: true,
|
||||
ThresholdIDs: []string{"thresh1", "thresh2"},
|
||||
}
|
||||
|
||||
cloned := original.Clone()
|
||||
|
||||
if cloned.Tenant != original.Tenant {
|
||||
t.Errorf("Expected Tenant %s, but got %s", original.Tenant, cloned.Tenant)
|
||||
}
|
||||
if cloned.ID != original.ID {
|
||||
t.Errorf("Expected ID %s, but got %s", original.ID, cloned.ID)
|
||||
}
|
||||
if cloned.Schedule != original.Schedule {
|
||||
t.Errorf("Expected Schedule %s, but got %s", original.Schedule, cloned.Schedule)
|
||||
}
|
||||
if cloned.StatID != original.StatID {
|
||||
t.Errorf("Expected StatID %s, but got %s", original.StatID, cloned.StatID)
|
||||
}
|
||||
if cloned.QueueLength != original.QueueLength {
|
||||
t.Errorf("Expected QueueLength %d, but got %d", original.QueueLength, cloned.QueueLength)
|
||||
}
|
||||
if cloned.TTL != original.TTL {
|
||||
t.Errorf("Expected TTL %v, but got %v", original.TTL, cloned.TTL)
|
||||
}
|
||||
if cloned.MinItems != original.MinItems {
|
||||
t.Errorf("Expected MinItems %d, but got %d", original.MinItems, cloned.MinItems)
|
||||
}
|
||||
if cloned.CorrelationType != original.CorrelationType {
|
||||
t.Errorf("Expected CorrelationType %s, but got %s", original.CorrelationType, cloned.CorrelationType)
|
||||
}
|
||||
if cloned.Tolerance != original.Tolerance {
|
||||
t.Errorf("Expected Tolerance %f, but got %f", original.Tolerance, cloned.Tolerance)
|
||||
}
|
||||
if cloned.Stored != original.Stored {
|
||||
t.Errorf("Expected Stored %v, but got %v", original.Stored, cloned.Stored)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(cloned.Metrics, original.Metrics) {
|
||||
t.Errorf("Expected Metrics %v, but got %v", original.Metrics, cloned.Metrics)
|
||||
}
|
||||
if !reflect.DeepEqual(cloned.ThresholdIDs, original.ThresholdIDs) {
|
||||
t.Errorf("Expected ThresholdIDs %v, but got %v", original.ThresholdIDs, cloned.ThresholdIDs)
|
||||
}
|
||||
|
||||
if len(cloned.Metrics) > 0 && &cloned.Metrics[0] == &original.Metrics[0] {
|
||||
t.Errorf("Metrics slice was not deep copied")
|
||||
}
|
||||
if len(cloned.ThresholdIDs) > 0 && &cloned.ThresholdIDs[0] == &original.ThresholdIDs[0] {
|
||||
t.Errorf("ThresholdIDs slice was not deep copied")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTrendProfileTenantIDAndTrendProfileWithAPIOpts(t *testing.T) {
|
||||
|
||||
tp := &TrendProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "trend1",
|
||||
Schedule: "*/5 * * * *",
|
||||
StatID: "StatID",
|
||||
Metrics: []string{"metric1", "metric2"},
|
||||
TTL: 10 * time.Minute,
|
||||
QueueLength: 100,
|
||||
MinItems: 10,
|
||||
CorrelationType: "average",
|
||||
Tolerance: 0.05,
|
||||
Stored: true,
|
||||
ThresholdIDs: []string{"thresh1", "thresh2"},
|
||||
}
|
||||
|
||||
tenantID := tp.TenantID()
|
||||
|
||||
expectedTenantID := "cgrates.org" + utils.ConcatenatedKeySep + "trend1"
|
||||
if tenantID != expectedTenantID {
|
||||
t.Errorf("Expected TenantID %s, but got %s", expectedTenantID, tenantID)
|
||||
}
|
||||
|
||||
apiOpts := map[string]any{
|
||||
"option1": "value1",
|
||||
"option2": 42,
|
||||
}
|
||||
|
||||
tpWithAPIOpts := &TrendProfileWithAPIOpts{
|
||||
TrendProfile: tp,
|
||||
APIOpts: apiOpts,
|
||||
}
|
||||
|
||||
if tpWithAPIOpts.Tenant != "cgrates.org" {
|
||||
t.Errorf("Expected Tenant %s, but got %s", "cgrates.org", tpWithAPIOpts.Tenant)
|
||||
}
|
||||
if tpWithAPIOpts.ID != "trend1" {
|
||||
t.Errorf("Expected ID %s, but got %s", "trend1", tpWithAPIOpts.ID)
|
||||
}
|
||||
|
||||
expectedAPIOpts := map[string]any{
|
||||
"option1": "value1",
|
||||
"option2": 42,
|
||||
}
|
||||
if !reflect.DeepEqual(tpWithAPIOpts.APIOpts, expectedAPIOpts) {
|
||||
t.Errorf("Expected APIOpts %v, but got %v", expectedAPIOpts, tpWithAPIOpts.APIOpts)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestIndexesAppendMetric(t *testing.T) {
|
||||
|
||||
trend := &Trend{
|
||||
mLast: make(map[string]time.Time),
|
||||
mCounts: make(map[string]int),
|
||||
mTotals: make(map[string]float64),
|
||||
}
|
||||
|
||||
metric1 := &MetricWithTrend{ID: "metric1", Value: 5.0}
|
||||
metric2 := &MetricWithTrend{ID: "metric2", Value: 3.0}
|
||||
|
||||
rTime1 := time.Now()
|
||||
rTime2 := rTime1.Add(10 * time.Minute)
|
||||
|
||||
trend.indexesAppendMetric(metric1, rTime1)
|
||||
trend.indexesAppendMetric(metric2, rTime2)
|
||||
trend.indexesAppendMetric(metric1, rTime2)
|
||||
|
||||
expectedMLast := map[string]time.Time{
|
||||
"metric1": rTime2,
|
||||
"metric2": rTime2,
|
||||
}
|
||||
if !reflect.DeepEqual(trend.mLast, expectedMLast) {
|
||||
t.Errorf("Expected mLast %v, but got %v", expectedMLast, trend.mLast)
|
||||
}
|
||||
|
||||
expectedMCounts := map[string]int{
|
||||
"metric1": 2,
|
||||
"metric2": 1,
|
||||
}
|
||||
if !reflect.DeepEqual(trend.mCounts, expectedMCounts) {
|
||||
t.Errorf("Expected mCounts %v, but got %v", expectedMCounts, trend.mCounts)
|
||||
}
|
||||
|
||||
expectedMTotals := map[string]float64{
|
||||
"metric1": 10.0,
|
||||
"metric2": 3.0,
|
||||
}
|
||||
if !reflect.DeepEqual(trend.mTotals, expectedMTotals) {
|
||||
t.Errorf("Expected mTotals %v, but got %v", expectedMTotals, trend.mTotals)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTrendTenantID(t *testing.T) {
|
||||
trend := &Trend{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "ID",
|
||||
RunTimes: []time.Time{
|
||||
time.Now(),
|
||||
time.Now().Add(-1 * time.Hour),
|
||||
},
|
||||
Metrics: map[time.Time]map[string]*MetricWithTrend{
|
||||
time.Now(): {
|
||||
"metric1": {ID: "metric1", Value: 1.5},
|
||||
"metric2": {ID: "metric2", Value: 2.0},
|
||||
},
|
||||
time.Now().Add(-1 * time.Hour): {
|
||||
"metric1": {ID: "metric1", Value: 1.0},
|
||||
},
|
||||
},
|
||||
CompressedMetrics: []byte{0x00, 0x01},
|
||||
mLast: map[string]time.Time{
|
||||
"metric1": time.Now(),
|
||||
"metric2": time.Now().Add(-1 * time.Hour),
|
||||
},
|
||||
mCounts: map[string]int{
|
||||
"metric1": 2,
|
||||
"metric2": 1,
|
||||
},
|
||||
mTotals: map[string]float64{
|
||||
"metric1": 2.5,
|
||||
"metric2": 2.0,
|
||||
},
|
||||
tPrfl: &TrendProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "trendProfileID",
|
||||
Schedule: "0 * * * *",
|
||||
StatID: "statID1",
|
||||
QueueLength: 10,
|
||||
TTL: 5 * time.Minute,
|
||||
MinItems: 1,
|
||||
CorrelationType: "average",
|
||||
Tolerance: 0.1,
|
||||
Stored: true,
|
||||
ThresholdIDs: []string{"threshold1", "threshold2"},
|
||||
},
|
||||
}
|
||||
|
||||
tenantID := trend.TenantID()
|
||||
|
||||
expectedTenantID := "cgrates.org:ID"
|
||||
if tenantID != expectedTenantID {
|
||||
t.Errorf("Expected TenantID %v, but got %v", expectedTenantID, tenantID)
|
||||
}
|
||||
|
||||
if len(trend.RunTimes) != 2 {
|
||||
t.Errorf("Expected 2 run times, but got %d", len(trend.RunTimes))
|
||||
}
|
||||
|
||||
if len(trend.Metrics) != 2 {
|
||||
t.Errorf("Expected 2 metrics time entries, but got %d", len(trend.Metrics))
|
||||
}
|
||||
|
||||
if trend.tPrfl.QueueLength != 10 {
|
||||
t.Errorf("Expected QueueLength 10, but got %d", trend.tPrfl.QueueLength)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTComputeIndexes(t *testing.T) {
|
||||
runTime1 := time.Now()
|
||||
runTime2 := runTime1.Add(time.Minute)
|
||||
|
||||
trend := &Trend{
|
||||
RunTimes: []time.Time{runTime1, runTime2},
|
||||
Metrics: map[time.Time]map[string]*MetricWithTrend{
|
||||
runTime1: {
|
||||
"metric1": &MetricWithTrend{ID: "metric1", Value: 10.0},
|
||||
"metric2": &MetricWithTrend{ID: "metric2", Value: 20.0},
|
||||
},
|
||||
runTime2: {
|
||||
"metric1": &MetricWithTrend{ID: "metric1", Value: 15.0},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
trend.computeIndexes()
|
||||
|
||||
if trend.mLast["metric1"] != runTime2 {
|
||||
t.Errorf("Expected last time for metric1 to be %v, got %v", runTime2, trend.mLast["metric1"])
|
||||
}
|
||||
|
||||
if trend.mCounts["metric1"] != 2 {
|
||||
t.Errorf("Expected count for metric1 to be 2, got %d", trend.mCounts["metric1"])
|
||||
}
|
||||
|
||||
if trend.mTotals["metric1"] != 25.0 {
|
||||
t.Errorf("Expected total for metric1 to be 25.0, got %f", trend.mTotals["metric1"])
|
||||
}
|
||||
|
||||
if trend.mLast["metric2"] != runTime1 {
|
||||
t.Errorf("Expected last time for metric2 to be %v, got %v", runTime1, trend.mLast["metric2"])
|
||||
}
|
||||
|
||||
if trend.mCounts["metric2"] != 1 {
|
||||
t.Errorf("Expected count for metric2 to be 1, got %d", trend.mCounts["metric2"])
|
||||
}
|
||||
|
||||
if trend.mTotals["metric2"] != 20.0 {
|
||||
t.Errorf("Expected total for metric2 to be 20.0, got %f", trend.mTotals["metric2"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetTrendLabel(t *testing.T) {
|
||||
trend := &Trend{}
|
||||
|
||||
tests := []struct {
|
||||
tGrowth float64
|
||||
tolerance float64
|
||||
expected string
|
||||
}{
|
||||
{1.0, 0.5, utils.MetaPositive},
|
||||
{-1.0, 0.5, utils.MetaNegative},
|
||||
{0.0, 0.5, utils.MetaConstant},
|
||||
{0.3, 0.5, utils.MetaConstant},
|
||||
{-0.3, 0.5, utils.MetaConstant},
|
||||
{0.6, 0.5, utils.MetaPositive},
|
||||
{-0.6, 0.5, utils.MetaNegative},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
result := trend.getTrendLabel(test.tGrowth, test.tolerance)
|
||||
if result != test.expected {
|
||||
t.Errorf("For tGrowth: %f and tolerance: %f, expected %s, got %s", test.tGrowth, test.tolerance, test.expected, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetTrendGrowth(t *testing.T) {
|
||||
|
||||
trend := Trend{
|
||||
mLast: map[string]time.Time{},
|
||||
Metrics: map[time.Time]map[string]*MetricWithTrend{},
|
||||
mTotals: map[string]float64{},
|
||||
mCounts: map[string]int{},
|
||||
}
|
||||
|
||||
_, err := trend.getTrendGrowth("unknownID", 100, utils.MetaLast, 2)
|
||||
if !errors.Is(err, utils.ErrNotFound) {
|
||||
t.Errorf("Expected error ErrNotFound, got: %v", err)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
trend.mLast["metric1"] = now
|
||||
|
||||
_, err = trend.getTrendGrowth("metric1", 100, utils.MetaLast, 2)
|
||||
if !errors.Is(err, utils.ErrNotFound) {
|
||||
t.Errorf("Expected error ErrNotFound, got: %v", err)
|
||||
}
|
||||
|
||||
trend.Metrics = map[time.Time]map[string]*MetricWithTrend{
|
||||
now: {
|
||||
"metric1": {ID: "metric1", Value: 80},
|
||||
},
|
||||
}
|
||||
|
||||
got, err := trend.getTrendGrowth("metric1", 100, utils.MetaLast, 2)
|
||||
if err != nil || got != 25.0 {
|
||||
t.Errorf("Mismatch for MetaLast correlation. Got: %v, expected: %v", got, 25.0)
|
||||
}
|
||||
|
||||
trend.mTotals = map[string]float64{
|
||||
"metric1": 400,
|
||||
}
|
||||
trend.mCounts = map[string]int{
|
||||
"metric1": 4,
|
||||
}
|
||||
|
||||
got, err = trend.getTrendGrowth("metric1", 120, utils.MetaAverage, 2)
|
||||
if err != nil || got != 20.0 {
|
||||
t.Errorf("Mismatch for MetaAverage correlation. Got: %v, expected: %v", got, 20.0)
|
||||
}
|
||||
|
||||
_, err = trend.getTrendGrowth("metric1", 100, "invalidCorrelation", 2)
|
||||
if !errors.Is(err, utils.ErrCorrelationUndefined) {
|
||||
t.Errorf("Expected error ErrCorrelationUndefined, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewTrendFromProfile(t *testing.T) {
|
||||
profile := &TrendProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "trendProfileID",
|
||||
Schedule: "@every 1sec",
|
||||
StatID: "statID1",
|
||||
QueueLength: 10,
|
||||
TTL: 5 * time.Minute,
|
||||
MinItems: 1,
|
||||
CorrelationType: "average",
|
||||
Tolerance: 0.1,
|
||||
Stored: true,
|
||||
ThresholdIDs: []string{"threshold1", "threshold2"},
|
||||
}
|
||||
|
||||
trend := NewTrendFromProfile(profile)
|
||||
|
||||
if trend.Tenant != profile.Tenant {
|
||||
t.Errorf("Expected Tenant %s, got %s", profile.Tenant, trend.Tenant)
|
||||
}
|
||||
if trend.ID != profile.ID {
|
||||
t.Errorf("Expected ID %s, got %s", profile.ID, trend.ID)
|
||||
}
|
||||
if trend.RunTimes == nil {
|
||||
t.Errorf("Expected RunTimes to be initialized, got nil")
|
||||
}
|
||||
if len(trend.RunTimes) != 0 {
|
||||
t.Errorf("Expected RunTimes to be empty, got length %d", len(trend.RunTimes))
|
||||
}
|
||||
if trend.Metrics == nil {
|
||||
t.Errorf("Expected Metrics to be initialized, got nil")
|
||||
}
|
||||
if len(trend.Metrics) != 0 {
|
||||
t.Errorf("Expected Metrics to be empty, got length %d", len(trend.Metrics))
|
||||
}
|
||||
if trend.tPrfl != profile {
|
||||
t.Errorf("Expected tPrfl to point to the original profile, got a different value")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTrendProfileFieldAsString(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
fldPath []string
|
||||
err error
|
||||
val any
|
||||
}{
|
||||
{utils.ID, []string{utils.ID}, nil, "Trend1"},
|
||||
{utils.Tenant, []string{utils.Tenant}, nil, "cgrates.org"},
|
||||
{utils.Schedule, []string{utils.Schedule}, nil, "@every 1m"},
|
||||
{utils.StatID, []string{utils.StatID}, nil, "Stat1"},
|
||||
{utils.Metrics, []string{utils.Metrics + "[0]"}, nil, "*acc"},
|
||||
{utils.Metrics, []string{utils.Metrics + "[1]"}, nil, "*tcd"},
|
||||
{utils.TTL, []string{utils.TTL}, nil, 10 * time.Minute},
|
||||
{utils.QueueLength, []string{utils.QueueLength}, nil, 100},
|
||||
{utils.MinItems, []string{utils.MinItems}, nil, 10},
|
||||
{utils.CorrelationType, []string{utils.CorrelationType}, nil, "*average"},
|
||||
{utils.Tolerance, []string{utils.Tolerance}, nil, 0.05},
|
||||
{utils.Stored, []string{utils.Stored}, nil, true},
|
||||
{utils.ThresholdIDs, []string{utils.ThresholdIDs + "[0]"}, nil, "Thresh1"},
|
||||
{utils.ThresholdIDs, []string{utils.ThresholdIDs + "[1]"}, nil, "Thresh2"},
|
||||
{"NonExistingField", []string{"Field1"}, utils.ErrNotFound, nil},
|
||||
}
|
||||
rp := &TrendProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "Trend1",
|
||||
Schedule: "@every 1m",
|
||||
StatID: "Stat1",
|
||||
Metrics: []string{"*acc", "*tcd"},
|
||||
TTL: 10 * time.Minute,
|
||||
QueueLength: 100,
|
||||
MinItems: 10,
|
||||
CorrelationType: "*average",
|
||||
Tolerance: 0.05,
|
||||
Stored: true,
|
||||
ThresholdIDs: []string{"Thresh1", "Thresh2"},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
val, err := rp.FieldAsInterface(tc.fldPath)
|
||||
if tc.err != nil {
|
||||
if err == nil {
|
||||
t.Error("expect to receive an error")
|
||||
}
|
||||
if tc.err != err {
|
||||
t.Errorf("expected %v,received %v", tc.err, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
}
|
||||
if val != tc.val {
|
||||
t.Errorf("expected %v,received %v", tc.val, val)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTrendCleanUp(t *testing.T) {
|
||||
tm := time.Now().Add(-19 * time.Second)
|
||||
tm2 := tm.Add(15 * time.Second)
|
||||
tm3 := time.Now().Add(1)
|
||||
tm4 := time.Now().Add(-5 * time.Second)
|
||||
tm5 := time.Now().Add(-3 * time.Second)
|
||||
tr := &Trend{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "TREND1",
|
||||
RunTimes: []time.Time{
|
||||
tm,
|
||||
tm2,
|
||||
tm3,
|
||||
tm4,
|
||||
tm5,
|
||||
},
|
||||
Metrics: map[time.Time]map[string]*MetricWithTrend{
|
||||
tm: {
|
||||
utils.MetaTCC: {ID: utils.MetaTCC, Value: 13, TrendGrowth: -1, TrendLabel: utils.NotAvailable},
|
||||
utils.MetaACC: {ID: utils.MetaACC, Value: 13, TrendGrowth: -1, TrendLabel: utils.NotAvailable},
|
||||
},
|
||||
tm2: {
|
||||
utils.MetaTCC: {ID: utils.MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: utils.MetaPositive},
|
||||
utils.MetaACC: {ID: utils.MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: utils.MetaPositive},
|
||||
},
|
||||
tm3: {
|
||||
utils.MetaTCC: {ID: utils.MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: utils.MetaPositive},
|
||||
utils.MetaACC: {ID: utils.MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: utils.MetaPositive},
|
||||
},
|
||||
tm4: {
|
||||
utils.MetaTCC: {ID: utils.MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: utils.MetaPositive},
|
||||
utils.MetaACC: {ID: utils.MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: utils.MetaPositive},
|
||||
},
|
||||
tm5: {
|
||||
utils.MetaTCC: {ID: utils.MetaTCC, Value: 30, TrendGrowth: 120, TrendLabel: utils.MetaPositive},
|
||||
utils.MetaACC: {ID: utils.MetaACC, Value: 15, TrendGrowth: 4, TrendLabel: utils.MetaPositive},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
ttl time.Duration
|
||||
tr *Trend
|
||||
qLength int
|
||||
altered bool
|
||||
runtimesmetriclens int
|
||||
}{
|
||||
{"TTLlgThan0", 10 * time.Second, tr, 3, true, 3},
|
||||
{"QueueLenLg0", -1, tr, 2, true, 2},
|
||||
{"TLLExpiredAll", 2 * time.Second, tr, 0, true, 0},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
altered := tc.tr.cleanup(tc.ttl, tc.qLength)
|
||||
if tc.altered {
|
||||
if !altered {
|
||||
t.Errorf("expected trend to be altered")
|
||||
}
|
||||
if len(tc.tr.RunTimes) != tc.runtimesmetriclens || len(tc.tr.Metrics) != tc.runtimesmetriclens {
|
||||
t.Errorf("expected len to be %d,got %d metrics,%d runtimes", tc.runtimesmetriclens, len(tc.tr.Metrics), len(tc.tr.RunTimes))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
if altered {
|
||||
t.Error("expected trend to not be altered")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTrendProfileString(t *testing.T) {
|
||||
profile := &TrendProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "Trend1",
|
||||
Schedule: "@every 1m",
|
||||
StatID: "Stat1",
|
||||
Metrics: []string{"*acc", "*tcd"},
|
||||
TTL: 10 * time.Minute,
|
||||
QueueLength: 100,
|
||||
MinItems: 10,
|
||||
CorrelationType: "*average",
|
||||
Tolerance: 0.05,
|
||||
Stored: true,
|
||||
ThresholdIDs: []string{"Thresh1", "Thresh2"},
|
||||
}
|
||||
|
||||
jsonStr := profile.String()
|
||||
if jsonStr == "" {
|
||||
t.Errorf("Expected non-empty JSON string representation of TrendProfile")
|
||||
}
|
||||
|
||||
expectedFields := []string{
|
||||
`"cgrates.org"`, `"Trend1"`, `"@every 1m"`, `"Stat1"`,
|
||||
`"*acc"`, `"*tcd"`, `"*average"`, `true`, `"Thresh1"`, `"Thresh2"`,
|
||||
}
|
||||
|
||||
for _, field := range expectedFields {
|
||||
if !strings.Contains(jsonStr, field) {
|
||||
t.Errorf("Expected JSON output to contain field: %s", field)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTrendProfileFieldAssString(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
fldPath []string
|
||||
expected string
|
||||
hasError bool
|
||||
}{
|
||||
{
|
||||
name: "Valid Tenant Field",
|
||||
fldPath: []string{"Tenant"},
|
||||
expected: "cgrates.org",
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "Valid ID Field",
|
||||
fldPath: []string{"ID"},
|
||||
expected: "Trend1",
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "Valid Schedule Field",
|
||||
fldPath: []string{"Schedule"},
|
||||
expected: "@every 1m",
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "Invalid Field Path",
|
||||
fldPath: []string{"NonExistentField"},
|
||||
expected: "",
|
||||
hasError: true,
|
||||
},
|
||||
}
|
||||
|
||||
tp := &TrendProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "Trend1",
|
||||
Schedule: "@every 1m",
|
||||
StatID: "Stat1",
|
||||
Metrics: []string{"*acc", "*tcd"},
|
||||
TTL: 10 * time.Minute,
|
||||
QueueLength: 100,
|
||||
MinItems: 10,
|
||||
CorrelationType: "*average",
|
||||
Tolerance: 0.05,
|
||||
Stored: true,
|
||||
ThresholdIDs: []string{"Thresh1", "Thresh2"},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result, err := tp.FieldAsString(tt.fldPath)
|
||||
|
||||
if tt.hasError && err == nil {
|
||||
t.Errorf("Expected error for fldPath %v, but got none", tt.fldPath)
|
||||
} else if !tt.hasError && err != nil {
|
||||
t.Errorf("Expected no error for fldPath %v, but got: %v", tt.fldPath, err)
|
||||
}
|
||||
|
||||
if result != tt.expected {
|
||||
t.Errorf("For fldPath %v, expected %v, but got %v", tt.fldPath, tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTrendProfileSet(t *testing.T) {
|
||||
tp := &TrendProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "Trend1",
|
||||
Schedule: "@every 1m",
|
||||
StatID: "Stat1",
|
||||
Metrics: []string{"*acc", "*tcd"},
|
||||
TTL: 10 * time.Minute,
|
||||
QueueLength: 100,
|
||||
MinItems: 10,
|
||||
CorrelationType: "*average",
|
||||
Tolerance: 0.05,
|
||||
Stored: true,
|
||||
ThresholdIDs: []string{"Thresh1", "Thresh2"},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
path []string
|
||||
val any
|
||||
expected any
|
||||
hasError bool
|
||||
}{
|
||||
{
|
||||
name: "Set Tenant",
|
||||
path: []string{utils.Tenant},
|
||||
val: "newTenant",
|
||||
expected: "newTenant",
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "Set ID",
|
||||
path: []string{utils.ID},
|
||||
val: "newID",
|
||||
expected: "newID",
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "Set Schedule",
|
||||
path: []string{utils.Schedule},
|
||||
val: "@every 2m",
|
||||
expected: "@every 2m",
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "Set StatID",
|
||||
path: []string{utils.StatID},
|
||||
val: "newStatID",
|
||||
expected: "newStatID",
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "Set Metrics",
|
||||
path: []string{utils.Metrics},
|
||||
val: []string{"*newMetric"},
|
||||
expected: []string{"*acc", "*tcd", "*newMetric"},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "Set TTL",
|
||||
path: []string{utils.TTL},
|
||||
val: "15m",
|
||||
expected: 15 * time.Minute,
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "Set QueueLength",
|
||||
path: []string{utils.QueueLength},
|
||||
val: 50,
|
||||
expected: 50,
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "Set MinItems",
|
||||
path: []string{utils.MinItems},
|
||||
val: 20,
|
||||
expected: 20,
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "Set CorrelationType",
|
||||
path: []string{utils.CorrelationType},
|
||||
val: "*sum",
|
||||
expected: "*sum",
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "Set Tolerance",
|
||||
path: []string{utils.Tolerance},
|
||||
val: 0.1,
|
||||
expected: 0.1,
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "Set Stored",
|
||||
path: []string{utils.Stored},
|
||||
val: false,
|
||||
expected: false,
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "Set ThresholdIDs",
|
||||
path: []string{utils.ThresholdIDs},
|
||||
val: []string{"Thresh3", "Thresh4"},
|
||||
expected: []string{"Thresh1", "Thresh2", "Thresh3", "Thresh4"},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "Set Invalid Path",
|
||||
path: []string{"InvalidPath"},
|
||||
val: "invalid",
|
||||
expected: nil,
|
||||
hasError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := tp.Set(tt.path, tt.val, false)
|
||||
|
||||
if tt.hasError && err == nil {
|
||||
t.Errorf("Expected error for path %v, but got none", tt.path)
|
||||
} else if !tt.hasError && err != nil {
|
||||
t.Errorf("Expected no error for path %v, but got: %v", tt.path, err)
|
||||
}
|
||||
|
||||
switch tt.path[0] {
|
||||
case utils.Tenant:
|
||||
if tp.Tenant != tt.expected {
|
||||
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.Tenant)
|
||||
}
|
||||
case utils.ID:
|
||||
if tp.ID != tt.expected {
|
||||
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.ID)
|
||||
}
|
||||
case utils.Schedule:
|
||||
if tp.Schedule != tt.expected {
|
||||
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.Schedule)
|
||||
}
|
||||
case utils.StatID:
|
||||
if tp.StatID != tt.expected {
|
||||
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.StatID)
|
||||
}
|
||||
case utils.Metrics:
|
||||
if len(tp.Metrics) != len(tt.expected.([]string)) {
|
||||
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.Metrics)
|
||||
} else {
|
||||
for i := range tp.Metrics {
|
||||
if tp.Metrics[i] != tt.expected.([]string)[i] {
|
||||
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.Metrics)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
case utils.TTL:
|
||||
if tp.TTL != tt.expected {
|
||||
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.TTL)
|
||||
}
|
||||
case utils.QueueLength:
|
||||
if tp.QueueLength != tt.expected {
|
||||
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.QueueLength)
|
||||
}
|
||||
case utils.MinItems:
|
||||
if tp.MinItems != tt.expected {
|
||||
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.MinItems)
|
||||
}
|
||||
case utils.CorrelationType:
|
||||
if tp.CorrelationType != tt.expected {
|
||||
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.CorrelationType)
|
||||
}
|
||||
case utils.Tolerance:
|
||||
if tp.Tolerance != tt.expected {
|
||||
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.Tolerance)
|
||||
}
|
||||
case utils.Stored:
|
||||
if tp.Stored != tt.expected {
|
||||
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.Stored)
|
||||
}
|
||||
case utils.ThresholdIDs:
|
||||
if len(tp.ThresholdIDs) != len(tt.expected.([]string)) {
|
||||
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.ThresholdIDs)
|
||||
} else {
|
||||
for i := range tp.ThresholdIDs {
|
||||
if tp.ThresholdIDs[i] != tt.expected.([]string)[i] {
|
||||
t.Errorf("For path %v, expected %v, but got %v", tt.path, tt.expected, tp.ThresholdIDs)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTrendProfileMergeV2(t *testing.T) {
|
||||
tp1 := &TrendProfile{
|
||||
Tenant: "tenant1",
|
||||
ID: "id1",
|
||||
Schedule: "schedule1",
|
||||
StatID: "stat1",
|
||||
Metrics: []string{"metric1", "metric2"},
|
||||
ThresholdIDs: []string{"threshold1"},
|
||||
Stored: true,
|
||||
TTL: 10,
|
||||
QueueLength: 100,
|
||||
MinItems: 5,
|
||||
CorrelationType: "type1",
|
||||
Tolerance: 0.5,
|
||||
}
|
||||
|
||||
tp2 := &TrendProfile{
|
||||
Tenant: "tenant2",
|
||||
ID: "id2",
|
||||
Schedule: "schedule2",
|
||||
StatID: "stat2",
|
||||
Metrics: []string{"metric3", "metric4"},
|
||||
ThresholdIDs: []string{"threshold2"},
|
||||
Stored: true,
|
||||
TTL: 20,
|
||||
QueueLength: 200,
|
||||
MinItems: 10,
|
||||
CorrelationType: "type2",
|
||||
Tolerance: 1.5,
|
||||
}
|
||||
|
||||
tp1.Merge(tp2)
|
||||
|
||||
if tp1.Tenant != "tenant2" {
|
||||
t.Errorf("Expected Tenant to be 'tenant2', but got: %s", tp1.Tenant)
|
||||
}
|
||||
|
||||
if tp1.ID != "id2" {
|
||||
t.Errorf("Expected ID to be 'id2', but got: %s", tp1.ID)
|
||||
}
|
||||
|
||||
if tp1.Schedule != "schedule2" {
|
||||
t.Errorf("Expected Schedule to be 'schedule2', but got: %s", tp1.Schedule)
|
||||
}
|
||||
|
||||
if tp1.StatID != "stat2" {
|
||||
t.Errorf("Expected StatID to be 'stat2', but got: %s", tp1.StatID)
|
||||
}
|
||||
|
||||
expectedMetrics := []string{"metric1", "metric2", "metric3", "metric4"}
|
||||
if len(tp1.Metrics) != len(expectedMetrics) {
|
||||
t.Errorf("Expected Metrics to be %v, but got: %v", expectedMetrics, tp1.Metrics)
|
||||
}
|
||||
|
||||
expectedThresholdIDs := []string{"threshold1", "threshold2"}
|
||||
if len(tp1.ThresholdIDs) != len(expectedThresholdIDs) {
|
||||
t.Errorf("Expected ThresholdIDs to be %v, but got: %v", expectedThresholdIDs, tp1.ThresholdIDs)
|
||||
}
|
||||
|
||||
if tp1.Stored != true {
|
||||
t.Errorf("Expected Stored to be 'true', but got: %v", tp1.Stored)
|
||||
}
|
||||
|
||||
if tp1.TTL != 20 {
|
||||
t.Errorf("Expected TTL to be 20, but got: %d", tp1.TTL)
|
||||
}
|
||||
|
||||
if tp1.QueueLength != 200 {
|
||||
t.Errorf("Expected QueueLength to be 200, but got: %d", tp1.QueueLength)
|
||||
}
|
||||
|
||||
if tp1.MinItems != 10 {
|
||||
t.Errorf("Expected MinItems to be 10, but got: %d", tp1.MinItems)
|
||||
}
|
||||
|
||||
if tp1.CorrelationType != "type2" {
|
||||
t.Errorf("Expected CorrelationType to be 'type2', but got: %s", tp1.CorrelationType)
|
||||
}
|
||||
|
||||
if tp1.Tolerance != 1.5 {
|
||||
t.Errorf("Expected Tolerance to be 1.5, but got: %f", tp1.Tolerance)
|
||||
}
|
||||
}
|
||||
@@ -820,8 +820,8 @@ func APItoModelTrends(tr *utils.TPTrendsProfile) (mdls TrendMdls) {
|
||||
return
|
||||
}
|
||||
|
||||
func APItoTrends(tpTR *utils.TPTrendsProfile) (tr *TrendProfile, err error) {
|
||||
tr = &TrendProfile{
|
||||
func APItoTrends(tpTR *utils.TPTrendsProfile) (tr *utils.TrendProfile, err error) {
|
||||
tr = &utils.TrendProfile{
|
||||
Tenant: tpTR.Tenant,
|
||||
ID: tpTR.ID,
|
||||
StatID: tpTR.StatID,
|
||||
@@ -844,7 +844,7 @@ func APItoTrends(tpTR *utils.TPTrendsProfile) (tr *TrendProfile, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func TrendProfileToAPI(tr *TrendProfile) (tpTR *utils.TPTrendsProfile) {
|
||||
func TrendProfileToAPI(tr *utils.TrendProfile) (tpTR *utils.TPTrendsProfile) {
|
||||
tpTR = &utils.TPTrendsProfile{
|
||||
Tenant: tr.Tenant,
|
||||
ID: tr.ID,
|
||||
|
||||
@@ -70,11 +70,11 @@ type DataDB interface {
|
||||
SetRankingDrv(ctx *context.Context, rn *Ranking) (err error)
|
||||
GetRankingDrv(ctx *context.Context, tenant string, id string) (sq *Ranking, err error)
|
||||
RemoveRankingDrv(ctx *context.Context, tenant string, id string) (err error)
|
||||
SetTrendProfileDrv(ctx *context.Context, sq *TrendProfile) (err error)
|
||||
GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sq *TrendProfile, err error)
|
||||
SetTrendProfileDrv(ctx *context.Context, sq *utils.TrendProfile) (err error)
|
||||
GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sq *utils.TrendProfile, err error)
|
||||
RemTrendProfileDrv(ctx *context.Context, tenant string, id string) (err error)
|
||||
GetTrendDrv(ctx *context.Context, tenant string, id string) (*Trend, error)
|
||||
SetTrendDrv(ctx *context.Context, tr *Trend) error
|
||||
GetTrendDrv(ctx *context.Context, tenant string, id string) (*utils.Trend, error)
|
||||
SetTrendDrv(ctx *context.Context, tr *utils.Trend) error
|
||||
RemoveTrendDrv(ctx *context.Context, tenant string, id string) error
|
||||
GetFilterDrv(ctx *context.Context, tnt string, id string) (*Filter, error)
|
||||
SetFilterDrv(ctx *context.Context, f *Filter) error
|
||||
|
||||
@@ -326,7 +326,7 @@ func (iDB *InternalDB) GetThresholdProfileDrv(_ *context.Context, tenant, id str
|
||||
return x.(*ThresholdProfile), nil
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) SetTrendProfileDrv(_ *context.Context, srp *TrendProfile) (err error) {
|
||||
func (iDB *InternalDB) SetTrendProfileDrv(_ *context.Context, srp *utils.TrendProfile) (err error) {
|
||||
iDB.db.Set(utils.CacheTrendProfiles, srp.TenantID(), srp, nil, true, utils.NonTransactional)
|
||||
return nil
|
||||
}
|
||||
@@ -336,23 +336,23 @@ func (iDB *InternalDB) RemTrendProfileDrv(_ *context.Context, tenant, id string)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) GetTrendProfileDrv(_ *context.Context, tenant, id string) (sg *TrendProfile, err error) {
|
||||
func (iDB *InternalDB) GetTrendProfileDrv(_ *context.Context, tenant, id string) (sg *utils.TrendProfile, err error) {
|
||||
x, ok := iDB.db.Get(utils.CacheTrendProfiles, utils.ConcatenatedKey(tenant, id))
|
||||
if !ok || x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*TrendProfile), nil
|
||||
return x.(*utils.TrendProfile), nil
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) GetTrendDrv(_ *context.Context, tenant, id string) (th *Trend, err error) {
|
||||
func (iDB *InternalDB) GetTrendDrv(_ *context.Context, tenant, id string) (th *utils.Trend, err error) {
|
||||
x, ok := iDB.db.Get(utils.CacheTrends, utils.ConcatenatedKey(tenant, id))
|
||||
if !ok || x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*Trend), nil
|
||||
return x.(*utils.Trend), nil
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) SetTrendDrv(_ *context.Context, tr *Trend) (err error) {
|
||||
func (iDB *InternalDB) SetTrendDrv(_ *context.Context, tr *utils.Trend) (err error) {
|
||||
iDB.db.Set(utils.CacheTrends, tr.TenantID(), tr, nil,
|
||||
true, utils.NonTransactional)
|
||||
return
|
||||
|
||||
@@ -804,8 +804,8 @@ func (ms *MongoStorage) RemoveRankingDrv(ctx *context.Context, tenant, id string
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTrendProfileDrv(ctx *context.Context, tenant, id string) (*TrendProfile, error) {
|
||||
srProfile := new(TrendProfile)
|
||||
func (ms *MongoStorage) GetTrendProfileDrv(ctx *context.Context, tenant, id string) (*utils.TrendProfile, error) {
|
||||
srProfile := new(utils.TrendProfile)
|
||||
err := ms.query(ctx, func(sctx mongo.SessionContext) error {
|
||||
sr := ms.getCol(ColTrs).FindOne(sctx, bson.M{"tenant": tenant, "id": id})
|
||||
decodeErr := sr.Decode(srProfile)
|
||||
@@ -817,7 +817,7 @@ func (ms *MongoStorage) GetTrendProfileDrv(ctx *context.Context, tenant, id stri
|
||||
return srProfile, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTrendProfileDrv(ctx *context.Context, srp *TrendProfile) (err error) {
|
||||
func (ms *MongoStorage) SetTrendProfileDrv(ctx *context.Context, srp *utils.TrendProfile) (err error) {
|
||||
return ms.query(ctx, func(sctx mongo.SessionContext) error {
|
||||
_, err := ms.getCol(ColTrs).UpdateOne(sctx, bson.M{"tenant": srp.Tenant, "id": srp.ID},
|
||||
bson.M{"$set": srp},
|
||||
@@ -836,8 +836,8 @@ func (ms *MongoStorage) RemTrendProfileDrv(ctx *context.Context, tenant, id stri
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (*Trend, error) {
|
||||
tr := new(Trend)
|
||||
func (ms *MongoStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (*utils.Trend, error) {
|
||||
tr := new(utils.Trend)
|
||||
err := ms.query(ctx, func(sctx mongo.SessionContext) error {
|
||||
sr := ms.getCol(ColTrd).FindOne(sctx, bson.M{"tenant": tenant, "id": id})
|
||||
decodeErr := sr.Decode(tr)
|
||||
@@ -849,7 +849,7 @@ func (ms *MongoStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (*T
|
||||
return tr, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTrendDrv(ctx *context.Context, tr *Trend) error {
|
||||
func (ms *MongoStorage) SetTrendDrv(ctx *context.Context, tr *utils.Trend) error {
|
||||
return ms.query(ctx, func(sctx mongo.SessionContext) error {
|
||||
_, err := ms.getCol(ColTrd).UpdateOne(sctx, bson.M{"tenant": tr.Tenant, "id": tr.ID},
|
||||
bson.M{"$set": tr},
|
||||
|
||||
@@ -529,7 +529,7 @@ func (rs *RedisStorage) RemStatQueueDrv(ctx *context.Context, tenant, id string)
|
||||
return rs.Cmd(nil, redisDEL, utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id))
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetTrendProfileDrv(ctx *context.Context, sg *TrendProfile) (err error) {
|
||||
func (rs *RedisStorage) SetTrendProfileDrv(ctx *context.Context, sg *utils.TrendProfile) (err error) {
|
||||
var result []byte
|
||||
if result, err = rs.ms.Marshal(sg); err != nil {
|
||||
return
|
||||
@@ -537,7 +537,7 @@ func (rs *RedisStorage) SetTrendProfileDrv(ctx *context.Context, sg *TrendProfil
|
||||
return rs.Cmd(nil, redisSET, utils.TrendProfilePrefix+utils.ConcatenatedKey(sg.Tenant, sg.ID), string(result))
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sg *TrendProfile, err error) {
|
||||
func (rs *RedisStorage) GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sg *utils.TrendProfile, err error) {
|
||||
var values []byte
|
||||
if err = rs.Cmd(&values, redisGET, utils.TrendProfilePrefix+utils.ConcatenatedKey(tenant, id)); err != nil {
|
||||
return
|
||||
@@ -553,7 +553,7 @@ func (rs *RedisStorage) RemTrendProfileDrv(ctx *context.Context, tenant string,
|
||||
return rs.Cmd(nil, redisDEL, utils.TrendProfilePrefix+utils.ConcatenatedKey(tenant, id))
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (r *Trend, err error) {
|
||||
func (rs *RedisStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (r *utils.Trend, err error) {
|
||||
var values []byte
|
||||
if err = rs.Cmd(&values, redisGET, utils.TrendPrefix+utils.ConcatenatedKey(tenant, id)); err != nil {
|
||||
return
|
||||
@@ -565,7 +565,7 @@ func (rs *RedisStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (r
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetTrendDrv(ctx *context.Context, r *Trend) (err error) {
|
||||
func (rs *RedisStorage) SetTrendDrv(ctx *context.Context, r *utils.Trend) (err error) {
|
||||
var result []byte
|
||||
if result, err = rs.ms.Marshal(r); err != nil {
|
||||
return
|
||||
|
||||
@@ -447,7 +447,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) {
|
||||
log.Print("TrendProfiles:")
|
||||
}
|
||||
for _, tpTR := range tpr.trProfiles {
|
||||
var tr *TrendProfile
|
||||
var tr *utils.TrendProfile
|
||||
if tr, err = APItoTrends(tpTR); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
570
engine/trends.go
570
engine/trends.go
@@ -1,570 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/cron"
|
||||
)
|
||||
|
||||
func NewTrendService(dm *DataManager,
|
||||
cgrcfg *config.CGRConfig, filterS *FilterS, connMgr *ConnManager) (tS *TrendS) {
|
||||
return &TrendS{
|
||||
dm: dm,
|
||||
cfg: cgrcfg,
|
||||
fltrS: filterS,
|
||||
connMgr: connMgr,
|
||||
loopStopped: make(chan struct{}),
|
||||
crn: cron.New(),
|
||||
crnTQs: make(map[string]map[string]cron.EntryID),
|
||||
crnTQsMux: new(sync.RWMutex),
|
||||
storedTrends: make(utils.StringSet),
|
||||
storingStopped: make(chan struct{}),
|
||||
trendStop: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// TrendS is responsible of implementing the logic of TrendService
|
||||
type TrendS struct {
|
||||
dm *DataManager
|
||||
cfg *config.CGRConfig
|
||||
fltrS *FilterS
|
||||
connMgr *ConnManager
|
||||
|
||||
crn *cron.Cron // cron refernce
|
||||
|
||||
crnTQsMux *sync.RWMutex // protects the crnTQs
|
||||
crnTQs map[string]map[string]cron.EntryID // save the EntryIDs for TrendQueries so we can reschedule them when needed
|
||||
|
||||
storedTrends utils.StringSet // keep a record of trends which need saving, map[trendTenantID]bool
|
||||
sTrndsMux sync.RWMutex // protects storedTrends
|
||||
storingStopped chan struct{} // signal back that the operations were stopped
|
||||
|
||||
loopStopped chan struct{}
|
||||
trendStop chan struct{} // signal to stop all operations
|
||||
}
|
||||
|
||||
// computeTrend will query a stat and build the Trend for it
|
||||
//
|
||||
// it is to be called by Cron service
|
||||
func (tS *TrendS) computeTrend(ctx *context.Context, tP *TrendProfile) {
|
||||
var floatMetrics map[string]float64
|
||||
if err := tS.connMgr.Call(context.Background(), tS.cfg.TrendSCfg().StatSConns,
|
||||
utils.StatSv1GetQueueFloatMetrics,
|
||||
&utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: tP.Tenant, ID: tP.StatID}},
|
||||
&floatMetrics); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> computing trend with id: <%s:%s> for stats <%s> error: <%s>",
|
||||
utils.TrendS, tP.Tenant, tP.ID, tP.StatID, err.Error()))
|
||||
return
|
||||
}
|
||||
trnd, err := tS.dm.GetTrend(ctx, tP.Tenant, tP.ID, true, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> querying trend with id: <%s:%s> dm error: <%s>",
|
||||
utils.TrendS, tP.Tenant, tP.ID, err.Error()))
|
||||
return
|
||||
}
|
||||
trnd.tMux.Lock()
|
||||
defer trnd.tMux.Unlock()
|
||||
if trnd.tPrfl == nil {
|
||||
trnd.tPrfl = tP
|
||||
}
|
||||
trnd.Compile(tP.TTL, tP.QueueLength)
|
||||
now := time.Now()
|
||||
var metrics []string
|
||||
if len(tP.Metrics) != 0 {
|
||||
metrics = tP.Metrics // read only
|
||||
}
|
||||
if len(metrics) == 0 { // unlimited metrics in trend
|
||||
for mID := range floatMetrics {
|
||||
metrics = append(metrics, mID)
|
||||
}
|
||||
}
|
||||
if len(metrics) == 0 {
|
||||
return // nothing to compute
|
||||
}
|
||||
trnd.RunTimes = append(trnd.RunTimes, now)
|
||||
if trnd.Metrics == nil {
|
||||
trnd.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
|
||||
}
|
||||
trnd.Metrics[now] = make(map[string]*MetricWithTrend)
|
||||
for _, mID := range metrics {
|
||||
mWt := &MetricWithTrend{ID: mID}
|
||||
var has bool
|
||||
if mWt.Value, has = floatMetrics[mID]; !has { // no stats computed for metric
|
||||
mWt.Value = -1.0
|
||||
mWt.TrendLabel = utils.NotAvailable
|
||||
continue
|
||||
}
|
||||
if mWt.TrendGrowth, err = trnd.getTrendGrowth(mID, mWt.Value, tP.CorrelationType,
|
||||
tS.cfg.GeneralCfg().RoundingDecimals); err != nil {
|
||||
mWt.TrendLabel = utils.NotAvailable
|
||||
} else {
|
||||
mWt.TrendLabel = trnd.getTrendLabel(mWt.TrendGrowth, tP.Tolerance)
|
||||
}
|
||||
trnd.Metrics[now][mWt.ID] = mWt
|
||||
trnd.indexesAppendMetric(mWt, now)
|
||||
}
|
||||
if err = tS.storeTrend(ctx, trnd); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> setting Trend with id: <%s:%s> DM error: <%s>",
|
||||
utils.TrendS, tP.Tenant, tP.ID, err.Error()))
|
||||
return
|
||||
}
|
||||
if err = tS.processThresholds(trnd); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> Trend with id <%s:%s> error: <%s> with ThresholdS",
|
||||
utils.TrendS, tP.Tenant, tP.ID, err.Error()))
|
||||
}
|
||||
if err = tS.processEEs(trnd); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> Trend with id <%s:%s> error: <%s> with EEs",
|
||||
utils.TrendS, tP.Tenant, tP.ID, err.Error()))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// processThresholds will pass the Trend event to ThresholdS
|
||||
func (tS *TrendS) processThresholds(trnd *Trend) (err error) {
|
||||
if len(trnd.RunTimes) == 0 ||
|
||||
len(trnd.RunTimes) < trnd.tPrfl.MinItems {
|
||||
return
|
||||
}
|
||||
if len(tS.cfg.TrendSCfg().ThresholdSConns) == 0 {
|
||||
return
|
||||
}
|
||||
opts := map[string]any{
|
||||
utils.MetaEventType: utils.TrendUpdate,
|
||||
}
|
||||
var thIDs []string
|
||||
if len(trnd.tPrfl.ThresholdIDs) != 0 {
|
||||
if len(trnd.tPrfl.ThresholdIDs) == 1 &&
|
||||
trnd.tPrfl.ThresholdIDs[0] == utils.MetaNone {
|
||||
return
|
||||
}
|
||||
thIDs = make([]string, len(trnd.tPrfl.ThresholdIDs))
|
||||
copy(thIDs, trnd.tPrfl.ThresholdIDs)
|
||||
}
|
||||
opts[utils.OptsThresholdsProfileIDs] = thIDs
|
||||
ts := trnd.asTrendSummary()
|
||||
trndEv := &utils.CGREvent{
|
||||
Tenant: trnd.Tenant,
|
||||
ID: utils.GenUUID(),
|
||||
APIOpts: opts,
|
||||
Event: map[string]any{
|
||||
utils.TrendID: trnd.ID,
|
||||
utils.Time: ts.Time,
|
||||
utils.Metrics: ts.Metrics,
|
||||
},
|
||||
}
|
||||
var withErrs bool
|
||||
var tIDs []string
|
||||
if err := tS.connMgr.Call(context.TODO(), tS.cfg.TrendSCfg().ThresholdSConns,
|
||||
utils.ThresholdSv1ProcessEvent, trndEv, &tIDs); err != nil &&
|
||||
(len(thIDs) != 0 || err.Error() != utils.ErrNotFound.Error()) {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", utils.TrendS, err.Error(), trndEv))
|
||||
withErrs = true
|
||||
}
|
||||
if withErrs {
|
||||
err = utils.ErrPartiallyExecuted
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// processEEs will pass the Trend event to EEs
|
||||
func (tS *TrendS) processEEs(trnd *Trend) (err error) {
|
||||
if len(trnd.RunTimes) == 0 ||
|
||||
len(trnd.RunTimes) < trnd.tPrfl.MinItems {
|
||||
return
|
||||
}
|
||||
if len(tS.cfg.TrendSCfg().EEsConns) == 0 {
|
||||
return
|
||||
}
|
||||
opts := map[string]any{
|
||||
utils.MetaEventType: utils.TrendUpdate,
|
||||
}
|
||||
ts := trnd.asTrendSummary()
|
||||
trndEv := &utils.CGREventWithEeIDs{
|
||||
CGREvent: &utils.CGREvent{
|
||||
Tenant: trnd.Tenant,
|
||||
ID: utils.GenUUID(),
|
||||
APIOpts: opts,
|
||||
Event: map[string]any{
|
||||
utils.TrendID: trnd.ID,
|
||||
utils.Time: ts.Time,
|
||||
utils.Metrics: ts.Metrics,
|
||||
},
|
||||
},
|
||||
EeIDs: tS.cfg.TrendSCfg().EEsExporterIDs,
|
||||
}
|
||||
var withErrs bool
|
||||
var reply map[string]map[string]any
|
||||
if err := tS.connMgr.Call(context.TODO(), tS.cfg.TrendSCfg().EEsConns,
|
||||
utils.EeSv1ProcessEvent, trndEv, &reply); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %q processing event %+v with EEs.", utils.TrendS, err.Error(), trndEv))
|
||||
withErrs = true
|
||||
}
|
||||
if withErrs {
|
||||
err = utils.ErrPartiallyExecuted
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// storeTrend will store or schedule the trend based on settings
|
||||
func (tS *TrendS) storeTrend(ctx *context.Context, trnd *Trend) (err error) {
|
||||
if tS.cfg.TrendSCfg().StoreInterval == 0 {
|
||||
return
|
||||
}
|
||||
if tS.cfg.TrendSCfg().StoreInterval == -1 {
|
||||
return tS.dm.SetTrend(ctx, trnd)
|
||||
}
|
||||
|
||||
// schedule the asynchronous save, relies for Trend to be in cache
|
||||
tS.sTrndsMux.Lock()
|
||||
tS.storedTrends.Add(trnd.TenantID())
|
||||
tS.sTrndsMux.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// storeTrends will do one round for saving modified trends
|
||||
//
|
||||
// from cache to dataDB
|
||||
// designed to run asynchronously
|
||||
func (tS *TrendS) storeTrends(ctx *context.Context) {
|
||||
var failedTrndIDs []string
|
||||
for {
|
||||
tS.sTrndsMux.Lock()
|
||||
trndID := tS.storedTrends.GetOne()
|
||||
if trndID != utils.EmptyString {
|
||||
tS.storedTrends.Remove(trndID)
|
||||
}
|
||||
tS.sTrndsMux.Unlock()
|
||||
if trndID == utils.EmptyString {
|
||||
break // no more keys, backup completed
|
||||
}
|
||||
trndIf, ok := Cache.Get(utils.CacheTrends, trndID)
|
||||
if !ok || trndIf == nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> failed retrieving from cache Trend with ID: %q",
|
||||
utils.TrendS, trndID))
|
||||
failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup
|
||||
continue
|
||||
}
|
||||
trnd := trndIf.(*Trend)
|
||||
trnd.tMux.Lock()
|
||||
if err := tS.dm.SetTrend(ctx, trnd); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q",
|
||||
utils.TrendS, trndID, err))
|
||||
failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup
|
||||
}
|
||||
trnd.tMux.Unlock()
|
||||
// randomize the CPU load and give up thread control
|
||||
runtime.Gosched()
|
||||
}
|
||||
if len(failedTrndIDs) != 0 { // there were errors on save, schedule the keys for next backup
|
||||
tS.sTrndsMux.Lock()
|
||||
tS.storedTrends.AddSlice(failedTrndIDs)
|
||||
tS.sTrndsMux.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// asyncStoreTrends runs as a backround process, calling storeTrends based on storeInterval
|
||||
func (tS *TrendS) asyncStoreTrends(ctx *context.Context) {
|
||||
storeInterval := tS.cfg.TrendSCfg().StoreInterval
|
||||
if storeInterval <= 0 {
|
||||
close(tS.storingStopped)
|
||||
return
|
||||
}
|
||||
for {
|
||||
tS.storeTrends(ctx)
|
||||
select {
|
||||
case <-tS.trendStop:
|
||||
close(tS.storingStopped)
|
||||
return
|
||||
case <-time.After(storeInterval): // continue to another storing loop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// StartCron will activates the Cron, together with all scheduled Trend queries
|
||||
func (tS *TrendS) StartTrendS(ctx *context.Context) error {
|
||||
if err := tS.scheduleAutomaticQueries(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
tS.crn.Start()
|
||||
go tS.asyncStoreTrends(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
// StopCron will shutdown the Cron tasks
|
||||
func (tS *TrendS) StopTrendS() {
|
||||
timeEnd := time.Now().Add(tS.cfg.CoreSCfg().ShutdownTimeout)
|
||||
|
||||
crnctx := tS.crn.Stop()
|
||||
close(tS.trendStop)
|
||||
|
||||
// Wait for cron
|
||||
select {
|
||||
case <-crnctx.Done():
|
||||
case <-time.After(time.Until(timeEnd)):
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> timeout waiting for Cron to finish",
|
||||
utils.TrendS))
|
||||
return
|
||||
}
|
||||
// Wait for backup and other operations
|
||||
select {
|
||||
case <-tS.storingStopped:
|
||||
case <-time.After(time.Until(timeEnd)):
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> timeout waiting for TrendS to finish",
|
||||
utils.TrendS))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (tS *TrendS) Reload(ctx *context.Context) {
|
||||
crnctx := tS.crn.Stop()
|
||||
close(tS.trendStop)
|
||||
<-crnctx.Done()
|
||||
<-tS.storingStopped
|
||||
tS.trendStop = make(chan struct{})
|
||||
tS.storingStopped = make(chan struct{})
|
||||
tS.crn.Start()
|
||||
go tS.asyncStoreTrends(ctx)
|
||||
}
|
||||
|
||||
// scheduleAutomaticQueries will schedule the queries at start/reload based on configured
|
||||
func (tS *TrendS) scheduleAutomaticQueries(ctx *context.Context) error {
|
||||
schedData := make(map[string][]string)
|
||||
for k, v := range tS.cfg.TrendSCfg().ScheduledIDs {
|
||||
schedData[k] = v
|
||||
}
|
||||
var tnts []string
|
||||
if len(schedData) == 0 {
|
||||
tnts = make([]string, 0)
|
||||
}
|
||||
for tnt, tIDs := range schedData {
|
||||
if len(tIDs) == 0 {
|
||||
tnts = append(tnts, tnt)
|
||||
}
|
||||
}
|
||||
if tnts != nil {
|
||||
qrydData, err := tS.dm.GetTrendProfileIDs(ctx, tnts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for tnt, ids := range qrydData {
|
||||
schedData[tnt] = ids
|
||||
}
|
||||
}
|
||||
for tnt, tIDs := range schedData {
|
||||
if _, err := tS.scheduleTrendQueries(ctx, tnt, tIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// scheduleTrendQueries will schedule/re-schedule specific trend queries
|
||||
func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []string) (scheduled int, err error) {
|
||||
var partial bool
|
||||
tS.crnTQsMux.Lock()
|
||||
if _, has := tS.crnTQs[tnt]; !has {
|
||||
tS.crnTQs[tnt] = make(map[string]cron.EntryID)
|
||||
}
|
||||
tS.crnTQsMux.Unlock()
|
||||
for _, tID := range tIDs {
|
||||
tS.crnTQsMux.RLock()
|
||||
if entryID, has := tS.crnTQs[tnt][tID]; has {
|
||||
tS.crn.Remove(entryID) // deschedule the query
|
||||
}
|
||||
tS.crnTQsMux.RUnlock()
|
||||
if tP, err := tS.dm.GetTrendProfile(ctx, tnt, tID, true, true, utils.NonTransactional); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> failed retrieving TrendProfile with id: <%s:%s> for scheduling, error: <%s>",
|
||||
utils.TrendS, tnt, tID, err.Error()))
|
||||
partial = true
|
||||
} else if entryID, err := tS.crn.AddFunc(tP.Schedule,
|
||||
func() { tS.computeTrend(ctx, tP.Clone()) }); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> scheduling TrendProfile <%s:%s>, error: <%s>",
|
||||
utils.TrendS, tnt, tID, err.Error()))
|
||||
partial = true
|
||||
} else { // log the entry ID for debugging
|
||||
tS.crnTQsMux.Lock()
|
||||
tS.crnTQs[tP.Tenant][tP.ID] = entryID
|
||||
tS.crnTQsMux.Unlock()
|
||||
scheduled++
|
||||
}
|
||||
}
|
||||
if partial {
|
||||
return 0, utils.ErrPartiallyExecuted
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// V1ScheduleQueries is the query for manually re-/scheduling Trend Queries
|
||||
func (tS *TrendS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgScheduleTrendQueries, scheduled *int) (err error) {
|
||||
if sched, errSched := tS.scheduleTrendQueries(ctx, args.Tenant, args.TrendIDs); errSched != nil {
|
||||
return errSched
|
||||
} else {
|
||||
*scheduled = sched
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// V1GetTrend is the API to return the trend Metrics
|
||||
// The number of runTimes can be filtered based on indexes and times provided as arguments
|
||||
//
|
||||
// in this way being possible to work with paginators
|
||||
func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTrend *Trend) (err error) {
|
||||
if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
var trnd *Trend
|
||||
if trnd, err = tS.dm.GetTrend(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
|
||||
return
|
||||
}
|
||||
trnd.tMux.RLock()
|
||||
defer trnd.tMux.RUnlock()
|
||||
retTrend.Tenant = trnd.Tenant // avoid vet complaining for mutex copying
|
||||
retTrend.ID = trnd.ID
|
||||
startIdx := arg.RunIndexStart
|
||||
if startIdx > len(trnd.RunTimes) {
|
||||
startIdx = len(trnd.RunTimes)
|
||||
}
|
||||
endIdx := arg.RunIndexEnd
|
||||
if endIdx > len(trnd.RunTimes) ||
|
||||
endIdx < startIdx ||
|
||||
endIdx == 0 {
|
||||
endIdx = len(trnd.RunTimes)
|
||||
}
|
||||
runTimes := trnd.RunTimes[startIdx:endIdx]
|
||||
if len(runTimes) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
var tStart, tEnd time.Time
|
||||
if arg.RunTimeStart == utils.EmptyString {
|
||||
tStart = runTimes[0]
|
||||
} else if tStart, err = utils.ParseTimeDetectLayout(arg.RunTimeStart, tS.cfg.GeneralCfg().DefaultTimezone); err != nil {
|
||||
return
|
||||
}
|
||||
if arg.RunTimeEnd == utils.EmptyString {
|
||||
tEnd = runTimes[len(runTimes)-1].Add(time.Duration(1))
|
||||
} else if tEnd, err = utils.ParseTimeDetectLayout(arg.RunTimeEnd, tS.cfg.GeneralCfg().DefaultTimezone); err != nil {
|
||||
return
|
||||
}
|
||||
retTrend.RunTimes = make([]time.Time, 0, len(runTimes))
|
||||
for _, runTime := range runTimes {
|
||||
if !runTime.Before(tStart) && runTime.Before(tEnd) {
|
||||
retTrend.RunTimes = append(retTrend.RunTimes, runTime)
|
||||
}
|
||||
}
|
||||
if len(retTrend.RunTimes) == 0 { // filtered out all
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
retTrend.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
|
||||
for _, runTime := range retTrend.RunTimes {
|
||||
retTrend.Metrics[runTime] = trnd.Metrics[runTime]
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (tS *TrendS) V1GetScheduledTrends(ctx *context.Context, args *utils.ArgScheduledTrends, schedTrends *[]utils.ScheduledTrend) (err error) {
|
||||
tnt := args.Tenant
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = tS.cfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
tS.crnTQsMux.RLock()
|
||||
defer tS.crnTQsMux.RUnlock()
|
||||
trendIDsMp, has := tS.crnTQs[tnt]
|
||||
if !has {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
var scheduledTrends []utils.ScheduledTrend
|
||||
var entryIds map[string]cron.EntryID
|
||||
if len(args.TrendIDPrefixes) == 0 {
|
||||
entryIds = trendIDsMp
|
||||
} else {
|
||||
entryIds = make(map[string]cron.EntryID)
|
||||
for _, tID := range args.TrendIDPrefixes {
|
||||
for key, entryID := range trendIDsMp {
|
||||
if strings.HasPrefix(key, tID) {
|
||||
entryIds[key] = entryID
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(entryIds) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
var entry cron.Entry
|
||||
for id, entryID := range entryIds {
|
||||
entry = tS.crn.Entry(entryID)
|
||||
if entry.ID == 0 {
|
||||
continue
|
||||
}
|
||||
scheduledTrends = append(scheduledTrends, utils.ScheduledTrend{
|
||||
TrendID: id,
|
||||
Next: entry.Next,
|
||||
Previous: entry.Prev,
|
||||
})
|
||||
}
|
||||
slices.SortFunc(scheduledTrends, func(a, b utils.ScheduledTrend) int {
|
||||
return a.Next.Compare(b.Next)
|
||||
})
|
||||
*schedTrends = scheduledTrends
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tS *TrendS) V1GetTrendSummary(ctx *context.Context, arg utils.TenantIDWithAPIOpts, reply *TrendSummary) (err error) {
|
||||
var trnd *Trend
|
||||
if trnd, err = tS.dm.GetTrend(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
|
||||
return
|
||||
}
|
||||
trnd.tMux.RLock()
|
||||
trndS := trnd.asTrendSummary()
|
||||
trnd.tMux.RUnlock()
|
||||
*reply = *trndS
|
||||
return
|
||||
}
|
||||
@@ -1,66 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
)
|
||||
|
||||
func TestNewTrendService(t *testing.T) {
|
||||
dm := &DataManager{}
|
||||
cfg := &config.CGRConfig{}
|
||||
filterS := &FilterS{}
|
||||
connMgr := &ConnManager{}
|
||||
|
||||
trendService := NewTrendService(dm, cfg, filterS, connMgr)
|
||||
|
||||
if trendService == nil {
|
||||
t.Errorf("Expected non-nil TrendS, got nil")
|
||||
}
|
||||
|
||||
if trendService.dm != dm {
|
||||
t.Errorf("Expected dm to be %v, got %v", dm, trendService.dm)
|
||||
}
|
||||
|
||||
if trendService.cfg != cfg {
|
||||
t.Errorf("Expected cfg to be %v, got %v", cfg, trendService.cfg)
|
||||
}
|
||||
|
||||
if trendService.fltrS != filterS {
|
||||
t.Errorf("Expected filterS to be %v, got %v", filterS, trendService.fltrS)
|
||||
}
|
||||
|
||||
if trendService.connMgr != connMgr {
|
||||
t.Errorf("Expected connMgr to be %v, got %v", connMgr, trendService.connMgr)
|
||||
}
|
||||
|
||||
if trendService.crnTQs == nil {
|
||||
t.Errorf("Expected crnTQs to be non-nil, got nil")
|
||||
}
|
||||
|
||||
if trendService.crnTQsMux == nil {
|
||||
t.Errorf("Expected crnTQsMux to be non-nil, got nil")
|
||||
}
|
||||
|
||||
if trendService.loopStopped == nil {
|
||||
t.Errorf("Expected loopStopped to be non-nil, got nil")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user