From bb6f9ca5ae5225961f87ff216001ce6f80b29b4b Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 3 Oct 2024 18:41:11 +0200 Subject: [PATCH] TrendS scheduleAutomaticQueries function --- engine/trends.go | 38 ++++++++++++++++++++++++++++++++++++-- services/trends.go | 4 +++- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/engine/trends.go b/engine/trends.go index 8c28a1acf..c43820086 100644 --- a/engine/trends.go +++ b/engine/trends.go @@ -143,8 +143,12 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) { } // StartCron will activates the Cron, together with all scheduled Trend queries -func (tS *TrendS) StartCron() { +func (tS *TrendS) StartCron() error { + if err := tS.scheduleAutomaticQueries(); err != nil { + return err + } tS.crn.Start() + return nil } // StopCron will shutdown the Cron tasks @@ -152,14 +156,44 @@ func (tS *TrendS) StopCron() { ctx := tS.crn.Stop() select { case <-ctx.Done(): - return case <-time.After(tS.cgrcfg.CoreSCfg().ShutdownTimeout): utils.Logger.Warning( fmt.Sprintf( "<%s> timeout waiting for Cron to finish", utils.TrendS)) } +} +// scheduleAutomaticQueries will schedule the queries at start/reload based on configured +func (tS *TrendS) scheduleAutomaticQueries() error { + schedData := make(map[string][]string) + for k, v := range tS.cgrcfg.TrendSCfg().ScheduledIDs { + schedData[k] = v + } + var tnts []string + if len(schedData) == 0 { + tnts = make([]string, 0) + } + for tnt, tIDs := range schedData { + if len(tIDs) == 0 { + tnts = append(tnts, tnt) + } + } + if tnts != nil { + qrydData, err := tS.dm.GetTrendProfileIDs(tnts) + if err != nil { + return err + } + for tnt, ids := range qrydData { + schedData[tnt] = ids + } + } + for tnt, tIDs := range schedData { + if _, err := tS.scheduleTrendQueries(context.TODO(), tnt, tIDs); err != nil { + return err + } + } + return nil } // scheduleTrendQueries will schedule/re-schedule specific trend queries diff --git a/services/trends.go b/services/trends.go index b2ec3bd5b..2fc7be6f5 100644 --- a/services/trends.go +++ b/services/trends.go @@ -83,7 +83,9 @@ func (trs *TrendService) Start() error { trs.Lock() defer trs.Unlock() trs.trs = engine.NewTrendS(dm, trs.connMgr, filterS, trs.cfg) - trs.trs.StartCron() + if err := trs.trs.StartCron(); err != nil { + return err + } srv, err := engine.NewService(v1.NewTrendSv1(trs.trs)) if err != nil { return err