Adding TrendS.V1ScheduleTrendQueries API

This commit is contained in:
DanB
2024-09-24 20:17:11 +02:00
parent 52523d0a73
commit 68a5a76b93
3 changed files with 41 additions and 6 deletions

View File

@@ -107,6 +107,18 @@ type Trend struct {
}
// Compile is used to initialize or cleanup the Trend
//
// thread safe since it should be used close to source
func (t *Trend) Compile(cleanTtl time.Duration, qLength int) {
t.tMux.Lock()
defer t.tMux.Unlock()
t.cleanup(cleanTtl, qLength)
if t.mTotals == nil { // indexes were not yet built
t.computeIndexes()
}
}
// cleanup will clean stale data out of
func (t *Trend) cleanup(ttl time.Duration, qLength int) (altered bool) {
expTime := time.Now().Add(-ttl)

View File

@@ -62,7 +62,7 @@ type TrendS struct {
// computeTrend will query a stat and build the Trend for it
//
// it is be called by Cron service
// it is to be called by Cron service
func (tS *TrendS) computeTrend(tP *TrendProfile) {
var floatMetrics map[string]float64
if err := tS.connMgr.Call(context.Background(), tS.cgrcfg.TrendSCfg().StatSConns,
@@ -94,6 +94,11 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
trend.tMux.Lock()
defer trend.tMux.Unlock()
trend.cleanup(tP.TTL, tP.QueueLength)
if trend.mTotals == nil { // indexes were not yet built
trend.computeIndexes()
}
now := time.Now()
var metrics []string
if len(tP.Metrics) != 0 {
@@ -135,8 +140,8 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
}
// scheduleTrendQueries will schedule/re-schedule specific trend queries
func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []string) (complete bool) {
complete = true
func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []string) (scheduled int, err error) {
var partial bool
for _, tID := range tIDs {
tS.crnTQsMux.RLock()
if entryID, has := tS.crnTQs[tnt][tID]; has {
@@ -148,20 +153,33 @@ func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []
fmt.Sprintf(
"<%s> failed retrieving TrendProfile with id: <%s:%s> for scheduling, error: <%s>",
utils.TrendS, tnt, tID, err.Error()))
complete = false
partial = true
} else if entryID, err := tS.crn.AddFunc(tP.Schedule,
func() { tS.computeTrend(tP.Clone()) }); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> scheduling TrendProfile <%s:%s>, error: <%s>",
utils.TrendS, tnt, tID, err.Error()))
partial = true
} else {
tS.crnTQsMux.Lock()
tS.crnTQs[tP.Tenant][tP.ID] = entryID
tS.crnTQsMux.Unlock()
}
scheduled += 1
}
if partial {
return 0, utils.ErrPartiallyExecuted
}
return
}
// V1ScheduleTrendQueries is the query for manually re-/scheduling Trend Queries
func (tS *TrendS) V1ScheduleTrendQueries(ctx *context.Context, args *utils.ArgScheduleTrendQueries, scheduled *int) (err error) {
if sched, errSched := tS.scheduleTrendQueries(ctx, args.Tenant, args.TrendIDs); errSched != nil {
return errSched
} else {
*scheduled = sched
}
return
}

View File

@@ -1653,3 +1653,8 @@ type ArgExportCDRs struct {
Verbose bool // verbose is used to inform the user about the positive and negative exported cdrs
RPCCDRsFilter
}
type ArgScheduleTrendQueries struct {
TenantIDWithAPIOpts
TrendIDs []string
}