revised trend compress method and added store_interval tests for trends&rankings

This commit is contained in:
gezimbll
2024-12-12 16:24:13 +01:00
committed by Dan Christian Bogos
parent da2052e7b3
commit cd3159d8ea
6 changed files with 602 additions and 16 deletions

View File

@@ -1032,7 +1032,7 @@ func (dm *DataManager) SetTrend(ctx *context.Context, tr *Trend) (err error) {
return utils.ErrNoDatabaseConn
}
if dm.dataDB.GetStorageType() != utils.MetaInternal {
if err = tr.compress(dm.ms); err != nil {
if tr, err = tr.compress(dm.ms); err != nil {
return
}
}
@@ -1391,10 +1391,10 @@ func (dm *DataManager) GetRanking(ctx *context.Context, tenant, id string, cache
}
return nil, err
}
if cacheWrite {
if errCh := Cache.Set(ctx, utils.CacheRankings, tntID, rn, nil, cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
}
if cacheWrite {
if errCh := Cache.Set(ctx, utils.CacheRankings, tntID, rn, nil, cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
}
return

View File

@@ -143,17 +143,19 @@ func (t *Trend) asTrendSummary() (ts *TrendSummary) {
return
}
func (t *Trend) compress(ms utils.Marshaler) (err error) {
func (t *Trend) compress(ms utils.Marshaler) (tr *Trend, err error) {
if config.CgrConfig().TrendSCfg().StoreUncompressedLimit > len(t.RunTimes) {
return
}
t.CompressedMetrics, err = ms.Marshal(t.Metrics)
tr = &Trend{
Tenant: t.Tenant,
ID: t.ID,
}
tr.CompressedMetrics, err = ms.Marshal(tr.Metrics)
if err != nil {
return
}
t.Metrics = nil
t.RunTimes = nil
return nil
return tr, nil
}
func (t *Trend) uncompress(ms utils.Marshaler) (err error) {
@@ -165,7 +167,7 @@ func (t *Trend) uncompress(ms utils.Marshaler) (err error) {
if err != nil {
return
}
t.CompressedMetrics = []byte{}
t.CompressedMetrics = nil
t.RunTimes = make([]time.Time, len(t.Metrics))
i := 0
for key := range t.Metrics {

View File

@@ -285,14 +285,14 @@ func (tS *TrendS) storeTrends(ctx *context.Context) {
continue
}
trnd := trndIf.(*Trend)
trnd.tMux.RLock()
trnd.tMux.Lock()
if err := tS.dm.SetTrend(ctx, trnd); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q",
utils.TrendS, trndID, err))
failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup
}
trnd.tMux.RUnlock()
trnd.tMux.Unlock()
// randomize the CPU load and give up thread control
runtime.Gosched()
}
@@ -465,6 +465,8 @@ func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTr
if trnd, err = tS.dm.GetTrend(ctx, arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
return
}
trnd.tMux.RLock()
defer trnd.tMux.RUnlock()
retTrend.Tenant = trnd.Tenant // avoid vet complaining for mutex copying
retTrend.ID = trnd.ID
startIdx := arg.RunIndexStart