diff --git a/apier/v1/trends.go b/apier/v1/trends.go index 1410b38d0..31b123d0a 100644 --- a/apier/v1/trends.go +++ b/apier/v1/trends.go @@ -105,10 +105,10 @@ func (apierSv1 *APIerSv1) RemoveTrendProfile(ctx *context.Context, args *utils.T } // NewTrendSv1 initializes TrendSV1 -func NewTrendSv1(trs *engine.TrendService) *TrendSv1 { +func NewTrendSv1(trs *engine.TrendS) *TrendSv1 { return &TrendSv1{} } type TrendSv1 struct { - trS *engine.TrendService + trS *engine.TrendS } diff --git a/engine/libtrends.go b/engine/libtrends.go new file mode 100644 index 000000000..76459973e --- /dev/null +++ b/engine/libtrends.go @@ -0,0 +1,82 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "time" + + "github.com/cgrates/cgrates/utils" +) + +type TrendProfile struct { + Tenant string + ID string + Schedule string // Cron expression scheduling gathering of the metrics + StatID string + Metrics []MetricWithSettings + QueueLength int + TTL time.Duration + TrendType string // *last, *average + ThresholdIDs []string +} + +// MetricWithSettings adds specific settings to the Metric +type MetricWithSettings struct { + MetricID string + TrendSwingMargin float64 // allow this margin for *neutral trend +} + +type TrendProfileWithAPIOpts struct { + *TrendProfile + APIOpts map[string]any +} + +type TrendProfilesAPI struct { + Tenant string + TpIDs []string +} + +func (srp *TrendProfile) TenantID() string { + return utils.ConcatenatedKey(srp.Tenant, srp.ID) +} + +type TrendWithAPIOpts struct { + *Trend + APIOpts map[string]any +} + +// Trend is the unit matched by filters +type Trend struct { + Tenant string + ID string + RunTimes []time.Time + Metrics map[time.Time]map[string]MetricWithTrend + totals map[string]float64 // cached sum, used for average calculations +} + +// MetricWithTrend represents one read from StatS +type MetricWithTrend struct { + ID string // Metric ID + Value float64 // Metric Value + Trend string // *positive, *negative, *neutral +} + +func (tr *Trend) TenantID() string { + return utils.ConcatenatedKey(tr.Tenant, tr.ID) +} diff --git a/engine/trends.go b/engine/trends.go index 60a6f148d..3e9773d02 100644 --- a/engine/trends.go +++ b/engine/trends.go @@ -19,80 +19,80 @@ along with this program. If not, see package engine import ( - "time" + "fmt" + "sync" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/cron" ) -type TrendProfile struct { - Tenant string - ID string - Schedule string // Cron expression scheduling gathering of the metrics - StatID string - Metrics []MetricWithSettings - QueueLength int - TTL time.Duration - TrendType string // *last, *average - ThresholdIDs []string -} - -// MetricWithSettings adds specific settings to the Metric -type MetricWithSettings struct { - MetricID string - TrendSwingMargin float64 // allow this margin for *neutral trend -} - -type TrendProfileWithAPIOpts struct { - *TrendProfile - APIOpts map[string]any -} - -type TrendProfilesAPI struct { - Tenant string - TpIDs []string -} - -func (srp *TrendProfile) TenantID() string { - return utils.ConcatenatedKey(srp.Tenant, srp.ID) -} - -type TrendWithAPIOpts struct { - *Trend - APIOpts map[string]any -} - -// Trend is the unit matched by filters -type Trend struct { - Tenant string - ID string - RunTimes []time.Time - Metrics map[time.Time]map[string]MetricWithTrend - totals map[string]float64 // cached sum, used for average calculations -} - -// MetricWithTrend represents one read from StatS -type MetricWithTrend struct { - ID string // Metric ID - Value float64 // Metric Value - Trend string // *positive, *negative, *neutral -} - -func (tr *Trend) TenantID() string { - return utils.ConcatenatedKey(tr.Tenant, tr.ID) -} - -// NewTrendService the constructor for TrendS service -func NewTrendService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS) *TrendService { - return &TrendService{ - dm: dm, - cgrcfg: cgrcfg, - filterS: filterS, +// NewTrendS is the constructor for TrendS +func NewTrendS(dm *DataManager, + connMgr *ConnManager, + filterS *FilterS, + cgrcfg *config.CGRConfig) *TrendS { + return &TrendS{ + dm: dm, + connMgr: connMgr, + filterS: filterS, + cgrcfg: cgrcfg, + loopStopped: make(chan struct{}), + crnTQsMux: new(sync.RWMutex), + crnTQs: make(map[string]map[string]cron.EntryID), } } -type TrendService struct { +// TrendS is responsible of implementing the logic of TrendService +type TrendS struct { dm *DataManager - cgrcfg *config.CGRConfig + connMgr *ConnManager filterS *FilterS + cgrcfg *config.CGRConfig + + crn *cron.Cron // cron reference + + crnTQsMux *sync.RWMutex // protects the crnTQs + crnTQs map[string]map[string]cron.EntryID // save the EntryIDs for TrendQueries so we can reschedule them when needed + + loopStopped chan struct{} +} + +// computeTrend will query a stat and build the Trend for it +// +// it is be called by Cron service +func (tS *TrendS) computeTrend(tP *TrendProfile) (err error) { + return +} + +// scheduleTrendQueries will schedule/re-schedule specific trend queries +func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []string) (complete bool) { + complete = true + for _, tID := range tIDs { + tS.crnTQsMux.RLock() + if entryID, has := tS.crnTQs[tnt][tID]; has { + tS.crn.Remove(entryID) // deschedule the query + } + tS.crnTQsMux.RUnlock() + if tP, err := tS.dm.GetTrendProfile(tnt, tID, true, true, utils.NonTransactional); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> failed retrieving TrendProfile with id: <%s:%s> for scheduling, error: <%s>", + utils.TrendS, tnt, tID, err.Error())) + complete = false + } else if entryID, err := tS.crn.AddFunc(tP.Schedule, + func() { tS.computeTrend(tP) }); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> scheduling TrendProfile <%s:%s>, error: <%s>", + utils.TrendS, tnt, tID, err.Error())) + + } else { + tS.crnTQsMux.Lock() + tS.crnTQs[tP.Tenant][tP.ID] = entryID + } + + } + return } diff --git a/engine/trends_test.go b/engine/trends_test.go index c90f6223c..ed378ad9b 100644 --- a/engine/trends_test.go +++ b/engine/trends_test.go @@ -20,8 +20,6 @@ package engine import ( "testing" - - "github.com/cgrates/cgrates/config" ) func TestTrendProfileTenantID(t *testing.T) { @@ -47,19 +45,3 @@ func TestTrendTenantID(t *testing.T) { t.Errorf("TenantID() = %v; want %v", result, expected) } } - -func TestNewTrendService(t *testing.T) { - dm := &DataManager{} - cgrcfg := &config.CGRConfig{} - filterS := &FilterS{} - result := NewTrendService(dm, cgrcfg, filterS) - if result.dm != dm { - t.Errorf("Expected dm to be %v, got %v", dm, result.dm) - } - if result.cgrcfg != cgrcfg { - t.Errorf("Expected cgrcfg to be %v, got %v", cgrcfg, result.cgrcfg) - } - if result.filterS != filterS { - t.Errorf("Expected filterS to be %v, got %v", filterS, result.filterS) - } -} diff --git a/go.mod b/go.mod index d435a851c..68fae4298 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/cgrates/aringo v0.0.0-20220525160735-b5990313d99e github.com/cgrates/baningo v0.0.0-20210413080722-004ffd5e429f github.com/cgrates/birpc v1.3.1-0.20211117095917-5b0ff29f3084 + github.com/cgrates/cron v0.0.0-20201129173550-63ea3d835706 github.com/cgrates/fsock v0.0.0-20240522220429-b6cc1d96fd2b github.com/cgrates/janusgo v0.0.0-20240503152118-188a408d7e73 github.com/cgrates/kamevapi v0.0.0-20240307160311-26273f03eedf diff --git a/go.sum b/go.sum index 12de92444..ad9216b0a 100644 --- a/go.sum +++ b/go.sum @@ -75,6 +75,8 @@ github.com/cgrates/baningo v0.0.0-20210413080722-004ffd5e429f h1:dCp5BflGB8I8wlh github.com/cgrates/baningo v0.0.0-20210413080722-004ffd5e429f/go.mod h1:3SwVROaS1Iml5lqEhj0gRhDRtmbBgypZpKcEkVTSleU= github.com/cgrates/birpc v1.3.1-0.20211117095917-5b0ff29f3084 h1:YIEepjEOjeHaFrewWaar/JkXYiDgO7gRw/R1zWITxEw= github.com/cgrates/birpc v1.3.1-0.20211117095917-5b0ff29f3084/go.mod h1:z/PmNnDPqSQALedKJv5T8+eXIq6XHa9J0St1YsvAVns= +github.com/cgrates/cron v0.0.0-20201129173550-63ea3d835706 h1:5HOoV63Xcbnx9q6yknmSzaMoChgTnSrncPsCfrWDRow= +github.com/cgrates/cron v0.0.0-20201129173550-63ea3d835706/go.mod h1:I9cUDn/uzkakr0hmYTjXkQqf6wagg44L2p01gSYRRz0= github.com/cgrates/fsock v0.0.0-20240522220429-b6cc1d96fd2b h1:PQzDye+0GcgJ3cKG5NcAOjdRyX0v76ZFkolu3X70fbs= github.com/cgrates/fsock v0.0.0-20240522220429-b6cc1d96fd2b/go.mod h1:bKByLko2HF33K+PbiiToAgevrrbr96C+7Pp3HGS6oag= github.com/cgrates/janusgo v0.0.0-20240503152118-188a408d7e73 h1:7AYhvpegrSkY9tLGCQsZgNl8yTjL5CaQOTr3/kYlPek= diff --git a/services/trends.go b/services/trends.go index 097f2f2e3..e24c13891 100644 --- a/services/trends.go +++ b/services/trends.go @@ -59,7 +59,7 @@ type TrendService struct { server *cores.Server connMgr *engine.ConnManager - trs *engine.TrendService + trs *engine.TrendS connChan chan birpc.ClientConnector anz *AnalyzerService srvDep map[string]*sync.WaitGroup @@ -76,14 +76,14 @@ func (trs *TrendService) Start() error { filterS := <-trs.filterSChan trs.filterSChan <- filterS dbchan := trs.dm.GetDMChan() - datadb := <-dbchan - dbchan <- datadb + dm := <-dbchan + dbchan <- dm utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.TrendS)) trs.Lock() defer trs.Unlock() - trs.trs = engine.NewTrendService(datadb, trs.cfg, filterS) + trs.trs = engine.NewTrendS(dm, trs.connMgr, filterS, trs.cfg) srv, err := engine.NewService(v1.NewTrendSv1(trs.trs)) if err != nil { return err @@ -91,7 +91,7 @@ func (trs *TrendService) Start() error { if !trs.cfg.DispatcherSCfg().Enabled { trs.server.RpcRegister(srv) } - trs.connChan <- trs.anz.GetInternalCodec(srv, utils.StatS) + trs.connChan <- trs.anz.GetInternalCodec(srv, utils.TrendS) return nil }