From d33f2af42cb3f7a74d96d192e9435ccc285affed Mon Sep 17 00:00:00 2001 From: Edwardro22 Date: Mon, 17 Jul 2017 22:25:21 +0300 Subject: [PATCH 1/2] Added methods and tests --- engine/onstor_it_test.go | 58 ++++++++++++++++++++++++++++++++++ engine/storage_mongo_datadb.go | 58 ++++++++++++++++++++++++++++++++-- engine/storage_redis.go | 57 ++++++++++++++++++++++++++++++--- 3 files changed, 166 insertions(+), 7 deletions(-) diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index abf4e1e4c..a78466db8 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -87,6 +87,7 @@ var sTestsOnStorIT = []func(t *testing.T){ testOnStorITCRUDTiming, testOnStorITCRUDHistory, testOnStorITCRUDStructVersion, + testOnStorITCRUDSQStoredMetrics, } func TestOnStorITRedisConnect(t *testing.T) { @@ -1812,3 +1813,60 @@ func testOnStorITCRUDStructVersion(t *testing.T) { t.Errorf("Expecting: %v, received: %v", cv, rcv) } } + +func testOnStorITCRUDSQStoredMetrics(t *testing.T) { + sqm := &SQStoredMetrics{ + SqID: "test", + SEvents: map[string]StatsEvent{}, + SQItems: []*SQItem{}, + SQMetrics: map[string][]byte{}, + } + if _, rcvErr := onStor.GetSQStoredMetrics(sqm.SqID); rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } + if err := onStor.SetSQStoredMetrics(sqm); err != nil { + t.Error(err) + } + if rcv, err := onStor.GetSQStoredMetrics(sqm.SqID); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(sqm, rcv) { + t.Errorf("Expecting: %v, received: %v", sqm, rcv) + } + if err := onStor.RemSQStoredMetrics(sqm.SqID); err != nil { + t.Error(err) + } + if _, rcvErr := onStor.GetSQStoredMetrics(sqm.SqID); rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } +} + +func testOnStorITCRUDStatsQueue(t *testing.T) { + timeTTL := time.Duration(0 * time.Second) + sq := &StatsQueue{ + ID: "test", + ActivationInterval: &utils.ActivationInterval{ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}, + Filters: []*RequestFilter{}, + QueueLength: 2, + TTL: &timeTTL, + Metrics: []string{}, + Store: true, + Thresholds: []string{}, + } + if _, rcvErr := onStor.GetStatsQueue(sq.ID, false, utils.NonTransactional); rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } + if err := onStor.SetStatsQueue(sq); err != nil { + t.Error(err) + } + if rcv, err := onStor.GetStatsQueue(sq.ID, false, utils.NonTransactional); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(sq, rcv) { + t.Errorf("Expecting: %v, received: %v", sq, rcv) + } + if err := onStor.RemStatsQueue(sq.ID, utils.NonTransactional); err != nil { + t.Error(err) + } + if _, rcvErr := onStor.GetStatsQueue(sq.ID, false, utils.NonTransactional); rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } +} diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 882637ef5..f7b58549b 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -2011,30 +2011,82 @@ func (ms *MongoStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs // GetStatsQueue retrieves a StatsQueue from dataDB/cache func (ms *MongoStorage) GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error) { + cacheKey := utils.StatsQueuePrefix + sqID + if !skipCache { + if x, ok := cache.Get(cacheKey); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*StatsQueue), nil + } + } + session, col := ms.conn(utils.StatsQueuePrefix) + defer session.Close() + cCommit := cacheCommit(transactionID) + if err = col.Find(bson.M{"id": sqID}).One(&sq); err != nil { + if err == mgo.ErrNotFound { + cache.Set(cacheKey, nil, cCommit, transactionID) + err = utils.ErrNotFound + } + return nil, err + } + cache.Set(cacheKey, sq, cCommit, transactionID) return } // SetStatsQueue stores a StatsQueue into DataDB func (ms *MongoStorage) SetStatsQueue(sq *StatsQueue) (err error) { + session, col := ms.conn(utils.StatsQueuePrefix) + defer session.Close() + _, err = col.UpsertId(bson.M{"id": sq.ID}, sq) return } // RemStatsQueue removes a StatsQueue from dataDB/cache func (ms *MongoStorage) RemStatsQueue(sqID string, transactionID string) (err error) { + session, col := ms.conn(utils.SQStoredMetricsPrefix) + key := utils.StatsQueuePrefix + sqID + _, err = ms.GetStatsQueue(sqID, false, transactionID) + if err != nil { + if err == mgo.ErrNotFound { + err = nil + } + return + } + err = col.Remove(bson.M{"id": sqID}) + if err != nil { + return err + } + cache.RemKey(key, cacheCommit(transactionID), transactionID) + session.Close() return } // GetSQStoredMetrics retrieves the stored metrics for a StatsQueue -func (ms *MongoStorage) GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error) { +func (ms *MongoStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics, err error) { + session, col := ms.conn(utils.SQStoredMetricsPrefix) + defer session.Close() + if err = col.Find(bson.M{"sqid": sqmID}).One(&sqSM); err != nil { + if err == mgo.ErrNotFound { + err = utils.ErrNotFound + } + return nil, err + } return } // SetStoredSQ stores the metrics for a StatsQueue func (ms *MongoStorage) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) { + session, col := ms.conn(utils.SQStoredMetricsPrefix) + defer session.Close() + _, err = col.UpsertId(bson.M{"sqid": sqSM.SqID}, sqSM) return } // RemSQStoredMetrics removes stored metrics for a StatsQueue -func (ms *MongoStorage) RemSQStoredMetrics(sqID string) (err error) { - return +func (ms *MongoStorage) RemSQStoredMetrics(sqmID string) (err error) { + session, col := ms.conn(utils.SQStoredMetricsPrefix) + defer session.Close() + err = col.Remove(bson.M{"sqid": sqmID}) + return err } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 6893a0140..f89ae3f7f 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1569,30 +1569,79 @@ func (rs *RedisStorage) RemoveVersions(vrs Versions) (err error) { // GetStatsQueue retrieves a StatsQueue from dataDB/cache func (rs *RedisStorage) GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error) { + key := utils.StatsQueuePrefix + sqID + if !skipCache { + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*StatsQueue), nil + } + } + var values []byte + if values, err = rs.Cmd("GET", key).Bytes(); err != nil { + if err == redis.ErrRespNil { + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + err = utils.ErrNotFound + } + return + } + if err = rs.ms.Unmarshal(values, &sq); err != nil { + return + } + cache.Set(key, &sq, cacheCommit(transactionID), transactionID) return } // SetStatsQueue stores a StatsQueue into DataDB func (rs *RedisStorage) SetStatsQueue(sq *StatsQueue) (err error) { - return + var result []byte + result, err = rs.ms.Marshal(sq) + if err != nil { + return + } + return rs.Cmd("SET", utils.StatsQueuePrefix+sq.ID, result).Err } // RemStatsQueue removes a StatsQueue from dataDB/cache func (rs *RedisStorage) RemStatsQueue(sqID string, transactionID string) (err error) { + key := utils.StatsQueuePrefix + sqID + err = rs.Cmd("DEL", key).Err + cache.RemKey(key, cacheCommit(transactionID), transactionID) return } // GetSQStoredMetrics retrieves the stored metrics for a StatsQueue -func (rs *RedisStorage) GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error) { +func (rs *RedisStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics, err error) { + key := utils.SQStoredMetricsPrefix + sqmID + var values []byte + if values, err = rs.Cmd("GET", key).Bytes(); err != nil { + if err == redis.ErrRespNil { + err = utils.ErrNotFound + } + return + } + if err = rs.ms.Unmarshal(values, &sqSM); err != nil { + return + } return } // SetStoredSQ stores the metrics for a StatsQueue func (rs *RedisStorage) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) { - return + var result []byte + result, err = rs.ms.Marshal(sqSM) + if err != nil { + return + } + return rs.Cmd("SET", utils.SQStoredMetricsPrefix+sqSM.SqID, result).Err } // RemSQStoredMetrics removes stored metrics for a StatsQueue -func (rs *RedisStorage) RemSQStoredMetrics(sqID string) (err error) { +func (rs *RedisStorage) RemSQStoredMetrics(sqmID string) (err error) { + key := utils.SQStoredMetricsPrefix + sqmID + if err = rs.Cmd("DEL", key).Err; err != nil { + return + } return } From d9fb74778a55befbbbdc1a33876881a6d6c58bec Mon Sep 17 00:00:00 2001 From: Edwardro22 Date: Mon, 17 Jul 2017 22:55:48 +0300 Subject: [PATCH 2/2] small fix --- engine/onstor_it_test.go | 13 +++++++++++-- engine/storage_mongo_datadb.go | 13 ++++--------- engine/storage_redis.go | 2 +- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index a78466db8..82f9c05e3 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -88,6 +88,7 @@ var sTestsOnStorIT = []func(t *testing.T){ testOnStorITCRUDHistory, testOnStorITCRUDStructVersion, testOnStorITCRUDSQStoredMetrics, + testOnStorITCRUDStatsQueue, } func TestOnStorITRedisConnect(t *testing.T) { @@ -1844,7 +1845,7 @@ func testOnStorITCRUDStatsQueue(t *testing.T) { timeTTL := time.Duration(0 * time.Second) sq := &StatsQueue{ ID: "test", - ActivationInterval: &utils.ActivationInterval{ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}, + ActivationInterval: &utils.ActivationInterval{}, Filters: []*RequestFilter{}, QueueLength: 2, TTL: &timeTTL, @@ -1852,12 +1853,17 @@ func testOnStorITCRUDStatsQueue(t *testing.T) { Store: true, Thresholds: []string{}, } - if _, rcvErr := onStor.GetStatsQueue(sq.ID, false, utils.NonTransactional); rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetStatsQueue(sq.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } if err := onStor.SetStatsQueue(sq); err != nil { t.Error(err) } + if rcv, err := onStor.GetStatsQueue(sq.ID, true, utils.NonTransactional); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(sq, rcv) { + t.Errorf("Expecting: %v, received: %v", sq, rcv) + } if rcv, err := onStor.GetStatsQueue(sq.ID, false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(sq, rcv) { @@ -1866,6 +1872,9 @@ func testOnStorITCRUDStatsQueue(t *testing.T) { if err := onStor.RemStatsQueue(sq.ID, utils.NonTransactional); err != nil { t.Error(err) } + if _, rcvErr := onStor.GetStatsQueue(sq.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } if _, rcvErr := onStor.GetStatsQueue(sq.ID, false, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index f7b58549b..69c4fd2e5 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -2011,6 +2011,7 @@ func (ms *MongoStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs // GetStatsQueue retrieves a StatsQueue from dataDB/cache func (ms *MongoStorage) GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error) { + var rez *StatsQueue cacheKey := utils.StatsQueuePrefix + sqID if !skipCache { if x, ok := cache.Get(cacheKey); ok { @@ -2023,13 +2024,14 @@ func (ms *MongoStorage) GetStatsQueue(sqID string, skipCache bool, transactionID session, col := ms.conn(utils.StatsQueuePrefix) defer session.Close() cCommit := cacheCommit(transactionID) - if err = col.Find(bson.M{"id": sqID}).One(&sq); err != nil { + if err = col.Find(bson.M{"id": sqID}).One(&rez); err != nil { if err == mgo.ErrNotFound { cache.Set(cacheKey, nil, cCommit, transactionID) err = utils.ErrNotFound } return nil, err } + sq = rez cache.Set(cacheKey, sq, cCommit, transactionID) return } @@ -2044,15 +2046,8 @@ func (ms *MongoStorage) SetStatsQueue(sq *StatsQueue) (err error) { // RemStatsQueue removes a StatsQueue from dataDB/cache func (ms *MongoStorage) RemStatsQueue(sqID string, transactionID string) (err error) { - session, col := ms.conn(utils.SQStoredMetricsPrefix) + session, col := ms.conn(utils.StatsQueuePrefix) key := utils.StatsQueuePrefix + sqID - _, err = ms.GetStatsQueue(sqID, false, transactionID) - if err != nil { - if err == mgo.ErrNotFound { - err = nil - } - return - } err = col.Remove(bson.M{"id": sqID}) if err != nil { return err diff --git a/engine/storage_redis.go b/engine/storage_redis.go index f89ae3f7f..660aa5faf 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1589,7 +1589,7 @@ func (rs *RedisStorage) GetStatsQueue(sqID string, skipCache bool, transactionID if err = rs.ms.Unmarshal(values, &sq); err != nil { return } - cache.Set(key, &sq, cacheCommit(transactionID), transactionID) + cache.Set(key, sq, cacheCommit(transactionID), transactionID) return }