From 8cd952dbe9753b2439c80a4a55266eebbd55f889 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 21 May 2021 13:15:19 +0300 Subject: [PATCH] Updated StatQueue handling in datamanager --- engine/datadbmock.go | 14 +- engine/datamanager.go | 333 ++++++++++++++------------------ engine/loader_csv_test.go | 10 - engine/stats.go | 2 +- engine/stats_test.go | 34 ++-- engine/tpreader.go | 75 +------ engine/tpreader_test.go | 31 +-- engine/z_datamanager_it_test.go | 2 +- engine/z_onstor_it_test.go | 4 +- loaders/lib_test.go | 42 ---- loaders/loader.go | 19 -- loaders/loader_test.go | 88 ++++----- migrator/resource_it_test.go | 9 +- migrator/stats.go | 50 ++--- migrator/stats_it_test.go | 70 +++---- migrator/thresholds.go | 9 +- 16 files changed, 284 insertions(+), 508 deletions(-) diff --git a/engine/datadbmock.go b/engine/datadbmock.go index 7b7fc1c2a..5a3b53a8f 100644 --- a/engine/datadbmock.go +++ b/engine/datadbmock.go @@ -43,6 +43,9 @@ type DataDBMock struct { GetResourceProfileDrvF func(ctx *context.Context, tnt, id string) (*ResourceProfile, error) SetResourceProfileDrvF func(ctx *context.Context, rp *ResourceProfile) error RemoveResourceProfileDrvF func(ctx *context.Context, tnt, id string) error + GetStatQueueProfileDrvF func(ctx *context.Context, tenant, id string) (sq *StatQueueProfile, err error) + SetStatQueueProfileDrvF func(ctx *context.Context, sq *StatQueueProfile) (err error) + RemStatQueueProfileDrvF func(ctx *context.Context, tenant, id string) (err error) } //Storage methods @@ -148,15 +151,24 @@ func (dbM *DataDBMock) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err return utils.ErrNotImplemented } -func (dbM *DataDBMock) GetStatQueueProfileDrv(ctx *context.Context, tenant string, ID string) (sq *StatQueueProfile, err error) { +func (dbM *DataDBMock) GetStatQueueProfileDrv(ctx *context.Context, tenant, id string) (sq *StatQueueProfile, err error) { + if dbM.GetStatQueueProfileDrvF != nil { + return dbM.GetStatQueueProfileDrvF(ctx, tenant, id) + } return nil, utils.ErrNotImplemented } func (dbM *DataDBMock) SetStatQueueProfileDrv(ctx *context.Context, sq *StatQueueProfile) (err error) { + if dbM.SetStatQueueProfileDrvF != nil { + return dbM.SetStatQueueProfileDrvF(ctx, sq) + } return utils.ErrNotImplemented } func (dbM *DataDBMock) RemStatQueueProfileDrv(ctx *context.Context, tenant, id string) (err error) { + if dbM.RemStatQueueProfileDrvF != nil { + return dbM.RemStatQueueProfileDrvF(ctx, tenant, id) + } return utils.ErrNotImplemented } diff --git a/engine/datamanager.go b/engine/datamanager.go index ecba8ccef..6a900da7b 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -18,7 +18,6 @@ package engine import ( "fmt" "strings" - "time" "github.com/cgrates/baningo" "github.com/cgrates/birpc/context" @@ -278,171 +277,6 @@ func (dm *DataManager) CacheDataFromDB(ctx *context.Context, prfx string, ids [] return } -// GetStatQueue retrieves a StatQueue from dataDB -// handles caching and deserialization of metrics -func (dm *DataManager) GetStatQueue(ctx *context.Context, tenant, id string, - cacheRead, cacheWrite bool, transactionID string) (sq *StatQueue, err error) { - tntID := utils.ConcatenatedKey(tenant, id) - if cacheRead { - if x, ok := Cache.Get(utils.CacheStatQueues, tntID); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.(*StatQueue), nil - } - } - if dm == nil { - err = utils.ErrNoDatabaseConn - return - } - sq, err = dm.dataDB.GetStatQueueDrv(ctx, tenant, id) - if err != nil { - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; err == utils.ErrNotFound && itm.Remote { - if err = dm.connMgr.Call(ctx, config.CgrConfig().DataDbCfg().RmtConns, utils.ReplicatorSv1GetStatQueue, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString, - utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID, - config.CgrConfig().GeneralCfg().NodeID)), - }, &sq); err == nil { - var ssq *StoredStatQueue - if dm.dataDB.GetStorageType() != utils.MetaInternal { - // in case of internal we don't marshal - if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil { - return nil, err - } - } - err = dm.dataDB.SetStatQueueDrv(ctx, ssq, sq) - } - } - if err != nil { - if err = utils.CastRPCErr(err); err == utils.ErrNotFound && cacheWrite && dm.dataDB.GetStorageType() != utils.Internal { - if errCh := Cache.Set(ctx, utils.CacheStatQueues, tntID, nil, nil, - cacheCommit(transactionID), transactionID); errCh != nil { - return nil, errCh - } - - } - return nil, err - } - } - if cacheWrite { - if errCh := Cache.Set(ctx, utils.CacheStatQueues, tntID, sq, nil, - cacheCommit(transactionID), transactionID); errCh != nil { - return nil, errCh - } - } - return -} - -// SetStatQueue converts to StoredStatQueue and stores the result in dataDB -func (dm *DataManager) SetStatQueue(ctx *context.Context, sq *StatQueue, metrics []*MetricWithFilters, - minItems int, ttl *time.Duration, queueLength int, simpleSet bool) (err error) { - if dm == nil { - return utils.ErrNoDatabaseConn - } - if !simpleSet { - tnt := sq.Tenant // save the tenant - id := sq.ID // save the ID from the initial StatQueue - // handle metrics for statsQueue - sq, err = dm.GetStatQueue(ctx, tnt, id, true, false, utils.NonTransactional) - if err != nil && err != utils.ErrNotFound { - return - } - if err == utils.ErrNotFound { - // if the statQueue didn't exists simply initiate all the metrics - if sq, err = NewStatQueue(tnt, id, metrics, minItems); err != nil { - return - } - } else { - for sqMetricID := range sq.SQMetrics { - // we consider that the metric needs to be removed - needsRemove := true - for _, metric := range metrics { - // in case we found the metric in the metrics define by the user we leave it - if sqMetricID == metric.MetricID { - needsRemove = false - break - } - } - if needsRemove { - delete(sq.SQMetrics, sqMetricID) - } - } - - for _, metric := range metrics { - if _, has := sq.SQMetrics[metric.MetricID]; !has { - var stsMetric StatMetric - if stsMetric, err = NewStatMetric(metric.MetricID, - minItems, - metric.FilterIDs); err != nil { - return - } - sq.SQMetrics[metric.MetricID] = stsMetric - } - } - - // if the user define a statQueue with an existing metric check if we need to update it based on queue length - sq.ttl = ttl - if _, err = sq.remExpired(); err != nil { - return - } - if len(sq.SQItems) > queueLength { - for i := 0; i < queueLength-len(sq.SQItems); i++ { - item := sq.SQItems[0] - if err = sq.remEventWithID(item.EventID); err != nil { - return - } - sq.SQItems = sq.SQItems[1:] - } - } - } - } - - var ssq *StoredStatQueue - if dm.dataDB.GetStorageType() != utils.MetaInternal { - // in case of internal we don't marshal - if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil { - return - } - } - if err = dm.dataDB.SetStatQueueDrv(ctx, ssq, sq); err != nil { - return - } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate { - err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.StatQueuePrefix, sq.TenantID(), // this are used to get the host IDs from cache - utils.ReplicatorSv1SetStatQueue, - &StatQueueWithAPIOpts{ - StatQueue: sq, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return -} - -// RemoveStatQueue removes the StoredStatQueue -func (dm *DataManager) RemoveStatQueue(ctx *context.Context, tenant, id string, transactionID string) (err error) { - if dm == nil { - return utils.ErrNoDatabaseConn - } - if err = dm.dataDB.RemStatQueueDrv(ctx, tenant, id); err != nil { - return - } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate { - replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.StatQueuePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveStatQueue, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return -} - // GetFilter returns a filter based on the given ID func (dm *DataManager) GetFilter(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, transactionID string) (fltr *Filter, err error) { @@ -760,7 +594,6 @@ func (dm *DataManager) SetThresholdProfile(ctx *context.Context, th *ThresholdPr Tenant: th.Tenant, ID: th.ID, Hits: 0, - tPrfl: th, }) } else if _, errTh := dm.GetThreshold(ctx, th.Tenant, th.ID, // do not try to get the threshold if the configuration changed true, false, utils.NonTransactional); errTh == utils.ErrNotFound { // the threshold does not exist @@ -768,7 +601,6 @@ func (dm *DataManager) SetThresholdProfile(ctx *context.Context, th *ThresholdPr Tenant: th.Tenant, ID: th.ID, Hits: 0, - tPrfl: th, }) } return @@ -811,6 +643,112 @@ func (dm *DataManager) RemoveThresholdProfile(ctx *context.Context, tenant, id, return dm.RemoveThreshold(ctx, tenant, id, transactionID) // remove the thrshold } +// GetStatQueue retrieves a StatQueue from dataDB +// handles caching and deserialization of metrics +func (dm *DataManager) GetStatQueue(ctx *context.Context, tenant, id string, + cacheRead, cacheWrite bool, transactionID string) (sq *StatQueue, err error) { + tntID := utils.ConcatenatedKey(tenant, id) + if cacheRead { + if x, ok := Cache.Get(utils.CacheStatQueues, tntID); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*StatQueue), nil + } + } + if dm == nil { + err = utils.ErrNoDatabaseConn + return + } + sq, err = dm.dataDB.GetStatQueueDrv(ctx, tenant, id) + if err != nil { + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; err == utils.ErrNotFound && itm.Remote { + if err = dm.connMgr.Call(ctx, config.CgrConfig().DataDbCfg().RmtConns, utils.ReplicatorSv1GetStatQueue, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString, + utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID, + config.CgrConfig().GeneralCfg().NodeID)), + }, &sq); err == nil { + var ssq *StoredStatQueue + if dm.dataDB.GetStorageType() != utils.MetaInternal { + // in case of internal we don't marshal + if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil { + return nil, err + } + } + err = dm.dataDB.SetStatQueueDrv(ctx, ssq, sq) + } + } + if err != nil { + if err = utils.CastRPCErr(err); err == utils.ErrNotFound && cacheWrite && dm.dataDB.GetStorageType() != utils.Internal { + if errCh := Cache.Set(ctx, utils.CacheStatQueues, tntID, nil, nil, + cacheCommit(transactionID), transactionID); errCh != nil { + return nil, errCh + } + + } + return nil, err + } + } + if cacheWrite { + if errCh := Cache.Set(ctx, utils.CacheStatQueues, tntID, sq, nil, + cacheCommit(transactionID), transactionID); errCh != nil { + return nil, errCh + } + } + return +} + +// SetStatQueue converts to StoredStatQueue and stores the result in dataDB +func (dm *DataManager) SetStatQueue(ctx *context.Context, sq *StatQueue) (err error) { + if dm == nil { + return utils.ErrNoDatabaseConn + } + var ssq *StoredStatQueue + if dm.dataDB.GetStorageType() != utils.MetaInternal { + // in case of internal we don't marshal + if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil { + return + } + } + if err = dm.dataDB.SetStatQueueDrv(ctx, ssq, sq); err != nil { + return + } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate { + err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.StatQueuePrefix, sq.TenantID(), // this are used to get the host IDs from cache + utils.ReplicatorSv1SetStatQueue, + &StatQueueWithAPIOpts{ + StatQueue: sq, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) + } + return +} + +// RemoveStatQueue removes the StoredStatQueue +func (dm *DataManager) RemoveStatQueue(ctx *context.Context, tenant, id string, transactionID string) (err error) { + if dm == nil { + return utils.ErrNoDatabaseConn + } + if err = dm.dataDB.RemStatQueueDrv(ctx, tenant, id); err != nil { + return + } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate { + replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.StatQueuePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveStatQueue, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) + } + return +} + func (dm *DataManager) GetStatQueueProfile(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, transactionID string) (sqp *StatQueueProfile, err error) { tntID := utils.ConcatenatedKey(tenant, id) @@ -890,14 +828,53 @@ func (dm *DataManager) SetStatQueueProfile(ctx *context.Context, sqp *StatQueueP } } if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles]; itm.Replicate { - err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + if err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, config.CgrConfig().DataDbCfg().RplFiltered, utils.StatQueueProfilePrefix, sqp.TenantID(), // this are used to get the host IDs from cache utils.ReplicatorSv1SetStatQueueProfile, &StatQueueProfileWithAPIOpts{ StatQueueProfile: sqp, APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}); err != nil { + return + } + } + if oldSts == nil || // create the stats queue if it didn't exist before + oldSts.QueueLength != sqp.QueueLength || + oldSts.TTL != sqp.TTL || + oldSts.MinItems != sqp.MinItems || + (oldSts.Stored != sqp.Stored && oldSts.Stored) { // reset the stats queue if the profile changed this fields + var sq *StatQueue + if sq, err = NewStatQueue(sqp.Tenant, sqp.ID, sqp.Metrics, + sqp.MinItems); err != nil { + return + } + err = dm.SetStatQueue(ctx, sq) + } else if oSq, errRs := dm.GetStatQueue(ctx, sqp.Tenant, sqp.ID, // do not try to get the stats queue if the configuration changed + true, false, utils.NonTransactional); errRs == utils.ErrNotFound { // the stats queue does not exist + var sq *StatQueue + if sq, err = NewStatQueue(sqp.Tenant, sqp.ID, sqp.Metrics, + sqp.MinItems); err != nil { + return + } + err = dm.SetStatQueue(ctx, sq) + } else { // update the metrics if needed + cMetricIDs := utils.StringSet{} + for _, metric := range sqp.Metrics { // add missing metrics and recreate the old metrics that changed + cMetricIDs.Add(metric.MetricID) + if oSqMetric, has := oSq.SQMetrics[metric.MetricID]; !has || + !utils.SliceStringEqual(oSqMetric.GetFilterIDs(), metric.FilterIDs) { // recreate it if the filter changed + if oSq.SQMetrics[metric.MetricID], err = NewStatMetric(metric.MetricID, + sqp.MinItems, metric.FilterIDs); err != nil { + return + } + } + } + for sqMetricID := range oSq.SQMetrics { // remove the old metrics + if !cMetricIDs.Has(sqMetricID) { + delete(oSq.SQMetrics, sqMetricID) + } + } } return } @@ -936,7 +913,7 @@ func (dm *DataManager) RemoveStatQueueProfile(ctx *context.Context, tenant, id, APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) } - return + return dm.RemoveStatQueue(ctx, tenant, id, transactionID) } func (dm *DataManager) GetResource(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, @@ -1122,30 +1099,18 @@ func (dm *DataManager) SetResourceProfile(ctx *context.Context, rp *ResourceProf if oldRes == nil || // create the resource if it didn't exist before oldRes.UsageTTL != rp.UsageTTL || oldRes.Limit != rp.Limit || - oldRes.Stored != rp.Stored { // reset the resource if the profile changed this fields - var ttl *time.Duration - if rp.UsageTTL > 0 { - ttl = &rp.UsageTTL - } + (oldRes.Stored != rp.Stored && oldRes.Stored) { // reset the resource if the profile changed this fields err = dm.SetResource(ctx, &Resource{ Tenant: rp.Tenant, ID: rp.ID, Usages: make(map[string]*ResourceUsage), - ttl: ttl, - rPrf: rp, }) } else if _, errRs := dm.GetResource(ctx, rp.Tenant, rp.ID, // do not try to get the resource if the configuration changed true, false, utils.NonTransactional); errRs == utils.ErrNotFound { // the resource does not exist - var ttl *time.Duration - if rp.UsageTTL > 0 { - ttl = &rp.UsageTTL - } err = dm.SetResource(ctx, &Resource{ Tenant: rp.Tenant, ID: rp.ID, Usages: make(map[string]*ResourceUsage), - ttl: ttl, - rPrf: rp, }) } return diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index b42ebb08c..032a456e5 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -677,16 +677,6 @@ func TestLoadDispatcherHosts(t *testing.T) { } } -func TestLoadstatQueues(t *testing.T) { - eStatQueues := []*utils.TenantID{ - {Tenant: "cgrates.org", ID: "TestStats"}, - {Tenant: "cgrates.org", ID: "TestStats2"}, - } - if len(csvr.statQueues) != len(eStatQueues) { - t.Errorf("Failed to load statQueues: %s", utils.ToIJSON(csvr.statQueues)) - } -} - func TestLoadAccount(t *testing.T) { expected := &utils.TPAccount{ TPid: testTPID, diff --git a/engine/stats.go b/engine/stats.go index 1ea59d435..7e725ce6a 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -121,7 +121,7 @@ func (sS *StatService) StoreStatQueue(ctx *context.Context, sq *StatQueue) (err if sq.dirty == nil || !*sq.dirty { return } - if err = sS.dm.SetStatQueue(ctx, sq, nil, 0, nil, 0, true); err != nil { + if err = sS.dm.SetStatQueue(ctx, sq); err != nil { utils.Logger.Warning( fmt.Sprintf(" failed saving StatQueue with ID: %s, error: %s", sq.TenantID(), err.Error())) diff --git a/engine/stats_test.go b/engine/stats_test.go index 08ab950fe..0f83df94f 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -110,9 +110,9 @@ func TestMatchingStatQueuesForEvent(t *testing.T) { }, } stqs := []*StatQueue{ - {Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0]}, - {Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1]}, - {Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2]}, + {Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0], SQMetrics: make(map[string]StatMetric)}, + {Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1], SQMetrics: make(map[string]StatMetric)}, + {Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2], SQMetrics: make(map[string]StatMetric)}, } statsEvs := []*StatsArgsProcessEvent{ { @@ -235,7 +235,7 @@ func TestMatchingStatQueuesForEvent(t *testing.T) { dmSTS.SetStatQueueProfile(context.TODO(), statQueueProfile, true) } for _, statQueue := range stqs { - dmSTS.SetStatQueue(context.TODO(), statQueue, nil, 0, nil, 0, true) + dmSTS.SetStatQueue(context.TODO(),statQueue) } //Test each statQueueProfile from cache for _, sqp := range sqps { @@ -341,9 +341,9 @@ func TestStatQueuesProcessEvent(t *testing.T) { }, } stqs := []*StatQueue{ - {Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0]}, - {Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1]}, - {Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2]}, + {Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0], SQMetrics: make(map[string]StatMetric)}, + {Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1], SQMetrics: make(map[string]StatMetric)}, + {Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2], SQMetrics: make(map[string]StatMetric)}, } statsEvs := []*StatsArgsProcessEvent{ { @@ -466,7 +466,7 @@ func TestStatQueuesProcessEvent(t *testing.T) { dmSTS.SetStatQueueProfile(context.TODO(), statQueueProfile, true) } for _, statQueue := range stqs { - dmSTS.SetStatQueue(context.TODO(), statQueue, nil, 0, nil, 0, true) + dmSTS.SetStatQueue(context.TODO(),statQueue) } //Test each statQueueProfile from cache for _, sqp := range sqps { @@ -573,9 +573,9 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) { }, } stqs := []*StatQueue{ - {Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0]}, - {Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1]}, - {Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2]}, + {Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0], SQMetrics: make(map[string]StatMetric)}, + {Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1], SQMetrics: make(map[string]StatMetric)}, + {Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2], SQMetrics: make(map[string]StatMetric)}, } statsEvs := []*StatsArgsProcessEvent{ { @@ -698,7 +698,7 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) { dmSTS.SetStatQueueProfile(context.TODO(), statQueueProfile, true) } for _, statQueue := range stqs { - dmSTS.SetStatQueue(context.TODO(), statQueue, nil, 0, nil, 0, true) + dmSTS.SetStatQueue(context.TODO(),statQueue) } //Test each statQueueProfile from cache for _, sqp := range sqps { @@ -805,9 +805,9 @@ func TestStatQueuesV1ProcessEvent(t *testing.T) { }, } stqs := []*StatQueue{ - {Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0]}, - {Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1]}, - {Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2]}, + {Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0], SQMetrics: make(map[string]StatMetric)}, + {Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1], SQMetrics: make(map[string]StatMetric)}, + {Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2], SQMetrics: make(map[string]StatMetric)}, } statsEvs := []*StatsArgsProcessEvent{ { @@ -930,7 +930,7 @@ func TestStatQueuesV1ProcessEvent(t *testing.T) { dmSTS.SetStatQueueProfile(context.TODO(), statQueueProfile, true) } for _, statQueue := range stqs { - dmSTS.SetStatQueue(context.TODO(), statQueue, nil, 0, nil, 0, true) + dmSTS.SetStatQueue(context.TODO(),statQueue) } //Test each statQueueProfile from cache for _, sqp := range sqps { @@ -961,7 +961,7 @@ func TestStatQueuesV1ProcessEvent(t *testing.T) { if err := dmSTS.SetStatQueueProfile(context.TODO(), sqPrf, true); err != nil { t.Error(err) } - if err := dmSTS.SetStatQueue(context.TODO(), sq, nil, 0, nil, 0, true); err != nil { + if err := dmSTS.SetStatQueue(context.TODO(),sq); err != nil { t.Error(err) } if tempStat, err := dmSTS.GetStatQueueProfile(context.TODO(), sqPrf.Tenant, diff --git a/engine/tpreader.go b/engine/tpreader.go index 69a1da567..dffd498c8 100644 --- a/engine/tpreader.go +++ b/engine/tpreader.go @@ -46,11 +46,7 @@ type TpReader struct { rateProfiles map[utils.TenantID]*utils.TPRateProfile actionProfiles map[utils.TenantID]*utils.TPActionProfile accounts map[utils.TenantID]*utils.TPAccount - // resources []*utils.TenantID // IDs of resources which need creation based on resourceProfiles - statQueues []*utils.TenantID // IDs of statQueues which need creation based on statQueueProfiles - // thresholds []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles - // acntActionPlans map[string][]string - cacheConns []string + cacheConns []string //schedulerConns []string isInternalDB bool // do not reload cache if we use internalDB } @@ -118,7 +114,6 @@ func (tpr *TpReader) LoadStatsFiltered(tag string) (err error) { return } mapSTs[utils.TenantID{Tenant: st.Tenant, ID: st.ID}] = st - tpr.statQueues = append(tpr.statQueues, &utils.TenantID{Tenant: st.Tenant, ID: st.ID}) } tpr.sqProfiles = mapSTs return nil @@ -429,52 +424,8 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) { } } if len(tpr.sqProfiles) != 0 { - loadIDs[utils.CacheStatQueueProfiles] = loadID - } - if verbose { - log.Print("StatQueues:") - } - for _, sqTntID := range tpr.statQueues { - var ttl *time.Duration - if tpr.sqProfiles[*sqTntID].TTL != utils.EmptyString { - ttl = new(time.Duration) - if *ttl, err = utils.ParseDurationWithNanosecs(tpr.sqProfiles[*sqTntID].TTL); err != nil { - return - } - if *ttl <= 0 { - ttl = nil - } - } - metrics := make([]*MetricWithFilters, len(tpr.sqProfiles[*sqTntID].Metrics)) - for i, metric := range tpr.sqProfiles[*sqTntID].Metrics { - metrics[i] = &MetricWithFilters{ - MetricID: metric.MetricID, - FilterIDs: metric.FilterIDs, - } - } - sq := &StatQueue{ - Tenant: sqTntID.Tenant, - ID: sqTntID.ID, - } - if !tpr.sqProfiles[*sqTntID].Stored { //for not stored queues create the metrics - if sq, err = NewStatQueue(sqTntID.Tenant, sqTntID.ID, metrics, - tpr.sqProfiles[*sqTntID].MinItems); err != nil { - return - } - } - // for non stored we do not save the metrics - if err = tpr.dm.SetStatQueue(context.TODO(), sq, metrics, - tpr.sqProfiles[*sqTntID].MinItems, - ttl, tpr.sqProfiles[*sqTntID].QueueLength, - !tpr.sqProfiles[*sqTntID].Stored); err != nil { - return err - } - if verbose { - log.Print("\t", sqTntID.TenantID()) - } - } - if len(tpr.statQueues) != 0 { loadIDs[utils.CacheStatQueues] = loadID + loadIDs[utils.CacheStatQueueProfiles] = loadID } if verbose { log.Print("ThresholdProfiles:") @@ -671,12 +622,6 @@ func (tpr *TpReader) ShowStatistics() { // GetLoadedIds returns the identities loaded for a specific category, useful for cache reloads func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) { switch categ { - case utils.StatQueuePrefix: - keys := make([]string, len(tpr.statQueues)) - for i, k := range tpr.statQueues { - keys[i] = k.TenantID() - } - return keys, nil case utils.ResourceProfilesPrefix: keys := make([]string, len(tpr.resProfiles)) i := 0 @@ -797,17 +742,6 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error log.Print("\t", utils.ConcatenatedKey(tpST.Tenant, tpST.ID)) } } - if verbose { - log.Print("StatQueues:") - } - for _, sqTntID := range tpr.statQueues { - if err = tpr.dm.RemoveStatQueue(context.TODO(), sqTntID.Tenant, sqTntID.ID, utils.NonTransactional); err != nil { - return - } - if verbose { - log.Print("\t", sqTntID.TenantID()) - } - } if verbose { log.Print("ThresholdProfiles:") } @@ -943,8 +877,6 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error } if len(tpr.sqProfiles) != 0 { loadIDs[utils.CacheStatQueueProfiles] = loadID - } - if len(tpr.statQueues) != 0 { loadIDs[utils.CacheStatQueues] = loadID } if len(tpr.thProfiles) != 0 { @@ -988,7 +920,6 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b } // take IDs for each type rspIDs, _ := tpr.GetLoadedIds(utils.ResourceProfilesPrefix) - stqIDs, _ := tpr.GetLoadedIds(utils.StatQueuePrefix) stqpIDs, _ := tpr.GetLoadedIds(utils.StatQueueProfilePrefix) trspfIDs, _ := tpr.GetLoadedIds(utils.ThresholdProfilePrefix) flrIDs, _ := tpr.GetLoadedIds(utils.FilterPrefix) @@ -1005,7 +936,7 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b cacheArgs := map[string][]string{ utils.ResourceProfileIDs: rspIDs, utils.ResourceIDs: rspIDs, - utils.StatsQueueIDs: stqIDs, + utils.StatsQueueIDs: stqpIDs, utils.StatsQueueProfileIDs: stqpIDs, utils.ThresholdIDs: trspfIDs, utils.ThresholdProfileIDs: trspfIDs, diff --git a/engine/tpreader_test.go b/engine/tpreader_test.go index 8b067fb26..769a8a386 100644 --- a/engine/tpreader_test.go +++ b/engine/tpreader_test.go @@ -411,29 +411,6 @@ func TestCallCacheClear(t *testing.T) { } } -func TestGetLoadedIdsStatQueues(t *testing.T) { - tpr := &TpReader{ - statQueues: []*utils.TenantID{ - { - Tenant: "cgrates.org", - ID: "statQueueID", - }, - { - Tenant: "tenant.com", - ID: "mytenantID", - }, - }, - } - rcv, err := tpr.GetLoadedIds(utils.StatQueuePrefix) - if err != nil { - t.Error(err) - } - expRcv := []string{"cgrates.org:statQueueID", "tenant.com:mytenantID"} - if !reflect.DeepEqual(expRcv, rcv) { - t.Errorf("\nExpected %v but received \n%v", expRcv, rcv) - } -} - func TestGetLoadedIdsResourceProfiles(t *testing.T) { tpr := &TpReader{ resProfiles: map[utils.TenantID]*utils.TPResourceProfile{ @@ -681,7 +658,7 @@ func TestReloadCache(t *testing.T) { "DispatcherProfileIDs": {"cgrates.org:dispatcherProfilesID"}, "DispatcherHostIDs": {"cgrates.org:dispatcherHostsID"}, "ResourceIDs": {"cgrates.org:resourceProfilesID"}, - "StatsQueueIDs": {"cgrates.org:statQueuesID"}, + "StatsQueueIDs": {"cgrates.org:statProfilesID"}, "ThresholdIDs": {"cgrates.org:thresholdProfilesID"}, }, } @@ -731,12 +708,6 @@ func TestReloadCache(t *testing.T) { dispatcherHosts: map[utils.TenantID]*utils.TPDispatcherHost{ {Tenant: "cgrates.org", ID: "dispatcherHostsID"}: {}, }, - statQueues: []*utils.TenantID{ - { - Tenant: "cgrates.org", - ID: "statQueuesID", - }, - }, dm: NewDataManager(data, config.CgrConfig().CacheCfg(), cnMgr), } tpr.cacheConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)} diff --git a/engine/z_datamanager_it_test.go b/engine/z_datamanager_it_test.go index a151933ad..10246d85b 100644 --- a/engine/z_datamanager_it_test.go +++ b/engine/z_datamanager_it_test.go @@ -118,7 +118,7 @@ func testDMitCRUDStatQueue(t *testing.T) { if _, ok := Cache.Get(utils.CacheStatQueues, sq.TenantID()); ok != false { t.Error("should not be in cache") } - if err := dm2.SetStatQueue(context.TODO(), sq, nil, 0, nil, 0, true); err != nil { + if err := dm2.SetStatQueue(context.TODO(), sq); err != nil { t.Error(err) } if _, ok := Cache.Get(utils.CacheStatQueues, sq.TenantID()); ok != false { diff --git a/engine/z_onstor_it_test.go b/engine/z_onstor_it_test.go index 8d087af89..fb436491b 100644 --- a/engine/z_onstor_it_test.go +++ b/engine/z_onstor_it_test.go @@ -399,7 +399,7 @@ func testOnStorITStatQueue(t *testing.T) { true, false, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } - if err := onStor.SetStatQueue(context.TODO(), sq, nil, 0, nil, 0, true); err != nil { + if err := onStor.SetStatQueue(context.TODO(), sq); err != nil { t.Error(err) } //get from database @@ -427,7 +427,7 @@ func testOnStorITStatQueue(t *testing.T) { }, }, } - if err := onStor.SetStatQueue(context.TODO(), sq, nil, 0, nil, 0, true); err != nil { + if err := onStor.SetStatQueue(context.TODO(), sq); err != nil { t.Error(err) } diff --git a/loaders/lib_test.go b/loaders/lib_test.go index d3ac1e6c2..46224e26a 100644 --- a/loaders/lib_test.go +++ b/loaders/lib_test.go @@ -932,45 +932,3 @@ cgrates.org,MOCK_RELOAD_3 t.Errorf("Expected %+v, received %+v", expected, err) } } - -type dataDBMockError struct { - *engine.DataDBMock -} - -func (dbM *dataDBMockError) SetIndexesDrv(ctx *context.Context, idxItmType, tntCtx string, - indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) { - return -} - -func (dbM *dataDBMockError) HasDataDrv(*context.Context, string, string, string) (bool, error) { - return false, nil -} - -//For StatQueue -func (dbM *dataDBMockError) GetStatQueueProfileDrv(ctx *context.Context, tenant string, ID string) (sq *engine.StatQueueProfile, err error) { - return nil, nil -} - -func (dbM *dataDBMockError) RemStatQueueProfileDrv(ctx *context.Context, tenant, id string) (err error) { - return nil -} - -func (dbM *dataDBMockError) RemStatQueueDrv(ctx *context.Context, tenant, id string) (err error) { - return utils.ErrNoDatabaseConn -} - -func (dbM *dataDBMockError) GetStatQueueDrv(ctx *context.Context, tenant, id string) (sq *engine.StatQueue, err error) { - return nil, utils.ErrNoDatabaseConn -} - -func (dbM *dataDBMockError) SetStatQueueDrv(ctx *context.Context, ssq *engine.StoredStatQueue, sq *engine.StatQueue) (err error) { - return utils.ErrNoDatabaseConn -} - -func (dbM *dataDBMockError) SetStatQueueProfileDrv(ctx *context.Context, sq *engine.StatQueueProfile) (err error) { - return nil -} - -func (dbM *dataDBMockError) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) { - return nil, nil -} diff --git a/loaders/loader.go b/loaders/loader.go index 250f03c0a..98da9e5fe 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -392,22 +392,6 @@ func (ldr *Loader) storeLoadedData(ctx *context.Context, loaderType string, if err := ldr.dm.SetStatQueueProfile(ctx, stsPrf, true); err != nil { return err } - var sq *engine.StatQueue - if sq, err = engine.NewStatQueue(stsPrf.Tenant, stsPrf.ID, stsPrf.Metrics, - stsPrf.MinItems); err != nil { - return utils.APIErrorHandler(err) - } - var ttl *time.Duration - if stsPrf.TTL > 0 { - ttl = &stsPrf.TTL - } - - // for non stored we do not save the metrics - if err := ldr.dm.SetStatQueue(ctx, sq, stsPrf.Metrics, - stsPrf.MinItems, ttl, stsPrf.QueueLength, - !stsPrf.Stored); err != nil { - return err - } cacheArgs[utils.StatsQueueProfileIDs] = ids cacheArgs[utils.StatsQueueIDs] = ids } @@ -805,9 +789,6 @@ func (ldr *Loader) removeLoadedData(ctx *context.Context, loaderType string, lds tntIDStruct.ID, utils.NonTransactional, true); err != nil { return err } - if err := ldr.dm.RemoveStatQueue(ctx, tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil { - return err - } cacheArgs[utils.StatsQueueProfileIDs] = ids cacheArgs[utils.StatsQueueIDs] = ids } diff --git a/loaders/loader_test.go b/loaders/loader_test.go index 714f7b720..b709c6646 100644 --- a/loaders/loader_test.go +++ b/loaders/loader_test.go @@ -721,24 +721,21 @@ func TestLoaderProcessStatsWrongMetrics(t *testing.T) { bufLoaderData: make(map[string][]LoaderData), dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil), timezone: "UTC", - } - ldr.dataTpls = map[string][]*config.FCTemplate{ - utils.MetaStats: { - {Tag: "MetricIDs", - Path: "MetricIDs", - Type: utils.MetaComposed, - Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep)}, - {Tag: "Stored", - Path: "Stored", - Type: utils.MetaComposed, - Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep)}, + dataTpls: map[string][]*config.FCTemplate{ + utils.MetaStats: { + {Tag: "MetricIDs", + Path: "MetricIDs", + Type: utils.MetaComposed, + Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep)}, + {Tag: "Stored", + Path: "Stored", + Type: utils.MetaComposed, + Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep)}, + }, }, } - statsCsv := ` -#Metrics[0],Stored[1] -not_a_valid_metric_type,true, -` - rdr := io.NopCloser(strings.NewReader(statsCsv)) + rdr := io.NopCloser(strings.NewReader(`#Metrics[0],Stored[1] +not_a_valid_metric_type,true,`)) csvRdr := csv.NewReader(rdr) csvRdr.Comment = '#' ldr.rdrs = map[string]map[string]*openedCSVFile{ @@ -746,7 +743,7 @@ not_a_valid_metric_type,true, utils.StatsCsv: &openedCSVFile{fileName: utils.StatsCsv, rdr: rdr, csvRdr: csvRdr}}, } - expected := "SERVER_ERROR: unsupported metric type " + expected := "unsupported metric type " if err := ldr.processContent(context.Background(), utils.MetaStats, utils.EmptyString); err == nil || err.Error() != expected { t.Errorf("Expected %+v, received %+v", expected, err) } @@ -755,11 +752,8 @@ not_a_valid_metric_type,true, } //initialize again but with a valid metric and false stored field - statsCsv = ` -#Metrics[0],Stored[1] -*sum#~*req.Value,false -` - rdr = io.NopCloser(strings.NewReader(statsCsv)) + rdr = io.NopCloser(strings.NewReader(`#Metrics[0],Stored[1] +*sum#~*req.Value,false`)) csvRdr = csv.NewReader(rdr) csvRdr.Comment = '#' ldr.rdrs = map[string]map[string]*openedCSVFile{ @@ -4554,26 +4548,23 @@ func TestRemoveStatQueueMockError(t *testing.T) { bufLoaderData: make(map[string][]LoaderData), dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil), timezone: "UTC", - } - ldr.dataTpls = map[string][]*config.FCTemplate{ - utils.MetaStats: { - {Tag: "TenantID", - Path: "Tenant", - Type: utils.MetaComposed, - Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep), - Mandatory: true}, - {Tag: "ProfileID", - Path: "ID", - Type: utils.MetaComposed, - Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep), - Mandatory: true}, + dataTpls: map[string][]*config.FCTemplate{ + utils.MetaStats: { + {Tag: "TenantID", + Path: "Tenant", + Type: utils.MetaComposed, + Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep), + Mandatory: true}, + {Tag: "ProfileID", + Path: "ID", + Type: utils.MetaComposed, + Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep), + Mandatory: true}, + }, }, } - statsCsv := ` -#Tenant[0],ProfileID[1] -cgrates.org,REM_STATS_1 -` - rdr := io.NopCloser(strings.NewReader(statsCsv)) + rdr := io.NopCloser(strings.NewReader(`#Tenant[0],ProfileID[1] +cgrates.org,REM_STATS_1`)) csvRdr := csv.NewReader(rdr) csvRdr.Comment = '#' ldr.rdrs = map[string]map[string]*openedCSVFile{ @@ -4585,18 +4576,15 @@ cgrates.org,REM_STATS_1 }, }, } - expStats := &engine.StatQueueProfile{ - Tenant: "cgrates.org", - ID: "REM_STATS_1", - } - if err := ldr.dm.SetStatQueueProfile(context.TODO(), expStats, true); err != nil { - t.Error(err) - } - - newData := &dataDBMockError{} - ldr.dm = engine.NewDataManager(newData, config.CgrConfig().CacheCfg(), nil) expected := utils.ErrNoDatabaseConn + ldr.dm = engine.NewDataManager(&engine.DataDBMock{ + GetStatQueueProfileDrvF: func(ctx *context.Context, tenant, id string) (sq *engine.StatQueueProfile, err error) { + return nil, nil + }, + SetStatQueueProfileDrvF: func(ctx *context.Context, sq *engine.StatQueueProfile) (err error) { return expected }, + RemStatQueueProfileDrvF: func(ctx *context.Context, tenant, id string) (err error) { return expected }, + }, config.CgrConfig().CacheCfg(), nil) if err := ldr.removeContent(context.Background(), utils.MetaStats, utils.EmptyString); err == nil || err != expected { t.Errorf("Expected %+v, received %+v", expected, err) diff --git a/migrator/resource_it_test.go b/migrator/resource_it_test.go index 9d76a338b..c5d34001d 100644 --- a/migrator/resource_it_test.go +++ b/migrator/resource_it_test.go @@ -27,6 +27,7 @@ import ( "testing" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -190,7 +191,7 @@ func testResITMigrateAndMove(t *testing.T) { switch resAction { case utils.Migrate: // for the momment only one version of rating plans exists case utils.Move: - if err := resMigrator.dmIN.DataManager().SetResourceProfile(resPrfl, true); err != nil { + if err := resMigrator.dmIN.DataManager().SetResourceProfile(context.TODO(), resPrfl, true); err != nil { t.Error(err) } currentVersion := engine.CurrentDataDBVersions() @@ -199,7 +200,7 @@ func testResITMigrateAndMove(t *testing.T) { t.Error("Error when setting version for Resource ", err.Error()) } - _, err = resMigrator.dmOut.DataManager().GetResourceProfile("cgrates.org", "RES1", false, false, utils.NonTransactional) + _, err = resMigrator.dmOut.DataManager().GetResourceProfile(context.TODO(), "cgrates.org", "RES1", false, false, utils.NonTransactional) if err != utils.ErrNotFound { t.Error(err) } @@ -208,14 +209,14 @@ func testResITMigrateAndMove(t *testing.T) { if err != nil { t.Error("Error when migrating Resource ", err.Error()) } - result, err := resMigrator.dmOut.DataManager().GetResourceProfile("cgrates.org", "RES1", false, false, utils.NonTransactional) + result, err := resMigrator.dmOut.DataManager().GetResourceProfile(context.TODO(), "cgrates.org", "RES1", false, false, utils.NonTransactional) if err != nil { t.Fatal(err) } if !reflect.DeepEqual(result, resPrfl) { t.Errorf("Expecting: %+v, received: %+v", resPrfl, result) } - result, err = resMigrator.dmIN.DataManager().GetResourceProfile("cgrates.org", "RES1", false, false, utils.NonTransactional) + result, err = resMigrator.dmIN.DataManager().GetResourceProfile(context.TODO(), "cgrates.org", "RES1", false, false, utils.NonTransactional) if err != utils.ErrNotFound { t.Error(err) } else if resMigrator.stats[utils.Resource] != 1 { diff --git a/migrator/stats.go b/migrator/stats.go index 7f7c98a32..74c6a177f 100644 --- a/migrator/stats.go +++ b/migrator/stats.go @@ -58,9 +58,7 @@ type v1Stat struct { CostInterval []float64 // CDRFieldFilter on CostInterval, 2 or less items, (>=Cost, when migrating stat queue profiles", id) } - sgs, err := m.dmIN.DataManager().GetStatQueueProfile(context.TODO(), tntID[0], tntID[1], false, false, utils.NonTransactional) + sqp, err := m.dmIN.DataManager().GetStatQueueProfile(context.TODO(), tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return err } - if sgs == nil || m.dryRun { - continue - } - if err := m.dmOut.DataManager().SetStatQueueProfile(context.TODO(), sgs, true); err != nil { - return err - } - if err := m.dmIN.DataManager().RemoveStatQueueProfile(context.TODO(), tntID[0], tntID[1], utils.NonTransactional, false); err != nil { - return err - } - } - return -} - -func (m *Migrator) migrateCurrentStats() (err error) { - var ids []string - //StatQueue - if ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(context.TODO(), utils.StatQueuePrefix); err != nil { - return err - } - for _, id := range ids { - tntID := strings.SplitN(strings.TrimPrefix(id, utils.StatQueuePrefix), utils.InInFieldSep, 2) - if len(tntID) < 2 { - return fmt.Errorf("Invalid key <%s> when migrating stat queues", id) - } sgs, err := m.dmIN.DataManager().GetStatQueue(context.TODO(), tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return err } - if sgs == nil || m.dryRun { + if sqp == nil || m.dryRun { continue } - if err := m.dmOut.DataManager().SetStatQueue(context.TODO(), sgs, nil, 0, nil, 0, true); err != nil { + if err := m.dmOut.DataManager().SetStatQueueProfile(context.TODO(), sqp, true); err != nil { return err } - if err := m.dmIN.DataManager().RemoveStatQueue(context.TODO(), tntID[0], tntID[1], utils.NonTransactional); err != nil { + if sgs != nil { + if err := m.dmOut.DataManager().SetStatQueue(context.TODO(), sgs); err != nil { + return err + } + } + if err := m.dmIN.DataManager().RemoveStatQueueProfile(context.TODO(), tntID[0], tntID[1], utils.NonTransactional, false); err != nil { return err } m.stats[utils.StatS]++ } - - return m.moveStatQueueProfile() + return } func (m *Migrator) migrateV1Stats() (filter *engine.Filter, v2Stats *engine.StatQueue, sts *engine.StatQueueProfile, err error) { @@ -226,8 +204,10 @@ func (m *Migrator) migrateStats() (err error) { if err = m.dmOut.DataManager().SetStatQueueProfile(context.TODO(), v4sts, true); err != nil { return } - if err = m.dmOut.DataManager().SetStatQueue(context.TODO(), v3Stats, nil, 0, nil, 0, true); err != nil { - return + if v3Stats != nil { + if err = m.dmOut.DataManager().SetStatQueue(context.TODO(), v3Stats); err != nil { + return + } } } m.stats[utils.StatS]++ diff --git a/migrator/stats_it_test.go b/migrator/stats_it_test.go index a314bc53c..34a44545c 100644 --- a/migrator/stats_it_test.go +++ b/migrator/stats_it_test.go @@ -287,40 +287,40 @@ func testStsITFlush(t *testing.T) { // t.Errorf("Expecting: %+v, received: %+v", sq.ID, result2.ID) // } -// case utils.Move: -// if err := stsMigrator.dmIN.DataManager().SetStatQueueProfile(sqp, false); err != nil { -// t.Error("Error when setting Stats ", err.Error()) -// } -// if err := stsMigrator.dmIN.DataManager().SetStatQueue(sq, nil, 0, nil, 0, true); err != nil { -// t.Error("Error when setting Stats ", err.Error()) -// } -// if err := stsMigrator.dmOut.DataManager().SetFilter(filter, true); err != nil { -// t.Error("Error when setting Filter ", err.Error()) -// } -// currentVersion := engine.CurrentDataDBVersions() -// err := stsMigrator.dmIN.DataManager().DataDB().SetVersions(currentVersion, false) -// if err != nil { -// t.Error("Error when setting version for stats ", err.Error()) -// } -// err, _ = stsMigrator.Migrate([]string{utils.MetaStats}) -// if err != nil { -// t.Error("Error when migrating Stats ", err.Error()) -// } -// result, err := stsMigrator.dmOut.DataManager().DataDB().GetStatQueueProfileDrv(sqp.Tenant, sqp.ID) -// if err != nil { -// t.Error("Error when getting Stats ", err.Error()) -// } -// result1, err := stsMigrator.dmOut.DataManager().GetStatQueue(sq.Tenant, sq.ID, false, false, utils.NonTransactional) -// if err != nil { -// t.Error("Error when getting Stats ", err.Error()) -// } -// if !reflect.DeepEqual(sqp, result) { -// t.Errorf("Expecting: %+v, received: %+v", sqp, result) -// } -// if !reflect.DeepEqual(sq.ID, result1.ID) { -// t.Errorf("Expecting: %+v, received: %+v", sq.ID, result1.ID) -// } +// case utils.Move: +// if err := stsMigrator.dmIN.DataManager().SetStatQueueProfile(sqp, false); err != nil { +// t.Error("Error when setting Stats ", err.Error()) // } +// if err := stsMigrator.dmIN.DataManager().SetStatQueue(sq); err != nil { +// t.Error("Error when setting Stats ", err.Error()) +// } +// if err := stsMigrator.dmOut.DataManager().SetFilter(filter, true); err != nil { +// t.Error("Error when setting Filter ", err.Error()) +// } +// currentVersion := engine.CurrentDataDBVersions() +// err := stsMigrator.dmIN.DataManager().DataDB().SetVersions(currentVersion, false) +// if err != nil { +// t.Error("Error when setting version for stats ", err.Error()) +// } +// err, _ = stsMigrator.Migrate([]string{utils.MetaStats}) +// if err != nil { +// t.Error("Error when migrating Stats ", err.Error()) +// } +// result, err := stsMigrator.dmOut.DataManager().DataDB().GetStatQueueProfileDrv(sqp.Tenant, sqp.ID) +// if err != nil { +// t.Error("Error when getting Stats ", err.Error()) +// } +// result1, err := stsMigrator.dmOut.DataManager().GetStatQueue(sq.Tenant, sq.ID, false, false, utils.NonTransactional) +// if err != nil { +// t.Error("Error when getting Stats ", err.Error()) +// } +// if !reflect.DeepEqual(sqp, result) { +// t.Errorf("Expecting: %+v, received: %+v", sqp, result) +// } +// if !reflect.DeepEqual(sq.ID, result1.ID) { +// t.Errorf("Expecting: %+v, received: %+v", sq.ID, result1.ID) +// } +// } // } @@ -430,7 +430,7 @@ func testStsITMigrateFromv1(t *testing.T) { MetricID: "*acc", }, } - if statQueueProfile, err := stsMigrator.dmOut.DataManager().GetStatQueueProfile("cgrates.org", "test", false, false, utils.NonTransactional); err != nil { + if statQueueProfile, err := stsMigrator.dmOut.DataManager().GetStatQueueProfile(context.TODO(), "cgrates.org", "test", false, false, utils.NonTransactional); err != nil { t.Error(err) } else if statQueueProfile.ThresholdIDs[0] != "Test" { t.Errorf("Expecting: 'Test', received: %+v", statQueueProfile.ThresholdIDs[0]) @@ -456,7 +456,7 @@ func testStsITMigrateFromv1(t *testing.T) { //from V2 to V3 var statQueue *engine.StatQueue - if statQueue, err = stsMigrator.dmOut.DataManager().GetStatQueue("cgrates.org", "test", false, false, utils.NonTransactional); err != nil { + if statQueue, err = stsMigrator.dmOut.DataManager().GetStatQueue(context.TODO(), "cgrates.org", "test", false, false, utils.NonTransactional); err != nil { t.Error(err) } else if statQueue.ID != "test" { t.Errorf("Expecting: 'test', received: %+v", statQueue.ID) diff --git a/migrator/thresholds.go b/migrator/thresholds.go index 3283357d5..77fd1b81c 100644 --- a/migrator/thresholds.go +++ b/migrator/thresholds.go @@ -55,11 +55,10 @@ func (m *Migrator) migrateCurrentThresholds() (err error) { return err } // update the threshold in the new DB - if ths == nil { - continue - } - if err := m.dmOut.DataManager().SetThreshold(context.TODO(), ths); err != nil { - return err + if ths != nil { + if err := m.dmOut.DataManager().SetThreshold(context.TODO(), ths); err != nil { + return err + } } if err := m.dmIN.DataManager().RemoveThresholdProfile(context.TODO(), tntID[0], tntID[1], utils.NonTransactional, false); err != nil { return err