Update statQueue when update StatQueueProfile

This commit is contained in:
TeoV
2020-09-21 16:48:32 +03:00
committed by Dan Christian Bogos
parent 8f004fd512
commit 4e5af5ea39
4 changed files with 112 additions and 34 deletions

View File

@@ -61,45 +61,74 @@ func (apierSv1 *APIerSv1) GetStatQueueProfileIDs(args *utils.PaginatorWithTenant
}
// SetStatQueueProfile alters/creates a StatQueueProfile
func (apierSv1 *APIerSv1) SetStatQueueProfile(arg *engine.StatQueueWithCache, reply *string) error {
func (apierSv1 *APIerSv1) SetStatQueueProfile(arg *engine.StatQueueWithCache, reply *string) (err error) {
if missing := utils.MissingStructFields(arg.StatQueueProfile, []string{"Tenant", "ID"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
if err := apierSv1.DataManager.SetStatQueueProfile(arg.StatQueueProfile, true); err != nil {
if err = apierSv1.DataManager.SetStatQueueProfile(arg.StatQueueProfile, true); err != nil {
return utils.APIErrorHandler(err)
}
//generate a loadID for CacheStatQueueProfiles and CacheStatQueues and store it in database
//make 1 insert for both StatQueueProfile and StatQueue instead of 2
loadID := time.Now().UnixNano()
if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheStatQueueProfiles: loadID, utils.CacheStatQueues: loadID}); err != nil {
if err = apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheStatQueueProfiles: loadID, utils.CacheStatQueues: loadID}); err != nil {
return utils.APIErrorHandler(err)
}
//handle caching for StatQueueProfile
if err := apierSv1.CallCache(arg.Cache, arg.Tenant, utils.CacheStatQueueProfiles,
if err = apierSv1.CallCache(arg.Cache, arg.Tenant, utils.CacheStatQueueProfiles,
arg.TenantID(), &arg.FilterIDs, nil, arg.Opts); err != nil {
return utils.APIErrorHandler(err)
}
if has, err := apierSv1.DataManager.HasData(utils.StatQueuePrefix, arg.ID, arg.Tenant); err != nil {
return err
} else if !has {
//compose metrics for StatQueue
metrics := make(map[string]engine.StatMetric)
metrics := make(map[string]engine.StatMetric)
var sq *engine.StatQueue
sq, err = apierSv1.DataManager.GetStatQueue(arg.Tenant, arg.ID, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return
}
if err == utils.ErrNotFound {
// if the statQueue didn't exists simply initiate all the metrics
for _, metric := range arg.Metrics {
if stsMetric, err := engine.NewStatMetric(metric.MetricID, arg.MinItems, metric.FilterIDs); err != nil {
return utils.APIErrorHandler(err)
} else {
var stsMetric engine.StatMetric
if stsMetric, err = engine.NewStatMetric(metric.MetricID,
arg.MinItems,
metric.FilterIDs); err != nil {
return
}
metrics[metric.MetricID] = stsMetric
}
sq = &engine.StatQueue{Tenant: arg.Tenant, ID: arg.ID, SQMetrics: metrics}
} else {
for _, metric := range arg.Metrics {
if _, has := sq.SQMetrics[metric.MetricID]; !has {
var stsMetric engine.StatMetric
if stsMetric, err = engine.NewStatMetric(metric.MetricID,
arg.MinItems,
metric.FilterIDs); err != nil {
return
}
metrics[metric.MetricID] = stsMetric
} else {
metrics[metric.MetricID] = sq.SQMetrics[metric.MetricID]
}
}
if err := apierSv1.DataManager.SetStatQueue(&engine.StatQueue{Tenant: arg.Tenant, ID: arg.ID, SQMetrics: metrics}); err != nil {
return utils.APIErrorHandler(err)
sq.SQMetrics = metrics
// if the user define a statQueue with an existing metric check if we need to update it based on queue length
var ttl *time.Duration
if arg.TTL > 0 {
ttl = &arg.TTL
}
//handle caching for StatQueues
if err := apierSv1.CallCache(arg.Cache, arg.Tenant, utils.CacheStatQueues,
arg.TenantID(), nil, nil, arg.Opts); err != nil {
return utils.APIErrorHandler(err)
if err = sq.UpdateStatQueue(ttl, arg.QueueLength); err != nil {
return
}
}
if err = apierSv1.DataManager.SetStatQueue(sq); err != nil {
return err
}
//handle caching for StatQueues
if err := apierSv1.CallCache(arg.Cache, arg.Tenant, utils.CacheStatQueues,
arg.TenantID(), nil, nil, arg.Opts); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK
return nil

View File

@@ -327,6 +327,24 @@ func (sq *StatQueue) Expand() {
sq.SQItems = newSQItems
}
//UpdateStatQueue will update the statQueue metrics based on a new ttl and queueLength
func (sq *StatQueue) UpdateStatQueue(ttl *time.Duration, queueLen int) (err error) {
sq.ttl = ttl
if err = sq.remExpired(); err != nil {
return
}
if len(sq.SQItems) > queueLen {
for i := 0; i < queueLen-len(sq.SQItems); i++ {
item := sq.SQItems[0]
if err = sq.remEventWithID(item.EventID); err != nil {
return
}
sq.SQItems = sq.SQItems[1:]
}
}
return
}
// StatQueues is a sortable list of StatQueue
type StatQueues []*StatQueue

View File

@@ -197,7 +197,7 @@ func (sS *StatService) matchingStatQueuesForEvent(args *StatsArgsProcessEvent) (
if sqPrfl.Stored && sq.dirty == nil {
sq.dirty = utils.BoolPointer(false)
}
if sqPrfl.TTL >= 0 {
if sqPrfl.TTL > 0 {
sq.ttl = utils.DurationPointer(sqPrfl.TTL)
}
sq.sqPrfl = sqPrfl

View File

@@ -1120,15 +1120,9 @@ func (tpr *TpReader) LoadStatsFiltered(tag string) (err error) {
return
}
mapSTs[utils.TenantID{Tenant: st.Tenant, ID: st.ID}] = st
tpr.statQueues = append(tpr.statQueues, &utils.TenantID{Tenant: st.Tenant, ID: st.ID})
}
tpr.sqProfiles = mapSTs
for tntID := range mapSTs {
if has, err := tpr.dm.HasData(utils.StatQueuePrefix, tntID.ID, tntID.Tenant); err != nil {
return err
} else if !has {
tpr.statQueues = append(tpr.statQueues, &utils.TenantID{Tenant: tntID.Tenant, ID: tntID.ID})
}
}
return nil
}
@@ -1621,18 +1615,55 @@ func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error)
}
for _, sqTntID := range tpr.statQueues {
metrics := make(map[string]StatMetric)
for _, metric := range tpr.sqProfiles[utils.TenantID{Tenant: sqTntID.Tenant, ID: sqTntID.ID}].Metrics {
var stsMetric StatMetric
if stsMetric, err = NewStatMetric(metric.MetricID,
tpr.sqProfiles[utils.TenantID{Tenant: sqTntID.Tenant, ID: sqTntID.ID}].MinItems,
metric.FilterIDs); err != nil {
var sq *StatQueue
sq, err = tpr.dm.GetStatQueue(sqTntID.Tenant, sqTntID.ID, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return
}
if err == utils.ErrNotFound {
// if the statQueue didn't exists simply initiate all the metrics
for _, metric := range tpr.sqProfiles[*sqTntID].Metrics {
var stsMetric StatMetric
if stsMetric, err = NewStatMetric(metric.MetricID,
tpr.sqProfiles[*sqTntID].MinItems,
metric.FilterIDs); err != nil {
return
}
metrics[metric.MetricID] = stsMetric
}
sq = &StatQueue{Tenant: sqTntID.Tenant, ID: sqTntID.ID, SQMetrics: metrics}
} else {
for _, metric := range tpr.sqProfiles[*sqTntID].Metrics {
if _, has := sq.SQMetrics[metric.MetricID]; !has {
var stsMetric StatMetric
if stsMetric, err = NewStatMetric(metric.MetricID,
tpr.sqProfiles[*sqTntID].MinItems,
metric.FilterIDs); err != nil {
return
}
metrics[metric.MetricID] = stsMetric
} else {
metrics[metric.MetricID] = sq.SQMetrics[metric.MetricID]
}
}
sq.SQMetrics = metrics
var ttl *time.Duration
if tpr.sqProfiles[*sqTntID].TTL != utils.EmptyString {
ttl = new(time.Duration)
if *ttl, err = utils.ParseDurationWithNanosecs(tpr.sqProfiles[*sqTntID].TTL); err != nil {
return
}
if *ttl <= 0 {
ttl = nil
}
}
// if the user define a statQueue with an existing metric check if we need to update it based on queue length
if err = sq.UpdateStatQueue(ttl, tpr.sqProfiles[*sqTntID].QueueLength); err != nil {
return
}
metrics[metric.MetricID] = stsMetric
}
sq := &StatQueue{Tenant: sqTntID.Tenant, ID: sqTntID.ID, SQMetrics: metrics}
if err = tpr.dm.SetStatQueue(sq); err != nil {
return
return err
}
if verbose {
log.Print("\t", sqTntID.TenantID())