diff --git a/apier/v1/stats.go b/apier/v1/stats.go index 280132934..afeeaa74e 100644 --- a/apier/v1/stats.go +++ b/apier/v1/stats.go @@ -87,32 +87,6 @@ func (apierSv1 *APIerSv1) SetStatQueueProfile(arg *engine.StatQueueProfileWithAP arg.TenantID(), &arg.FilterIDs, nil, arg.APIOpts); err != nil { return utils.APIErrorHandler(err) } - var ttl *time.Duration - if arg.TTL > 0 { - ttl = &arg.TTL - } - sq := &engine.StatQueue{ - Tenant: arg.Tenant, - ID: arg.ID, - } - if !arg.Stored { // for not stored queues create the metrics - if sq, err = engine.NewStatQueue(arg.Tenant, arg.ID, arg.Metrics, - arg.MinItems); err != nil { - return err - } - } - // for non stored we do not save the metrics - if err = apierSv1.DataManager.SetStatQueue(sq, - arg.Metrics, arg.MinItems, ttl, arg.QueueLength, - !arg.Stored); err != nil { - return err - } - //handle caching for StatQueues - if err := apierSv1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), arg.Tenant, utils.CacheStatQueues, - arg.TenantID(), nil, nil, arg.APIOpts); err != nil { - return utils.APIErrorHandler(err) - } - *reply = utils.OK return nil } @@ -134,20 +108,12 @@ func (apierSv1 *APIerSv1) RemoveStatQueueProfile(args *utils.TenantIDWithAPIOpts utils.ConcatenatedKey(tnt, args.ID), nil, nil, args.APIOpts); err != nil { return utils.APIErrorHandler(err) } - if err := apierSv1.DataManager.RemoveStatQueue(tnt, args.ID, utils.NonTransactional); err != nil { - return utils.APIErrorHandler(err) - } //generate a loadID for CacheStatQueueProfiles and CacheStatQueues and store it in database //make 1 insert for both StatQueueProfile and StatQueue instead of 2 loadID := time.Now().UnixNano() if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheStatQueueProfiles: loadID, utils.CacheStatQueues: loadID}); err != nil { return utils.APIErrorHandler(err) } - //handle caching for StatQueues - if err := apierSv1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), tnt, utils.CacheStatQueues, - utils.ConcatenatedKey(tnt, args.ID), nil, nil, args.APIOpts); err != nil { - return utils.APIErrorHandler(err) - } *reply = utils.OK return nil } diff --git a/engine/datadbmock.go b/engine/datadbmock.go index 1f21cc59f..e43306ec0 100644 --- a/engine/datadbmock.go +++ b/engine/datadbmock.go @@ -34,6 +34,9 @@ type DataDBMock struct { GetResourceProfileDrvF func(tnt, id string) (*ResourceProfile, error) SetResourceProfileDrvF func(rp *ResourceProfile) error RemoveResourceProfileDrvF func(tnt, id string) error + GetStatQueueProfileDrvF func(tenant, id string) (sq *StatQueueProfile, err error) + SetStatQueueProfileDrvF func(sq *StatQueueProfile) (err error) + RemStatQueueProfileDrvF func(tenant, id string) (err error) } //Storage methods @@ -277,15 +280,24 @@ func (dbM *DataDBMock) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err return utils.ErrNotImplemented } -func (dbM *DataDBMock) GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error) { +func (dbM *DataDBMock) GetStatQueueProfileDrv(tenant, id string) (sq *StatQueueProfile, err error) { + if dbM.GetStatQueueProfileDrvF != nil { + return dbM.GetStatQueueProfileDrvF(tenant, id) + } return nil, utils.ErrNotImplemented } func (dbM *DataDBMock) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) { + if dbM.SetStatQueueProfileDrvF != nil { + return dbM.SetStatQueueProfileDrvF(sq) + } return utils.ErrNotImplemented } func (dbM *DataDBMock) RemStatQueueProfileDrv(tenant, id string) (err error) { + if dbM.RemStatQueueProfileDrvF != nil { + return dbM.RemStatQueueProfileDrvF(tenant, id) + } return utils.ErrNotImplemented } diff --git a/engine/datamanager.go b/engine/datamanager.go index 8cb3b9693..b1b77cedc 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -18,7 +18,6 @@ package engine import ( "fmt" "strings" - "time" "github.com/cgrates/baningo" "github.com/cgrates/cgrates/config" @@ -604,171 +603,6 @@ func (dm *DataManager) RemoveAccount(id string) (err error) { return } -// GetStatQueue retrieves a StatQueue from dataDB -// handles caching and deserialization of metrics -func (dm *DataManager) GetStatQueue(tenant, id string, - cacheRead, cacheWrite bool, transactionID string) (sq *StatQueue, err error) { - tntID := utils.ConcatenatedKey(tenant, id) - if cacheRead { - if x, ok := Cache.Get(utils.CacheStatQueues, tntID); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.(*StatQueue), nil - } - } - if dm == nil { - err = utils.ErrNoDatabaseConn - return - } - sq, err = dm.dataDB.GetStatQueueDrv(tenant, id) - if err != nil { - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; err == utils.ErrNotFound && itm.Remote { - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil, utils.ReplicatorSv1GetStatQueue, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString, - utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID, - config.CgrConfig().GeneralCfg().NodeID)), - }, &sq); err == nil { - var ssq *StoredStatQueue - if dm.dataDB.GetStorageType() != utils.MetaInternal { - // in case of internal we don't marshal - if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil { - return nil, err - } - } - err = dm.dataDB.SetStatQueueDrv(ssq, sq) - } - } - if err != nil { - if err = utils.CastRPCErr(err); err == utils.ErrNotFound && cacheWrite { - if errCh := Cache.Set(utils.CacheStatQueues, tntID, nil, nil, - cacheCommit(transactionID), transactionID); errCh != nil { - return nil, errCh - } - - } - return nil, err - } - } - if cacheWrite { - if errCh := Cache.Set(utils.CacheStatQueues, tntID, sq, nil, - cacheCommit(transactionID), transactionID); errCh != nil { - return nil, errCh - } - } - return -} - -// SetStatQueue converts to StoredStatQueue and stores the result in dataDB -func (dm *DataManager) SetStatQueue(sq *StatQueue, metrics []*MetricWithFilters, - minItems int, ttl *time.Duration, queueLength int, simpleSet bool) (err error) { - if dm == nil { - return utils.ErrNoDatabaseConn - } - if !simpleSet { - tnt := sq.Tenant // save the tenant - id := sq.ID // save the ID from the initial StatQueue - // handle metrics for statsQueue - sq, err = dm.GetStatQueue(tnt, id, true, false, utils.NonTransactional) - if err != nil && err != utils.ErrNotFound { - return - } - if err == utils.ErrNotFound { - // if the statQueue didn't exists simply initiate all the metrics - if sq, err = NewStatQueue(tnt, id, metrics, minItems); err != nil { - return - } - } else { - for sqMetricID := range sq.SQMetrics { - // we consider that the metric needs to be removed - needsRemove := true - for _, metric := range metrics { - // in case we found the metric in the metrics define by the user we leave it - if sqMetricID == metric.MetricID { - needsRemove = false - break - } - } - if needsRemove { - delete(sq.SQMetrics, sqMetricID) - } - } - - for _, metric := range metrics { - if _, has := sq.SQMetrics[metric.MetricID]; !has { - var stsMetric StatMetric - if stsMetric, err = NewStatMetric(metric.MetricID, - minItems, - metric.FilterIDs); err != nil { - return - } - sq.SQMetrics[metric.MetricID] = stsMetric - } - } - - // if the user define a statQueue with an existing metric check if we need to update it based on queue length - sq.ttl = ttl - if _, err = sq.remExpired(); err != nil { - return - } - if len(sq.SQItems) > queueLength { - for i := 0; i < queueLength-len(sq.SQItems); i++ { - item := sq.SQItems[0] - if err = sq.remEventWithID(item.EventID); err != nil { - return - } - sq.SQItems = sq.SQItems[1:] - } - } - } - } - - var ssq *StoredStatQueue - if dm.dataDB.GetStorageType() != utils.MetaInternal { - // in case of internal we don't marshal - if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil { - return - } - } - if err = dm.dataDB.SetStatQueueDrv(ssq, sq); err != nil { - return - } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.StatQueuePrefix, sq.TenantID(), // this are used to get the host IDs from cache - utils.ReplicatorSv1SetStatQueue, - &StatQueueWithAPIOpts{ - StatQueue: sq, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return -} - -// RemoveStatQueue removes the StoredStatQueue -func (dm *DataManager) RemoveStatQueue(tenant, id string, transactionID string) (err error) { - if dm == nil { - return utils.ErrNoDatabaseConn - } - if err = dm.dataDB.RemStatQueueDrv(tenant, id); err != nil { - return - } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.StatQueuePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveStatQueue, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return -} - // GetFilter returns a filter based on the given ID func (dm *DataManager) GetFilter(tenant, id string, cacheRead, cacheWrite bool, transactionID string) (fltr *Filter, err error) { @@ -1089,7 +923,6 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool) Tenant: th.Tenant, ID: th.ID, Hits: 0, - tPrfl: th, }) } else if _, errTh := dm.GetThreshold(th.Tenant, th.ID, // do not try to get the threshold if the configuration changed true, false, utils.NonTransactional); errTh == utils.ErrNotFound { // the threshold does not exist @@ -1097,7 +930,6 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool) Tenant: th.Tenant, ID: th.ID, Hits: 0, - tPrfl: th, }) } return @@ -1140,6 +972,112 @@ func (dm *DataManager) RemoveThresholdProfile(tenant, id, return dm.RemoveThreshold(tenant, id, transactionID) // remove the thrshold } +// GetStatQueue retrieves a StatQueue from dataDB +// handles caching and deserialization of metrics +func (dm *DataManager) GetStatQueue(tenant, id string, + cacheRead, cacheWrite bool, transactionID string) (sq *StatQueue, err error) { + tntID := utils.ConcatenatedKey(tenant, id) + if cacheRead { + if x, ok := Cache.Get(utils.CacheStatQueues, tntID); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*StatQueue), nil + } + } + if dm == nil { + err = utils.ErrNoDatabaseConn + return + } + sq, err = dm.dataDB.GetStatQueueDrv(tenant, id) + if err != nil { + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; err == utils.ErrNotFound && itm.Remote { + if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil, utils.ReplicatorSv1GetStatQueue, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString, + utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID, + config.CgrConfig().GeneralCfg().NodeID)), + }, &sq); err == nil { + var ssq *StoredStatQueue + if dm.dataDB.GetStorageType() != utils.MetaInternal { + // in case of internal we don't marshal + if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil { + return nil, err + } + } + err = dm.dataDB.SetStatQueueDrv(ssq, sq) + } + } + if err != nil { + if err = utils.CastRPCErr(err); err == utils.ErrNotFound && cacheWrite { + if errCh := Cache.Set(utils.CacheStatQueues, tntID, nil, nil, + cacheCommit(transactionID), transactionID); errCh != nil { + return nil, errCh + } + + } + return nil, err + } + } + if cacheWrite { + if errCh := Cache.Set(utils.CacheStatQueues, tntID, sq, nil, + cacheCommit(transactionID), transactionID); errCh != nil { + return nil, errCh + } + } + return +} + +// SetStatQueue converts to StoredStatQueue and stores the result in dataDB +func (dm *DataManager) SetStatQueue(sq *StatQueue) (err error) { + if dm == nil { + return utils.ErrNoDatabaseConn + } + var ssq *StoredStatQueue + if dm.dataDB.GetStorageType() != utils.MetaInternal { + // in case of internal we don't marshal + if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil { + return + } + } + if err = dm.dataDB.SetStatQueueDrv(ssq, sq); err != nil { + return + } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.StatQueuePrefix, sq.TenantID(), // this are used to get the host IDs from cache + utils.ReplicatorSv1SetStatQueue, + &StatQueueWithAPIOpts{ + StatQueue: sq, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) + } + return +} + +// RemoveStatQueue removes the StoredStatQueue +func (dm *DataManager) RemoveStatQueue(tenant, id string, transactionID string) (err error) { + if dm == nil { + return utils.ErrNoDatabaseConn + } + if err = dm.dataDB.RemStatQueueDrv(tenant, id); err != nil { + return + } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate { + replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.StatQueuePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveStatQueue, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) + } + return +} + func (dm *DataManager) GetStatQueueProfile(tenant, id string, cacheRead, cacheWrite bool, transactionID string) (sqp *StatQueueProfile, err error) { tntID := utils.ConcatenatedKey(tenant, id) @@ -1219,14 +1157,53 @@ func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool } } if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, config.CgrConfig().DataDbCfg().RplFiltered, utils.StatQueueProfilePrefix, sqp.TenantID(), // this are used to get the host IDs from cache utils.ReplicatorSv1SetStatQueueProfile, &StatQueueProfileWithAPIOpts{ StatQueueProfile: sqp, APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}); err != nil { + return + } + } + if oldSts == nil || // create the stats queue if it didn't exist before + oldSts.QueueLength != sqp.QueueLength || + oldSts.TTL != sqp.TTL || + oldSts.MinItems != sqp.MinItems || + (oldSts.Stored != sqp.Stored && oldSts.Stored) { // reset the stats queue if the profile changed this fields + var sq *StatQueue + if sq, err = NewStatQueue(sqp.Tenant, sqp.ID, sqp.Metrics, + sqp.MinItems); err != nil { + return + } + err = dm.SetStatQueue(sq) + } else if oSq, errRs := dm.GetStatQueue(sqp.Tenant, sqp.ID, // do not try to get the stats queue if the configuration changed + true, false, utils.NonTransactional); errRs == utils.ErrNotFound { // the stats queue does not exist + var sq *StatQueue + if sq, err = NewStatQueue(sqp.Tenant, sqp.ID, sqp.Metrics, + sqp.MinItems); err != nil { + return + } + err = dm.SetStatQueue(sq) + } else { // update the metrics if needed + cMetricIDs := utils.StringSet{} + for _, metric := range sqp.Metrics { // add missing metrics and recreate the old metrics that changed + cMetricIDs.Add(metric.MetricID) + if oSqMetric, has := oSq.SQMetrics[metric.MetricID]; !has || + !utils.SliceStringEqual(oSqMetric.GetFilterIDs(), metric.FilterIDs) { // recreate it if the filter changed + if oSq.SQMetrics[metric.MetricID], err = NewStatMetric(metric.MetricID, + sqp.MinItems, metric.FilterIDs); err != nil { + return + } + } + } + for sqMetricID := range oSq.SQMetrics { // remove the old metrics + if !cMetricIDs.Has(sqMetricID) { + delete(oSq.SQMetrics, sqMetricID) + } + } } return } @@ -1265,7 +1242,7 @@ func (dm *DataManager) RemoveStatQueueProfile(tenant, id, APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) } - return + return dm.RemoveStatQueue(tenant, id, transactionID) } func (dm *DataManager) GetTiming(id string, skipCache bool, @@ -1542,30 +1519,18 @@ func (dm *DataManager) SetResourceProfile(rp *ResourceProfile, withIndex bool) ( if oldRes == nil || // create the resource if it didn't exist before oldRes.UsageTTL != rp.UsageTTL || oldRes.Limit != rp.Limit || - oldRes.Stored != rp.Stored { // reset the resource if the profile changed this fields - var ttl *time.Duration - if rp.UsageTTL > 0 { - ttl = &rp.UsageTTL - } + (oldRes.Stored != rp.Stored && oldRes.Stored) { // reset the resource if the profile changed this fields err = dm.SetResource(&Resource{ Tenant: rp.Tenant, ID: rp.ID, Usages: make(map[string]*ResourceUsage), - ttl: ttl, - rPrf: rp, }) } else if _, errRs := dm.GetResource(rp.Tenant, rp.ID, // do not try to get the resource if the configuration changed true, false, utils.NonTransactional); errRs == utils.ErrNotFound { // the resource does not exist - var ttl *time.Duration - if rp.UsageTTL > 0 { - ttl = &rp.UsageTTL - } err = dm.SetResource(&Resource{ Tenant: rp.Tenant, ID: rp.ID, Usages: make(map[string]*ResourceUsage), - ttl: ttl, - rPrf: rp, }) } return diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index 035622027..961b4d41c 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -1412,13 +1412,3 @@ func TestLoadDispatcherHosts(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eDispatcherHosts), utils.ToJSON(csvr.dispatcherHosts[dphKey])) } } - -func TestLoadstatQueues(t *testing.T) { - eStatQueues := []*utils.TenantID{ - {Tenant: "cgrates.org", ID: "TestStats"}, - {Tenant: "cgrates.org", ID: "TestStats2"}, - } - if len(csvr.statQueues) != len(eStatQueues) { - t.Errorf("Failed to load statQueues: %s", utils.ToIJSON(csvr.statQueues)) - } -} diff --git a/engine/stats.go b/engine/stats.go index 7def4b3ae..c88c06d4d 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -120,7 +120,7 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) { if sq.dirty == nil || !*sq.dirty { return } - if err = sS.dm.SetStatQueue(sq, nil, 0, nil, 0, true); err != nil { + if err = sS.dm.SetStatQueue(sq); err != nil { utils.Logger.Warning( fmt.Sprintf(" failed saving StatQueue with ID: %s, error: %s", sq.TenantID(), err.Error())) diff --git a/engine/stats_test.go b/engine/stats_test.go index fabe64c82..112bf1c2c 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -118,9 +118,9 @@ func TestMatchingStatQueuesForEvent(t *testing.T) { }, } stqs := []*StatQueue{ - {Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0]}, - {Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1]}, - {Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2]}, + {Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0], SQMetrics: make(map[string]StatMetric)}, + {Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1], SQMetrics: make(map[string]StatMetric)}, + {Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2], SQMetrics: make(map[string]StatMetric)}, } statsEvs := []*StatsArgsProcessEvent{ { @@ -243,7 +243,7 @@ func TestMatchingStatQueuesForEvent(t *testing.T) { dmSTS.SetStatQueueProfile(statQueueProfile, true) } for _, statQueue := range stqs { - dmSTS.SetStatQueue(statQueue, nil, 0, nil, 0, true) + dmSTS.SetStatQueue(statQueue) } //Test each statQueueProfile from cache for _, sqp := range sqps { @@ -358,9 +358,9 @@ func TestStatQueuesProcessEvent(t *testing.T) { }, } stqs := []*StatQueue{ - {Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0]}, - {Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1]}, - {Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2]}, + {Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0], SQMetrics: make(map[string]StatMetric)}, + {Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1], SQMetrics: make(map[string]StatMetric)}, + {Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2], SQMetrics: make(map[string]StatMetric)}, } statsEvs := []*StatsArgsProcessEvent{ { @@ -483,7 +483,7 @@ func TestStatQueuesProcessEvent(t *testing.T) { dmSTS.SetStatQueueProfile(statQueueProfile, true) } for _, statQueue := range stqs { - dmSTS.SetStatQueue(statQueue, nil, 0, nil, 0, true) + dmSTS.SetStatQueue(statQueue) } //Test each statQueueProfile from cache for _, sqp := range sqps { @@ -599,9 +599,9 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) { }, } stqs := []*StatQueue{ - {Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0]}, - {Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1]}, - {Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2]}, + {Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0], SQMetrics: make(map[string]StatMetric)}, + {Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1], SQMetrics: make(map[string]StatMetric)}, + {Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2], SQMetrics: make(map[string]StatMetric)}, } statsEvs := []*StatsArgsProcessEvent{ { @@ -724,7 +724,7 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) { dmSTS.SetStatQueueProfile(statQueueProfile, true) } for _, statQueue := range stqs { - dmSTS.SetStatQueue(statQueue, nil, 0, nil, 0, true) + dmSTS.SetStatQueue(statQueue) } //Test each statQueueProfile from cache for _, sqp := range sqps { @@ -840,9 +840,9 @@ func TestStatQueuesV1ProcessEvent(t *testing.T) { }, } stqs := []*StatQueue{ - {Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0]}, - {Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1]}, - {Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2]}, + {Tenant: "cgrates.org", ID: "StatQueueProfile1", sqPrfl: sqps[0], SQMetrics: make(map[string]StatMetric)}, + {Tenant: "cgrates.org", ID: "StatQueueProfile2", sqPrfl: sqps[1], SQMetrics: make(map[string]StatMetric)}, + {Tenant: "cgrates.org", ID: "StatQueueProfilePrefix", sqPrfl: sqps[2], SQMetrics: make(map[string]StatMetric)}, } statsEvs := []*StatsArgsProcessEvent{ { @@ -965,7 +965,7 @@ func TestStatQueuesV1ProcessEvent(t *testing.T) { dmSTS.SetStatQueueProfile(statQueueProfile, true) } for _, statQueue := range stqs { - dmSTS.SetStatQueue(statQueue, nil, 0, nil, 0, true) + dmSTS.SetStatQueue(statQueue) } //Test each statQueueProfile from cache for _, sqp := range sqps { @@ -999,7 +999,7 @@ func TestStatQueuesV1ProcessEvent(t *testing.T) { if err := dmSTS.SetStatQueueProfile(sqPrf, true); err != nil { t.Error(err) } - if err := dmSTS.SetStatQueue(sq, nil, 0, nil, 0, true); err != nil { + if err := dmSTS.SetStatQueue(sq); err != nil { t.Error(err) } if tempStat, err := dmSTS.GetStatQueueProfile(sqPrf.Tenant, diff --git a/engine/tpreader.go b/engine/tpreader.go index 00878df81..b93e24a40 100644 --- a/engine/tpreader.go +++ b/engine/tpreader.go @@ -56,13 +56,10 @@ type TpReader struct { chargerProfiles map[utils.TenantID]*utils.TPChargerProfile dispatcherProfiles map[utils.TenantID]*utils.TPDispatcherProfile dispatcherHosts map[utils.TenantID]*utils.TPDispatcherHost - // resources []*utils.TenantID // IDs of resources which need creation based on resourceProfiles - statQueues []*utils.TenantID // IDs of statQueues which need creation based on statQueueProfiles - // thresholds []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles - acntActionPlans map[string][]string - cacheConns []string - schedulerConns []string - isInternalDB bool // do not reload cache if we use intarnalDB + acntActionPlans map[string][]string + cacheConns []string + schedulerConns []string + isInternalDB bool // do not reload cache if we use intarnalDB } func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string, @@ -1130,7 +1127,6 @@ func (tpr *TpReader) LoadStatsFiltered(tag string) (err error) { return } mapSTs[utils.TenantID{Tenant: st.Tenant, ID: st.ID}] = st - tpr.statQueues = append(tpr.statQueues, &utils.TenantID{Tenant: st.Tenant, ID: st.ID}) } tpr.sqProfiles = mapSTs return nil @@ -1575,52 +1571,8 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) { } } if len(tpr.sqProfiles) != 0 { - loadIDs[utils.CacheStatQueueProfiles] = loadID - } - if verbose { - log.Print("StatQueues:") - } - for _, sqTntID := range tpr.statQueues { - var ttl *time.Duration - if tpr.sqProfiles[*sqTntID].TTL != utils.EmptyString { - ttl = new(time.Duration) - if *ttl, err = utils.ParseDurationWithNanosecs(tpr.sqProfiles[*sqTntID].TTL); err != nil { - return - } - if *ttl <= 0 { - ttl = nil - } - } - metrics := make([]*MetricWithFilters, len(tpr.sqProfiles[*sqTntID].Metrics)) - for i, metric := range tpr.sqProfiles[*sqTntID].Metrics { - metrics[i] = &MetricWithFilters{ - MetricID: metric.MetricID, - FilterIDs: metric.FilterIDs, - } - } - sq := &StatQueue{ - Tenant: sqTntID.Tenant, - ID: sqTntID.ID, - } - if !tpr.sqProfiles[*sqTntID].Stored { //for not stored queues create the metrics - if sq, err = NewStatQueue(sqTntID.Tenant, sqTntID.ID, metrics, - tpr.sqProfiles[*sqTntID].MinItems); err != nil { - return - } - } - // for non stored we do not save the metrics - if err = tpr.dm.SetStatQueue(sq, metrics, - tpr.sqProfiles[*sqTntID].MinItems, - ttl, tpr.sqProfiles[*sqTntID].QueueLength, - !tpr.sqProfiles[*sqTntID].Stored); err != nil { - return err - } - if verbose { - log.Print("\t", sqTntID.TenantID()) - } - } - if len(tpr.statQueues) != 0 { loadIDs[utils.CacheStatQueues] = loadID + loadIDs[utils.CacheStatQueueProfiles] = loadID } if verbose { log.Print("ThresholdProfiles:") @@ -1830,12 +1782,6 @@ func (tpr *TpReader) ShowStatistics() { // GetLoadedIds returns the identities loaded for a specific category, useful for cache reloads func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) { switch categ { - case utils.StatQueuePrefix: - keys := make([]string, len(tpr.statQueues)) - for i, k := range tpr.statQueues { - keys[i] = k.TenantID() - } - return keys, nil case utils.DestinationPrefix: keys := make([]string, len(tpr.destinations)) i := 0 @@ -2104,17 +2050,6 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error log.Print("\t", utils.ConcatenatedKey(tpST.Tenant, tpST.ID)) } } - if verbose { - log.Print("StatQueues:") - } - for _, sqTntID := range tpr.statQueues { - if err = tpr.dm.RemoveStatQueue(sqTntID.Tenant, sqTntID.ID, utils.NonTransactional); err != nil { - return - } - if verbose { - log.Print("\t", sqTntID.TenantID()) - } - } if verbose { log.Print("ThresholdProfiles:") } @@ -2265,8 +2200,6 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error } if len(tpr.sqProfiles) != 0 { loadIDs[utils.CacheStatQueueProfiles] = loadID - } - if len(tpr.statQueues) != 0 { loadIDs[utils.CacheStatQueues] = loadID } if len(tpr.thProfiles) != 0 { @@ -2313,7 +2246,6 @@ func (tpr *TpReader) ReloadCache(caching string, verbose bool, opts map[string]i shgIds, _ := tpr.GetLoadedIds(utils.SharedGroupPrefix) rspIDs, _ := tpr.GetLoadedIds(utils.ResourceProfilesPrefix) aatIDs, _ := tpr.GetLoadedIds(utils.ActionTriggerPrefix) - stqIDs, _ := tpr.GetLoadedIds(utils.StatQueuePrefix) stqpIDs, _ := tpr.GetLoadedIds(utils.StatQueueProfilePrefix) trspfIDs, _ := tpr.GetLoadedIds(utils.ThresholdProfilePrefix) flrIDs, _ := tpr.GetLoadedIds(utils.FilterPrefix) @@ -2338,7 +2270,7 @@ func (tpr *TpReader) ReloadCache(caching string, verbose bool, opts map[string]i utils.ResourceProfileIDs: rspIDs, utils.ResourceIDs: rspIDs, utils.ActionTriggerIDs: aatIDs, - utils.StatsQueueIDs: stqIDs, + utils.StatsQueueIDs: stqpIDs, utils.StatsQueueProfileIDs: stqpIDs, utils.ThresholdIDs: trspfIDs, utils.ThresholdProfileIDs: trspfIDs, diff --git a/engine/tpreader_test.go b/engine/tpreader_test.go index 9a996112b..fa7f94669 100644 --- a/engine/tpreader_test.go +++ b/engine/tpreader_test.go @@ -406,29 +406,6 @@ func TestCallCacheClear(t *testing.T) { } } -func TestGetLoadedIdsStatQueues(t *testing.T) { - tpr := &TpReader{ - statQueues: []*utils.TenantID{ - { - Tenant: "cgrates.org", - ID: "statQueueID", - }, - { - Tenant: "tenant.com", - ID: "mytenantID", - }, - }, - } - rcv, err := tpr.GetLoadedIds(utils.StatQueuePrefix) - if err != nil { - t.Error(err) - } - expRcv := []string{"cgrates.org:statQueueID", "tenant.com:mytenantID"} - if !reflect.DeepEqual(expRcv, rcv) { - t.Errorf("\nExpected %v but received \n%v", expRcv, rcv) - } -} - func TestGetLoadedIdsDestinations(t *testing.T) { tpr := &TpReader{ destinations: map[string]*Destination{ @@ -984,7 +961,7 @@ func TestReloadCache(t *testing.T) { "DispatcherProfileIDs": {"cgrates.org:dispatcherProfilesID"}, "DispatcherHostIDs": {"cgrates.org:dispatcherHostsID"}, "ResourceIDs": {"cgrates.org:resourceProfilesID"}, - "StatsQueueIDs": {"cgrates.org:statQueuesID"}, + "StatsQueueIDs": {"cgrates.org:statProfilesID"}, "ThresholdIDs": {"cgrates.org:thresholdProfilesID"}, "AccountActionPlanIDs": {"AccountActionPlansID"}, }, @@ -1059,12 +1036,6 @@ func TestReloadCache(t *testing.T) { dispatcherHosts: map[utils.TenantID]*utils.TPDispatcherHost{ {Tenant: "cgrates.org", ID: "dispatcherHostsID"}: {}, }, - statQueues: []*utils.TenantID{ - { - Tenant: "cgrates.org", - ID: "statQueuesID", - }, - }, acntActionPlans: map[string][]string{ "AccountActionPlansID": {}, }, diff --git a/engine/z_datamanager_it_test.go b/engine/z_datamanager_it_test.go index 150b5f868..722e5fb09 100644 --- a/engine/z_datamanager_it_test.go +++ b/engine/z_datamanager_it_test.go @@ -117,7 +117,7 @@ func testDMitCRUDStatQueue(t *testing.T) { if _, ok := Cache.Get(utils.CacheStatQueues, sq.TenantID()); ok != false { t.Error("should not be in cache") } - if err := dm2.SetStatQueue(sq, nil, 0, nil, 0, true); err != nil { + if err := dm2.SetStatQueue(sq); err != nil { t.Error(err) } if _, ok := Cache.Get(utils.CacheStatQueues, sq.TenantID()); ok != false { diff --git a/engine/z_onstor_it_test.go b/engine/z_onstor_it_test.go index 1efccea38..67e8c26d1 100644 --- a/engine/z_onstor_it_test.go +++ b/engine/z_onstor_it_test.go @@ -1518,7 +1518,7 @@ func testOnStorITStatQueue(t *testing.T) { true, false, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } - if err := onStor.SetStatQueue(sq, nil, 0, nil, 0, true); err != nil { + if err := onStor.SetStatQueue(sq); err != nil { t.Error(err) } //get from database @@ -1546,7 +1546,7 @@ func testOnStorITStatQueue(t *testing.T) { }, }, } - if err := onStor.SetStatQueue(sq, nil, 0, nil, 0, true); err != nil { + if err := onStor.SetStatQueue(sq); err != nil { t.Error(err) } diff --git a/loaders/lib_test.go b/loaders/lib_test.go index 1b9ac245e..0a60ec5f1 100644 --- a/loaders/lib_test.go +++ b/loaders/lib_test.go @@ -739,45 +739,3 @@ cgrates.org,MOCK_RELOAD_3 t.Errorf("Expected %+v, received %+v", expected, err) } } - -type dataDBMockError struct { - *engine.DataDBMock -} - -func (dbM *dataDBMockError) SetIndexesDrv(idxItmType, tntCtx string, - indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) { - return -} - -func (dbM *dataDBMockError) HasDataDrv(string, string, string) (bool, error) { - return false, nil -} - -//For StatQueue -func (dbM *dataDBMockError) GetStatQueueProfileDrv(tenant string, ID string) (sq *engine.StatQueueProfile, err error) { - return nil, nil -} - -func (dbM *dataDBMockError) RemStatQueueProfileDrv(tenant, id string) (err error) { - return nil -} - -func (dbM *dataDBMockError) RemStatQueueDrv(tenant, id string) (err error) { - return utils.ErrNoDatabaseConn -} - -func (dbM *dataDBMockError) GetStatQueueDrv(tenant, id string) (sq *engine.StatQueue, err error) { - return nil, utils.ErrNoDatabaseConn -} - -func (dbM *dataDBMockError) SetStatQueueDrv(ssq *engine.StoredStatQueue, sq *engine.StatQueue) (err error) { - return utils.ErrNoDatabaseConn -} - -func (dbM *dataDBMockError) SetStatQueueProfileDrv(sq *engine.StatQueueProfile) (err error) { - return nil -} - -func (dbM *dataDBMockError) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) { - return nil, nil -} diff --git a/loaders/loader.go b/loaders/loader.go index 8b18f8102..2c528f383 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -389,22 +389,6 @@ func (ldr *Loader) storeLoadedData(loaderType string, if err := ldr.dm.SetStatQueueProfile(stsPrf, true); err != nil { return err } - var sq *engine.StatQueue - if sq, err = engine.NewStatQueue(stsPrf.Tenant, stsPrf.ID, stsPrf.Metrics, - stsPrf.MinItems); err != nil { - return utils.APIErrorHandler(err) - } - var ttl *time.Duration - if stsPrf.TTL > 0 { - ttl = &stsPrf.TTL - } - - // for non stored we do not save the metrics - if err := ldr.dm.SetStatQueue(sq, stsPrf.Metrics, - stsPrf.MinItems, ttl, stsPrf.QueueLength, - !stsPrf.Stored); err != nil { - return err - } cacheArgs[utils.StatsQueueProfileIDs] = ids cacheArgs[utils.StatsQueueIDs] = ids } @@ -706,9 +690,6 @@ func (ldr *Loader) removeLoadedData(loaderType string, lds map[string][]LoaderDa tntIDStruct.ID, utils.NonTransactional, true); err != nil { return err } - if err := ldr.dm.RemoveStatQueue(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil { - return err - } cacheArgs[utils.StatsQueueProfileIDs] = ids cacheArgs[utils.StatsQueueIDs] = ids } diff --git a/loaders/loader_test.go b/loaders/loader_test.go index f31b89fc0..efb2aa013 100644 --- a/loaders/loader_test.go +++ b/loaders/loader_test.go @@ -769,24 +769,21 @@ func TestLoaderProcessStatsWrongMetrics(t *testing.T) { bufLoaderData: make(map[string][]LoaderData), dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil), timezone: "UTC", - } - ldr.dataTpls = map[string][]*config.FCTemplate{ - utils.MetaStats: { - {Tag: "MetricIDs", - Path: "MetricIDs", - Type: utils.MetaComposed, - Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep)}, - {Tag: "Stored", - Path: "Stored", - Type: utils.MetaComposed, - Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep)}, + dataTpls: map[string][]*config.FCTemplate{ + utils.MetaStats: { + {Tag: "MetricIDs", + Path: "MetricIDs", + Type: utils.MetaComposed, + Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep)}, + {Tag: "Stored", + Path: "Stored", + Type: utils.MetaComposed, + Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep)}, + }, }, } - statsCsv := ` -#Metrics[0],Stored[1] -not_a_valid_metric_type,true, -` - rdr := io.NopCloser(strings.NewReader(statsCsv)) + rdr := io.NopCloser(strings.NewReader(`#Metrics[0],Stored[1] +not_a_valid_metric_type,true,`)) csvRdr := csv.NewReader(rdr) csvRdr.Comment = '#' ldr.rdrs = map[string]map[string]*openedCSVFile{ @@ -794,7 +791,7 @@ not_a_valid_metric_type,true, utils.StatsCsv: &openedCSVFile{fileName: utils.StatsCsv, rdr: rdr, csvRdr: csvRdr}}, } - expected := "SERVER_ERROR: unsupported metric type " + expected := "unsupported metric type " if err := ldr.processContent(utils.MetaStats, utils.EmptyString); err == nil || err.Error() != expected { t.Errorf("Expected %+v, received %+v", expected, err) } @@ -803,11 +800,8 @@ not_a_valid_metric_type,true, } //initialize again but with a valid metric and false stored field - statsCsv = ` -#Metrics[0],Stored[1] -*sum#~*req.Value,false -` - rdr = io.NopCloser(strings.NewReader(statsCsv)) + rdr = io.NopCloser(strings.NewReader(`#Metrics[0],Stored[1] +*sum#~*req.Value,false`)) csvRdr = csv.NewReader(rdr) csvRdr.Comment = '#' ldr.rdrs = map[string]map[string]*openedCSVFile{ @@ -3054,26 +3048,23 @@ func TestRemoveStatQueueMockError(t *testing.T) { bufLoaderData: make(map[string][]LoaderData), dm: engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil), timezone: "UTC", - } - ldr.dataTpls = map[string][]*config.FCTemplate{ - utils.MetaStats: { - {Tag: "TenantID", - Path: "Tenant", - Type: utils.MetaComposed, - Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep), - Mandatory: true}, - {Tag: "ProfileID", - Path: "ID", - Type: utils.MetaComposed, - Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep), - Mandatory: true}, + dataTpls: map[string][]*config.FCTemplate{ + utils.MetaStats: { + {Tag: "TenantID", + Path: "Tenant", + Type: utils.MetaComposed, + Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep), + Mandatory: true}, + {Tag: "ProfileID", + Path: "ID", + Type: utils.MetaComposed, + Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep), + Mandatory: true}, + }, }, } - statsCsv := ` -#Tenant[0],ProfileID[1] -cgrates.org,REM_STATS_1 -` - rdr := io.NopCloser(strings.NewReader(statsCsv)) + rdr := io.NopCloser(strings.NewReader(`#Tenant[0],ProfileID[1] +cgrates.org,REM_STATS_1`)) csvRdr := csv.NewReader(rdr) csvRdr.Comment = '#' ldr.rdrs = map[string]map[string]*openedCSVFile{ @@ -3085,18 +3076,13 @@ cgrates.org,REM_STATS_1 }, }, } - expStats := &engine.StatQueueProfile{ - Tenant: "cgrates.org", - ID: "REM_STATS_1", - } - if err := ldr.dm.SetStatQueueProfile(expStats, true); err != nil { - t.Error(err) - } - - newData := &dataDBMockError{} - ldr.dm = engine.NewDataManager(newData, config.CgrConfig().CacheCfg(), nil) expected := utils.ErrNoDatabaseConn + ldr.dm = engine.NewDataManager(&engine.DataDBMock{ + GetStatQueueProfileDrvF: func(tenant, id string) (sq *engine.StatQueueProfile, err error) { return nil, nil }, + SetStatQueueProfileDrvF: func(sq *engine.StatQueueProfile) (err error) { return expected }, + RemStatQueueProfileDrvF: func(tenant, id string) (err error) { return expected }, + }, config.CgrConfig().CacheCfg(), nil) if err := ldr.removeContent(utils.MetaStats, utils.EmptyString); err == nil || err != expected { t.Errorf("Expected %+v, received %+v", expected, err) diff --git a/migrator/stats.go b/migrator/stats.go index 032a4131c..00919e6dc 100644 --- a/migrator/stats.go +++ b/migrator/stats.go @@ -58,7 +58,7 @@ type v1Stat struct { Triggers engine.ActionTriggers } -func (m *Migrator) moveStatQueueProfile() (err error) { +func (m *Migrator) migrateCurrentStats() (err error) { //StatQueueProfile var ids []string if ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueueProfilePrefix); err != nil { @@ -69,52 +69,32 @@ func (m *Migrator) moveStatQueueProfile() (err error) { if len(tntID) < 2 { return fmt.Errorf("Invalid key <%s> when migrating stat queue profiles", id) } - sgs, err := m.dmIN.DataManager().GetStatQueueProfile(tntID[0], tntID[1], false, false, utils.NonTransactional) + sqp, err := m.dmIN.DataManager().GetStatQueueProfile(tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return err } - if sgs == nil || m.dryRun { - continue - } - if err := m.dmOut.DataManager().SetStatQueueProfile(sgs, true); err != nil { - return err - } - if err := m.dmIN.DataManager().RemoveStatQueueProfile(tntID[0], tntID[1], utils.NonTransactional, false); err != nil { - return err - } - } - return -} - -func (m *Migrator) migrateCurrentStats() (err error) { - var ids []string - //StatQueue - if ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueuePrefix); err != nil { - return err - } - for _, id := range ids { - tntID := strings.SplitN(strings.TrimPrefix(id, utils.StatQueuePrefix), utils.InInFieldSep, 2) - if len(tntID) < 2 { - return fmt.Errorf("Invalid key <%s> when migrating stat queues", id) - } sgs, err := m.dmIN.DataManager().GetStatQueue(tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return err } - if sgs == nil || m.dryRun { + if sqp == nil || m.dryRun { continue } - if err := m.dmOut.DataManager().SetStatQueue(sgs, nil, 0, nil, 0, true); err != nil { + if err := m.dmOut.DataManager().SetStatQueueProfile(sqp, true); err != nil { return err } - if err := m.dmIN.DataManager().RemoveStatQueue(tntID[0], tntID[1], utils.NonTransactional); err != nil { + if sgs != nil { + if err := m.dmOut.DataManager().SetStatQueue(sgs); err != nil { + return err + } + } + if err := m.dmIN.DataManager().RemoveStatQueueProfile(tntID[0], tntID[1], utils.NonTransactional, false); err != nil { return err } m.stats[utils.StatS]++ } - - return m.moveStatQueueProfile() + return } func (m *Migrator) migrateV1Stats() (filter *engine.Filter, v2Stats *engine.StatQueue, sts *engine.StatQueueProfile, err error) { @@ -232,8 +212,10 @@ func (m *Migrator) migrateStats() (err error) { if err = m.dmOut.DataManager().SetStatQueueProfile(v4sts, true); err != nil { return } - if err = m.dmOut.DataManager().SetStatQueue(v3Stats, nil, 0, nil, 0, true); err != nil { - return + if v3Stats != nil { + if err = m.dmOut.DataManager().SetStatQueue(v3Stats); err != nil { + return + } } } m.stats[utils.StatS]++ diff --git a/migrator/stats_it_test.go b/migrator/stats_it_test.go index 5c82d40ca..e7732f18b 100644 --- a/migrator/stats_it_test.go +++ b/migrator/stats_it_test.go @@ -294,7 +294,7 @@ func testStsITMigrateAndMove(t *testing.T) { if err := stsMigrator.dmIN.DataManager().SetStatQueueProfile(sqp, false); err != nil { t.Error("Error when setting Stats ", err.Error()) } - if err := stsMigrator.dmIN.DataManager().SetStatQueue(sq, nil, 0, nil, 0, true); err != nil { + if err := stsMigrator.dmIN.DataManager().SetStatQueue(sq); err != nil { t.Error("Error when setting Stats ", err.Error()) } if err := stsMigrator.dmOut.DataManager().SetFilter(filter, true); err != nil { diff --git a/migrator/thresholds.go b/migrator/thresholds.go index 2abbe0b34..4ccac40af 100644 --- a/migrator/thresholds.go +++ b/migrator/thresholds.go @@ -80,11 +80,10 @@ func (m *Migrator) migrateCurrentThresholds() (err error) { return err } // update the threshold in the new DB - if ths == nil { - continue - } - if err := m.dmOut.DataManager().SetThreshold(ths); err != nil { - return err + if ths != nil { + if err := m.dmOut.DataManager().SetThreshold(ths); err != nil { + return err + } } if err := m.dmIN.DataManager().RemoveThresholdProfile(tntID[0], tntID[1], utils.NonTransactional, false); err != nil { return err diff --git a/utils/slice.go b/utils/slice.go index 2260ac3a3..808213068 100644 --- a/utils/slice.go +++ b/utils/slice.go @@ -76,3 +76,15 @@ func CloneStringSlice(in []string) (cl []string) { copy(cl, in) return } + +func SliceStringEqual(v1, v2 []string) bool { + if len(v1) != len(v2) { + return false + } + for i := range v1 { + if v1[i] != v2[i] { + return false + } + } + return true +}