Alpha code for TrendS

This commit is contained in:
DanB
2024-10-05 20:30:22 +02:00
parent bb6f9ca5ae
commit 86ed9e932b
9 changed files with 414 additions and 72 deletions

View File

@@ -74,6 +74,7 @@ The components from the diagram can be found documented in the links bellow:
resources
routes
stats
trends
thresholds
filters
dispatchers

66
docs/trends.rst Normal file
View File

@@ -0,0 +1,66 @@
.. _trends:
TrendS
=====
**TrendS** is a standalone subsystem part of the **CGRateS** infrastructure, designed to store *StatS* in a time-series-like database and calculate trend percentages based on their evolution.
Complete interaction with **TrendS** is possible via `CGRateS RPC APIs <https://pkg.go.dev/github.com/cgrates/cgrates/apier@master/>`_.
Due it's real-time nature, **StatS** are designed towards high throughput being able to process thousands of *Events* per second. This is doable since each *StatQueue* is a very light object, held in memory and eventually backed up in *DataDB*.
Processing logic
----------------
Parameters
----------
TrendS
^^^^^^
**TrendS** is the **CGRateS** component responsible of handling the *Trend* queries.
It is configured within **trends** section from :ref:`JSON configuration <configuration>` via the following parameters:
enabled
Will enable starting of the service. Possible values: <true|false>.
TrendProfile
^^^^^^^^^^^^
Ís made of the following fields:
Tenant
The tenant on the platform (one can see the tenant as partition ID).
ID
Identifier for the *TrendProfile*, unique within a *Tenant*.
FilterIDs
List of *FilterProfileIDs* which should match in order to consider the profile matching the event.
Trend
^^^^^
Use cases
---------
* Aggregate various traffic metrics for traffic transparency.
* Revenue assurance applications.
* Fraud detection by aggregating specific billing metrics during sensitive time intervals (\*acc, \*tcc, \*tcd).
* Building call patterns.
* Building statistical information to train systems capable of artificial intelligence.
* Building quality metrics used in traffic routing.

View File

@@ -1292,6 +1292,7 @@ func (dm *DataManager) RemoveStatQueueProfile(tenant, id string, withIndex bool)
func (dm *DataManager) GetTrend(tenant, id string,
cacheRead, cacheWrite bool, transactionID string) (tr *Trend, err error) {
tntID := utils.ConcatenatedKey(tenant, id)
if cacheRead {
if x, ok := Cache.Get(utils.CacheTrends, tntID); ok {
if x == nil {
@@ -1300,40 +1301,53 @@ func (dm *DataManager) GetTrend(tenant, id string,
return x.(*Trend), nil
}
}
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
tr, err = dm.dataDB.GetTrendDrv(tenant, id)
if err != nil {
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends]; err == utils.ErrNotFound && itm.Remote {
if tr, err = dm.dataDB.GetTrendDrv(tenant, id); err != nil {
if err != utils.ErrNotFound { // database error
return
}
// ErrNotFound
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends]; itm.Remote {
if err = dm.connMgr.Call(context.TODO(), config.CgrConfig().DataDbCfg().RmtConns,
utils.ReplicatorSv1GetTrend, &utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString,
utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID,
config.CgrConfig().GeneralCfg().NodeID)),
}, &tr); err == nil {
err = dm.dataDB.SetTrendDrv(tr)
}, &tr); err != nil {
err = utils.CastRPCErr(err)
if err != utils.ErrNotFound { // RPC error
return
}
} else if err = dm.dataDB.SetTrendDrv(tr); err != nil { // Save the Trend received from remote
return
}
}
if err != nil {
err = utils.CastRPCErr(err)
if err == utils.ErrNotFound && cacheWrite {
if errCh := Cache.Set(utils.CacheTrends, tntID, nil, nil,
cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
// have Trend or ErrNotFound
if err == utils.ErrNotFound {
if cacheWrite {
if errCache := Cache.Set(utils.CacheTrends, tntID, nil, nil,
cacheCommit(transactionID), transactionID); errCache != nil {
return nil, errCache
}
}
return nil, err
return
}
}
if cacheWrite {
if errCh := Cache.Set(utils.CacheTrends, tntID, tr, nil,
cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
}
return
}
@@ -1346,14 +1360,16 @@ func (dm *DataManager) SetTrend(tr *Trend) (err error) {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends]; itm.Replicate {
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.TrendPrefix, tr.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetTrend,
&TrendWithAPIOpts{
Trend: tr,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}); err != nil {
return
}
}
return
}
@@ -1489,16 +1505,9 @@ func (dm *DataManager) SetTrendProfile(trp *TrendProfile) (err error) {
if oldTrd == nil ||
oldTrd.QueueLength != trp.QueueLength ||
oldTrd.Schedule != trp.Schedule {
err = dm.SetTrend(&Trend{
Tenant: trp.Tenant,
ID: trp.ID,
})
} else if _, errTr := dm.GetTrend(trp.Tenant, trp.ID,
true, false, utils.NonTransactional); errTr == utils.ErrNotFound {
err = dm.SetTrend(&Trend{
Tenant: trp.Tenant,
ID: trp.ID,
})
if err = dm.SetTrend(NewTrendFromProfile(trp)); err != nil {
return
}
}
return
}

View File

@@ -85,21 +85,37 @@ type TrendWithAPIOpts struct {
APIOpts map[string]any
}
// NewTrendFromProfile is a constructor for an empty trend out of it's profile
func NewTrendFromProfile(tP *TrendProfile) *Trend {
return &Trend{
Tenant: tP.Tenant,
ID: tP.ID,
RunTimes: make([]time.Time, 0),
Metrics: make(map[time.Time]map[string]*MetricWithTrend),
tPrfl: tP,
}
}
// Trend is the unit matched by filters
type Trend struct {
tMux sync.RWMutex
Tenant string
ID string
RunTimes []time.Time
Metrics map[time.Time]map[string]*MetricWithTrend
CompressedMetrics []byte // if populated, Metrics will be emty
Tenant string
ID string
RunTimes []time.Time
Metrics map[time.Time]map[string]*MetricWithTrend
// indexes help faster processing
mLast map[string]time.Time // last time a metric was present
mCounts map[string]int // number of times a metric is present in Metrics
mTotals map[string]float64 // cached sum, used for average calculations
tPrfl *TrendProfile // store here the trend profile so we can have it at hands further
}
func (t *Trend) Clone() (tC *Trend) {
return
}
// Compile is used to initialize or cleanup the Trend

View File

@@ -20,6 +20,7 @@ package engine
import (
"fmt"
"runtime"
"sync"
"time"
@@ -35,14 +36,16 @@ func NewTrendS(dm *DataManager,
filterS *FilterS,
cgrcfg *config.CGRConfig) *TrendS {
return &TrendS{
dm: dm,
connMgr: connMgr,
filterS: filterS,
cgrcfg: cgrcfg,
crn: cron.New(),
loopStopped: make(chan struct{}),
crnTQsMux: new(sync.RWMutex),
crnTQs: make(map[string]map[string]cron.EntryID),
dm: dm,
connMgr: connMgr,
filterS: filterS,
cgrcfg: cgrcfg,
crn: cron.New(),
crnTQsMux: new(sync.RWMutex),
crnTQs: make(map[string]map[string]cron.EntryID),
storedTrends: make(utils.StringSet),
storingStopped: make(chan struct{}),
trendStop: make(chan struct{}),
}
}
@@ -58,7 +61,12 @@ type TrendS struct {
crnTQsMux *sync.RWMutex // protects the crnTQs
crnTQs map[string]map[string]cron.EntryID // save the EntryIDs for TrendQueries so we can reschedule them when needed
loopStopped chan struct{}
storedTrends utils.StringSet // keep a record of trends which need saving, map[trendTenantID]bool
sTrndsMux sync.RWMutex // protects storedTrends
storingStopped chan struct{} // signal back that the operations were stopped
trendStop chan struct{} // signal to stop all operations
}
// computeTrend will query a stat and build the Trend for it
@@ -76,27 +84,23 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
utils.TrendS, tP.Tenant, tP.ID, tP.StatID, err.Error()))
return
}
trend, err := tS.dm.GetTrend(tP.Tenant, tP.ID, true, true, utils.NonTransactional)
if err == utils.ErrNotFound {
trend = &Trend{
Tenant: tP.Tenant,
ID: tP.ID,
RunTimes: make([]time.Time, 0),
Metrics: make(map[time.Time]map[string]*MetricWithTrend),
}
} else if err != nil {
trnd, err := tS.dm.GetTrend(tP.Tenant, tP.ID, true, true, utils.NonTransactional)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> querying trend with id: <%s:%s> dm error: <%s>",
utils.TrendS, tP.Tenant, tP.ID, err.Error()))
return
}
trend.tMux.Lock()
defer trend.tMux.Unlock()
trend.cleanup(tP.TTL, tP.QueueLength)
trnd.tMux.Lock()
defer trnd.tMux.Unlock()
if trnd.tPrfl == nil {
trnd.tPrfl = tP
}
trnd.cleanup(tP.TTL, tP.QueueLength)
if len(trend.mTotals) == 0 { // indexes were not yet built
trend.computeIndexes()
if len(trnd.mTotals) == 0 { // indexes were not yet built
trnd.computeIndexes()
}
now := time.Now()
var metrics []string
@@ -111,11 +115,11 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
if len(metrics) == 0 {
return // nothing to compute
}
trend.RunTimes = append(trend.RunTimes, now)
if trend.Metrics == nil {
trend.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
trnd.RunTimes = append(trnd.RunTimes, now)
if trnd.Metrics == nil {
trnd.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
}
trend.Metrics[now] = make(map[string]*MetricWithTrend)
trnd.Metrics[now] = make(map[string]*MetricWithTrend)
for _, mID := range metrics {
mWt := &MetricWithTrend{ID: mID}
var has bool
@@ -124,26 +128,205 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
mWt.TrendLabel = utils.NotAvailable
continue
}
if mWt.TrendGrowth, err = trend.getTrendGrowth(mID, mWt.Value, tP.CorrelationType,
if mWt.TrendGrowth, err = trnd.getTrendGrowth(mID, mWt.Value, tP.CorrelationType,
tS.cgrcfg.GeneralCfg().RoundingDecimals); err != nil {
mWt.TrendLabel = utils.NotAvailable
} else {
mWt.TrendLabel = trend.getTrendLabel(mWt.TrendGrowth, tP.Tolerance)
mWt.TrendLabel = trnd.getTrendLabel(mWt.TrendGrowth, tP.Tolerance)
}
trend.Metrics[now][mWt.ID] = mWt
trnd.Metrics[now][mWt.ID] = mWt
}
if err := tS.dm.SetTrend(trend); err != nil {
if err = tS.storeTrend(trnd); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> setting trend with id: <%s:%s> dm error: <%s>",
"<%s> setting Trend with id: <%s:%s> DM error: <%s>",
utils.TrendS, tP.Tenant, tP.ID, err.Error()))
return
}
}
// processThresholds will pass the Trend event to ThresholdS
func (tS *TrendS) processThresholds(trnd *Trend) (err error) {
if len(trnd.RunTimes) == 0 ||
len(trnd.RunTimes) < trnd.tPrfl.MinItems {
return
}
if len(tS.cgrcfg.TrendSCfg().ThresholdSConns) == 0 {
return
}
opts := map[string]any{
utils.MetaEventType: utils.TrendUpdate,
}
var thIDs []string
if len(trnd.tPrfl.ThresholdIDs) != 0 {
if len(trnd.tPrfl.ThresholdIDs) == 1 &&
trnd.tPrfl.ThresholdIDs[0] == utils.MetaNone {
return
}
copy(thIDs, trnd.tPrfl.ThresholdIDs)
}
opts[utils.OptsThresholdsProfileIDs] = thIDs
mtrx := make(map[string]*MetricWithTrend)
for mtID, mtWT := range trnd.Metrics[trnd.RunTimes[len(trnd.RunTimes)-1]] {
mtrx[mtID] = &MetricWithTrend{
ID: mtWT.ID,
Value: mtWT.Value,
TrendGrowth: mtWT.TrendGrowth,
TrendLabel: mtWT.TrendLabel,
}
}
trndEv := &utils.CGREvent{
Tenant: trnd.Tenant,
ID: utils.GenUUID(),
APIOpts: opts,
Event: map[string]any{
utils.TrendID: trnd.ID,
utils.Metrics: mtrx,
},
}
var withErrs bool
var tIDs []string
if err := tS.connMgr.Call(context.TODO(), tS.cgrcfg.TrendSCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent, trndEv, &tIDs); err != nil &&
(len(thIDs) != 0 || err.Error() != utils.ErrNotFound.Error()) {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", utils.TrendS, err.Error(), trndEv))
withErrs = true
}
if withErrs {
err = utils.ErrPartiallyExecuted
}
return
}
// processEEs will pass the Trend event to EEs
func (tS *TrendS) processEEs(trnd *Trend) (err error) {
if len(trnd.RunTimes) == 0 ||
len(trnd.RunTimes) < trnd.tPrfl.MinItems {
return
}
if len(tS.cgrcfg.StatSCfg().EEsConns) == 0 {
return
}
opts := map[string]any{
utils.MetaEventType: utils.TrendUpdate,
}
mtrx := make(map[string]*MetricWithTrend)
for mtID, mtWT := range trnd.Metrics[trnd.RunTimes[len(trnd.RunTimes)-1]] {
mtrx[mtID] = &MetricWithTrend{
ID: mtWT.ID,
Value: mtWT.Value,
TrendGrowth: mtWT.TrendGrowth,
TrendLabel: mtWT.TrendLabel,
}
}
trndEv := &CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: trnd.Tenant,
ID: utils.GenUUID(),
APIOpts: opts,
Event: map[string]any{
utils.TrendID: trnd.ID,
utils.Metrics: mtrx,
},
},
EeIDs: tS.cgrcfg.StatSCfg().EEsExporterIDs,
}
var withErrs bool
var reply map[string]map[string]any
if err := tS.connMgr.Call(context.TODO(), tS.cgrcfg.StatSCfg().EEsConns,
utils.EeSv1ProcessEvent, trndEv, &reply); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %q processing event %+v with EEs.", utils.TrendS, err.Error(), trndEv))
withErrs = true
}
if withErrs {
err = utils.ErrPartiallyExecuted
}
return
}
// storeTrend will store or schedule the trend based on settings
func (tS *TrendS) storeTrend(trnd *Trend) (err error) {
if tS.cgrcfg.StatSCfg().StoreInterval == 0 {
return
}
if tS.cgrcfg.StatSCfg().StoreInterval == -1 {
return tS.dm.SetTrend(trnd)
}
// schedule the asynchronous save, relies for Trend to be in cache
tS.sTrndsMux.Lock()
tS.storedTrends.Add(trnd.TenantID())
tS.sTrndsMux.Unlock()
return
}
// storeTrends will do one round for saving modified trends
//
// from cache to dataDB
// designed to run asynchronously
func (tS *TrendS) storeTrends() {
var failedTrndIDs []string
for {
tS.sTrndsMux.Lock()
trndID := tS.storedTrends.GetOne()
if trndID != utils.EmptyString {
tS.storedTrends.Remove(trndID)
}
tS.sTrndsMux.Unlock()
if trndID == utils.EmptyString {
break // no more keys, backup completed
}
trndIf, ok := Cache.Get(utils.CacheTrends, trndID)
if !ok || trndIf == nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed retrieving from cache Trend with ID: %q",
utils.TrendS, trndID))
failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup
continue
}
trnd := trndIf.(*Trend)
trnd.tMux.RLock()
if err := tS.dm.SetTrend(trnd); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q",
utils.TrendS, trndID, err))
failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup
}
trnd.tMux.RUnlock()
// randomize the CPU load and give up thread control
runtime.Gosched()
}
if len(failedTrndIDs) != 0 { // there were errors on save, schedule the keys for next backup
tS.sTrndsMux.Lock()
tS.storedTrends.AddSlice(failedTrndIDs)
tS.sTrndsMux.Unlock()
}
}
// asyncStoreTrends runs as a backround process, calling storeTrends based on storeInterval
func (tS *TrendS) asyncStoreTrends() {
storeInterval := tS.cgrcfg.StatSCfg().StoreInterval
if storeInterval <= 0 {
close(tS.storingStopped)
return
}
for {
tS.storeTrends()
select {
case <-tS.trendStop:
close(tS.storingStopped)
return
case <-time.After(storeInterval): // continue to another storing loop
}
}
}
// StartCron will activates the Cron, together with all scheduled Trend queries
func (tS *TrendS) StartCron() error {
func (tS *TrendS) StartTrendS() error {
if err := tS.scheduleAutomaticQueries(); err != nil {
return err
}
@@ -152,15 +335,31 @@ func (tS *TrendS) StartCron() error {
}
// StopCron will shutdown the Cron tasks
func (tS *TrendS) StopCron() {
func (tS *TrendS) StopTrendS() {
timeEnd := time.Now().Add(tS.cgrcfg.CoreSCfg().ShutdownTimeout)
ctx := tS.crn.Stop()
close(tS.trendStop)
// Wait for cron
select {
case <-ctx.Done():
case <-time.After(tS.cgrcfg.CoreSCfg().ShutdownTimeout):
case <-time.After(timeEnd.Sub(time.Now())):
utils.Logger.Warning(
fmt.Sprintf(
"<%s> timeout waiting for Cron to finish",
utils.TrendS))
return
}
// Wait for backup and other operations
select {
case <-tS.storingStopped:
case <-time.After(timeEnd.Sub(time.Now())):
utils.Logger.Warning(
fmt.Sprintf(
"<%s> timeout waiting for TrendS to finish",
utils.TrendS))
return
}
}
@@ -242,9 +441,54 @@ func (tS *TrendS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgSchedul
return
}
func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, trend *Trend) (err error) {
var tr *Trend
tr, err = tS.dm.GetTrend(arg.Tenant, arg.ID, true, true, utils.NonTransactional)
*trend = *tr
// V1GetTrend is the API to return the trend Metrics
// The number of runTimes can be filtered based on indexes and times provided as arguments
//
// in this way being possible to work with paginators
func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTrend *Trend) (err error) {
var trnd *Trend
if trnd, err = tS.dm.GetTrend(arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
return
}
retTrend.Tenant = trnd.Tenant // avoid vet complaining for mutex copying
retTrend.ID = trnd.ID
startIdx := arg.RunIndexStart
if startIdx > len(trnd.RunTimes) {
startIdx = len(trnd.RunTimes)
}
endIdx := arg.RunIndexEnd
if endIdx > len(trnd.RunTimes) ||
endIdx < startIdx ||
endIdx == 0 {
endIdx = len(trnd.RunTimes)
}
runTimes := trnd.RunTimes[startIdx:endIdx]
if len(runTimes) == 0 {
return utils.ErrNotFound
}
var tStart, tEnd time.Time
if arg.RunTimeStart == utils.EmptyString {
tStart = runTimes[0]
} else if tStart, err = utils.ParseTimeDetectLayout(arg.RunTimeStart, tS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil {
return
}
if arg.RunTimeEnd == utils.EmptyString {
tEnd = runTimes[len(runTimes)-1].Add(time.Duration(1))
} else if tEnd, err = utils.ParseTimeDetectLayout(arg.RunTimeStart, tS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil {
return
}
retTrend.RunTimes = make([]time.Time, 0, len(runTimes))
for _, runTime := range runTimes {
if !runTime.Before(tStart) && runTime.Before(tEnd) {
retTrend.RunTimes = append(retTrend.RunTimes, runTime)
}
}
if len(runTimes) == 0 { // filtered out all
return utils.ErrNotFound
}
retTrend.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
for _, runTime := range retTrend.RunTimes {
retTrend.Metrics[runTime] = trnd.Metrics[runTime]
}
return
}

View File

@@ -72,7 +72,7 @@ func TestNewTrendS(t *testing.T) {
t.Errorf("Expected CGRConfig to be set correctly, got %v, want %v", trendS.cgrcfg, cgrcfg)
}
if trendS.loopStopped == nil {
if trendS.trendStop == nil {
t.Errorf("Expected loopStopped to be initialized, but got nil")
}
if trendS.crnTQsMux == nil {

View File

@@ -83,7 +83,7 @@ func (trs *TrendService) Start() error {
trs.Lock()
defer trs.Unlock()
trs.trs = engine.NewTrendS(dm, trs.connMgr, filterS, trs.cfg)
if err := trs.trs.StartCron(); err != nil {
if err := trs.trs.StartTrendS(); err != nil {
return err
}
srv, err := engine.NewService(v1.NewTrendSv1(trs.trs))
@@ -107,7 +107,7 @@ func (tr *TrendService) Shutdown() (err error) {
defer tr.srvDep[utils.DataDB].Done()
tr.Lock()
defer tr.Unlock()
tr.trs.StopCron()
tr.trs.StopTrendS()
<-tr.connChan
return
}

View File

@@ -1661,5 +1661,9 @@ type ArgScheduleTrendQueries struct {
type ArgGetTrend struct {
TenantWithAPIOpts
ID string
ID string
RunIndexStart int
RunIndexEnd int
RunTimeStart string
RunTimeEnd string
}

View File

@@ -512,6 +512,7 @@ const (
TotalUsage = "TotalUsage"
StatID = "StatID"
StatIDs = "StatIDs"
TrendID = "TrendID"
BalanceType = "BalanceType"
BalanceID = "BalanceID"
BalanceDestinationIds = "BalanceDestinationIds"
@@ -527,6 +528,7 @@ const (
Units = "Units"
AccountUpdate = "AccountUpdate"
StatUpdate = "StatUpdate"
TrendUpdate = "TrendUpdate"
ResourceUpdate = "ResourceUpdate"
CDR = "CDR"
CDRs = "CDRs"