Merge pull request #691 from Edwardro22/master

StoredSQ Added implementations for redis and mongo/dataDB and test
This commit is contained in:
Dan Christian Bogos
2017-07-18 15:28:16 +02:00
committed by GitHub
3 changed files with 170 additions and 7 deletions

View File

@@ -87,6 +87,8 @@ var sTestsOnStorIT = []func(t *testing.T){
testOnStorITCRUDTiming,
testOnStorITCRUDHistory,
testOnStorITCRUDStructVersion,
testOnStorITCRUDSQStoredMetrics,
testOnStorITCRUDStatsQueue,
}
func TestOnStorITRedisConnect(t *testing.T) {
@@ -1810,3 +1812,68 @@ func testOnStorITCRUDStructVersion(t *testing.T) {
t.Errorf("Expecting: %v, received: %v", cv, rcv)
}
}
func testOnStorITCRUDSQStoredMetrics(t *testing.T) {
sqm := &SQStoredMetrics{
SqID: "test",
SEvents: map[string]StatsEvent{},
SQItems: []*SQItem{},
SQMetrics: map[string][]byte{},
}
if _, rcvErr := onStor.GetSQStoredMetrics(sqm.SqID); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
if err := onStor.SetSQStoredMetrics(sqm); err != nil {
t.Error(err)
}
if rcv, err := onStor.GetSQStoredMetrics(sqm.SqID); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(sqm, rcv) {
t.Errorf("Expecting: %v, received: %v", sqm, rcv)
}
if err := onStor.RemSQStoredMetrics(sqm.SqID); err != nil {
t.Error(err)
}
if _, rcvErr := onStor.GetSQStoredMetrics(sqm.SqID); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
}
func testOnStorITCRUDStatsQueue(t *testing.T) {
timeTTL := time.Duration(0 * time.Second)
sq := &StatsQueue{
ID: "test",
ActivationInterval: &utils.ActivationInterval{},
Filters: []*RequestFilter{},
QueueLength: 2,
TTL: &timeTTL,
Metrics: []string{},
Store: true,
Thresholds: []string{},
}
if _, rcvErr := onStor.GetStatsQueue(sq.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
if err := onStor.SetStatsQueue(sq); err != nil {
t.Error(err)
}
if rcv, err := onStor.GetStatsQueue(sq.ID, true, utils.NonTransactional); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(sq, rcv) {
t.Errorf("Expecting: %v, received: %v", sq, rcv)
}
if rcv, err := onStor.GetStatsQueue(sq.ID, false, utils.NonTransactional); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(sq, rcv) {
t.Errorf("Expecting: %v, received: %v", sq, rcv)
}
if err := onStor.RemStatsQueue(sq.ID, utils.NonTransactional); err != nil {
t.Error(err)
}
if _, rcvErr := onStor.GetStatsQueue(sq.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
if _, rcvErr := onStor.GetStatsQueue(sq.ID, false, utils.NonTransactional); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
}

View File

@@ -2008,30 +2008,77 @@ func (ms *MongoStorage) MatchReqFilterIndex(dbKey, fldName, fldVal string) (item
// GetStatsQueue retrieves a StatsQueue from dataDB/cache
func (ms *MongoStorage) GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error) {
var rez *StatsQueue
cacheKey := utils.StatsQueuePrefix + sqID
if !skipCache {
if x, ok := cache.Get(cacheKey); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return x.(*StatsQueue), nil
}
}
session, col := ms.conn(utils.StatsQueuePrefix)
defer session.Close()
cCommit := cacheCommit(transactionID)
if err = col.Find(bson.M{"id": sqID}).One(&rez); err != nil {
if err == mgo.ErrNotFound {
cache.Set(cacheKey, nil, cCommit, transactionID)
err = utils.ErrNotFound
}
return nil, err
}
sq = rez
cache.Set(cacheKey, sq, cCommit, transactionID)
return
}
// SetStatsQueue stores a StatsQueue into DataDB
func (ms *MongoStorage) SetStatsQueue(sq *StatsQueue) (err error) {
session, col := ms.conn(utils.StatsQueuePrefix)
defer session.Close()
_, err = col.UpsertId(bson.M{"id": sq.ID}, sq)
return
}
// RemStatsQueue removes a StatsQueue from dataDB/cache
func (ms *MongoStorage) RemStatsQueue(sqID string, transactionID string) (err error) {
session, col := ms.conn(utils.StatsQueuePrefix)
key := utils.StatsQueuePrefix + sqID
err = col.Remove(bson.M{"id": sqID})
if err != nil {
return err
}
cache.RemKey(key, cacheCommit(transactionID), transactionID)
session.Close()
return
}
// GetSQStoredMetrics retrieves the stored metrics for a StatsQueue
func (ms *MongoStorage) GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error) {
func (ms *MongoStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics, err error) {
session, col := ms.conn(utils.SQStoredMetricsPrefix)
defer session.Close()
if err = col.Find(bson.M{"sqid": sqmID}).One(&sqSM); err != nil {
if err == mgo.ErrNotFound {
err = utils.ErrNotFound
}
return nil, err
}
return
}
// SetStoredSQ stores the metrics for a StatsQueue
func (ms *MongoStorage) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) {
session, col := ms.conn(utils.SQStoredMetricsPrefix)
defer session.Close()
_, err = col.UpsertId(bson.M{"sqid": sqSM.SqID}, sqSM)
return
}
// RemSQStoredMetrics removes stored metrics for a StatsQueue
func (ms *MongoStorage) RemSQStoredMetrics(sqID string) (err error) {
return
func (ms *MongoStorage) RemSQStoredMetrics(sqmID string) (err error) {
session, col := ms.conn(utils.SQStoredMetricsPrefix)
defer session.Close()
err = col.Remove(bson.M{"sqid": sqmID})
return err
}

View File

@@ -1570,30 +1570,79 @@ func (rs *RedisStorage) RemoveVersions(vrs Versions) (err error) {
// GetStatsQueue retrieves a StatsQueue from dataDB/cache
func (rs *RedisStorage) GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error) {
key := utils.StatsQueuePrefix + sqID
if !skipCache {
if x, ok := cache.Get(key); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return x.(*StatsQueue), nil
}
}
var values []byte
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
if err == redis.ErrRespNil {
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
err = utils.ErrNotFound
}
return
}
if err = rs.ms.Unmarshal(values, &sq); err != nil {
return
}
cache.Set(key, sq, cacheCommit(transactionID), transactionID)
return
}
// SetStatsQueue stores a StatsQueue into DataDB
func (rs *RedisStorage) SetStatsQueue(sq *StatsQueue) (err error) {
return
var result []byte
result, err = rs.ms.Marshal(sq)
if err != nil {
return
}
return rs.Cmd("SET", utils.StatsQueuePrefix+sq.ID, result).Err
}
// RemStatsQueue removes a StatsQueue from dataDB/cache
func (rs *RedisStorage) RemStatsQueue(sqID string, transactionID string) (err error) {
key := utils.StatsQueuePrefix + sqID
err = rs.Cmd("DEL", key).Err
cache.RemKey(key, cacheCommit(transactionID), transactionID)
return
}
// GetSQStoredMetrics retrieves the stored metrics for a StatsQueue
func (rs *RedisStorage) GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error) {
func (rs *RedisStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics, err error) {
key := utils.SQStoredMetricsPrefix + sqmID
var values []byte
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
if err == redis.ErrRespNil {
err = utils.ErrNotFound
}
return
}
if err = rs.ms.Unmarshal(values, &sqSM); err != nil {
return
}
return
}
// SetStoredSQ stores the metrics for a StatsQueue
func (rs *RedisStorage) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) {
return
var result []byte
result, err = rs.ms.Marshal(sqSM)
if err != nil {
return
}
return rs.Cmd("SET", utils.SQStoredMetricsPrefix+sqSM.SqID, result).Err
}
// RemSQStoredMetrics removes stored metrics for a StatsQueue
func (rs *RedisStorage) RemSQStoredMetrics(sqID string) (err error) {
func (rs *RedisStorage) RemSQStoredMetrics(sqmID string) (err error) {
key := utils.SQStoredMetricsPrefix + sqmID
if err = rs.Cmd("DEL", key).Err; err != nil {
return
}
return
}