revise tests for trends storeinterval && file readers

This commit is contained in:
gezimbll
2024-11-26 11:22:02 +01:00
committed by Dan Christian Bogos
parent f3a80014a3
commit cf44898b98
8 changed files with 215 additions and 145 deletions

View File

@@ -1302,7 +1302,6 @@ func (dm *DataManager) RemoveStatQueueProfile(tenant, id string, withIndex bool)
func (dm *DataManager) GetTrend(tenant, id string,
cacheRead, cacheWrite bool, transactionID string) (tr *Trend, err error) {
tntID := utils.ConcatenatedKey(tenant, id)
if cacheRead {
if x, ok := Cache.Get(utils.CacheTrends, tntID); ok {
if x == nil {
@@ -1367,7 +1366,7 @@ func (dm *DataManager) SetTrend(tr *Trend) (err error) {
return utils.ErrNoDatabaseConn
}
if dm.dataDB.GetStorageType() != utils.MetaInternal {
if err = tr.compress(dm.ms); err != nil {
if tr, err = tr.compress(dm.ms); err != nil {
return
}
}

View File

@@ -119,6 +119,25 @@ type Trend struct {
}
func (t *Trend) Clone() (tC *Trend) {
tC = &Trend{
Tenant: t.Tenant,
ID: t.ID,
}
if t.RunTimes != nil {
tC.RunTimes = make([]time.Time, len(t.RunTimes))
copy(tC.RunTimes, t.RunTimes)
}
if t.CompressedMetrics != nil {
tC.CompressedMetrics = make([]byte, len(t.CompressedMetrics))
copy(tC.CompressedMetrics, t.CompressedMetrics)
}
if t.Metrics != nil {
tC.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
for key, val := range t.Metrics {
tC.Metrics[key] = val
}
}
return
}
@@ -143,17 +162,19 @@ func (t *Trend) asTrendSummary() (ts *TrendSummary) {
return
}
func (t *Trend) compress(ms Marshaler) (err error) {
func (t *Trend) compress(ms Marshaler) (tr *Trend, err error) {
if config.CgrConfig().TrendSCfg().StoreUncompressedLimit > len(t.RunTimes) {
return
}
t.CompressedMetrics, err = ms.Marshal(t.Metrics)
if err != nil {
return
tr = &Trend{
Tenant: t.Tenant,
ID: t.ID,
}
t.Metrics = nil
t.RunTimes = nil
return nil
tr.CompressedMetrics, err = ms.Marshal(t.Metrics)
if err != nil {
return nil, err
}
return tr, nil
}
func (t *Trend) uncompress(ms Marshaler) (err error) {
@@ -164,7 +185,7 @@ func (t *Trend) uncompress(ms Marshaler) (err error) {
if err != nil {
return
}
t.CompressedMetrics = []byte{}
t.CompressedMetrics = nil
t.RunTimes = make([]time.Time, len(t.Metrics))
i := 0
for key := range t.Metrics {

View File

@@ -442,20 +442,20 @@ func TestTrendCompressSuccess(t *testing.T) {
RunTimes: []time.Time{time.Now()},
}
err := trend.compress(marshaler)
tr, err := trend.compress(marshaler)
if err != nil {
t.Errorf("Expected no error, got <%+v>", err)
}
if trend.CompressedMetrics == nil {
t.Errorf("Expected CompressedMetrics to be populated, got: %+v", trend.CompressedMetrics)
if tr.CompressedMetrics == nil {
t.Errorf("Expected CompressedMetrics to be populated, got: %+v", tr.CompressedMetrics)
}
if trend.Metrics != nil {
t.Errorf("Expected Metrics to be nil after compression, got: %+v", trend.Metrics)
if tr.Metrics != nil {
t.Errorf("Expected Metrics to be nil after compression, got: %+v", tr.Metrics)
}
if trend.RunTimes != nil {
t.Errorf("Expected RunTimes to be nil after compression, got: %+v", trend.RunTimes)
if tr.RunTimes != nil {
t.Errorf("Expected RunTimes to be nil after compression, got: %+v", tr.RunTimes)
}
}

View File

@@ -287,14 +287,14 @@ func (tS *TrendS) storeTrends() {
continue
}
trnd := trndIf.(*Trend)
trnd.tMux.RLock()
trnd.tMux.Lock()
if err := tS.dm.SetTrend(trnd); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q",
utils.TrendS, trndID, err))
failedTrndIDs = append(failedTrndIDs, trndID) // record failure so we can schedule it for next backup
}
trnd.tMux.RUnlock()
trnd.tMux.Unlock()
// randomize the CPU load and give up thread control
runtime.Gosched()
}
@@ -467,8 +467,11 @@ func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, retTr
if trnd, err = tS.dm.GetTrend(arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
return
}
trnd.tMux.RLock()
defer trnd.tMux.RUnlock()
retTrend.Tenant = trnd.Tenant // avoid vet complaining for mutex copying
retTrend.ID = trnd.ID
startIdx := arg.RunIndexStart
if startIdx > len(trnd.RunTimes) {
startIdx = len(trnd.RunTimes)