Updated StatQueue handling in datamanager

This commit is contained in:
Trial97
2021-05-21 13:15:19 +03:00
committed by Dan Christian Bogos
parent 30b6cc2c70
commit 8cd952dbe9
16 changed files with 284 additions and 508 deletions

View File

@@ -43,6 +43,9 @@ type DataDBMock struct {
GetResourceProfileDrvF func(ctx *context.Context, tnt, id string) (*ResourceProfile, error)
SetResourceProfileDrvF func(ctx *context.Context, rp *ResourceProfile) error
RemoveResourceProfileDrvF func(ctx *context.Context, tnt, id string) error
GetStatQueueProfileDrvF func(ctx *context.Context, tenant, id string) (sq *StatQueueProfile, err error)
SetStatQueueProfileDrvF func(ctx *context.Context, sq *StatQueueProfile) (err error)
RemStatQueueProfileDrvF func(ctx *context.Context, tenant, id string) (err error)
}
//Storage methods
@@ -148,15 +151,24 @@ func (dbM *DataDBMock) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err
return utils.ErrNotImplemented
}
func (dbM *DataDBMock) GetStatQueueProfileDrv(ctx *context.Context, tenant string, ID string) (sq *StatQueueProfile, err error) {
func (dbM *DataDBMock) GetStatQueueProfileDrv(ctx *context.Context, tenant, id string) (sq *StatQueueProfile, err error) {
if dbM.GetStatQueueProfileDrvF != nil {
return dbM.GetStatQueueProfileDrvF(ctx, tenant, id)
}
return nil, utils.ErrNotImplemented
}
func (dbM *DataDBMock) SetStatQueueProfileDrv(ctx *context.Context, sq *StatQueueProfile) (err error) {
if dbM.SetStatQueueProfileDrvF != nil {
return dbM.SetStatQueueProfileDrvF(ctx, sq)
}
return utils.ErrNotImplemented
}
func (dbM *DataDBMock) RemStatQueueProfileDrv(ctx *context.Context, tenant, id string) (err error) {
if dbM.RemStatQueueProfileDrvF != nil {
return dbM.RemStatQueueProfileDrvF(ctx, tenant, id)
}
return utils.ErrNotImplemented
}

View File

@@ -18,7 +18,6 @@ package engine
import (
"fmt"
"strings"
"time"
"github.com/cgrates/baningo"
"github.com/cgrates/birpc/context"
@@ -278,171 +277,6 @@ func (dm *DataManager) CacheDataFromDB(ctx *context.Context, prfx string, ids []
return
}
// GetStatQueue retrieves a StatQueue from dataDB
// handles caching and deserialization of metrics
func (dm *DataManager) GetStatQueue(ctx *context.Context, tenant, id string,
cacheRead, cacheWrite bool, transactionID string) (sq *StatQueue, err error) {
tntID := utils.ConcatenatedKey(tenant, id)
if cacheRead {
if x, ok := Cache.Get(utils.CacheStatQueues, tntID); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return x.(*StatQueue), nil
}
}
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
sq, err = dm.dataDB.GetStatQueueDrv(ctx, tenant, id)
if err != nil {
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; err == utils.ErrNotFound && itm.Remote {
if err = dm.connMgr.Call(ctx, config.CgrConfig().DataDbCfg().RmtConns, utils.ReplicatorSv1GetStatQueue,
&utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString,
utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID,
config.CgrConfig().GeneralCfg().NodeID)),
}, &sq); err == nil {
var ssq *StoredStatQueue
if dm.dataDB.GetStorageType() != utils.MetaInternal {
// in case of internal we don't marshal
if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil {
return nil, err
}
}
err = dm.dataDB.SetStatQueueDrv(ctx, ssq, sq)
}
}
if err != nil {
if err = utils.CastRPCErr(err); err == utils.ErrNotFound && cacheWrite && dm.dataDB.GetStorageType() != utils.Internal {
if errCh := Cache.Set(ctx, utils.CacheStatQueues, tntID, nil, nil,
cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
}
return nil, err
}
}
if cacheWrite {
if errCh := Cache.Set(ctx, utils.CacheStatQueues, tntID, sq, nil,
cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
}
return
}
// SetStatQueue converts to StoredStatQueue and stores the result in dataDB
func (dm *DataManager) SetStatQueue(ctx *context.Context, sq *StatQueue, metrics []*MetricWithFilters,
minItems int, ttl *time.Duration, queueLength int, simpleSet bool) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
if !simpleSet {
tnt := sq.Tenant // save the tenant
id := sq.ID // save the ID from the initial StatQueue
// handle metrics for statsQueue
sq, err = dm.GetStatQueue(ctx, tnt, id, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return
}
if err == utils.ErrNotFound {
// if the statQueue didn't exists simply initiate all the metrics
if sq, err = NewStatQueue(tnt, id, metrics, minItems); err != nil {
return
}
} else {
for sqMetricID := range sq.SQMetrics {
// we consider that the metric needs to be removed
needsRemove := true
for _, metric := range metrics {
// in case we found the metric in the metrics define by the user we leave it
if sqMetricID == metric.MetricID {
needsRemove = false
break
}
}
if needsRemove {
delete(sq.SQMetrics, sqMetricID)
}
}
for _, metric := range metrics {
if _, has := sq.SQMetrics[metric.MetricID]; !has {
var stsMetric StatMetric
if stsMetric, err = NewStatMetric(metric.MetricID,
minItems,
metric.FilterIDs); err != nil {
return
}
sq.SQMetrics[metric.MetricID] = stsMetric
}
}
// if the user define a statQueue with an existing metric check if we need to update it based on queue length
sq.ttl = ttl
if _, err = sq.remExpired(); err != nil {
return
}
if len(sq.SQItems) > queueLength {
for i := 0; i < queueLength-len(sq.SQItems); i++ {
item := sq.SQItems[0]
if err = sq.remEventWithID(item.EventID); err != nil {
return
}
sq.SQItems = sq.SQItems[1:]
}
}
}
}
var ssq *StoredStatQueue
if dm.dataDB.GetStorageType() != utils.MetaInternal {
// in case of internal we don't marshal
if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil {
return
}
}
if err = dm.dataDB.SetStatQueueDrv(ctx, ssq, sq); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate {
err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.StatQueuePrefix, sq.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetStatQueue,
&StatQueueWithAPIOpts{
StatQueue: sq,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
}
return
}
// RemoveStatQueue removes the StoredStatQueue
func (dm *DataManager) RemoveStatQueue(ctx *context.Context, tenant, id string, transactionID string) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
if err = dm.dataDB.RemStatQueueDrv(ctx, tenant, id); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate {
replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.StatQueuePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveStatQueue,
&utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
}
return
}
// GetFilter returns a filter based on the given ID
func (dm *DataManager) GetFilter(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool,
transactionID string) (fltr *Filter, err error) {
@@ -760,7 +594,6 @@ func (dm *DataManager) SetThresholdProfile(ctx *context.Context, th *ThresholdPr
Tenant: th.Tenant,
ID: th.ID,
Hits: 0,
tPrfl: th,
})
} else if _, errTh := dm.GetThreshold(ctx, th.Tenant, th.ID, // do not try to get the threshold if the configuration changed
true, false, utils.NonTransactional); errTh == utils.ErrNotFound { // the threshold does not exist
@@ -768,7 +601,6 @@ func (dm *DataManager) SetThresholdProfile(ctx *context.Context, th *ThresholdPr
Tenant: th.Tenant,
ID: th.ID,
Hits: 0,
tPrfl: th,
})
}
return
@@ -811,6 +643,112 @@ func (dm *DataManager) RemoveThresholdProfile(ctx *context.Context, tenant, id,
return dm.RemoveThreshold(ctx, tenant, id, transactionID) // remove the thrshold
}
// GetStatQueue retrieves a StatQueue from dataDB
// handles caching and deserialization of metrics
func (dm *DataManager) GetStatQueue(ctx *context.Context, tenant, id string,
cacheRead, cacheWrite bool, transactionID string) (sq *StatQueue, err error) {
tntID := utils.ConcatenatedKey(tenant, id)
if cacheRead {
if x, ok := Cache.Get(utils.CacheStatQueues, tntID); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return x.(*StatQueue), nil
}
}
if dm == nil {
err = utils.ErrNoDatabaseConn
return
}
sq, err = dm.dataDB.GetStatQueueDrv(ctx, tenant, id)
if err != nil {
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; err == utils.ErrNotFound && itm.Remote {
if err = dm.connMgr.Call(ctx, config.CgrConfig().DataDbCfg().RmtConns, utils.ReplicatorSv1GetStatQueue,
&utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString,
utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID,
config.CgrConfig().GeneralCfg().NodeID)),
}, &sq); err == nil {
var ssq *StoredStatQueue
if dm.dataDB.GetStorageType() != utils.MetaInternal {
// in case of internal we don't marshal
if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil {
return nil, err
}
}
err = dm.dataDB.SetStatQueueDrv(ctx, ssq, sq)
}
}
if err != nil {
if err = utils.CastRPCErr(err); err == utils.ErrNotFound && cacheWrite && dm.dataDB.GetStorageType() != utils.Internal {
if errCh := Cache.Set(ctx, utils.CacheStatQueues, tntID, nil, nil,
cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
}
return nil, err
}
}
if cacheWrite {
if errCh := Cache.Set(ctx, utils.CacheStatQueues, tntID, sq, nil,
cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh
}
}
return
}
// SetStatQueue converts to StoredStatQueue and stores the result in dataDB
func (dm *DataManager) SetStatQueue(ctx *context.Context, sq *StatQueue) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
var ssq *StoredStatQueue
if dm.dataDB.GetStorageType() != utils.MetaInternal {
// in case of internal we don't marshal
if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil {
return
}
}
if err = dm.dataDB.SetStatQueueDrv(ctx, ssq, sq); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate {
err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.StatQueuePrefix, sq.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetStatQueue,
&StatQueueWithAPIOpts{
StatQueue: sq,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
}
return
}
// RemoveStatQueue removes the StoredStatQueue
func (dm *DataManager) RemoveStatQueue(ctx *context.Context, tenant, id string, transactionID string) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
if err = dm.dataDB.RemStatQueueDrv(ctx, tenant, id); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate {
replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.StatQueuePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveStatQueue,
&utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
}
return
}
func (dm *DataManager) GetStatQueueProfile(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool,
transactionID string) (sqp *StatQueueProfile, err error) {
tntID := utils.ConcatenatedKey(tenant, id)
@@ -890,14 +828,53 @@ func (dm *DataManager) SetStatQueueProfile(ctx *context.Context, sqp *StatQueueP
}
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles]; itm.Replicate {
err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
if err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.StatQueueProfilePrefix, sqp.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetStatQueueProfile,
&StatQueueProfileWithAPIOpts{
StatQueueProfile: sqp,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}); err != nil {
return
}
}
if oldSts == nil || // create the stats queue if it didn't exist before
oldSts.QueueLength != sqp.QueueLength ||
oldSts.TTL != sqp.TTL ||
oldSts.MinItems != sqp.MinItems ||
(oldSts.Stored != sqp.Stored && oldSts.Stored) { // reset the stats queue if the profile changed this fields
var sq *StatQueue
if sq, err = NewStatQueue(sqp.Tenant, sqp.ID, sqp.Metrics,
sqp.MinItems); err != nil {
return
}
err = dm.SetStatQueue(ctx, sq)
} else if oSq, errRs := dm.GetStatQueue(ctx, sqp.Tenant, sqp.ID, // do not try to get the stats queue if the configuration changed
true, false, utils.NonTransactional); errRs == utils.ErrNotFound { // the stats queue does not exist
var sq *StatQueue
if sq, err = NewStatQueue(sqp.Tenant, sqp.ID, sqp.Metrics,
sqp.MinItems); err != nil {
return
}
err = dm.SetStatQueue(ctx, sq)
} else { // update the metrics if needed
cMetricIDs := utils.StringSet{}
for _, metric := range sqp.Metrics { // add missing metrics and recreate the old metrics that changed
cMetricIDs.Add(metric.MetricID)
if oSqMetric, has := oSq.SQMetrics[metric.MetricID]; !has ||
!utils.SliceStringEqual(oSqMetric.GetFilterIDs(), metric.FilterIDs) { // recreate it if the filter changed
if oSq.SQMetrics[metric.MetricID], err = NewStatMetric(metric.MetricID,
sqp.MinItems, metric.FilterIDs); err != nil {
return
}
}
}
for sqMetricID := range oSq.SQMetrics { // remove the old metrics
if !cMetricIDs.Has(sqMetricID) {
delete(oSq.SQMetrics, sqMetricID)
}
}
}
return
}
@@ -936,7 +913,7 @@ func (dm *DataManager) RemoveStatQueueProfile(ctx *context.Context, tenant, id,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
}
return
return dm.RemoveStatQueue(ctx, tenant, id, transactionID)
}
func (dm *DataManager) GetResource(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool,
@@ -1122,30 +1099,18 @@ func (dm *DataManager) SetResourceProfile(ctx *context.Context, rp *ResourceProf
if oldRes == nil || // create the resource if it didn't exist before
oldRes.UsageTTL != rp.UsageTTL ||
oldRes.Limit != rp.Limit ||
oldRes.Stored != rp.Stored { // reset the resource if the profile changed this fields
var ttl *time.Duration
if rp.UsageTTL > 0 {
ttl = &rp.UsageTTL
}
(oldRes.Stored != rp.Stored && oldRes.Stored) { // reset the resource if the profile changed this fields
err = dm.SetResource(ctx, &Resource{
Tenant: rp.Tenant,
ID: rp.ID,
Usages: make(map[string]*ResourceUsage),
ttl: ttl,
rPrf: rp,
})
} else if _, errRs := dm.GetResource(ctx, rp.Tenant, rp.ID, // do not try to get the resource if the configuration changed
true, false, utils.NonTransactional); errRs == utils.ErrNotFound { // the resource does not exist
var ttl *time.Duration
if rp.UsageTTL > 0 {
ttl = &rp.UsageTTL
}
err = dm.SetResource(ctx, &Resource{
Tenant: rp.Tenant,
ID: rp.ID,
Usages: make(map[string]*ResourceUsage),
ttl: ttl,
rPrf: rp,
})
}
return

View File

@@ -677,16 +677,6 @@ func TestLoadDispatcherHosts(t *testing.T) {
}
}
func TestLoadstatQueues(t *testing.T) {
eStatQueues := []*utils.TenantID{
{Tenant: "cgrates.org", ID: "TestStats"},
{Tenant: "cgrates.org", ID: "TestStats2"},
}
if len(csvr.statQueues) != len(eStatQueues) {
t.Errorf("Failed to load statQueues: %s", utils.ToIJSON(csvr.statQueues))
}
}
func TestLoadAccount(t *testing.T) {
expected := &utils.TPAccount{
TPid: testTPID,

View File

@@ -121,7 +121,7 @@ func (sS *StatService) StoreStatQueue(ctx *context.Context, sq *StatQueue) (err
if sq.dirty == nil || !*sq.dirty {
return
}
if err = sS.dm.SetStatQueue(ctx, sq, nil, 0, nil, 0, true); err != nil {
if err = sS.dm.SetStatQueue(ctx, sq); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<StatS> failed saving StatQueue with ID: %s, error: %s",
sq.TenantID(), err.Error()))

View File

@@ -110,9 +110,9 @@ func TestMatchingStatQueuesForEvent(t *testing.T) {
},
}
stqs := []*StatQueue{
{Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0]},
{Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1]},
{Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2]},
{Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0], SQMetrics: make(map[string]StatMetric)},
{Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1], SQMetrics: make(map[string]StatMetric)},
{Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2], SQMetrics: make(map[string]StatMetric)},
}
statsEvs := []*StatsArgsProcessEvent{
{
@@ -235,7 +235,7 @@ func TestMatchingStatQueuesForEvent(t *testing.T) {
dmSTS.SetStatQueueProfile(context.TODO(), statQueueProfile, true)
}
for _, statQueue := range stqs {
dmSTS.SetStatQueue(context.TODO(), statQueue, nil, 0, nil, 0, true)
dmSTS.SetStatQueue(context.TODO(),statQueue)
}
//Test each statQueueProfile from cache
for _, sqp := range sqps {
@@ -341,9 +341,9 @@ func TestStatQueuesProcessEvent(t *testing.T) {
},
}
stqs := []*StatQueue{
{Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0]},
{Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1]},
{Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2]},
{Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0], SQMetrics: make(map[string]StatMetric)},
{Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1], SQMetrics: make(map[string]StatMetric)},
{Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2], SQMetrics: make(map[string]StatMetric)},
}
statsEvs := []*StatsArgsProcessEvent{
{
@@ -466,7 +466,7 @@ func TestStatQueuesProcessEvent(t *testing.T) {
dmSTS.SetStatQueueProfile(context.TODO(), statQueueProfile, true)
}
for _, statQueue := range stqs {
dmSTS.SetStatQueue(context.TODO(), statQueue, nil, 0, nil, 0, true)
dmSTS.SetStatQueue(context.TODO(),statQueue)
}
//Test each statQueueProfile from cache
for _, sqp := range sqps {
@@ -573,9 +573,9 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) {
},
}
stqs := []*StatQueue{
{Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0]},
{Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1]},
{Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2]},
{Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0], SQMetrics: make(map[string]StatMetric)},
{Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1], SQMetrics: make(map[string]StatMetric)},
{Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2], SQMetrics: make(map[string]StatMetric)},
}
statsEvs := []*StatsArgsProcessEvent{
{
@@ -698,7 +698,7 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) {
dmSTS.SetStatQueueProfile(context.TODO(), statQueueProfile, true)
}
for _, statQueue := range stqs {
dmSTS.SetStatQueue(context.TODO(), statQueue, nil, 0, nil, 0, true)
dmSTS.SetStatQueue(context.TODO(),statQueue)
}
//Test each statQueueProfile from cache
for _, sqp := range sqps {
@@ -805,9 +805,9 @@ func TestStatQueuesV1ProcessEvent(t *testing.T) {
},
}
stqs := []*StatQueue{
{Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0]},
{Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1]},
{Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2]},
{Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0], SQMetrics: make(map[string]StatMetric)},
{Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1], SQMetrics: make(map[string]StatMetric)},
{Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2], SQMetrics: make(map[string]StatMetric)},
}
statsEvs := []*StatsArgsProcessEvent{
{
@@ -930,7 +930,7 @@ func TestStatQueuesV1ProcessEvent(t *testing.T) {
dmSTS.SetStatQueueProfile(context.TODO(), statQueueProfile, true)
}
for _, statQueue := range stqs {
dmSTS.SetStatQueue(context.TODO(), statQueue, nil, 0, nil, 0, true)
dmSTS.SetStatQueue(context.TODO(),statQueue)
}
//Test each statQueueProfile from cache
for _, sqp := range sqps {
@@ -961,7 +961,7 @@ func TestStatQueuesV1ProcessEvent(t *testing.T) {
if err := dmSTS.SetStatQueueProfile(context.TODO(), sqPrf, true); err != nil {
t.Error(err)
}
if err := dmSTS.SetStatQueue(context.TODO(), sq, nil, 0, nil, 0, true); err != nil {
if err := dmSTS.SetStatQueue(context.TODO(),sq); err != nil {
t.Error(err)
}
if tempStat, err := dmSTS.GetStatQueueProfile(context.TODO(), sqPrf.Tenant,

View File

@@ -46,11 +46,7 @@ type TpReader struct {
rateProfiles map[utils.TenantID]*utils.TPRateProfile
actionProfiles map[utils.TenantID]*utils.TPActionProfile
accounts map[utils.TenantID]*utils.TPAccount
// resources []*utils.TenantID // IDs of resources which need creation based on resourceProfiles
statQueues []*utils.TenantID // IDs of statQueues which need creation based on statQueueProfiles
// thresholds []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles
// acntActionPlans map[string][]string
cacheConns []string
cacheConns []string
//schedulerConns []string
isInternalDB bool // do not reload cache if we use internalDB
}
@@ -118,7 +114,6 @@ func (tpr *TpReader) LoadStatsFiltered(tag string) (err error) {
return
}
mapSTs[utils.TenantID{Tenant: st.Tenant, ID: st.ID}] = st
tpr.statQueues = append(tpr.statQueues, &utils.TenantID{Tenant: st.Tenant, ID: st.ID})
}
tpr.sqProfiles = mapSTs
return nil
@@ -429,52 +424,8 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) {
}
}
if len(tpr.sqProfiles) != 0 {
loadIDs[utils.CacheStatQueueProfiles] = loadID
}
if verbose {
log.Print("StatQueues:")
}
for _, sqTntID := range tpr.statQueues {
var ttl *time.Duration
if tpr.sqProfiles[*sqTntID].TTL != utils.EmptyString {
ttl = new(time.Duration)
if *ttl, err = utils.ParseDurationWithNanosecs(tpr.sqProfiles[*sqTntID].TTL); err != nil {
return
}
if *ttl <= 0 {
ttl = nil
}
}
metrics := make([]*MetricWithFilters, len(tpr.sqProfiles[*sqTntID].Metrics))
for i, metric := range tpr.sqProfiles[*sqTntID].Metrics {
metrics[i] = &MetricWithFilters{
MetricID: metric.MetricID,
FilterIDs: metric.FilterIDs,
}
}
sq := &StatQueue{
Tenant: sqTntID.Tenant,
ID: sqTntID.ID,
}
if !tpr.sqProfiles[*sqTntID].Stored { //for not stored queues create the metrics
if sq, err = NewStatQueue(sqTntID.Tenant, sqTntID.ID, metrics,
tpr.sqProfiles[*sqTntID].MinItems); err != nil {
return
}
}
// for non stored we do not save the metrics
if err = tpr.dm.SetStatQueue(context.TODO(), sq, metrics,
tpr.sqProfiles[*sqTntID].MinItems,
ttl, tpr.sqProfiles[*sqTntID].QueueLength,
!tpr.sqProfiles[*sqTntID].Stored); err != nil {
return err
}
if verbose {
log.Print("\t", sqTntID.TenantID())
}
}
if len(tpr.statQueues) != 0 {
loadIDs[utils.CacheStatQueues] = loadID
loadIDs[utils.CacheStatQueueProfiles] = loadID
}
if verbose {
log.Print("ThresholdProfiles:")
@@ -671,12 +622,6 @@ func (tpr *TpReader) ShowStatistics() {
// GetLoadedIds returns the identities loaded for a specific category, useful for cache reloads
func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) {
switch categ {
case utils.StatQueuePrefix:
keys := make([]string, len(tpr.statQueues))
for i, k := range tpr.statQueues {
keys[i] = k.TenantID()
}
return keys, nil
case utils.ResourceProfilesPrefix:
keys := make([]string, len(tpr.resProfiles))
i := 0
@@ -797,17 +742,6 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
log.Print("\t", utils.ConcatenatedKey(tpST.Tenant, tpST.ID))
}
}
if verbose {
log.Print("StatQueues:")
}
for _, sqTntID := range tpr.statQueues {
if err = tpr.dm.RemoveStatQueue(context.TODO(), sqTntID.Tenant, sqTntID.ID, utils.NonTransactional); err != nil {
return
}
if verbose {
log.Print("\t", sqTntID.TenantID())
}
}
if verbose {
log.Print("ThresholdProfiles:")
}
@@ -943,8 +877,6 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
}
if len(tpr.sqProfiles) != 0 {
loadIDs[utils.CacheStatQueueProfiles] = loadID
}
if len(tpr.statQueues) != 0 {
loadIDs[utils.CacheStatQueues] = loadID
}
if len(tpr.thProfiles) != 0 {
@@ -988,7 +920,6 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b
}
// take IDs for each type
rspIDs, _ := tpr.GetLoadedIds(utils.ResourceProfilesPrefix)
stqIDs, _ := tpr.GetLoadedIds(utils.StatQueuePrefix)
stqpIDs, _ := tpr.GetLoadedIds(utils.StatQueueProfilePrefix)
trspfIDs, _ := tpr.GetLoadedIds(utils.ThresholdProfilePrefix)
flrIDs, _ := tpr.GetLoadedIds(utils.FilterPrefix)
@@ -1005,7 +936,7 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b
cacheArgs := map[string][]string{
utils.ResourceProfileIDs: rspIDs,
utils.ResourceIDs: rspIDs,
utils.StatsQueueIDs: stqIDs,
utils.StatsQueueIDs: stqpIDs,
utils.StatsQueueProfileIDs: stqpIDs,
utils.ThresholdIDs: trspfIDs,
utils.ThresholdProfileIDs: trspfIDs,

View File

@@ -411,29 +411,6 @@ func TestCallCacheClear(t *testing.T) {
}
}
func TestGetLoadedIdsStatQueues(t *testing.T) {
tpr := &TpReader{
statQueues: []*utils.TenantID{
{
Tenant: "cgrates.org",
ID: "statQueueID",
},
{
Tenant: "tenant.com",
ID: "mytenantID",
},
},
}
rcv, err := tpr.GetLoadedIds(utils.StatQueuePrefix)
if err != nil {
t.Error(err)
}
expRcv := []string{"cgrates.org:statQueueID", "tenant.com:mytenantID"}
if !reflect.DeepEqual(expRcv, rcv) {
t.Errorf("\nExpected %v but received \n%v", expRcv, rcv)
}
}
func TestGetLoadedIdsResourceProfiles(t *testing.T) {
tpr := &TpReader{
resProfiles: map[utils.TenantID]*utils.TPResourceProfile{
@@ -681,7 +658,7 @@ func TestReloadCache(t *testing.T) {
"DispatcherProfileIDs": {"cgrates.org:dispatcherProfilesID"},
"DispatcherHostIDs": {"cgrates.org:dispatcherHostsID"},
"ResourceIDs": {"cgrates.org:resourceProfilesID"},
"StatsQueueIDs": {"cgrates.org:statQueuesID"},
"StatsQueueIDs": {"cgrates.org:statProfilesID"},
"ThresholdIDs": {"cgrates.org:thresholdProfilesID"},
},
}
@@ -731,12 +708,6 @@ func TestReloadCache(t *testing.T) {
dispatcherHosts: map[utils.TenantID]*utils.TPDispatcherHost{
{Tenant: "cgrates.org", ID: "dispatcherHostsID"}: {},
},
statQueues: []*utils.TenantID{
{
Tenant: "cgrates.org",
ID: "statQueuesID",
},
},
dm: NewDataManager(data, config.CgrConfig().CacheCfg(), cnMgr),
}
tpr.cacheConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)}

View File

@@ -118,7 +118,7 @@ func testDMitCRUDStatQueue(t *testing.T) {
if _, ok := Cache.Get(utils.CacheStatQueues, sq.TenantID()); ok != false {
t.Error("should not be in cache")
}
if err := dm2.SetStatQueue(context.TODO(), sq, nil, 0, nil, 0, true); err != nil {
if err := dm2.SetStatQueue(context.TODO(), sq); err != nil {
t.Error(err)
}
if _, ok := Cache.Get(utils.CacheStatQueues, sq.TenantID()); ok != false {

View File

@@ -399,7 +399,7 @@ func testOnStorITStatQueue(t *testing.T) {
true, false, utils.NonTransactional); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
if err := onStor.SetStatQueue(context.TODO(), sq, nil, 0, nil, 0, true); err != nil {
if err := onStor.SetStatQueue(context.TODO(), sq); err != nil {
t.Error(err)
}
//get from database
@@ -427,7 +427,7 @@ func testOnStorITStatQueue(t *testing.T) {
},
},
}
if err := onStor.SetStatQueue(context.TODO(), sq, nil, 0, nil, 0, true); err != nil {
if err := onStor.SetStatQueue(context.TODO(), sq); err != nil {
t.Error(err)
}