diff --git a/engine/libtrends_test.go b/engine/libtrends_test.go index cf7238689..ba95556bd 100644 --- a/engine/libtrends_test.go +++ b/engine/libtrends_test.go @@ -138,7 +138,6 @@ func TestTrendCleanUp(t *testing.T) { t4 := now.Add(2 * time.Second) t5 := now.Add(time.Minute) trend := &Trend{ - tMux: new(sync.RWMutex), Tenant: "cgrates.org", ID: "TestTrendCleanUp", diff --git a/engine/trends.go b/engine/trends.go index 951f9ded4..8c28a1acf 100644 --- a/engine/trends.go +++ b/engine/trends.go @@ -124,7 +124,8 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) { mWt.TrendLabel = utils.NotAvailable continue } - if mWt.TrendGrowth, err = trend.getTrendGrowth(mID, mWt.Value, tP.CorrelationType, tS.cgrcfg.GeneralCfg().RoundingDecimals); err != nil { + if mWt.TrendGrowth, err = trend.getTrendGrowth(mID, mWt.Value, tP.CorrelationType, + tS.cgrcfg.GeneralCfg().RoundingDecimals); err != nil { mWt.TrendLabel = utils.NotAvailable } else { mWt.TrendLabel = trend.getTrendLabel(mWt.TrendGrowth, tP.Tolerance) @@ -141,13 +142,24 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) { } -func (tS *TrendS) StartScheduling() { +// StartCron will activates the Cron, together with all scheduled Trend queries +func (tS *TrendS) StartCron() { tS.crn.Start() } -func (tS *TrendS) StopScheduling() { +// StopCron will shutdown the Cron tasks +func (tS *TrendS) StopCron() { ctx := tS.crn.Stop() - <-ctx.Done() + select { + case <-ctx.Done(): + return + case <-time.After(tS.cgrcfg.CoreSCfg().ShutdownTimeout): + utils.Logger.Warning( + fmt.Sprintf( + "<%s> timeout waiting for Cron to finish", + utils.TrendS)) + } + } // scheduleTrendQueries will schedule/re-schedule specific trend queries diff --git a/services/trends.go b/services/trends.go index 755f234c4..b2ec3bd5b 100644 --- a/services/trends.go +++ b/services/trends.go @@ -83,7 +83,7 @@ func (trs *TrendService) Start() error { trs.Lock() defer trs.Unlock() trs.trs = engine.NewTrendS(dm, trs.connMgr, filterS, trs.cfg) - trs.trs.StartScheduling() + trs.trs.StartCron() srv, err := engine.NewService(v1.NewTrendSv1(trs.trs)) if err != nil { return err @@ -105,7 +105,7 @@ func (tr *TrendService) Shutdown() (err error) { defer tr.srvDep[utils.DataDB].Done() tr.Lock() defer tr.Unlock() - tr.trs.StopScheduling() + tr.trs.StopCron() <-tr.connChan return }