diff --git a/docs/cgr-engine.rst b/docs/cgr-engine.rst
index 93da786e8..5150ee5f5 100644
--- a/docs/cgr-engine.rst
+++ b/docs/cgr-engine.rst
@@ -74,6 +74,7 @@ The components from the diagram can be found documented in the links bellow:
resources
routes
stats
+ trends
thresholds
filters
dispatchers
diff --git a/docs/trends.rst b/docs/trends.rst
new file mode 100644
index 000000000..40d6a2dc4
--- /dev/null
+++ b/docs/trends.rst
@@ -0,0 +1,66 @@
+.. _trends:
+
+TrendS
+=====
+
+
+**TrendS** is a standalone subsystem part of the **CGRateS** infrastructure, designed to store *StatS* in a time-series-like database and calculate trend percentages based on their evolution.
+
+Complete interaction with **TrendS** is possible via `CGRateS RPC APIs `_.
+
+Due it's real-time nature, **StatS** are designed towards high throughput being able to process thousands of *Events* per second. This is doable since each *StatQueue* is a very light object, held in memory and eventually backed up in *DataDB*.
+
+
+Processing logic
+----------------
+
+
+
+
+Parameters
+----------
+
+
+TrendS
+^^^^^^
+
+**TrendS** is the **CGRateS** component responsible of handling the *Trend* queries.
+
+It is configured within **trends** section from :ref:`JSON configuration ` via the following parameters:
+
+enabled
+ Will enable starting of the service. Possible values: .
+
+
+
+TrendProfile
+^^^^^^^^^^^^
+
+Ís made of the following fields:
+
+Tenant
+ The tenant on the platform (one can see the tenant as partition ID).
+
+ID
+ Identifier for the *TrendProfile*, unique within a *Tenant*.
+
+FilterIDs
+ List of *FilterProfileIDs* which should match in order to consider the profile matching the event.
+
+
+
+
+Trend
+^^^^^
+
+
+
+Use cases
+---------
+
+* Aggregate various traffic metrics for traffic transparency.
+* Revenue assurance applications.
+* Fraud detection by aggregating specific billing metrics during sensitive time intervals (\*acc, \*tcc, \*tcd).
+* Building call patterns.
+* Building statistical information to train systems capable of artificial intelligence.
+* Building quality metrics used in traffic routing.
diff --git a/engine/datamanager.go b/engine/datamanager.go
index 65cf28a32..e58a19e71 100644
--- a/engine/datamanager.go
+++ b/engine/datamanager.go
@@ -1292,6 +1292,7 @@ func (dm *DataManager) RemoveStatQueueProfile(tenant, id string, withIndex bool)
func (dm *DataManager) GetTrend(tenant, id string,
cacheRead, cacheWrite bool, transactionID string) (tr *Trend, err error) {
tntID := utils.ConcatenatedKey(tenant, id)
+
if cacheRead {
if x, ok := Cache.Get(utils.CacheTrends, tntID); ok {
if x == nil {
@@ -1300,40 +1301,53 @@ func (dm *DataManager) GetTrend(tenant, id string,
return x.(*Trend), nil
}
}
+
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
- tr, err = dm.dataDB.GetTrendDrv(tenant, id)
- if err != nil {
- if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends]; err == utils.ErrNotFound && itm.Remote {
+
+ if tr, err = dm.dataDB.GetTrendDrv(tenant, id); err != nil {
+ if err != utils.ErrNotFound { // database error
+ return
+ }
+ // ErrNotFound
+ if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends]; itm.Remote {
if err = dm.connMgr.Call(context.TODO(), config.CgrConfig().DataDbCfg().RmtConns,
utils.ReplicatorSv1GetTrend, &utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString,
utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID,
config.CgrConfig().GeneralCfg().NodeID)),
- }, &tr); err == nil {
- err = dm.dataDB.SetTrendDrv(tr)
+ }, &tr); err != nil {
+ err = utils.CastRPCErr(err)
+ if err != utils.ErrNotFound { // RPC error
+ return
+ }
+ } else if err = dm.dataDB.SetTrendDrv(tr); err != nil { // Save the Trend received from remote
+ return
}
}
- if err != nil {
- err = utils.CastRPCErr(err)
- if err == utils.ErrNotFound && cacheWrite {
- if errCh := Cache.Set(utils.CacheTrends, tntID, nil, nil,
- cacheCommit(transactionID), transactionID); errCh != nil {
- return nil, errCh
+
+ // have Trend or ErrNotFound
+ if err == utils.ErrNotFound {
+ if cacheWrite {
+ if errCache := Cache.Set(utils.CacheTrends, tntID, nil, nil,
+ cacheCommit(transactionID), transactionID); errCache != nil {
+ return nil, errCache
}
}
- return nil, err
+ return
}
}
+
if cacheWrite {
if errCh := Cache.Set(utils.CacheTrends, tntID, tr, nil,
cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
}
+
return
}
@@ -1346,14 +1360,16 @@ func (dm *DataManager) SetTrend(tr *Trend) (err error) {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends]; itm.Replicate {
- err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
+ if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.TrendPrefix, tr.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetTrend,
&TrendWithAPIOpts{
Trend: tr,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
- config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
+ config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}); err != nil {
+ return
+ }
}
return
}
@@ -1489,16 +1505,9 @@ func (dm *DataManager) SetTrendProfile(trp *TrendProfile) (err error) {
if oldTrd == nil ||
oldTrd.QueueLength != trp.QueueLength ||
oldTrd.Schedule != trp.Schedule {
- err = dm.SetTrend(&Trend{
- Tenant: trp.Tenant,
- ID: trp.ID,
- })
- } else if _, errTr := dm.GetTrend(trp.Tenant, trp.ID,
- true, false, utils.NonTransactional); errTr == utils.ErrNotFound {
- err = dm.SetTrend(&Trend{
- Tenant: trp.Tenant,
- ID: trp.ID,
- })
+ if err = dm.SetTrend(NewTrendFromProfile(trp)); err != nil {
+ return
+ }
}
return
}
diff --git a/engine/libtrends.go b/engine/libtrends.go
index d0735f350..c581fb641 100644
--- a/engine/libtrends.go
+++ b/engine/libtrends.go
@@ -85,21 +85,37 @@ type TrendWithAPIOpts struct {
APIOpts map[string]any
}
+// NewTrendFromProfile is a constructor for an empty trend out of it's profile
+func NewTrendFromProfile(tP *TrendProfile) *Trend {
+ return &Trend{
+ Tenant: tP.Tenant,
+ ID: tP.ID,
+ RunTimes: make([]time.Time, 0),
+ Metrics: make(map[time.Time]map[string]*MetricWithTrend),
+ tPrfl: tP,
+ }
+}
+
// Trend is the unit matched by filters
type Trend struct {
tMux sync.RWMutex
- Tenant string
- ID string
- RunTimes []time.Time
- Metrics map[time.Time]map[string]*MetricWithTrend
- CompressedMetrics []byte // if populated, Metrics will be emty
+ Tenant string
+ ID string
+ RunTimes []time.Time
+ Metrics map[time.Time]map[string]*MetricWithTrend
// indexes help faster processing
mLast map[string]time.Time // last time a metric was present
mCounts map[string]int // number of times a metric is present in Metrics
mTotals map[string]float64 // cached sum, used for average calculations
+ tPrfl *TrendProfile // store here the trend profile so we can have it at hands further
+
+}
+
+func (t *Trend) Clone() (tC *Trend) {
+ return
}
// Compile is used to initialize or cleanup the Trend
diff --git a/engine/trends.go b/engine/trends.go
index c43820086..18d35d059 100644
--- a/engine/trends.go
+++ b/engine/trends.go
@@ -20,6 +20,7 @@ package engine
import (
"fmt"
+ "runtime"
"sync"
"time"
@@ -35,14 +36,16 @@ func NewTrendS(dm *DataManager,
filterS *FilterS,
cgrcfg *config.CGRConfig) *TrendS {
return &TrendS{
- dm: dm,
- connMgr: connMgr,
- filterS: filterS,
- cgrcfg: cgrcfg,
- crn: cron.New(),
- loopStopped: make(chan struct{}),
- crnTQsMux: new(sync.RWMutex),
- crnTQs: make(map[string]map[string]cron.EntryID),
+ dm: dm,
+ connMgr: connMgr,
+ filterS: filterS,
+ cgrcfg: cgrcfg,
+ crn: cron.New(),
+ crnTQsMux: new(sync.RWMutex),
+ crnTQs: make(map[string]map[string]cron.EntryID),
+ storedTrends: make(utils.StringSet),
+ storingStopped: make(chan struct{}),
+ trendStop: make(chan struct{}),
}
}
@@ -58,7 +61,12 @@ type TrendS struct {
crnTQsMux *sync.RWMutex // protects the crnTQs
crnTQs map[string]map[string]cron.EntryID // save the EntryIDs for TrendQueries so we can reschedule them when needed
- loopStopped chan struct{}
+ storedTrends utils.StringSet // keep a record of trends which need saving, map[trendTenantID]bool
+ sTrndsMux sync.RWMutex // protects storedTrends
+ storingStopped chan struct{} // signal back that the operations were stopped
+
+ trendStop chan struct{} // signal to stop all operations
+
}
// computeTrend will query a stat and build the Trend for it
@@ -76,27 +84,23 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
utils.TrendS, tP.Tenant, tP.ID, tP.StatID, err.Error()))
return
}
- trend, err := tS.dm.GetTrend(tP.Tenant, tP.ID, true, true, utils.NonTransactional)
- if err == utils.ErrNotFound {
- trend = &Trend{
- Tenant: tP.Tenant,
- ID: tP.ID,
- RunTimes: make([]time.Time, 0),
- Metrics: make(map[time.Time]map[string]*MetricWithTrend),
- }
- } else if err != nil {
+ trnd, err := tS.dm.GetTrend(tP.Tenant, tP.ID, true, true, utils.NonTransactional)
+ if err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> querying trend with id: <%s:%s> dm error: <%s>",
utils.TrendS, tP.Tenant, tP.ID, err.Error()))
return
}
- trend.tMux.Lock()
- defer trend.tMux.Unlock()
- trend.cleanup(tP.TTL, tP.QueueLength)
+ trnd.tMux.Lock()
+ defer trnd.tMux.Unlock()
+ if trnd.tPrfl == nil {
+ trnd.tPrfl = tP
+ }
+ trnd.cleanup(tP.TTL, tP.QueueLength)
- if len(trend.mTotals) == 0 { // indexes were not yet built
- trend.computeIndexes()
+ if len(trnd.mTotals) == 0 { // indexes were not yet built
+ trnd.computeIndexes()
}
now := time.Now()
var metrics []string
@@ -111,11 +115,11 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
if len(metrics) == 0 {
return // nothing to compute
}
- trend.RunTimes = append(trend.RunTimes, now)
- if trend.Metrics == nil {
- trend.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
+ trnd.RunTimes = append(trnd.RunTimes, now)
+ if trnd.Metrics == nil {
+ trnd.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
}
- trend.Metrics[now] = make(map[string]*MetricWithTrend)
+ trnd.Metrics[now] = make(map[string]*MetricWithTrend)
for _, mID := range metrics {
mWt := &MetricWithTrend{ID: mID}
var has bool
@@ -124,26 +128,205 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
mWt.TrendLabel = utils.NotAvailable
continue
}
- if mWt.TrendGrowth, err = trend.getTrendGrowth(mID, mWt.Value, tP.CorrelationType,
+ if mWt.TrendGrowth, err = trnd.getTrendGrowth(mID, mWt.Value, tP.CorrelationType,
tS.cgrcfg.GeneralCfg().RoundingDecimals); err != nil {
mWt.TrendLabel = utils.NotAvailable
} else {
- mWt.TrendLabel = trend.getTrendLabel(mWt.TrendGrowth, tP.Tolerance)
+ mWt.TrendLabel = trnd.getTrendLabel(mWt.TrendGrowth, tP.Tolerance)
}
- trend.Metrics[now][mWt.ID] = mWt
+ trnd.Metrics[now][mWt.ID] = mWt
}
- if err := tS.dm.SetTrend(trend); err != nil {
+ if err = tS.storeTrend(trnd); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
- "<%s> setting trend with id: <%s:%s> dm error: <%s>",
+ "<%s> setting Trend with id: <%s:%s> DM error: <%s>",
utils.TrendS, tP.Tenant, tP.ID, err.Error()))
return
}
}
+// processThresholds will pass the Trend event to ThresholdS
+func (tS *TrendS) processThresholds(trnd *Trend) (err error) {
+ if len(trnd.RunTimes) == 0 ||
+ len(trnd.RunTimes) < trnd.tPrfl.MinItems {
+ return
+ }
+ if len(tS.cgrcfg.TrendSCfg().ThresholdSConns) == 0 {
+ return
+ }
+ opts := map[string]any{
+ utils.MetaEventType: utils.TrendUpdate,
+ }
+ var thIDs []string
+ if len(trnd.tPrfl.ThresholdIDs) != 0 {
+ if len(trnd.tPrfl.ThresholdIDs) == 1 &&
+ trnd.tPrfl.ThresholdIDs[0] == utils.MetaNone {
+ return
+ }
+ copy(thIDs, trnd.tPrfl.ThresholdIDs)
+ }
+ opts[utils.OptsThresholdsProfileIDs] = thIDs
+ mtrx := make(map[string]*MetricWithTrend)
+ for mtID, mtWT := range trnd.Metrics[trnd.RunTimes[len(trnd.RunTimes)-1]] {
+ mtrx[mtID] = &MetricWithTrend{
+ ID: mtWT.ID,
+ Value: mtWT.Value,
+ TrendGrowth: mtWT.TrendGrowth,
+ TrendLabel: mtWT.TrendLabel,
+ }
+ }
+ trndEv := &utils.CGREvent{
+ Tenant: trnd.Tenant,
+ ID: utils.GenUUID(),
+ APIOpts: opts,
+ Event: map[string]any{
+ utils.TrendID: trnd.ID,
+ utils.Metrics: mtrx,
+ },
+ }
+ var withErrs bool
+ var tIDs []string
+ if err := tS.connMgr.Call(context.TODO(), tS.cgrcfg.TrendSCfg().ThresholdSConns,
+ utils.ThresholdSv1ProcessEvent, trndEv, &tIDs); err != nil &&
+ (len(thIDs) != 0 || err.Error() != utils.ErrNotFound.Error()) {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", utils.TrendS, err.Error(), trndEv))
+ withErrs = true
+ }
+ if withErrs {
+ err = utils.ErrPartiallyExecuted
+ }
+ return
+}
+
+// processEEs will pass the Trend event to EEs
+func (tS *TrendS) processEEs(trnd *Trend) (err error) {
+ if len(trnd.RunTimes) == 0 ||
+ len(trnd.RunTimes) < trnd.tPrfl.MinItems {
+ return
+ }
+ if len(tS.cgrcfg.StatSCfg().EEsConns) == 0 {
+ return
+ }
+ opts := map[string]any{
+ utils.MetaEventType: utils.TrendUpdate,
+ }
+ mtrx := make(map[string]*MetricWithTrend)
+ for mtID, mtWT := range trnd.Metrics[trnd.RunTimes[len(trnd.RunTimes)-1]] {
+ mtrx[mtID] = &MetricWithTrend{
+ ID: mtWT.ID,
+ Value: mtWT.Value,
+ TrendGrowth: mtWT.TrendGrowth,
+ TrendLabel: mtWT.TrendLabel,
+ }
+ }
+ trndEv := &CGREventWithEeIDs{
+ CGREvent: &utils.CGREvent{
+ Tenant: trnd.Tenant,
+ ID: utils.GenUUID(),
+ APIOpts: opts,
+ Event: map[string]any{
+ utils.TrendID: trnd.ID,
+ utils.Metrics: mtrx,
+ },
+ },
+ EeIDs: tS.cgrcfg.StatSCfg().EEsExporterIDs,
+ }
+ var withErrs bool
+ var reply map[string]map[string]any
+ if err := tS.connMgr.Call(context.TODO(), tS.cgrcfg.StatSCfg().EEsConns,
+ utils.EeSv1ProcessEvent, trndEv, &reply); err != nil &&
+ err.Error() != utils.ErrNotFound.Error() {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> error: %q processing event %+v with EEs.", utils.TrendS, err.Error(), trndEv))
+ withErrs = true
+ }
+ if withErrs {
+ err = utils.ErrPartiallyExecuted
+ }
+ return
+}
+
+// storeTrend will store or schedule the trend based on settings
+func (tS *TrendS) storeTrend(trnd *Trend) (err error) {
+ if tS.cgrcfg.StatSCfg().StoreInterval == 0 {
+ return
+ }
+ if tS.cgrcfg.StatSCfg().StoreInterval == -1 {
+ return tS.dm.SetTrend(trnd)
+ }
+
+ // schedule the asynchronous save, relies for Trend to be in cache
+ tS.sTrndsMux.Lock()
+ tS.storedTrends.Add(trnd.TenantID())
+ tS.sTrndsMux.Unlock()
+ return
+}
+
+// storeTrends will do one round for saving modified trends
+//
+// from cache to dataDB
+// designed to run asynchronously
+func (tS *TrendS) storeTrends() {
+ var failedTrndIDs []string
+ for {
+ tS.sTrndsMux.Lock()
+ trndID := tS.storedTrends.GetOne()
+ if trndID != utils.EmptyString {
+ tS.storedTrends.Remove(trndID)
+ }
+ tS.sTrndsMux.Unlock()
+ if trndID == utils.EmptyString {
+ break // no more keys, backup completed
+ }
+ trndIf, ok := Cache.Get(utils.CacheTrends, trndID)
+ if !ok || trndIf == nil {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> failed retrieving from cache Trend with ID: %q",
+ utils.TrendS, trndID))
+ failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup
+ continue
+ }
+ trnd := trndIf.(*Trend)
+ trnd.tMux.RLock()
+ if err := tS.dm.SetTrend(trnd); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q",
+ utils.TrendS, trndID, err))
+ failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup
+ }
+ trnd.tMux.RUnlock()
+ // randomize the CPU load and give up thread control
+ runtime.Gosched()
+ }
+ if len(failedTrndIDs) != 0 { // there were errors on save, schedule the keys for next backup
+ tS.sTrndsMux.Lock()
+ tS.storedTrends.AddSlice(failedTrndIDs)
+ tS.sTrndsMux.Unlock()
+ }
+}
+
+// asyncStoreTrends runs as a backround process, calling storeTrends based on storeInterval
+func (tS *TrendS) asyncStoreTrends() {
+ storeInterval := tS.cgrcfg.StatSCfg().StoreInterval
+ if storeInterval <= 0 {
+ close(tS.storingStopped)
+ return
+ }
+ for {
+ tS.storeTrends()
+ select {
+ case <-tS.trendStop:
+ close(tS.storingStopped)
+ return
+ case <-time.After(storeInterval): // continue to another storing loop
+ }
+ }
+}
+
// StartCron will activates the Cron, together with all scheduled Trend queries
-func (tS *TrendS) StartCron() error {
+func (tS *TrendS) StartTrendS() error {
if err := tS.scheduleAutomaticQueries(); err != nil {
return err
}
@@ -152,15 +335,31 @@ func (tS *TrendS) StartCron() error {
}
// StopCron will shutdown the Cron tasks
-func (tS *TrendS) StopCron() {
+func (tS *TrendS) StopTrendS() {
+ timeEnd := time.Now().Add(tS.cgrcfg.CoreSCfg().ShutdownTimeout)
+
ctx := tS.crn.Stop()
+ close(tS.trendStop)
+
+ // Wait for cron
select {
case <-ctx.Done():
- case <-time.After(tS.cgrcfg.CoreSCfg().ShutdownTimeout):
+ case <-time.After(timeEnd.Sub(time.Now())):
utils.Logger.Warning(
fmt.Sprintf(
"<%s> timeout waiting for Cron to finish",
utils.TrendS))
+ return
+ }
+ // Wait for backup and other operations
+ select {
+ case <-tS.storingStopped:
+ case <-time.After(timeEnd.Sub(time.Now())):
+ utils.Logger.Warning(
+ fmt.Sprintf(
+ "<%s> timeout waiting for TrendS to finish",
+ utils.TrendS))
+ return
}
}
@@ -242,9 +441,54 @@ func (tS *TrendS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgSchedul
return
}
-func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, trend *Trend) (err error) {
- var tr *Trend
- tr, err = tS.dm.GetTrend(arg.Tenant, arg.ID, true, true, utils.NonTransactional)
- *trend = *tr
+// V1GetTrend is the API to return the trend Metrics
+// The number of runTimes can be filtered based on indexes and times provided as arguments
+//
+// in this way being possible to work with paginators
+func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTrend *Trend) (err error) {
+ var trnd *Trend
+ if trnd, err = tS.dm.GetTrend(arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
+ return
+ }
+ retTrend.Tenant = trnd.Tenant // avoid vet complaining for mutex copying
+ retTrend.ID = trnd.ID
+ startIdx := arg.RunIndexStart
+ if startIdx > len(trnd.RunTimes) {
+ startIdx = len(trnd.RunTimes)
+ }
+ endIdx := arg.RunIndexEnd
+ if endIdx > len(trnd.RunTimes) ||
+ endIdx < startIdx ||
+ endIdx == 0 {
+ endIdx = len(trnd.RunTimes)
+ }
+ runTimes := trnd.RunTimes[startIdx:endIdx]
+ if len(runTimes) == 0 {
+ return utils.ErrNotFound
+ }
+ var tStart, tEnd time.Time
+ if arg.RunTimeStart == utils.EmptyString {
+ tStart = runTimes[0]
+ } else if tStart, err = utils.ParseTimeDetectLayout(arg.RunTimeStart, tS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil {
+ return
+ }
+ if arg.RunTimeEnd == utils.EmptyString {
+ tEnd = runTimes[len(runTimes)-1].Add(time.Duration(1))
+ } else if tEnd, err = utils.ParseTimeDetectLayout(arg.RunTimeStart, tS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil {
+ return
+ }
+ retTrend.RunTimes = make([]time.Time, 0, len(runTimes))
+ for _, runTime := range runTimes {
+ if !runTime.Before(tStart) && runTime.Before(tEnd) {
+ retTrend.RunTimes = append(retTrend.RunTimes, runTime)
+ }
+ }
+ if len(runTimes) == 0 { // filtered out all
+ return utils.ErrNotFound
+ }
+ retTrend.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
+ for _, runTime := range retTrend.RunTimes {
+ retTrend.Metrics[runTime] = trnd.Metrics[runTime]
+ }
return
}
diff --git a/engine/trends_test.go b/engine/trends_test.go
index 4138a3e78..8c123cb3f 100644
--- a/engine/trends_test.go
+++ b/engine/trends_test.go
@@ -72,7 +72,7 @@ func TestNewTrendS(t *testing.T) {
t.Errorf("Expected CGRConfig to be set correctly, got %v, want %v", trendS.cgrcfg, cgrcfg)
}
- if trendS.loopStopped == nil {
+ if trendS.trendStop == nil {
t.Errorf("Expected loopStopped to be initialized, but got nil")
}
if trendS.crnTQsMux == nil {
diff --git a/services/trends.go b/services/trends.go
index 2fc7be6f5..43bbdf786 100644
--- a/services/trends.go
+++ b/services/trends.go
@@ -83,7 +83,7 @@ func (trs *TrendService) Start() error {
trs.Lock()
defer trs.Unlock()
trs.trs = engine.NewTrendS(dm, trs.connMgr, filterS, trs.cfg)
- if err := trs.trs.StartCron(); err != nil {
+ if err := trs.trs.StartTrendS(); err != nil {
return err
}
srv, err := engine.NewService(v1.NewTrendSv1(trs.trs))
@@ -107,7 +107,7 @@ func (tr *TrendService) Shutdown() (err error) {
defer tr.srvDep[utils.DataDB].Done()
tr.Lock()
defer tr.Unlock()
- tr.trs.StopCron()
+ tr.trs.StopTrendS()
<-tr.connChan
return
}
diff --git a/utils/apitpdata.go b/utils/apitpdata.go
index aeba0fadd..fdc81284b 100644
--- a/utils/apitpdata.go
+++ b/utils/apitpdata.go
@@ -1661,5 +1661,9 @@ type ArgScheduleTrendQueries struct {
type ArgGetTrend struct {
TenantWithAPIOpts
- ID string
+ ID string
+ RunIndexStart int
+ RunIndexEnd int
+ RunTimeStart string
+ RunTimeEnd string
}
diff --git a/utils/consts.go b/utils/consts.go
index 6737b0a06..8c194a509 100644
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -512,6 +512,7 @@ const (
TotalUsage = "TotalUsage"
StatID = "StatID"
StatIDs = "StatIDs"
+ TrendID = "TrendID"
BalanceType = "BalanceType"
BalanceID = "BalanceID"
BalanceDestinationIds = "BalanceDestinationIds"
@@ -527,6 +528,7 @@ const (
Units = "Units"
AccountUpdate = "AccountUpdate"
StatUpdate = "StatUpdate"
+ TrendUpdate = "TrendUpdate"
ResourceUpdate = "ResourceUpdate"
CDR = "CDR"
CDRs = "CDRs"