added compressing for metrics on trends

This commit is contained in:
gezimbll
2024-10-14 08:39:11 +02:00
committed by Dan Christian Bogos
parent 69f4f08770
commit 3dccb44511
10 changed files with 106 additions and 37 deletions

View File

@@ -1323,11 +1323,17 @@ func (dm *DataManager) GetTrend(tenant, id string,
if err != utils.ErrNotFound { // RPC error
return
}
} else if err = dm.dataDB.SetTrendDrv(tr); err != nil { // Save the Trend received from remote
return
} else {
if dm.dataDB.GetStorageType() != utils.MetaInternal {
if err = tr.compress(dm.ms); err != nil {
return
}
}
if err = dm.dataDB.SetTrendDrv(tr); err != nil {
return
}
}
}
// have Trend or ErrNotFound
if err == utils.ErrNotFound {
if cacheWrite {
@@ -1354,6 +1360,11 @@ func (dm *DataManager) SetTrend(tr *Trend) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
if dm.dataDB.GetStorageType() != utils.MetaInternal {
if err = tr.compress(dm.ms); err != nil {
return
}
}
if err = dm.DataDB().SetTrendDrv(tr); err != nil {
return
}

View File

@@ -20,9 +20,11 @@ package engine
import (
"math"
"slices"
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -120,6 +122,42 @@ func (t *Trend) Clone() (tC *Trend) {
return
}
func (t *Trend) compress(ms Marshaler) (err error) {
maxRL := config.CgrConfig().TrendSCfg().StoreUncompressedLimit
if maxRL > 0 && maxRL > len(t.RunTimes) {
return
}
t.CompressedMetrics, err = ms.Marshal(t.Metrics)
if err != nil {
return
}
t.Metrics = nil
t.RunTimes = nil
return nil
}
func (t *Trend) uncompress(ms Marshaler) (err error) {
maxL := config.CgrConfig().TrendSCfg().StoreUncompressedLimit
if t == nil || (maxL > 0 && t.RunTimes != nil) {
return
}
err = ms.Unmarshal(t.CompressedMetrics, &t.Metrics)
if err != nil {
return
}
t.CompressedMetrics = []byte{}
t.RunTimes = make([]time.Time, len(t.Metrics))
i := 0
for key := range t.Metrics {
t.RunTimes[i] = key
i++
}
slices.SortFunc(t.RunTimes, func(a, b time.Time) int {
return a.Compare(b)
})
return
}
// Compile is used to initialize or cleanup the Trend
//
// thread safe since it should be used close to source

View File

@@ -1612,7 +1612,13 @@ func (ms *MongoStorage) GetTrendDrv(tenant, id string) (*Trend, error) {
}
return decodeErr
})
return tr, err
if err != nil {
return nil, err
}
if err = tr.uncompress(ms.ms); err != nil {
return nil, err
}
return tr, nil
}
func (ms *MongoStorage) SetTrendDrv(tr *Trend) error {

View File

@@ -963,7 +963,7 @@ func (rs *RedisStorage) RemTrendProfileDrv(tenant string, id string) (err error)
return rs.Cmd(nil, redis_DEL, utils.TrendsProfilePrefix+utils.ConcatenatedKey(tenant, id))
}
func (rs *RedisStorage) GetTrendDrv(tenant, id string) (r *Trend, err error) {
func (rs *RedisStorage) GetTrendDrv(tenant, id string) (tr *Trend, err error) {
var values []byte
if err = rs.Cmd(&values, redis_GET, utils.TrendPrefix+utils.ConcatenatedKey(tenant, id)); err != nil {
return
@@ -971,8 +971,13 @@ func (rs *RedisStorage) GetTrendDrv(tenant, id string) (r *Trend, err error) {
err = utils.ErrNotFound
return
}
err = rs.ms.Unmarshal(values, &r)
return
if err = rs.ms.Unmarshal(values, &tr); err != nil {
return
}
if err = tr.uncompress(rs.ms); err != nil {
return nil, err
}
return tr, nil
}
func (rs *RedisStorage) SetTrendDrv(r *Trend) (err error) {