mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-24 16:48:45 +05:00
TPReader creating StatQueue based on StatQueueProfile, adding necessary methods in dataDB, tests
This commit is contained in:
@@ -1273,7 +1273,7 @@ func TestApierResetDataAfterLoadFromFolder(t *testing.T) {
|
||||
rcvStats.Aliases != 1 ||
|
||||
rcvStats.ReverseAliases != 2 ||
|
||||
rcvStats.ResourceProfiles != 3 ||
|
||||
rcvStats.Resources != 0 {
|
||||
rcvStats.Resources != 3 {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expStats, rcvStats)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,7 +102,7 @@ func TestSMGV1CacheStats(t *testing.T) {
|
||||
var rcvStats *utils.CacheStats
|
||||
expectedStats := &utils.CacheStats{Destinations: 5, ReverseDestinations: 7, RatingPlans: 4, RatingProfiles: 9,
|
||||
Actions: 8, ActionPlans: 4, AccountActionPlans: 5, SharedGroups: 1, DerivedChargers: 1,
|
||||
LcrProfiles: 5, CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 0}
|
||||
LcrProfiles: 5, CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 3}
|
||||
var args utils.AttrCacheStats
|
||||
if err := smgV1Rpc.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil {
|
||||
t.Error("Got error on ApierV1.GetCacheStats: ", err.Error())
|
||||
|
||||
@@ -1453,10 +1453,10 @@ func TestLoadStats(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if len(csvr.stats) != len(eStats) {
|
||||
t.Error("Failed to load stats: ", len(csvr.stats))
|
||||
} else if !reflect.DeepEqual(eStats["Stats1"], csvr.stats["Stats1"]) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eStats["Stats1"], csvr.stats["Stats1"])
|
||||
if len(csvr.sqProfiles) != len(eStats) {
|
||||
t.Error("Failed to load stats: ", len(csvr.sqProfiles))
|
||||
} else if !reflect.DeepEqual(eStats["Stats1"], csvr.sqProfiles["Stats1"]) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eStats["Stats1"], csvr.sqProfiles["Stats1"])
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -323,7 +323,7 @@ func TestLoaderITWriteToDatabase(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
for k, st := range loader.stats {
|
||||
for k, st := range loader.sqProfiles {
|
||||
rcv, err := loader.dataStorage.GetStatQueueProfile(k)
|
||||
if err != nil {
|
||||
t.Error("Failed GetStatsQueue: ", err.Error())
|
||||
|
||||
@@ -306,7 +306,9 @@ func (ms *MapStorage) HasData(categ, subject string) (bool, error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
switch categ {
|
||||
case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, utils.ResourcesPrefix:
|
||||
case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX,
|
||||
utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX,
|
||||
utils.ResourcesPrefix, utils.StatQueuePrefix:
|
||||
_, exists := ms.dict[categ+subject]
|
||||
return exists, nil
|
||||
}
|
||||
|
||||
@@ -21,7 +21,6 @@ package engine
|
||||
import (
|
||||
"bytes"
|
||||
"compress/zlib"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
@@ -327,7 +326,7 @@ func (ms *MongoStorage) getColNameForPrefix(prefix string) (name string, ok bool
|
||||
utils.LOADINST_KEY: colLht,
|
||||
utils.VERSION_PREFIX: colVer,
|
||||
utils.ResourceProfilesPrefix: colRsP,
|
||||
utils.StatsPrefix: colStq,
|
||||
utils.StatQueuePrefix: colStq,
|
||||
utils.TimingsPrefix: colTmg,
|
||||
utils.ResourcesPrefix: colRes,
|
||||
}
|
||||
@@ -632,10 +631,10 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er
|
||||
for iter.Next(&idResult) {
|
||||
result = append(result, utils.ResourcesPrefix+idResult.Id)
|
||||
}
|
||||
case utils.StatsPrefix:
|
||||
case utils.StatQueuePrefix:
|
||||
iter := db.C(colStq).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
|
||||
for iter.Next(&idResult) {
|
||||
result = append(result, utils.StatsPrefix+idResult.Id)
|
||||
result = append(result, utils.StatQueuePrefix+idResult.Id)
|
||||
}
|
||||
case utils.StatQueueProfilePrefix:
|
||||
iter := db.C(colSqp).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
|
||||
@@ -658,34 +657,40 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) HasData(category, subject string) (bool, error) {
|
||||
func (ms *MongoStorage) HasData(category, subject string) (has bool, err error) {
|
||||
session := ms.session.Copy()
|
||||
defer session.Close()
|
||||
db := session.DB(ms.db)
|
||||
var count int
|
||||
switch category {
|
||||
case utils.DESTINATION_PREFIX:
|
||||
count, err := db.C(colDst).Find(bson.M{"key": subject}).Count()
|
||||
return count > 0, err
|
||||
count, err = db.C(colDst).Find(bson.M{"key": subject}).Count()
|
||||
has = count > 0
|
||||
case utils.RATING_PLAN_PREFIX:
|
||||
count, err := db.C(colRpl).Find(bson.M{"key": subject}).Count()
|
||||
return count > 0, err
|
||||
count, err = db.C(colRpl).Find(bson.M{"key": subject}).Count()
|
||||
has = count > 0
|
||||
case utils.RATING_PROFILE_PREFIX:
|
||||
count, err := db.C(colRpf).Find(bson.M{"id": subject}).Count()
|
||||
return count > 0, err
|
||||
count, err = db.C(colRpf).Find(bson.M{"id": subject}).Count()
|
||||
has = count > 0
|
||||
case utils.ACTION_PREFIX:
|
||||
count, err := db.C(colAct).Find(bson.M{"key": subject}).Count()
|
||||
return count > 0, err
|
||||
count, err = db.C(colAct).Find(bson.M{"key": subject}).Count()
|
||||
has = count > 0
|
||||
case utils.ACTION_PLAN_PREFIX:
|
||||
count, err := db.C(colApl).Find(bson.M{"key": subject}).Count()
|
||||
return count > 0, err
|
||||
count, err = db.C(colApl).Find(bson.M{"key": subject}).Count()
|
||||
has = count > 0
|
||||
case utils.ACCOUNT_PREFIX:
|
||||
count, err := db.C(colAcc).Find(bson.M{"id": subject}).Count()
|
||||
return count > 0, err
|
||||
count, err = db.C(colAcc).Find(bson.M{"id": subject}).Count()
|
||||
has = count > 0
|
||||
case utils.ResourcesPrefix:
|
||||
count, err := db.C(colRes).Find(bson.M{"id": subject}).Count()
|
||||
return count > 0, err
|
||||
count, err = db.C(colRes).Find(bson.M{"id": subject}).Count()
|
||||
has = count > 0
|
||||
case utils.StatQueuePrefix:
|
||||
count, err = db.C(colRes).Find(bson.M{"id": subject}).Count()
|
||||
has = count > 0
|
||||
default:
|
||||
err = fmt.Errorf("unsupported category in HasData: %s", category)
|
||||
}
|
||||
return false, errors.New("unsupported category in HasData")
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetRatingPlan(key string, skipCache bool, transactionID string) (rp *RatingPlan, err error) {
|
||||
|
||||
@@ -319,7 +319,9 @@ func (rs *RedisStorage) GetKeysForPrefix(prefix string) ([]string, error) {
|
||||
// Used to check if specific subject is stored using prefix key attached to entity
|
||||
func (rs *RedisStorage) HasData(category, subject string) (bool, error) {
|
||||
switch category {
|
||||
case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, utils.ResourcesPrefix:
|
||||
case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX,
|
||||
utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX,
|
||||
utils.ResourcesPrefix, utils.StatQueuePrefix:
|
||||
i, err := rs.Cmd("EXISTS", category+subject).Int()
|
||||
return i == 1, err
|
||||
}
|
||||
|
||||
@@ -54,9 +54,10 @@ type TpReader struct {
|
||||
users map[string]*UserProfile
|
||||
aliases map[string]*Alias
|
||||
resProfiles map[string]*utils.TPResource
|
||||
stats map[string]*utils.TPStats
|
||||
sqProfiles map[string]*utils.TPStats
|
||||
thresholds map[string]*utils.TPThreshold
|
||||
resources []string // IDs of resources which need creation based on resourceConfigs
|
||||
resources []string // IDs of resources which need creation based on resourceProfiles
|
||||
statQueues []string // IDs of statQueues which need creation based on statQueueProfiles
|
||||
|
||||
revDests,
|
||||
revAliases,
|
||||
@@ -129,7 +130,7 @@ func (tpr *TpReader) Init() {
|
||||
tpr.aliases = make(map[string]*Alias)
|
||||
tpr.derivedChargers = make(map[string]*utils.DerivedChargers)
|
||||
tpr.resProfiles = make(map[string]*utils.TPResource)
|
||||
tpr.stats = make(map[string]*utils.TPStats)
|
||||
tpr.sqProfiles = make(map[string]*utils.TPStats)
|
||||
tpr.thresholds = make(map[string]*utils.TPThreshold)
|
||||
tpr.revDests = make(map[string][]string)
|
||||
tpr.revAliases = make(map[string][]string)
|
||||
@@ -1626,7 +1627,14 @@ func (tpr *TpReader) LoadStatsFiltered(tag string) error {
|
||||
for _, st := range tps {
|
||||
mapSTs[st.ID] = st
|
||||
}
|
||||
tpr.stats = mapSTs
|
||||
tpr.sqProfiles = mapSTs
|
||||
for sqID := range mapSTs {
|
||||
if has, err := tpr.dataStorage.HasData(utils.StatQueuePrefix, sqID); err != nil {
|
||||
return err
|
||||
} else if !has {
|
||||
tpr.statQueues = append(tpr.statQueues, sqID)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1970,7 +1978,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
|
||||
if verbose {
|
||||
log.Print("StatQueueProfiles:")
|
||||
}
|
||||
for _, tpST := range tpr.stats {
|
||||
for _, tpST := range tpr.sqProfiles {
|
||||
st, err := APItoStats(tpST, tpr.timezone)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -1982,6 +1990,18 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
|
||||
log.Print("\t", st.ID)
|
||||
}
|
||||
}
|
||||
if verbose {
|
||||
log.Print("StatQueues:")
|
||||
}
|
||||
for _, sqID := range tpr.statQueues {
|
||||
if err = tpr.dataStorage.SetStoredStatQueue(&StoredStatQueue{Tenant: "", ID: sqID,
|
||||
SQMetrics: make(map[string][]byte)}); err != nil {
|
||||
return
|
||||
}
|
||||
if verbose {
|
||||
log.Print("\t", sqID)
|
||||
}
|
||||
}
|
||||
if verbose {
|
||||
log.Print("Thresholds:")
|
||||
}
|
||||
@@ -2055,7 +2075,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(tpr.stats) > 0 {
|
||||
if len(tpr.sqProfiles) > 0 {
|
||||
if verbose {
|
||||
log.Print("Indexing stats")
|
||||
}
|
||||
@@ -2063,7 +2083,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, tpST := range tpr.stats {
|
||||
for _, tpST := range tpr.sqProfiles {
|
||||
if st, err := APItoStats(tpST, tpr.timezone); err != nil {
|
||||
return err
|
||||
} else {
|
||||
@@ -2163,7 +2183,7 @@ func (tpr *TpReader) ShowStatistics() {
|
||||
// resource limits
|
||||
log.Print("ResourceProfiles: ", len(tpr.resProfiles))
|
||||
// stats
|
||||
log.Print("Stats: ", len(tpr.stats))
|
||||
log.Print("Stats: ", len(tpr.sqProfiles))
|
||||
}
|
||||
|
||||
// Returns the identities loaded for a specific category, useful for cache reloads
|
||||
@@ -2297,10 +2317,10 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) {
|
||||
i++
|
||||
}
|
||||
return keys, nil
|
||||
case utils.StatsPrefix:
|
||||
keys := make([]string, len(tpr.stats))
|
||||
case utils.StatQueueProfilePrefix:
|
||||
keys := make([]string, len(tpr.sqProfiles))
|
||||
i := 0
|
||||
for k := range tpr.stats {
|
||||
for k := range tpr.sqProfiles {
|
||||
keys[i] = k
|
||||
i++
|
||||
}
|
||||
|
||||
@@ -100,7 +100,7 @@ func TestTutSMGCacheStats(t *testing.T) {
|
||||
var rcvStats *utils.CacheStats
|
||||
expectedStats := &utils.CacheStats{Destinations: 5, ReverseDestinations: 7, RatingPlans: 4, RatingProfiles: 9,
|
||||
Actions: 8, ActionPlans: 4, AccountActionPlans: 5, SharedGroups: 1, DerivedChargers: 1, LcrProfiles: 5,
|
||||
CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 0}
|
||||
CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 3}
|
||||
var args utils.AttrCacheStats
|
||||
if err := tutSMGRpc.Call("ApierV2.GetCacheStats", args, &rcvStats); err != nil {
|
||||
t.Error("Got error on ApierV2.GetCacheStats: ", err.Error())
|
||||
|
||||
@@ -104,7 +104,7 @@ func TestTutITCacheStats(t *testing.T) {
|
||||
var rcvStats *utils.CacheStats
|
||||
expectedStats := &utils.CacheStats{Destinations: 5, ReverseDestinations: 7, RatingPlans: 4, RatingProfiles: 9,
|
||||
Actions: 8, ActionPlans: 4, AccountActionPlans: 5, SharedGroups: 1, DerivedChargers: 1, LcrProfiles: 5,
|
||||
CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 0}
|
||||
CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 3}
|
||||
var args utils.AttrCacheStats
|
||||
if err := tutLocalRpc.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil {
|
||||
t.Error("Got error on ApierV1.GetCacheStats: ", err.Error())
|
||||
|
||||
@@ -243,7 +243,6 @@ const (
|
||||
ResourcesPrefix = "res_"
|
||||
ResourceProfilesIndex = "rsi_"
|
||||
ResourceProfilesPrefix = "rsp_"
|
||||
StatsPrefix = "sts_"
|
||||
StatsIndex = "sti_"
|
||||
ThresholdsPrefix = "ths_"
|
||||
ThresholdsIndex = "thi_"
|
||||
|
||||
Reference in New Issue
Block a user