From 68a5a76b9318e871640b5237c277a0a30d8b6896 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 24 Sep 2024 20:17:11 +0200 Subject: [PATCH] Adding TrendS.V1ScheduleTrendQueries API --- engine/libtrends.go | 12 ++++++++++++ engine/trends.go | 30 ++++++++++++++++++++++++------ utils/apitpdata.go | 5 +++++ 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/engine/libtrends.go b/engine/libtrends.go index f355fb805..5fec45b44 100644 --- a/engine/libtrends.go +++ b/engine/libtrends.go @@ -107,6 +107,18 @@ type Trend struct { } +// Compile is used to initialize or cleanup the Trend +// +// thread safe since it should be used close to source +func (t *Trend) Compile(cleanTtl time.Duration, qLength int) { + t.tMux.Lock() + defer t.tMux.Unlock() + t.cleanup(cleanTtl, qLength) + if t.mTotals == nil { // indexes were not yet built + t.computeIndexes() + } +} + // cleanup will clean stale data out of func (t *Trend) cleanup(ttl time.Duration, qLength int) (altered bool) { expTime := time.Now().Add(-ttl) diff --git a/engine/trends.go b/engine/trends.go index f4e6b71fc..a2c7679b3 100644 --- a/engine/trends.go +++ b/engine/trends.go @@ -62,7 +62,7 @@ type TrendS struct { // computeTrend will query a stat and build the Trend for it // -// it is be called by Cron service +// it is to be called by Cron service func (tS *TrendS) computeTrend(tP *TrendProfile) { var floatMetrics map[string]float64 if err := tS.connMgr.Call(context.Background(), tS.cgrcfg.TrendSCfg().StatSConns, @@ -94,6 +94,11 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) { trend.tMux.Lock() defer trend.tMux.Unlock() + trend.cleanup(tP.TTL, tP.QueueLength) + if trend.mTotals == nil { // indexes were not yet built + trend.computeIndexes() + } + now := time.Now() var metrics []string if len(tP.Metrics) != 0 { @@ -135,8 +140,8 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) { } // scheduleTrendQueries will schedule/re-schedule specific trend queries -func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []string) (complete bool) { - complete = true +func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []string) (scheduled int, err error) { + var partial bool for _, tID := range tIDs { tS.crnTQsMux.RLock() if entryID, has := tS.crnTQs[tnt][tID]; has { @@ -148,20 +153,33 @@ func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs [] fmt.Sprintf( "<%s> failed retrieving TrendProfile with id: <%s:%s> for scheduling, error: <%s>", utils.TrendS, tnt, tID, err.Error())) - complete = false + partial = true } else if entryID, err := tS.crn.AddFunc(tP.Schedule, func() { tS.computeTrend(tP.Clone()) }); err != nil { utils.Logger.Warning( fmt.Sprintf( "<%s> scheduling TrendProfile <%s:%s>, error: <%s>", utils.TrendS, tnt, tID, err.Error())) - + partial = true } else { tS.crnTQsMux.Lock() tS.crnTQs[tP.Tenant][tP.ID] = entryID tS.crnTQsMux.Unlock() } - + scheduled += 1 + } + if partial { + return 0, utils.ErrPartiallyExecuted + } + return +} + +// V1ScheduleTrendQueries is the query for manually re-/scheduling Trend Queries +func (tS *TrendS) V1ScheduleTrendQueries(ctx *context.Context, args *utils.ArgScheduleTrendQueries, scheduled *int) (err error) { + if sched, errSched := tS.scheduleTrendQueries(ctx, args.Tenant, args.TrendIDs); errSched != nil { + return errSched + } else { + *scheduled = sched } return } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index f98acddce..fba8a43ca 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1653,3 +1653,8 @@ type ArgExportCDRs struct { Verbose bool // verbose is used to inform the user about the positive and negative exported cdrs RPCCDRsFilter } + +type ArgScheduleTrendQueries struct { + TenantIDWithAPIOpts + TrendIDs []string +}