diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 3d6c3c6ea..4a69c302b 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -93,6 +93,7 @@ var sTestsOnStorIT = []func(t *testing.T){ testOnStorITCRUDStatQueueProfile, testOnStorITCRUDStoredStatQueue, testOnStorITCRUDThresholdProfile, + testOnStorITCRUDThreshold, } func TestOnStorITRedisConnect(t *testing.T) { @@ -2088,3 +2089,34 @@ func testOnStorITCRUDThresholdProfile(t *testing.T) { t.Error(rcvErr) } } + +func testOnStorITCRUDThreshold(t *testing.T) { + res := &Threshold{ + Tenant: "cgrates.org", + ID: "TH1", + LastExecuted: time.Date(2016, 10, 1, 0, 0, 0, 0, time.UTC).Local(), + WakeupTime: time.Date(2016, 10, 1, 0, 0, 0, 0, time.UTC).Local(), + } + if _, rcvErr := onStor.GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } + if err := onStor.SetThreshold(res); err != nil { + t.Error(err) + } + if rcv, err := onStor.GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); err != nil { + t.Error(err) + } else if !(reflect.DeepEqual(res, rcv)) { + t.Errorf("Expecting: %v, received: %v", res, rcv) + } + if rcv, err := onStor.GetThreshold("cgrates.org", "TH1", false, utils.NonTransactional); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(res, rcv) { + t.Errorf("Expecting: %v, received: %v", res, rcv) + } + if err := onStor.RemoveThreshold(res.Tenant, res.ID, utils.NonTransactional); err != nil { + t.Error(err) + } + if _, rcvErr := onStor.GetThreshold(res.Tenant, res.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } +} diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 4c4fc8028..4dc6ed658 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -123,6 +123,9 @@ type DataDB interface { GetThresholdProfile(tenant string, ID string, skipCache bool, transID string) (tp *ThresholdProfile, err error) SetThresholdProfile(tp *ThresholdProfile) (err error) RemThresholdProfile(tenant, id, transactionID string) (err error) + GetThreshold(string, string, bool, string) (*Threshold, error) + SetThreshold(*Threshold) error + RemoveThreshold(string, string, string) error // CacheDataFromDB loads data to cache, prefix represents the cache prefix, IDs should be nil if all available data should be loaded CacheDataFromDB(prefix string, IDs []string, mustBeCached bool) error // ToDo: Move this to dataManager } diff --git a/engine/storage_map.go b/engine/storage_map.go index 92e30e05e..a97a70357 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1624,6 +1624,51 @@ func (ms *MapStorage) RemThresholdProfile(tenant, id, transactionID string) (err return } +func (ms *MapStorage) GetThreshold(tenant, id string, skipCache bool, transactionID string) (r *Threshold, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + key := utils.ThresholdsPrefix + utils.ConcatenatedKey(tenant, id) + if !skipCache { + if x, ok := cache.Get(key); ok { + if x != nil { + return x.(*Threshold), nil + } + return nil, utils.ErrNotFound + } + } + values, ok := ms.dict[key] + if !ok { + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + return nil, utils.ErrNotFound + } + err = ms.ms.Unmarshal(values, r) + if err != nil { + return nil, err + } + cache.Set(key, r, cacheCommit(transactionID), transactionID) + return +} + +func (ms *MapStorage) SetThreshold(r *Threshold) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() + result, err := ms.ms.Marshal(r) + if err != nil { + return err + } + ms.dict[utils.ThresholdsPrefix+utils.ConcatenatedKey(r.Tenant, r.ID)] = result + return +} + +func (ms *MapStorage) RemoveThreshold(tenant, id string, transactionID string) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() + key := utils.ThresholdsPrefix + utils.ConcatenatedKey(tenant, id) + delete(ms.dict, key) + cache.RemKey(key, cacheCommit(transactionID), transactionID) + return +} + func (ms *MapStorage) GetVersions(itm string) (vrs Versions, err error) { ms.mu.Lock() defer ms.mu.Unlock() diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index bcdca5bcc..2bc2a1ed4 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -61,7 +61,8 @@ const ( colRes = "resources" colSqs = "statqueues" colSqp = "statqueue_profiles" - colTlds = "thresholds" + colTlds = "threshold_profiles" + colThs = "thresholds" ) var ( @@ -330,6 +331,7 @@ func (ms *MongoStorage) getColNameForPrefix(prefix string) (name string, ok bool utils.ResourcesPrefix: colRes, utils.ResourceProfilesPrefix: colRsP, utils.ThresholdProfilePrefix: colTlds, + utils.ThresholdsPrefix: colThs, } name, ok = colMap[prefix] return @@ -537,6 +539,9 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached case utils.ThresholdProfilePrefix: tntID := utils.NewTenantID(dataID) _, err = ms.GetThresholdProfile(tntID.Tenant, tntID.ID, true, utils.NonTransactional) + case utils.ThresholdsPrefix: + tntID := utils.NewTenantID(dataID) + _, err = ms.GetThreshold(tntID.Tenant, tntID.ID, true, utils.NonTransactional) } if err != nil { return utils.NewCGRError(utils.MONGO, @@ -713,8 +718,8 @@ func (ms *MongoStorage) HasData(category, subject string) (has bool, err error) case utils.StatQueuePrefix: count, err = db.C(colRes).Find(bson.M{"id": subject}).Count() has = count > 0 - case utils.ThresholdProfilePrefix: - count, err = db.C(colTlds).Find(bson.M{"id": subject}).Count() + case utils.ThresholdsPrefix: + count, err = db.C(colThs).Find(bson.M{"id": subject}).Count() has = count > 0 default: err = fmt.Errorf("unsupported category in HasData: %s", category) @@ -2181,3 +2186,45 @@ func (ms *MongoStorage) RemThresholdProfile(tenant, id, transactionID string) (e cacheCommit(transactionID), transactionID) return } + +func (ms *MongoStorage) GetThreshold(tenant, id string, skipCache bool, transactionID string) (r *Threshold, err error) { + key := utils.ThresholdsPrefix + utils.ConcatenatedKey(tenant, id) + if !skipCache { + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*Threshold), nil + } + } + session, col := ms.conn(colThs) + defer session.Close() + r = new(Threshold) + if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(r); err != nil { + if err == mgo.ErrNotFound { + err = utils.ErrNotFound + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + } + return nil, err + } + cache.Set(key, r, cacheCommit(transactionID), transactionID) + return +} + +func (ms *MongoStorage) SetThreshold(r *Threshold) (err error) { + session, col := ms.conn(colThs) + defer session.Close() + _, err = col.Upsert(bson.M{"tenant": r.Tenant, "id": r.ID}, r) + return +} + +func (ms *MongoStorage) RemoveThreshold(tenant, id string, transactionID string) (err error) { + session, col := ms.conn(colThs) + defer session.Close() + if err = col.Remove(bson.M{"tenant": tenant, "id": id}); err != nil { + return + } + cache.RemKey(utils.ThresholdsPrefix+utils.ConcatenatedKey(tenant, id), + cacheCommit(transactionID), transactionID) + return nil +} diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 450d6f940..710fe5442 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1734,6 +1734,48 @@ func (rs *RedisStorage) RemThresholdProfile(tenant, id, transactionID string) (e return } +func (rs *RedisStorage) GetThreshold(tenant, id string, skipCache bool, transactionID string) (r *Threshold, err error) { + key := utils.ThresholdsPrefix + utils.ConcatenatedKey(tenant, id) + if !skipCache { + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*Threshold), nil + } + } + var values []byte + if values, err = rs.Cmd("GET", key).Bytes(); err != nil { + if err == redis.ErrRespNil { // did not find the destination + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + err = utils.ErrNotFound + } + return + } + if err = rs.ms.Unmarshal(values, &r); err != nil { + return + } + cache.Set(key, r, cacheCommit(transactionID), transactionID) + return +} + +func (rs *RedisStorage) SetThreshold(r *Threshold) (err error) { + result, err := rs.ms.Marshal(r) + if err != nil { + return err + } + return rs.Cmd("SET", utils.ThresholdsPrefix+utils.ConcatenatedKey(r.Tenant, r.ID), result).Err +} + +func (rs *RedisStorage) RemoveThreshold(tenant, id string, transactionID string) (err error) { + key := utils.ThresholdsPrefix + utils.ConcatenatedKey(tenant, id) + if err = rs.Cmd("DEL", key).Err; err != nil { + return + } + cache.RemKey(key, cacheCommit(transactionID), transactionID) + return +} + func (rs *RedisStorage) GetStorageType() string { return utils.REDIS }