diff --git a/apier/v1/stats.go b/apier/v1/stats.go index 04cdc5609..838c1b08c 100644 --- a/apier/v1/stats.go +++ b/apier/v1/stats.go @@ -61,45 +61,74 @@ func (apierSv1 *APIerSv1) GetStatQueueProfileIDs(args *utils.PaginatorWithTenant } // SetStatQueueProfile alters/creates a StatQueueProfile -func (apierSv1 *APIerSv1) SetStatQueueProfile(arg *engine.StatQueueWithCache, reply *string) error { +func (apierSv1 *APIerSv1) SetStatQueueProfile(arg *engine.StatQueueWithCache, reply *string) (err error) { if missing := utils.MissingStructFields(arg.StatQueueProfile, []string{"Tenant", "ID"}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) } - if err := apierSv1.DataManager.SetStatQueueProfile(arg.StatQueueProfile, true); err != nil { + if err = apierSv1.DataManager.SetStatQueueProfile(arg.StatQueueProfile, true); err != nil { return utils.APIErrorHandler(err) } //generate a loadID for CacheStatQueueProfiles and CacheStatQueues and store it in database //make 1 insert for both StatQueueProfile and StatQueue instead of 2 loadID := time.Now().UnixNano() - if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheStatQueueProfiles: loadID, utils.CacheStatQueues: loadID}); err != nil { + if err = apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheStatQueueProfiles: loadID, utils.CacheStatQueues: loadID}); err != nil { return utils.APIErrorHandler(err) } //handle caching for StatQueueProfile - if err := apierSv1.CallCache(arg.Cache, arg.Tenant, utils.CacheStatQueueProfiles, + if err = apierSv1.CallCache(arg.Cache, arg.Tenant, utils.CacheStatQueueProfiles, arg.TenantID(), &arg.FilterIDs, nil, arg.Opts); err != nil { return utils.APIErrorHandler(err) } - if has, err := apierSv1.DataManager.HasData(utils.StatQueuePrefix, arg.ID, arg.Tenant); err != nil { - return err - } else if !has { - //compose metrics for StatQueue - metrics := make(map[string]engine.StatMetric) + metrics := make(map[string]engine.StatMetric) + var sq *engine.StatQueue + sq, err = apierSv1.DataManager.GetStatQueue(arg.Tenant, arg.ID, true, false, utils.NonTransactional) + if err != nil && err != utils.ErrNotFound { + return + } + if err == utils.ErrNotFound { + // if the statQueue didn't exists simply initiate all the metrics for _, metric := range arg.Metrics { - if stsMetric, err := engine.NewStatMetric(metric.MetricID, arg.MinItems, metric.FilterIDs); err != nil { - return utils.APIErrorHandler(err) - } else { + var stsMetric engine.StatMetric + if stsMetric, err = engine.NewStatMetric(metric.MetricID, + arg.MinItems, + metric.FilterIDs); err != nil { + return + } + metrics[metric.MetricID] = stsMetric + } + sq = &engine.StatQueue{Tenant: arg.Tenant, ID: arg.ID, SQMetrics: metrics} + } else { + for _, metric := range arg.Metrics { + if _, has := sq.SQMetrics[metric.MetricID]; !has { + var stsMetric engine.StatMetric + if stsMetric, err = engine.NewStatMetric(metric.MetricID, + arg.MinItems, + metric.FilterIDs); err != nil { + return + } metrics[metric.MetricID] = stsMetric + } else { + metrics[metric.MetricID] = sq.SQMetrics[metric.MetricID] } } - if err := apierSv1.DataManager.SetStatQueue(&engine.StatQueue{Tenant: arg.Tenant, ID: arg.ID, SQMetrics: metrics}); err != nil { - return utils.APIErrorHandler(err) + sq.SQMetrics = metrics + // if the user define a statQueue with an existing metric check if we need to update it based on queue length + var ttl *time.Duration + if arg.TTL > 0 { + ttl = &arg.TTL } - //handle caching for StatQueues - if err := apierSv1.CallCache(arg.Cache, arg.Tenant, utils.CacheStatQueues, - arg.TenantID(), nil, nil, arg.Opts); err != nil { - return utils.APIErrorHandler(err) + if err = sq.UpdateStatQueue(ttl, arg.QueueLength); err != nil { + return } } + if err = apierSv1.DataManager.SetStatQueue(sq); err != nil { + return err + } + //handle caching for StatQueues + if err := apierSv1.CallCache(arg.Cache, arg.Tenant, utils.CacheStatQueues, + arg.TenantID(), nil, nil, arg.Opts); err != nil { + return utils.APIErrorHandler(err) + } *reply = utils.OK return nil diff --git a/engine/libstats.go b/engine/libstats.go index d316aacd8..79eb6c2e5 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -327,6 +327,24 @@ func (sq *StatQueue) Expand() { sq.SQItems = newSQItems } +//UpdateStatQueue will update the statQueue metrics based on a new ttl and queueLength +func (sq *StatQueue) UpdateStatQueue(ttl *time.Duration, queueLen int) (err error) { + sq.ttl = ttl + if err = sq.remExpired(); err != nil { + return + } + if len(sq.SQItems) > queueLen { + for i := 0; i < queueLen-len(sq.SQItems); i++ { + item := sq.SQItems[0] + if err = sq.remEventWithID(item.EventID); err != nil { + return + } + sq.SQItems = sq.SQItems[1:] + } + } + return +} + // StatQueues is a sortable list of StatQueue type StatQueues []*StatQueue diff --git a/engine/stats.go b/engine/stats.go index a8c5fc44a..94a22c78f 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -197,7 +197,7 @@ func (sS *StatService) matchingStatQueuesForEvent(args *StatsArgsProcessEvent) ( if sqPrfl.Stored && sq.dirty == nil { sq.dirty = utils.BoolPointer(false) } - if sqPrfl.TTL >= 0 { + if sqPrfl.TTL > 0 { sq.ttl = utils.DurationPointer(sqPrfl.TTL) } sq.sqPrfl = sqPrfl diff --git a/engine/tpreader.go b/engine/tpreader.go index 006e2633f..f6d271917 100644 --- a/engine/tpreader.go +++ b/engine/tpreader.go @@ -1120,15 +1120,9 @@ func (tpr *TpReader) LoadStatsFiltered(tag string) (err error) { return } mapSTs[utils.TenantID{Tenant: st.Tenant, ID: st.ID}] = st + tpr.statQueues = append(tpr.statQueues, &utils.TenantID{Tenant: st.Tenant, ID: st.ID}) } tpr.sqProfiles = mapSTs - for tntID := range mapSTs { - if has, err := tpr.dm.HasData(utils.StatQueuePrefix, tntID.ID, tntID.Tenant); err != nil { - return err - } else if !has { - tpr.statQueues = append(tpr.statQueues, &utils.TenantID{Tenant: tntID.Tenant, ID: tntID.ID}) - } - } return nil } @@ -1621,18 +1615,55 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error) } for _, sqTntID := range tpr.statQueues { metrics := make(map[string]StatMetric) - for _, metric := range tpr.sqProfiles[utils.TenantID{Tenant: sqTntID.Tenant, ID: sqTntID.ID}].Metrics { - var stsMetric StatMetric - if stsMetric, err = NewStatMetric(metric.MetricID, - tpr.sqProfiles[utils.TenantID{Tenant: sqTntID.Tenant, ID: sqTntID.ID}].MinItems, - metric.FilterIDs); err != nil { + var sq *StatQueue + sq, err = tpr.dm.GetStatQueue(sqTntID.Tenant, sqTntID.ID, true, false, utils.NonTransactional) + if err != nil && err != utils.ErrNotFound { + return + } + if err == utils.ErrNotFound { + // if the statQueue didn't exists simply initiate all the metrics + for _, metric := range tpr.sqProfiles[*sqTntID].Metrics { + var stsMetric StatMetric + if stsMetric, err = NewStatMetric(metric.MetricID, + tpr.sqProfiles[*sqTntID].MinItems, + metric.FilterIDs); err != nil { + return + } + metrics[metric.MetricID] = stsMetric + } + sq = &StatQueue{Tenant: sqTntID.Tenant, ID: sqTntID.ID, SQMetrics: metrics} + } else { + for _, metric := range tpr.sqProfiles[*sqTntID].Metrics { + if _, has := sq.SQMetrics[metric.MetricID]; !has { + var stsMetric StatMetric + if stsMetric, err = NewStatMetric(metric.MetricID, + tpr.sqProfiles[*sqTntID].MinItems, + metric.FilterIDs); err != nil { + return + } + metrics[metric.MetricID] = stsMetric + } else { + metrics[metric.MetricID] = sq.SQMetrics[metric.MetricID] + } + } + sq.SQMetrics = metrics + var ttl *time.Duration + if tpr.sqProfiles[*sqTntID].TTL != utils.EmptyString { + ttl = new(time.Duration) + if *ttl, err = utils.ParseDurationWithNanosecs(tpr.sqProfiles[*sqTntID].TTL); err != nil { + return + } + if *ttl <= 0 { + ttl = nil + } + } + // if the user define a statQueue with an existing metric check if we need to update it based on queue length + if err = sq.UpdateStatQueue(ttl, tpr.sqProfiles[*sqTntID].QueueLength); err != nil { return } - metrics[metric.MetricID] = stsMetric } - sq := &StatQueue{Tenant: sqTntID.Tenant, ID: sqTntID.ID, SQMetrics: metrics} if err = tpr.dm.SetStatQueue(sq); err != nil { - return + return err } if verbose { log.Print("\t", sqTntID.TenantID())