mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
TrendS scheduleAutomaticQueries function
This commit is contained in:
@@ -143,8 +143,12 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
|
||||
}
|
||||
|
||||
// StartCron will activates the Cron, together with all scheduled Trend queries
|
||||
func (tS *TrendS) StartCron() {
|
||||
func (tS *TrendS) StartCron() error {
|
||||
if err := tS.scheduleAutomaticQueries(); err != nil {
|
||||
return err
|
||||
}
|
||||
tS.crn.Start()
|
||||
return nil
|
||||
}
|
||||
|
||||
// StopCron will shutdown the Cron tasks
|
||||
@@ -152,14 +156,44 @@ func (tS *TrendS) StopCron() {
|
||||
ctx := tS.crn.Stop()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(tS.cgrcfg.CoreSCfg().ShutdownTimeout):
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> timeout waiting for Cron to finish",
|
||||
utils.TrendS))
|
||||
}
|
||||
}
|
||||
|
||||
// scheduleAutomaticQueries will schedule the queries at start/reload based on configured
|
||||
func (tS *TrendS) scheduleAutomaticQueries() error {
|
||||
schedData := make(map[string][]string)
|
||||
for k, v := range tS.cgrcfg.TrendSCfg().ScheduledIDs {
|
||||
schedData[k] = v
|
||||
}
|
||||
var tnts []string
|
||||
if len(schedData) == 0 {
|
||||
tnts = make([]string, 0)
|
||||
}
|
||||
for tnt, tIDs := range schedData {
|
||||
if len(tIDs) == 0 {
|
||||
tnts = append(tnts, tnt)
|
||||
}
|
||||
}
|
||||
if tnts != nil {
|
||||
qrydData, err := tS.dm.GetTrendProfileIDs(tnts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for tnt, ids := range qrydData {
|
||||
schedData[tnt] = ids
|
||||
}
|
||||
}
|
||||
for tnt, tIDs := range schedData {
|
||||
if _, err := tS.scheduleTrendQueries(context.TODO(), tnt, tIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// scheduleTrendQueries will schedule/re-schedule specific trend queries
|
||||
|
||||
@@ -83,7 +83,9 @@ func (trs *TrendService) Start() error {
|
||||
trs.Lock()
|
||||
defer trs.Unlock()
|
||||
trs.trs = engine.NewTrendS(dm, trs.connMgr, filterS, trs.cfg)
|
||||
trs.trs.StartCron()
|
||||
if err := trs.trs.StartCron(); err != nil {
|
||||
return err
|
||||
}
|
||||
srv, err := engine.NewService(v1.NewTrendSv1(trs.trs))
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
Reference in New Issue
Block a user