diff --git a/apier/v1/resourcesv1.go b/apier/v1/resourcesv1.go index a1bfedbd6..2b3fd7970 100644 --- a/apier/v1/resourcesv1.go +++ b/apier/v1/resourcesv1.go @@ -137,8 +137,12 @@ func (apierSv1 *APIerSv1) SetResourceProfile(arg *ResourceWithCache, reply *stri if arg.UsageTTL > 0 { ttl = &arg.UsageTTL } - if err = apierSv1.DataManager.SetResource(&engine.Resource{Tenant: arg.Tenant, ID: arg.ID}, - ttl, arg.Limit, false); err != nil { + // for non stored we do not save the metrics + if err = apierSv1.DataManager.SetResource(&engine.Resource{ + Tenant: arg.Tenant, + ID: arg.ID, + Usages: make(map[string]*engine.ResourceUsage), + }, ttl, arg.Limit, !arg.Stored); err != nil { return } //handle caching for Resource diff --git a/apier/v1/stats.go b/apier/v1/stats.go index 012e9ad98..95245b3a7 100644 --- a/apier/v1/stats.go +++ b/apier/v1/stats.go @@ -91,8 +91,20 @@ func (apierSv1 *APIerSv1) SetStatQueueProfile(arg *engine.StatQueueWithCache, re if arg.TTL > 0 { ttl = &arg.TTL } - if err = apierSv1.DataManager.SetStatQueue(&engine.StatQueue{Tenant: arg.Tenant, ID: arg.ID}, arg.Metrics, - arg.MinItems, ttl, arg.QueueLength, false); err != nil { + sq := &engine.StatQueue{ + Tenant: arg.Tenant, + ID: arg.ID, + } + if !arg.Stored { // for not stored queues create the metrics + if sq, err = engine.NewStatQueue(arg.Tenant, arg.ID, arg.Metrics, + arg.MinItems); err != nil { + return err + } + } + // for non stored we do not save the metrics + if err = apierSv1.DataManager.SetStatQueue(sq, + arg.Metrics, arg.MinItems, ttl, arg.QueueLength, + !arg.Stored); err != nil { return err } //handle caching for StatQueues diff --git a/engine/datamanager.go b/engine/datamanager.go index 3d1a1d99c..9db64c130 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -743,16 +743,9 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue, metrics []*MetricWithFilters, return } if err == utils.ErrNotFound { - sq = &StatQueue{Tenant: tnt, ID: id, SQMetrics: make(map[string]StatMetric)} // if the statQueue didn't exists simply initiate all the metrics - for _, metric := range metrics { - var stsMetric StatMetric - if stsMetric, err = NewStatMetric(metric.MetricID, - minItems, - metric.FilterIDs); err != nil { - return - } - sq.SQMetrics[metric.MetricID] = stsMetric + if sq, err = NewStatQueue(tnt, id, metrics, minItems); err != nil { + return } } else { for sqMetricID := range sq.SQMetrics { @@ -764,20 +757,24 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue, metrics []*MetricWithFilters, needsRemove = false break } - if _, has := sq.SQMetrics[metric.MetricID]; !has { - var stsMetric StatMetric - if stsMetric, err = NewStatMetric(metric.MetricID, - minItems, - metric.FilterIDs); err != nil { - return - } - sq.SQMetrics[metric.MetricID] = stsMetric - } } if needsRemove { delete(sq.SQMetrics, sqMetricID) } } + + for _, metric := range metrics { + if _, has := sq.SQMetrics[metric.MetricID]; !has { + var stsMetric StatMetric + if stsMetric, err = NewStatMetric(metric.MetricID, + minItems, + metric.FilterIDs); err != nil { + return + } + sq.SQMetrics[metric.MetricID] = stsMetric + } + } + // if the user define a statQueue with an existing metric check if we need to update it based on queue length sq.ttl = ttl if _, err = sq.remExpired(); err != nil { diff --git a/engine/libstats.go b/engine/libstats.go index 453bfe23d..9d4c68388 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -141,6 +141,22 @@ type SQItem struct { ExpiryTime *time.Time // Used to auto-expire events } +func NewStatQueue(tnt, id string, metrics []*MetricWithFilters, minItems int) (sq *StatQueue, err error) { + sq = &StatQueue{ + Tenant: tnt, + ID: id, + SQMetrics: make(map[string]StatMetric), + } + + for _, metric := range metrics { + if sq.SQMetrics[metric.MetricID], err = NewStatMetric(metric.MetricID, + minItems, metric.FilterIDs); err != nil { + return + } + } + return +} + // StatQueue represents an individual stats instance type StatQueue struct { lk sync.RWMutex // protect the elements from within diff --git a/engine/tpreader.go b/engine/tpreader.go index bc86f6834..aa651ba59 100644 --- a/engine/tpreader.go +++ b/engine/tpreader.go @@ -1615,8 +1615,13 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) { return } } + // for non stored we do not save the resource if err = tpr.dm.SetResource( - &Resource{Tenant: rTid.Tenant, ID: rTid.ID}, ttl, limit, false); err != nil { + &Resource{ + Tenant: rTid.Tenant, + ID: rTid.ID, + Usages: make(map[string]*ResourceUsage), + }, ttl, limit, !tpr.resProfiles[*rTid].Stored); err != nil { return } if verbose { @@ -1665,8 +1670,21 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) { FilterIDs: metric.FilterIDs, } } - if err = tpr.dm.SetStatQueue(&StatQueue{Tenant: sqTntID.Tenant, ID: sqTntID.ID}, metrics, - tpr.sqProfiles[*sqTntID].MinItems, ttl, tpr.sqProfiles[*sqTntID].QueueLength, false); err != nil { + sq := &StatQueue{ + Tenant: sqTntID.Tenant, + ID: sqTntID.ID, + } + if !tpr.sqProfiles[*sqTntID].Stored { //for not stored queues create the metrics + if sq, err = NewStatQueue(sqTntID.Tenant, sqTntID.ID, metrics, + tpr.sqProfiles[*sqTntID].MinItems); err != nil { + return + } + } + // for non stored we do not save the metrics + if err = tpr.dm.SetStatQueue(sq, metrics, + tpr.sqProfiles[*sqTntID].MinItems, + ttl, tpr.sqProfiles[*sqTntID].QueueLength, + !tpr.sqProfiles[*sqTntID].Stored); err != nil { return err } if verbose { diff --git a/loaders/loader.go b/loaders/loader.go index c0f3803f5..f51ea81d6 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -340,9 +340,13 @@ func (ldr *Loader) storeLoadedData(loaderType string, if res.UsageTTL > 0 { ttl = &res.UsageTTL } + // for non stored we do not save the resource if err := ldr.dm.SetResource( - &engine.Resource{Tenant: res.Tenant, - ID: res.ID}, ttl, res.Limit, false); err != nil { + &engine.Resource{ + Tenant: res.Tenant, + ID: res.ID, + Usages: make(map[string]*engine.ResourceUsage), + }, ttl, res.Limit, !res.Stored); err != nil { return err } cacheArgs[utils.ResourceProfileIDs] = ids @@ -416,8 +420,20 @@ func (ldr *Loader) storeLoadedData(loaderType string, if stsPrf.TTL > 0 { ttl = &stsPrf.TTL } - if err := ldr.dm.SetStatQueue(&engine.StatQueue{Tenant: stsPrf.Tenant, ID: stsPrf.ID}, stsPrf.Metrics, - stsPrf.MinItems, ttl, stsPrf.QueueLength, false); err != nil { + sq := &engine.StatQueue{ + Tenant: stsPrf.Tenant, + ID: stsPrf.ID, + } + if !stsPrf.Stored { // for not stored queues create the metrics + if sq, err = engine.NewStatQueue(stsPrf.Tenant, stsPrf.ID, stsPrf.Metrics, + stsPrf.MinItems); err != nil { + return err + } + } + // for non stored we do not save the metrics + if err := ldr.dm.SetStatQueue(sq, stsPrf.Metrics, + stsPrf.MinItems, ttl, stsPrf.QueueLength, + !stsPrf.Stored); err != nil { return err } cacheArgs[utils.StatsQueueProfileIDs] = ids