Updated StatS update when set

This commit is contained in:
Trial97
2020-12-10 12:19:35 +02:00
committed by Dan Christian Bogos
parent e362ea84e9
commit fb205ec396
6 changed files with 92 additions and 29 deletions

View File

@@ -137,8 +137,12 @@ func (apierSv1 *APIerSv1) SetResourceProfile(arg *ResourceWithCache, reply *stri
if arg.UsageTTL > 0 {
ttl = &arg.UsageTTL
}
if err = apierSv1.DataManager.SetResource(&engine.Resource{Tenant: arg.Tenant, ID: arg.ID},
ttl, arg.Limit, false); err != nil {
// for non stored we do not save the metrics
if err = apierSv1.DataManager.SetResource(&engine.Resource{
Tenant: arg.Tenant,
ID: arg.ID,
Usages: make(map[string]*engine.ResourceUsage),
}, ttl, arg.Limit, !arg.Stored); err != nil {
return
}
//handle caching for Resource

View File

@@ -91,8 +91,20 @@ func (apierSv1 *APIerSv1) SetStatQueueProfile(arg *engine.StatQueueWithCache, re
if arg.TTL > 0 {
ttl = &arg.TTL
}
if err = apierSv1.DataManager.SetStatQueue(&engine.StatQueue{Tenant: arg.Tenant, ID: arg.ID}, arg.Metrics,
arg.MinItems, ttl, arg.QueueLength, false); err != nil {
sq := &engine.StatQueue{
Tenant: arg.Tenant,
ID: arg.ID,
}
if !arg.Stored { // for not stored queues create the metrics
if sq, err = engine.NewStatQueue(arg.Tenant, arg.ID, arg.Metrics,
arg.MinItems); err != nil {
return err
}
}
// for non stored we do not save the metrics
if err = apierSv1.DataManager.SetStatQueue(sq,
arg.Metrics, arg.MinItems, ttl, arg.QueueLength,
!arg.Stored); err != nil {
return err
}
//handle caching for StatQueues

View File

@@ -743,16 +743,9 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue, metrics []*MetricWithFilters,
return
}
if err == utils.ErrNotFound {
sq = &StatQueue{Tenant: tnt, ID: id, SQMetrics: make(map[string]StatMetric)}
// if the statQueue didn't exists simply initiate all the metrics
for _, metric := range metrics {
var stsMetric StatMetric
if stsMetric, err = NewStatMetric(metric.MetricID,
minItems,
metric.FilterIDs); err != nil {
return
}
sq.SQMetrics[metric.MetricID] = stsMetric
if sq, err = NewStatQueue(tnt, id, metrics, minItems); err != nil {
return
}
} else {
for sqMetricID := range sq.SQMetrics {
@@ -764,20 +757,24 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue, metrics []*MetricWithFilters,
needsRemove = false
break
}
if _, has := sq.SQMetrics[metric.MetricID]; !has {
var stsMetric StatMetric
if stsMetric, err = NewStatMetric(metric.MetricID,
minItems,
metric.FilterIDs); err != nil {
return
}
sq.SQMetrics[metric.MetricID] = stsMetric
}
}
if needsRemove {
delete(sq.SQMetrics, sqMetricID)
}
}
for _, metric := range metrics {
if _, has := sq.SQMetrics[metric.MetricID]; !has {
var stsMetric StatMetric
if stsMetric, err = NewStatMetric(metric.MetricID,
minItems,
metric.FilterIDs); err != nil {
return
}
sq.SQMetrics[metric.MetricID] = stsMetric
}
}
// if the user define a statQueue with an existing metric check if we need to update it based on queue length
sq.ttl = ttl
if _, err = sq.remExpired(); err != nil {

View File

@@ -141,6 +141,22 @@ type SQItem struct {
ExpiryTime *time.Time // Used to auto-expire events
}
func NewStatQueue(tnt, id string, metrics []*MetricWithFilters, minItems int) (sq *StatQueue, err error) {
sq = &StatQueue{
Tenant: tnt,
ID: id,
SQMetrics: make(map[string]StatMetric),
}
for _, metric := range metrics {
if sq.SQMetrics[metric.MetricID], err = NewStatMetric(metric.MetricID,
minItems, metric.FilterIDs); err != nil {
return
}
}
return
}
// StatQueue represents an individual stats instance
type StatQueue struct {
lk sync.RWMutex // protect the elements from within

View File

@@ -1615,8 +1615,13 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) {
return
}
}
// for non stored we do not save the resource
if err = tpr.dm.SetResource(
&Resource{Tenant: rTid.Tenant, ID: rTid.ID}, ttl, limit, false); err != nil {
&Resource{
Tenant: rTid.Tenant,
ID: rTid.ID,
Usages: make(map[string]*ResourceUsage),
}, ttl, limit, !tpr.resProfiles[*rTid].Stored); err != nil {
return
}
if verbose {
@@ -1665,8 +1670,21 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) {
FilterIDs: metric.FilterIDs,
}
}
if err = tpr.dm.SetStatQueue(&StatQueue{Tenant: sqTntID.Tenant, ID: sqTntID.ID}, metrics,
tpr.sqProfiles[*sqTntID].MinItems, ttl, tpr.sqProfiles[*sqTntID].QueueLength, false); err != nil {
sq := &StatQueue{
Tenant: sqTntID.Tenant,
ID: sqTntID.ID,
}
if !tpr.sqProfiles[*sqTntID].Stored { //for not stored queues create the metrics
if sq, err = NewStatQueue(sqTntID.Tenant, sqTntID.ID, metrics,
tpr.sqProfiles[*sqTntID].MinItems); err != nil {
return
}
}
// for non stored we do not save the metrics
if err = tpr.dm.SetStatQueue(sq, metrics,
tpr.sqProfiles[*sqTntID].MinItems,
ttl, tpr.sqProfiles[*sqTntID].QueueLength,
!tpr.sqProfiles[*sqTntID].Stored); err != nil {
return err
}
if verbose {

View File

@@ -340,9 +340,13 @@ func (ldr *Loader) storeLoadedData(loaderType string,
if res.UsageTTL > 0 {
ttl = &res.UsageTTL
}
// for non stored we do not save the resource
if err := ldr.dm.SetResource(
&engine.Resource{Tenant: res.Tenant,
ID: res.ID}, ttl, res.Limit, false); err != nil {
&engine.Resource{
Tenant: res.Tenant,
ID: res.ID,
Usages: make(map[string]*engine.ResourceUsage),
}, ttl, res.Limit, !res.Stored); err != nil {
return err
}
cacheArgs[utils.ResourceProfileIDs] = ids
@@ -416,8 +420,20 @@ func (ldr *Loader) storeLoadedData(loaderType string,
if stsPrf.TTL > 0 {
ttl = &stsPrf.TTL
}
if err := ldr.dm.SetStatQueue(&engine.StatQueue{Tenant: stsPrf.Tenant, ID: stsPrf.ID}, stsPrf.Metrics,
stsPrf.MinItems, ttl, stsPrf.QueueLength, false); err != nil {
sq := &engine.StatQueue{
Tenant: stsPrf.Tenant,
ID: stsPrf.ID,
}
if !stsPrf.Stored { // for not stored queues create the metrics
if sq, err = engine.NewStatQueue(stsPrf.Tenant, stsPrf.ID, stsPrf.Metrics,
stsPrf.MinItems); err != nil {
return err
}
}
// for non stored we do not save the metrics
if err := ldr.dm.SetStatQueue(sq, stsPrf.Metrics,
stsPrf.MinItems, ttl, stsPrf.QueueLength,
!stsPrf.Stored); err != nil {
return err
}
cacheArgs[utils.StatsQueueProfileIDs] = ids