From 959ffdcc75944524344a8d779f6f70a91b0966f6 Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 13 Sep 2017 18:54:05 +0200 Subject: [PATCH] TPReader creating StatQueue based on StatQueueProfile, adding necessary methods in dataDB, tests --- apier/v1/apier_it_test.go | 2 +- apier/v1/smgenericv1_it_test.go | 2 +- engine/loader_csv_test.go | 8 ++--- engine/loader_it_test.go | 2 +- engine/storage_map.go | 4 ++- engine/storage_mongo_datadb.go | 45 ++++++++++++++------------ engine/storage_redis.go | 4 ++- engine/tp_reader.go | 42 +++++++++++++++++------- general_tests/tut_smgeneric_it_test.go | 2 +- general_tests/tutorial_it_test.go | 2 +- utils/consts.go | 1 - 11 files changed, 71 insertions(+), 43 deletions(-) diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index c6a1312e8..a492833af 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -1273,7 +1273,7 @@ func TestApierResetDataAfterLoadFromFolder(t *testing.T) { rcvStats.Aliases != 1 || rcvStats.ReverseAliases != 2 || rcvStats.ResourceProfiles != 3 || - rcvStats.Resources != 0 { + rcvStats.Resources != 3 { t.Errorf("Expecting: %+v, received: %+v", expStats, rcvStats) } } diff --git a/apier/v1/smgenericv1_it_test.go b/apier/v1/smgenericv1_it_test.go index ba5a9d5af..3af5ae6e1 100644 --- a/apier/v1/smgenericv1_it_test.go +++ b/apier/v1/smgenericv1_it_test.go @@ -102,7 +102,7 @@ func TestSMGV1CacheStats(t *testing.T) { var rcvStats *utils.CacheStats expectedStats := &utils.CacheStats{Destinations: 5, ReverseDestinations: 7, RatingPlans: 4, RatingProfiles: 9, Actions: 8, ActionPlans: 4, AccountActionPlans: 5, SharedGroups: 1, DerivedChargers: 1, - LcrProfiles: 5, CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 0} + LcrProfiles: 5, CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 3} var args utils.AttrCacheStats if err := smgV1Rpc.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil { t.Error("Got error on ApierV1.GetCacheStats: ", err.Error()) diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index 6a79b3b9e..a6a25116f 100755 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -1453,10 +1453,10 @@ func TestLoadStats(t *testing.T) { }, } - if len(csvr.stats) != len(eStats) { - t.Error("Failed to load stats: ", len(csvr.stats)) - } else if !reflect.DeepEqual(eStats["Stats1"], csvr.stats["Stats1"]) { - t.Errorf("Expecting: %+v, received: %+v", eStats["Stats1"], csvr.stats["Stats1"]) + if len(csvr.sqProfiles) != len(eStats) { + t.Error("Failed to load stats: ", len(csvr.sqProfiles)) + } else if !reflect.DeepEqual(eStats["Stats1"], csvr.sqProfiles["Stats1"]) { + t.Errorf("Expecting: %+v, received: %+v", eStats["Stats1"], csvr.sqProfiles["Stats1"]) } } diff --git a/engine/loader_it_test.go b/engine/loader_it_test.go index 168541ef5..476480f2c 100755 --- a/engine/loader_it_test.go +++ b/engine/loader_it_test.go @@ -323,7 +323,7 @@ func TestLoaderITWriteToDatabase(t *testing.T) { } } - for k, st := range loader.stats { + for k, st := range loader.sqProfiles { rcv, err := loader.dataStorage.GetStatQueueProfile(k) if err != nil { t.Error("Failed GetStatsQueue: ", err.Error()) diff --git a/engine/storage_map.go b/engine/storage_map.go index 22a790f87..bc7d7780b 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -306,7 +306,9 @@ func (ms *MapStorage) HasData(categ, subject string) (bool, error) { ms.mu.RLock() defer ms.mu.RUnlock() switch categ { - case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, utils.ResourcesPrefix: + case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, + utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, + utils.ResourcesPrefix, utils.StatQueuePrefix: _, exists := ms.dict[categ+subject] return exists, nil } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index c375fc31f..66f6fb984 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -21,7 +21,6 @@ package engine import ( "bytes" "compress/zlib" - "errors" "fmt" "io/ioutil" "strings" @@ -327,7 +326,7 @@ func (ms *MongoStorage) getColNameForPrefix(prefix string) (name string, ok bool utils.LOADINST_KEY: colLht, utils.VERSION_PREFIX: colVer, utils.ResourceProfilesPrefix: colRsP, - utils.StatsPrefix: colStq, + utils.StatQueuePrefix: colStq, utils.TimingsPrefix: colTmg, utils.ResourcesPrefix: colRes, } @@ -632,10 +631,10 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er for iter.Next(&idResult) { result = append(result, utils.ResourcesPrefix+idResult.Id) } - case utils.StatsPrefix: + case utils.StatQueuePrefix: iter := db.C(colStq).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter() for iter.Next(&idResult) { - result = append(result, utils.StatsPrefix+idResult.Id) + result = append(result, utils.StatQueuePrefix+idResult.Id) } case utils.StatQueueProfilePrefix: iter := db.C(colSqp).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter() @@ -658,34 +657,40 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er return } -func (ms *MongoStorage) HasData(category, subject string) (bool, error) { +func (ms *MongoStorage) HasData(category, subject string) (has bool, err error) { session := ms.session.Copy() defer session.Close() db := session.DB(ms.db) + var count int switch category { case utils.DESTINATION_PREFIX: - count, err := db.C(colDst).Find(bson.M{"key": subject}).Count() - return count > 0, err + count, err = db.C(colDst).Find(bson.M{"key": subject}).Count() + has = count > 0 case utils.RATING_PLAN_PREFIX: - count, err := db.C(colRpl).Find(bson.M{"key": subject}).Count() - return count > 0, err + count, err = db.C(colRpl).Find(bson.M{"key": subject}).Count() + has = count > 0 case utils.RATING_PROFILE_PREFIX: - count, err := db.C(colRpf).Find(bson.M{"id": subject}).Count() - return count > 0, err + count, err = db.C(colRpf).Find(bson.M{"id": subject}).Count() + has = count > 0 case utils.ACTION_PREFIX: - count, err := db.C(colAct).Find(bson.M{"key": subject}).Count() - return count > 0, err + count, err = db.C(colAct).Find(bson.M{"key": subject}).Count() + has = count > 0 case utils.ACTION_PLAN_PREFIX: - count, err := db.C(colApl).Find(bson.M{"key": subject}).Count() - return count > 0, err + count, err = db.C(colApl).Find(bson.M{"key": subject}).Count() + has = count > 0 case utils.ACCOUNT_PREFIX: - count, err := db.C(colAcc).Find(bson.M{"id": subject}).Count() - return count > 0, err + count, err = db.C(colAcc).Find(bson.M{"id": subject}).Count() + has = count > 0 case utils.ResourcesPrefix: - count, err := db.C(colRes).Find(bson.M{"id": subject}).Count() - return count > 0, err + count, err = db.C(colRes).Find(bson.M{"id": subject}).Count() + has = count > 0 + case utils.StatQueuePrefix: + count, err = db.C(colRes).Find(bson.M{"id": subject}).Count() + has = count > 0 + default: + err = fmt.Errorf("unsupported category in HasData: %s", category) } - return false, errors.New("unsupported category in HasData") + return } func (ms *MongoStorage) GetRatingPlan(key string, skipCache bool, transactionID string) (rp *RatingPlan, err error) { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 2651604f9..db1ae044d 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -319,7 +319,9 @@ func (rs *RedisStorage) GetKeysForPrefix(prefix string) ([]string, error) { // Used to check if specific subject is stored using prefix key attached to entity func (rs *RedisStorage) HasData(category, subject string) (bool, error) { switch category { - case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, utils.ResourcesPrefix: + case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, + utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, + utils.ResourcesPrefix, utils.StatQueuePrefix: i, err := rs.Cmd("EXISTS", category+subject).Int() return i == 1, err } diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 565d6cebf..d1959c0de 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -54,9 +54,10 @@ type TpReader struct { users map[string]*UserProfile aliases map[string]*Alias resProfiles map[string]*utils.TPResource - stats map[string]*utils.TPStats + sqProfiles map[string]*utils.TPStats thresholds map[string]*utils.TPThreshold - resources []string // IDs of resources which need creation based on resourceConfigs + resources []string // IDs of resources which need creation based on resourceProfiles + statQueues []string // IDs of statQueues which need creation based on statQueueProfiles revDests, revAliases, @@ -129,7 +130,7 @@ func (tpr *TpReader) Init() { tpr.aliases = make(map[string]*Alias) tpr.derivedChargers = make(map[string]*utils.DerivedChargers) tpr.resProfiles = make(map[string]*utils.TPResource) - tpr.stats = make(map[string]*utils.TPStats) + tpr.sqProfiles = make(map[string]*utils.TPStats) tpr.thresholds = make(map[string]*utils.TPThreshold) tpr.revDests = make(map[string][]string) tpr.revAliases = make(map[string][]string) @@ -1626,7 +1627,14 @@ func (tpr *TpReader) LoadStatsFiltered(tag string) error { for _, st := range tps { mapSTs[st.ID] = st } - tpr.stats = mapSTs + tpr.sqProfiles = mapSTs + for sqID := range mapSTs { + if has, err := tpr.dataStorage.HasData(utils.StatQueuePrefix, sqID); err != nil { + return err + } else if !has { + tpr.statQueues = append(tpr.statQueues, sqID) + } + } return nil } @@ -1970,7 +1978,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if verbose { log.Print("StatQueueProfiles:") } - for _, tpST := range tpr.stats { + for _, tpST := range tpr.sqProfiles { st, err := APItoStats(tpST, tpr.timezone) if err != nil { return err @@ -1982,6 +1990,18 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("\t", st.ID) } } + if verbose { + log.Print("StatQueues:") + } + for _, sqID := range tpr.statQueues { + if err = tpr.dataStorage.SetStoredStatQueue(&StoredStatQueue{Tenant: "", ID: sqID, + SQMetrics: make(map[string][]byte)}); err != nil { + return + } + if verbose { + log.Print("\t", sqID) + } + } if verbose { log.Print("Thresholds:") } @@ -2055,7 +2075,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err return err } } - if len(tpr.stats) > 0 { + if len(tpr.sqProfiles) > 0 { if verbose { log.Print("Indexing stats") } @@ -2063,7 +2083,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if err != nil { return err } - for _, tpST := range tpr.stats { + for _, tpST := range tpr.sqProfiles { if st, err := APItoStats(tpST, tpr.timezone); err != nil { return err } else { @@ -2163,7 +2183,7 @@ func (tpr *TpReader) ShowStatistics() { // resource limits log.Print("ResourceProfiles: ", len(tpr.resProfiles)) // stats - log.Print("Stats: ", len(tpr.stats)) + log.Print("Stats: ", len(tpr.sqProfiles)) } // Returns the identities loaded for a specific category, useful for cache reloads @@ -2297,10 +2317,10 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) { i++ } return keys, nil - case utils.StatsPrefix: - keys := make([]string, len(tpr.stats)) + case utils.StatQueueProfilePrefix: + keys := make([]string, len(tpr.sqProfiles)) i := 0 - for k := range tpr.stats { + for k := range tpr.sqProfiles { keys[i] = k i++ } diff --git a/general_tests/tut_smgeneric_it_test.go b/general_tests/tut_smgeneric_it_test.go index 133c92035..d0af8c9e7 100644 --- a/general_tests/tut_smgeneric_it_test.go +++ b/general_tests/tut_smgeneric_it_test.go @@ -100,7 +100,7 @@ func TestTutSMGCacheStats(t *testing.T) { var rcvStats *utils.CacheStats expectedStats := &utils.CacheStats{Destinations: 5, ReverseDestinations: 7, RatingPlans: 4, RatingProfiles: 9, Actions: 8, ActionPlans: 4, AccountActionPlans: 5, SharedGroups: 1, DerivedChargers: 1, LcrProfiles: 5, - CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 0} + CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 3} var args utils.AttrCacheStats if err := tutSMGRpc.Call("ApierV2.GetCacheStats", args, &rcvStats); err != nil { t.Error("Got error on ApierV2.GetCacheStats: ", err.Error()) diff --git a/general_tests/tutorial_it_test.go b/general_tests/tutorial_it_test.go index 1c21a5503..ebc98c4e4 100644 --- a/general_tests/tutorial_it_test.go +++ b/general_tests/tutorial_it_test.go @@ -104,7 +104,7 @@ func TestTutITCacheStats(t *testing.T) { var rcvStats *utils.CacheStats expectedStats := &utils.CacheStats{Destinations: 5, ReverseDestinations: 7, RatingPlans: 4, RatingProfiles: 9, Actions: 8, ActionPlans: 4, AccountActionPlans: 5, SharedGroups: 1, DerivedChargers: 1, LcrProfiles: 5, - CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 0} + CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourceProfiles: 3, Resources: 3} var args utils.AttrCacheStats if err := tutLocalRpc.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil { t.Error("Got error on ApierV1.GetCacheStats: ", err.Error()) diff --git a/utils/consts.go b/utils/consts.go index 69ca64ba9..70bd03e16 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -243,7 +243,6 @@ const ( ResourcesPrefix = "res_" ResourceProfilesIndex = "rsi_" ResourceProfilesPrefix = "rsp_" - StatsPrefix = "sts_" StatsIndex = "sti_" ThresholdsPrefix = "ths_" ThresholdsIndex = "thi_"