mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add methods for Threshold (get/set/remove) in Mongo Redis and StorageMap (DataDB) + test for them (onstor_it_test.go)
This commit is contained in:
committed by
Dan Christian Bogos
parent
03c665d299
commit
2951286563
@@ -93,6 +93,7 @@ var sTestsOnStorIT = []func(t *testing.T){
|
||||
testOnStorITCRUDStatQueueProfile,
|
||||
testOnStorITCRUDStoredStatQueue,
|
||||
testOnStorITCRUDThresholdProfile,
|
||||
testOnStorITCRUDThreshold,
|
||||
}
|
||||
|
||||
func TestOnStorITRedisConnect(t *testing.T) {
|
||||
@@ -2088,3 +2089,34 @@ func testOnStorITCRUDThresholdProfile(t *testing.T) {
|
||||
t.Error(rcvErr)
|
||||
}
|
||||
}
|
||||
|
||||
func testOnStorITCRUDThreshold(t *testing.T) {
|
||||
res := &Threshold{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "TH1",
|
||||
LastExecuted: time.Date(2016, 10, 1, 0, 0, 0, 0, time.UTC).Local(),
|
||||
WakeupTime: time.Date(2016, 10, 1, 0, 0, 0, 0, time.UTC).Local(),
|
||||
}
|
||||
if _, rcvErr := onStor.GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound {
|
||||
t.Error(rcvErr)
|
||||
}
|
||||
if err := onStor.SetThreshold(res); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rcv, err := onStor.GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
} else if !(reflect.DeepEqual(res, rcv)) {
|
||||
t.Errorf("Expecting: %v, received: %v", res, rcv)
|
||||
}
|
||||
if rcv, err := onStor.GetThreshold("cgrates.org", "TH1", false, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(res, rcv) {
|
||||
t.Errorf("Expecting: %v, received: %v", res, rcv)
|
||||
}
|
||||
if err := onStor.RemoveThreshold(res.Tenant, res.ID, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, rcvErr := onStor.GetThreshold(res.Tenant, res.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
|
||||
t.Error(rcvErr)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,6 +123,9 @@ type DataDB interface {
|
||||
GetThresholdProfile(tenant string, ID string, skipCache bool, transID string) (tp *ThresholdProfile, err error)
|
||||
SetThresholdProfile(tp *ThresholdProfile) (err error)
|
||||
RemThresholdProfile(tenant, id, transactionID string) (err error)
|
||||
GetThreshold(string, string, bool, string) (*Threshold, error)
|
||||
SetThreshold(*Threshold) error
|
||||
RemoveThreshold(string, string, string) error
|
||||
// CacheDataFromDB loads data to cache, prefix represents the cache prefix, IDs should be nil if all available data should be loaded
|
||||
CacheDataFromDB(prefix string, IDs []string, mustBeCached bool) error // ToDo: Move this to dataManager
|
||||
}
|
||||
|
||||
@@ -1624,6 +1624,51 @@ func (ms *MapStorage) RemThresholdProfile(tenant, id, transactionID string) (err
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetThreshold(tenant, id string, skipCache bool, transactionID string) (r *Threshold, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
key := utils.ThresholdsPrefix + utils.ConcatenatedKey(tenant, id)
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x != nil {
|
||||
return x.(*Threshold), nil
|
||||
}
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
}
|
||||
values, ok := ms.dict[key]
|
||||
if !ok {
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
err = ms.ms.Unmarshal(values, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cache.Set(key, r, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) SetThreshold(r *Threshold) (err error) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
result, err := ms.ms.Marshal(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ms.dict[utils.ThresholdsPrefix+utils.ConcatenatedKey(r.Tenant, r.ID)] = result
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) RemoveThreshold(tenant, id string, transactionID string) (err error) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
key := utils.ThresholdsPrefix + utils.ConcatenatedKey(tenant, id)
|
||||
delete(ms.dict, key)
|
||||
cache.RemKey(key, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetVersions(itm string) (vrs Versions, err error) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
|
||||
@@ -61,7 +61,8 @@ const (
|
||||
colRes = "resources"
|
||||
colSqs = "statqueues"
|
||||
colSqp = "statqueue_profiles"
|
||||
colTlds = "thresholds"
|
||||
colTlds = "threshold_profiles"
|
||||
colThs = "thresholds"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -330,6 +331,7 @@ func (ms *MongoStorage) getColNameForPrefix(prefix string) (name string, ok bool
|
||||
utils.ResourcesPrefix: colRes,
|
||||
utils.ResourceProfilesPrefix: colRsP,
|
||||
utils.ThresholdProfilePrefix: colTlds,
|
||||
utils.ThresholdsPrefix: colThs,
|
||||
}
|
||||
name, ok = colMap[prefix]
|
||||
return
|
||||
@@ -537,6 +539,9 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
case utils.ThresholdProfilePrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = ms.GetThresholdProfile(tntID.Tenant, tntID.ID, true, utils.NonTransactional)
|
||||
case utils.ThresholdsPrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = ms.GetThreshold(tntID.Tenant, tntID.ID, true, utils.NonTransactional)
|
||||
}
|
||||
if err != nil {
|
||||
return utils.NewCGRError(utils.MONGO,
|
||||
@@ -713,8 +718,8 @@ func (ms *MongoStorage) HasData(category, subject string) (has bool, err error)
|
||||
case utils.StatQueuePrefix:
|
||||
count, err = db.C(colRes).Find(bson.M{"id": subject}).Count()
|
||||
has = count > 0
|
||||
case utils.ThresholdProfilePrefix:
|
||||
count, err = db.C(colTlds).Find(bson.M{"id": subject}).Count()
|
||||
case utils.ThresholdsPrefix:
|
||||
count, err = db.C(colThs).Find(bson.M{"id": subject}).Count()
|
||||
has = count > 0
|
||||
default:
|
||||
err = fmt.Errorf("unsupported category in HasData: %s", category)
|
||||
@@ -2181,3 +2186,45 @@ func (ms *MongoStorage) RemThresholdProfile(tenant, id, transactionID string) (e
|
||||
cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetThreshold(tenant, id string, skipCache bool, transactionID string) (r *Threshold, err error) {
|
||||
key := utils.ThresholdsPrefix + utils.ConcatenatedKey(tenant, id)
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*Threshold), nil
|
||||
}
|
||||
}
|
||||
session, col := ms.conn(colThs)
|
||||
defer session.Close()
|
||||
r = new(Threshold)
|
||||
if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(r); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
err = utils.ErrNotFound
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
cache.Set(key, r, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetThreshold(r *Threshold) (err error) {
|
||||
session, col := ms.conn(colThs)
|
||||
defer session.Close()
|
||||
_, err = col.Upsert(bson.M{"tenant": r.Tenant, "id": r.ID}, r)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) RemoveThreshold(tenant, id string, transactionID string) (err error) {
|
||||
session, col := ms.conn(colThs)
|
||||
defer session.Close()
|
||||
if err = col.Remove(bson.M{"tenant": tenant, "id": id}); err != nil {
|
||||
return
|
||||
}
|
||||
cache.RemKey(utils.ThresholdsPrefix+utils.ConcatenatedKey(tenant, id),
|
||||
cacheCommit(transactionID), transactionID)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1734,6 +1734,48 @@ func (rs *RedisStorage) RemThresholdProfile(tenant, id, transactionID string) (e
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetThreshold(tenant, id string, skipCache bool, transactionID string) (r *Threshold, err error) {
|
||||
key := utils.ThresholdsPrefix + utils.ConcatenatedKey(tenant, id)
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*Threshold), nil
|
||||
}
|
||||
}
|
||||
var values []byte
|
||||
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
|
||||
if err == redis.ErrRespNil { // did not find the destination
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
err = utils.ErrNotFound
|
||||
}
|
||||
return
|
||||
}
|
||||
if err = rs.ms.Unmarshal(values, &r); err != nil {
|
||||
return
|
||||
}
|
||||
cache.Set(key, r, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetThreshold(r *Threshold) (err error) {
|
||||
result, err := rs.ms.Marshal(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return rs.Cmd("SET", utils.ThresholdsPrefix+utils.ConcatenatedKey(r.Tenant, r.ID), result).Err
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) RemoveThreshold(tenant, id string, transactionID string) (err error) {
|
||||
key := utils.ThresholdsPrefix + utils.ConcatenatedKey(tenant, id)
|
||||
if err = rs.Cmd("DEL", key).Err; err != nil {
|
||||
return
|
||||
}
|
||||
cache.RemKey(key, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetStorageType() string {
|
||||
return utils.REDIS
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user