Basic structure of TrendS using cron as scheduler

This commit is contained in:
DanB
2024-09-09 20:53:51 +02:00
parent 932e86aa53
commit 569ddd1956
7 changed files with 158 additions and 91 deletions

View File

@@ -105,10 +105,10 @@ func (apierSv1 *APIerSv1) RemoveTrendProfile(ctx *context.Context, args *utils.T
}
// NewTrendSv1 initializes TrendSV1
func NewTrendSv1(trs *engine.TrendService) *TrendSv1 {
func NewTrendSv1(trs *engine.TrendS) *TrendSv1 {
return &TrendSv1{}
}
type TrendSv1 struct {
trS *engine.TrendService
trS *engine.TrendS
}

82
engine/libtrends.go Normal file
View File

@@ -0,0 +1,82 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"time"
"github.com/cgrates/cgrates/utils"
)
type TrendProfile struct {
Tenant string
ID string
Schedule string // Cron expression scheduling gathering of the metrics
StatID string
Metrics []MetricWithSettings
QueueLength int
TTL time.Duration
TrendType string // *last, *average
ThresholdIDs []string
}
// MetricWithSettings adds specific settings to the Metric
type MetricWithSettings struct {
MetricID string
TrendSwingMargin float64 // allow this margin for *neutral trend
}
type TrendProfileWithAPIOpts struct {
*TrendProfile
APIOpts map[string]any
}
type TrendProfilesAPI struct {
Tenant string
TpIDs []string
}
func (srp *TrendProfile) TenantID() string {
return utils.ConcatenatedKey(srp.Tenant, srp.ID)
}
type TrendWithAPIOpts struct {
*Trend
APIOpts map[string]any
}
// Trend is the unit matched by filters
type Trend struct {
Tenant string
ID string
RunTimes []time.Time
Metrics map[time.Time]map[string]MetricWithTrend
totals map[string]float64 // cached sum, used for average calculations
}
// MetricWithTrend represents one read from StatS
type MetricWithTrend struct {
ID string // Metric ID
Value float64 // Metric Value
Trend string // *positive, *negative, *neutral
}
func (tr *Trend) TenantID() string {
return utils.ConcatenatedKey(tr.Tenant, tr.ID)
}

View File

@@ -19,80 +19,80 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"time"
"fmt"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cron"
)
type TrendProfile struct {
Tenant string
ID string
Schedule string // Cron expression scheduling gathering of the metrics
StatID string
Metrics []MetricWithSettings
QueueLength int
TTL time.Duration
TrendType string // *last, *average
ThresholdIDs []string
}
// MetricWithSettings adds specific settings to the Metric
type MetricWithSettings struct {
MetricID string
TrendSwingMargin float64 // allow this margin for *neutral trend
}
type TrendProfileWithAPIOpts struct {
*TrendProfile
APIOpts map[string]any
}
type TrendProfilesAPI struct {
Tenant string
TpIDs []string
}
func (srp *TrendProfile) TenantID() string {
return utils.ConcatenatedKey(srp.Tenant, srp.ID)
}
type TrendWithAPIOpts struct {
*Trend
APIOpts map[string]any
}
// Trend is the unit matched by filters
type Trend struct {
Tenant string
ID string
RunTimes []time.Time
Metrics map[time.Time]map[string]MetricWithTrend
totals map[string]float64 // cached sum, used for average calculations
}
// MetricWithTrend represents one read from StatS
type MetricWithTrend struct {
ID string // Metric ID
Value float64 // Metric Value
Trend string // *positive, *negative, *neutral
}
func (tr *Trend) TenantID() string {
return utils.ConcatenatedKey(tr.Tenant, tr.ID)
}
// NewTrendService the constructor for TrendS service
func NewTrendService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS) *TrendService {
return &TrendService{
dm: dm,
cgrcfg: cgrcfg,
filterS: filterS,
// NewTrendS is the constructor for TrendS
func NewTrendS(dm *DataManager,
connMgr *ConnManager,
filterS *FilterS,
cgrcfg *config.CGRConfig) *TrendS {
return &TrendS{
dm: dm,
connMgr: connMgr,
filterS: filterS,
cgrcfg: cgrcfg,
loopStopped: make(chan struct{}),
crnTQsMux: new(sync.RWMutex),
crnTQs: make(map[string]map[string]cron.EntryID),
}
}
type TrendService struct {
// TrendS is responsible of implementing the logic of TrendService
type TrendS struct {
dm *DataManager
cgrcfg *config.CGRConfig
connMgr *ConnManager
filterS *FilterS
cgrcfg *config.CGRConfig
crn *cron.Cron // cron reference
crnTQsMux *sync.RWMutex // protects the crnTQs
crnTQs map[string]map[string]cron.EntryID // save the EntryIDs for TrendQueries so we can reschedule them when needed
loopStopped chan struct{}
}
// computeTrend will query a stat and build the Trend for it
//
// it is be called by Cron service
func (tS *TrendS) computeTrend(tP *TrendProfile) (err error) {
return
}
// scheduleTrendQueries will schedule/re-schedule specific trend queries
func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []string) (complete bool) {
complete = true
for _, tID := range tIDs {
tS.crnTQsMux.RLock()
if entryID, has := tS.crnTQs[tnt][tID]; has {
tS.crn.Remove(entryID) // deschedule the query
}
tS.crnTQsMux.RUnlock()
if tP, err := tS.dm.GetTrendProfile(tnt, tID, true, true, utils.NonTransactional); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> failed retrieving TrendProfile with id: <%s:%s> for scheduling, error: <%s>",
utils.TrendS, tnt, tID, err.Error()))
complete = false
} else if entryID, err := tS.crn.AddFunc(tP.Schedule,
func() { tS.computeTrend(tP) }); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> scheduling TrendProfile <%s:%s>, error: <%s>",
utils.TrendS, tnt, tID, err.Error()))
} else {
tS.crnTQsMux.Lock()
tS.crnTQs[tP.Tenant][tP.ID] = entryID
}
}
return
}

View File

@@ -20,8 +20,6 @@ package engine
import (
"testing"
"github.com/cgrates/cgrates/config"
)
func TestTrendProfileTenantID(t *testing.T) {
@@ -47,19 +45,3 @@ func TestTrendTenantID(t *testing.T) {
t.Errorf("TenantID() = %v; want %v", result, expected)
}
}
func TestNewTrendService(t *testing.T) {
dm := &DataManager{}
cgrcfg := &config.CGRConfig{}
filterS := &FilterS{}
result := NewTrendService(dm, cgrcfg, filterS)
if result.dm != dm {
t.Errorf("Expected dm to be %v, got %v", dm, result.dm)
}
if result.cgrcfg != cgrcfg {
t.Errorf("Expected cgrcfg to be %v, got %v", cgrcfg, result.cgrcfg)
}
if result.filterS != filterS {
t.Errorf("Expected filterS to be %v, got %v", filterS, result.filterS)
}
}

1
go.mod
View File

@@ -20,6 +20,7 @@ require (
github.com/cgrates/aringo v0.0.0-20220525160735-b5990313d99e
github.com/cgrates/baningo v0.0.0-20210413080722-004ffd5e429f
github.com/cgrates/birpc v1.3.1-0.20211117095917-5b0ff29f3084
github.com/cgrates/cron v0.0.0-20201129173550-63ea3d835706
github.com/cgrates/fsock v0.0.0-20240522220429-b6cc1d96fd2b
github.com/cgrates/janusgo v0.0.0-20240503152118-188a408d7e73
github.com/cgrates/kamevapi v0.0.0-20240307160311-26273f03eedf

2
go.sum
View File

@@ -75,6 +75,8 @@ github.com/cgrates/baningo v0.0.0-20210413080722-004ffd5e429f h1:dCp5BflGB8I8wlh
github.com/cgrates/baningo v0.0.0-20210413080722-004ffd5e429f/go.mod h1:3SwVROaS1Iml5lqEhj0gRhDRtmbBgypZpKcEkVTSleU=
github.com/cgrates/birpc v1.3.1-0.20211117095917-5b0ff29f3084 h1:YIEepjEOjeHaFrewWaar/JkXYiDgO7gRw/R1zWITxEw=
github.com/cgrates/birpc v1.3.1-0.20211117095917-5b0ff29f3084/go.mod h1:z/PmNnDPqSQALedKJv5T8+eXIq6XHa9J0St1YsvAVns=
github.com/cgrates/cron v0.0.0-20201129173550-63ea3d835706 h1:5HOoV63Xcbnx9q6yknmSzaMoChgTnSrncPsCfrWDRow=
github.com/cgrates/cron v0.0.0-20201129173550-63ea3d835706/go.mod h1:I9cUDn/uzkakr0hmYTjXkQqf6wagg44L2p01gSYRRz0=
github.com/cgrates/fsock v0.0.0-20240522220429-b6cc1d96fd2b h1:PQzDye+0GcgJ3cKG5NcAOjdRyX0v76ZFkolu3X70fbs=
github.com/cgrates/fsock v0.0.0-20240522220429-b6cc1d96fd2b/go.mod h1:bKByLko2HF33K+PbiiToAgevrrbr96C+7Pp3HGS6oag=
github.com/cgrates/janusgo v0.0.0-20240503152118-188a408d7e73 h1:7AYhvpegrSkY9tLGCQsZgNl8yTjL5CaQOTr3/kYlPek=

View File

@@ -59,7 +59,7 @@ type TrendService struct {
server *cores.Server
connMgr *engine.ConnManager
trs *engine.TrendService
trs *engine.TrendS
connChan chan birpc.ClientConnector
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
@@ -76,14 +76,14 @@ func (trs *TrendService) Start() error {
filterS := <-trs.filterSChan
trs.filterSChan <- filterS
dbchan := trs.dm.GetDMChan()
datadb := <-dbchan
dbchan <- datadb
dm := <-dbchan
dbchan <- dm
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem",
utils.CoreS, utils.TrendS))
trs.Lock()
defer trs.Unlock()
trs.trs = engine.NewTrendService(datadb, trs.cfg, filterS)
trs.trs = engine.NewTrendS(dm, trs.connMgr, filterS, trs.cfg)
srv, err := engine.NewService(v1.NewTrendSv1(trs.trs))
if err != nil {
return err
@@ -91,7 +91,7 @@ func (trs *TrendService) Start() error {
if !trs.cfg.DispatcherSCfg().Enabled {
trs.server.RpcRegister(srv)
}
trs.connChan <- trs.anz.GetInternalCodec(srv, utils.StatS)
trs.connChan <- trs.anz.GetInternalCodec(srv, utils.TrendS)
return nil
}