diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 245266f6d..af9bc7aa6 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -87,6 +87,8 @@ var sTestsOnStorIT = []func(t *testing.T){ testOnStorITCRUDTiming, testOnStorITCRUDHistory, testOnStorITCRUDStructVersion, + testOnStorITCRUDSQStoredMetrics, + testOnStorITCRUDStatsQueue, } func TestOnStorITRedisConnect(t *testing.T) { @@ -1810,3 +1812,68 @@ func testOnStorITCRUDStructVersion(t *testing.T) { t.Errorf("Expecting: %v, received: %v", cv, rcv) } } + +func testOnStorITCRUDSQStoredMetrics(t *testing.T) { + sqm := &SQStoredMetrics{ + SqID: "test", + SEvents: map[string]StatsEvent{}, + SQItems: []*SQItem{}, + SQMetrics: map[string][]byte{}, + } + if _, rcvErr := onStor.GetSQStoredMetrics(sqm.SqID); rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } + if err := onStor.SetSQStoredMetrics(sqm); err != nil { + t.Error(err) + } + if rcv, err := onStor.GetSQStoredMetrics(sqm.SqID); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(sqm, rcv) { + t.Errorf("Expecting: %v, received: %v", sqm, rcv) + } + if err := onStor.RemSQStoredMetrics(sqm.SqID); err != nil { + t.Error(err) + } + if _, rcvErr := onStor.GetSQStoredMetrics(sqm.SqID); rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } +} + +func testOnStorITCRUDStatsQueue(t *testing.T) { + timeTTL := time.Duration(0 * time.Second) + sq := &StatsQueue{ + ID: "test", + ActivationInterval: &utils.ActivationInterval{}, + Filters: []*RequestFilter{}, + QueueLength: 2, + TTL: &timeTTL, + Metrics: []string{}, + Store: true, + Thresholds: []string{}, + } + if _, rcvErr := onStor.GetStatsQueue(sq.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } + if err := onStor.SetStatsQueue(sq); err != nil { + t.Error(err) + } + if rcv, err := onStor.GetStatsQueue(sq.ID, true, utils.NonTransactional); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(sq, rcv) { + t.Errorf("Expecting: %v, received: %v", sq, rcv) + } + if rcv, err := onStor.GetStatsQueue(sq.ID, false, utils.NonTransactional); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(sq, rcv) { + t.Errorf("Expecting: %v, received: %v", sq, rcv) + } + if err := onStor.RemStatsQueue(sq.ID, utils.NonTransactional); err != nil { + t.Error(err) + } + if _, rcvErr := onStor.GetStatsQueue(sq.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } + if _, rcvErr := onStor.GetStatsQueue(sq.ID, false, utils.NonTransactional); rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } +} diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 10a458fd1..db8715028 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -2008,30 +2008,77 @@ func (ms *MongoStorage) MatchReqFilterIndex(dbKey, fldName, fldVal string) (item // GetStatsQueue retrieves a StatsQueue from dataDB/cache func (ms *MongoStorage) GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error) { + var rez *StatsQueue + cacheKey := utils.StatsQueuePrefix + sqID + if !skipCache { + if x, ok := cache.Get(cacheKey); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*StatsQueue), nil + } + } + session, col := ms.conn(utils.StatsQueuePrefix) + defer session.Close() + cCommit := cacheCommit(transactionID) + if err = col.Find(bson.M{"id": sqID}).One(&rez); err != nil { + if err == mgo.ErrNotFound { + cache.Set(cacheKey, nil, cCommit, transactionID) + err = utils.ErrNotFound + } + return nil, err + } + sq = rez + cache.Set(cacheKey, sq, cCommit, transactionID) return } // SetStatsQueue stores a StatsQueue into DataDB func (ms *MongoStorage) SetStatsQueue(sq *StatsQueue) (err error) { + session, col := ms.conn(utils.StatsQueuePrefix) + defer session.Close() + _, err = col.UpsertId(bson.M{"id": sq.ID}, sq) return } // RemStatsQueue removes a StatsQueue from dataDB/cache func (ms *MongoStorage) RemStatsQueue(sqID string, transactionID string) (err error) { + session, col := ms.conn(utils.StatsQueuePrefix) + key := utils.StatsQueuePrefix + sqID + err = col.Remove(bson.M{"id": sqID}) + if err != nil { + return err + } + cache.RemKey(key, cacheCommit(transactionID), transactionID) + session.Close() return } // GetSQStoredMetrics retrieves the stored metrics for a StatsQueue -func (ms *MongoStorage) GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error) { +func (ms *MongoStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics, err error) { + session, col := ms.conn(utils.SQStoredMetricsPrefix) + defer session.Close() + if err = col.Find(bson.M{"sqid": sqmID}).One(&sqSM); err != nil { + if err == mgo.ErrNotFound { + err = utils.ErrNotFound + } + return nil, err + } return } // SetStoredSQ stores the metrics for a StatsQueue func (ms *MongoStorage) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) { + session, col := ms.conn(utils.SQStoredMetricsPrefix) + defer session.Close() + _, err = col.UpsertId(bson.M{"sqid": sqSM.SqID}, sqSM) return } // RemSQStoredMetrics removes stored metrics for a StatsQueue -func (ms *MongoStorage) RemSQStoredMetrics(sqID string) (err error) { - return +func (ms *MongoStorage) RemSQStoredMetrics(sqmID string) (err error) { + session, col := ms.conn(utils.SQStoredMetricsPrefix) + defer session.Close() + err = col.Remove(bson.M{"sqid": sqmID}) + return err } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index d07f1c8b1..e4cb767d4 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1570,30 +1570,79 @@ func (rs *RedisStorage) RemoveVersions(vrs Versions) (err error) { // GetStatsQueue retrieves a StatsQueue from dataDB/cache func (rs *RedisStorage) GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error) { + key := utils.StatsQueuePrefix + sqID + if !skipCache { + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*StatsQueue), nil + } + } + var values []byte + if values, err = rs.Cmd("GET", key).Bytes(); err != nil { + if err == redis.ErrRespNil { + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + err = utils.ErrNotFound + } + return + } + if err = rs.ms.Unmarshal(values, &sq); err != nil { + return + } + cache.Set(key, sq, cacheCommit(transactionID), transactionID) return } // SetStatsQueue stores a StatsQueue into DataDB func (rs *RedisStorage) SetStatsQueue(sq *StatsQueue) (err error) { - return + var result []byte + result, err = rs.ms.Marshal(sq) + if err != nil { + return + } + return rs.Cmd("SET", utils.StatsQueuePrefix+sq.ID, result).Err } // RemStatsQueue removes a StatsQueue from dataDB/cache func (rs *RedisStorage) RemStatsQueue(sqID string, transactionID string) (err error) { + key := utils.StatsQueuePrefix + sqID + err = rs.Cmd("DEL", key).Err + cache.RemKey(key, cacheCommit(transactionID), transactionID) return } // GetSQStoredMetrics retrieves the stored metrics for a StatsQueue -func (rs *RedisStorage) GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error) { +func (rs *RedisStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics, err error) { + key := utils.SQStoredMetricsPrefix + sqmID + var values []byte + if values, err = rs.Cmd("GET", key).Bytes(); err != nil { + if err == redis.ErrRespNil { + err = utils.ErrNotFound + } + return + } + if err = rs.ms.Unmarshal(values, &sqSM); err != nil { + return + } return } // SetStoredSQ stores the metrics for a StatsQueue func (rs *RedisStorage) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) { - return + var result []byte + result, err = rs.ms.Marshal(sqSM) + if err != nil { + return + } + return rs.Cmd("SET", utils.SQStoredMetricsPrefix+sqSM.SqID, result).Err } // RemSQStoredMetrics removes stored metrics for a StatsQueue -func (rs *RedisStorage) RemSQStoredMetrics(sqID string) (err error) { +func (rs *RedisStorage) RemSQStoredMetrics(sqmID string) (err error) { + key := utils.SQStoredMetricsPrefix + sqmID + if err = rs.Cmd("DEL", key).Err; err != nil { + return + } return }