mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add GetThresholdProfile to DataManager
This commit is contained in:
committed by
Dan Christian Bogos
parent
d6bb574e65
commit
05b8001857
@@ -64,7 +64,7 @@ func (apierV1 *ApierV1) GetThresholdProfile(arg *utils.TenantID, reply *engine.T
|
||||
if missing := utils.MissingStructFields(arg, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
if th, err := apierV1.DataManager.DataDB().GetThresholdProfile(arg.Tenant, arg.ID, false, utils.NonTransactional); err != nil {
|
||||
if th, err := apierV1.DataManager.GetThresholdProfile(arg.Tenant, arg.ID, false, utils.NonTransactional); err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
} else {
|
||||
*reply = *th
|
||||
|
||||
@@ -146,3 +146,25 @@ func (dm *DataManager) RemoveThreshold(tenant, id, transactionID string) (err er
|
||||
cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (dm *DataManager) GetThresholdProfile(tenant, id string, skipCache bool, transactionID string) (th *ThresholdProfile, err error) {
|
||||
key := utils.ThresholdProfilePrefix + utils.ConcatenatedKey(tenant, id)
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*ThresholdProfile), nil
|
||||
}
|
||||
}
|
||||
th, err = dm.dataDB.GetThresholdProfileDrv(tenant, id)
|
||||
if err != nil {
|
||||
if err == utils.ErrNotFound {
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
cache.Set(key, th, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
@@ -346,7 +346,7 @@ func TestLoaderITWriteToDatabase(t *testing.T) {
|
||||
|
||||
for _, mpIDs := range loader.thProfiles {
|
||||
for _, th := range mpIDs {
|
||||
rcv, err := loader.dataStorage.GetThresholdProfile(th.Tenant, th.ID, true, utils.NonTransactional)
|
||||
rcv, err := loader.dm.GetThresholdProfile(th.Tenant, th.ID, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
t.Errorf("Failed GetThresholdProfile, tenant: %s, id: %s, error: %s ", th.Tenant, th.ID, err.Error())
|
||||
}
|
||||
|
||||
@@ -2062,19 +2062,19 @@ func testOnStorITCRUDThresholdProfile(t *testing.T) {
|
||||
Weight: 1.4,
|
||||
ActionIDs: []string{},
|
||||
}
|
||||
if _, rcvErr := onStor.DataDB().GetThresholdProfile(th.Tenant, th.ID,
|
||||
if _, rcvErr := onStor.GetThresholdProfile(th.Tenant, th.ID,
|
||||
false, utils.NonTransactional); rcvErr != utils.ErrNotFound {
|
||||
t.Error(rcvErr)
|
||||
}
|
||||
if err := onStor.DataDB().SetThresholdProfile(th); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rcv, err := onStor.DataDB().GetThresholdProfile(th.Tenant, th.ID, true, utils.NonTransactional); err != nil {
|
||||
if rcv, err := onStor.GetThresholdProfile(th.Tenant, th.ID, true, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(th, rcv) {
|
||||
t.Errorf("Expecting: %v, received: %v", th, rcv)
|
||||
}
|
||||
if rcv, err := onStor.DataDB().GetThresholdProfile(th.Tenant, th.ID, false, utils.NonTransactional); err != nil {
|
||||
if rcv, err := onStor.GetThresholdProfile(th.Tenant, th.ID, false, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(th, rcv) {
|
||||
t.Errorf("Expecting: %v, received: %v", th, rcv)
|
||||
@@ -2082,10 +2082,10 @@ func testOnStorITCRUDThresholdProfile(t *testing.T) {
|
||||
if err := onStor.DataDB().RemThresholdProfile(th.Tenant, th.ID, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, rcvErr := onStor.DataDB().GetThresholdProfile(th.Tenant, th.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
|
||||
if _, rcvErr := onStor.GetThresholdProfile(th.Tenant, th.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
|
||||
t.Error(rcvErr)
|
||||
}
|
||||
if _, rcvErr := onStor.DataDB().GetThresholdProfile(th.Tenant, th.ID, false, utils.NonTransactional); rcvErr != utils.ErrNotFound {
|
||||
if _, rcvErr := onStor.GetThresholdProfile(th.Tenant, th.ID, false, utils.NonTransactional); rcvErr != utils.ErrNotFound {
|
||||
t.Error(rcvErr)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,7 +120,7 @@ type DataDB interface {
|
||||
GetStoredStatQueue(tenant, id string) (sq *StoredStatQueue, err error)
|
||||
SetStoredStatQueue(sq *StoredStatQueue) (err error)
|
||||
RemStoredStatQueue(tenant, id string) (err error)
|
||||
GetThresholdProfile(tenant string, ID string, skipCache bool, transID string) (tp *ThresholdProfile, err error)
|
||||
GetThresholdProfileDrv(tenant string, ID string) (tp *ThresholdProfile, err error)
|
||||
SetThresholdProfile(tp *ThresholdProfile) (err error)
|
||||
RemThresholdProfile(tenant, id, transactionID string) (err error)
|
||||
GetThresholdDrv(string, string) (*Threshold, error)
|
||||
|
||||
@@ -1570,23 +1570,13 @@ func (ms *MapStorage) RemStoredStatQueue(tenant, id string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// GetThresholdProfile retrieves a ThresholdProfile from dataDB/cache
|
||||
func (ms *MapStorage) GetThresholdProfile(tenant, ID string,
|
||||
skipCache bool, transactionID string) (tp *ThresholdProfile, err error) {
|
||||
// GetThresholdProfileDrv retrieves a ThresholdProfile from dataDB
|
||||
func (ms *MapStorage) GetThresholdProfileDrv(tenant, ID string) (tp *ThresholdProfile, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
key := utils.ThresholdProfilePrefix + utils.ConcatenatedKey(tenant, ID)
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*ThresholdProfile), nil
|
||||
}
|
||||
}
|
||||
values, ok := ms.dict[key]
|
||||
if !ok {
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
err = ms.ms.Unmarshal(values, &tp)
|
||||
@@ -1598,7 +1588,6 @@ func (ms *MapStorage) GetThresholdProfile(tenant, ID string,
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
cache.Set(key, tp, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -540,7 +540,7 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
_, err = ms.GetTiming(dataID, true, utils.NonTransactional)
|
||||
case utils.ThresholdProfilePrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = ms.GetThresholdProfile(tntID.Tenant, tntID.ID, true, utils.NonTransactional)
|
||||
_, err = ms.GetThresholdProfileDrv(tntID.Tenant, tntID.ID)
|
||||
case utils.ThresholdPrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = ms.GetThresholdDrv(tntID.Tenant, tntID.ID)
|
||||
@@ -2158,25 +2158,12 @@ func (ms *MongoStorage) RemStoredStatQueue(tenant, id string) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
// GetThresholdProfile retrieves a ThresholdProfile from dataDB/cache
|
||||
func (ms *MongoStorage) GetThresholdProfile(tenant, ID string,
|
||||
skipCache bool, transactionID string) (tp *ThresholdProfile, err error) {
|
||||
cacheKey := utils.ThresholdProfilePrefix + utils.ConcatenatedKey(tenant, ID)
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(cacheKey); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*ThresholdProfile), nil
|
||||
}
|
||||
}
|
||||
// GetThresholdProfileDrv retrieves a ThresholdProfile from dataDB
|
||||
func (ms *MongoStorage) GetThresholdProfileDrv(tenant, ID string) (tp *ThresholdProfile, err error) {
|
||||
session, col := ms.conn(colTps)
|
||||
defer session.Close()
|
||||
tp = new(ThresholdProfile)
|
||||
cCommit := cacheCommit(transactionID)
|
||||
if err = col.Find(bson.M{"tenant": tenant, "id": ID}).One(&tp); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
cache.Set(cacheKey, nil, cCommit, transactionID)
|
||||
err = utils.ErrNotFound
|
||||
}
|
||||
return nil, err
|
||||
@@ -2186,7 +2173,6 @@ func (ms *MongoStorage) GetThresholdProfile(tenant, ID string,
|
||||
return
|
||||
}
|
||||
}
|
||||
cache.Set(cacheKey, tp, cCommit, transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -1684,22 +1684,12 @@ func (rs *RedisStorage) RemStoredStatQueue(tenant, id string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// GetThresholdProfile retrieves a ThresholdProfile from dataDB/cache
|
||||
func (rs *RedisStorage) GetThresholdProfile(tenant, ID string,
|
||||
skipCache bool, transactionID string) (tp *ThresholdProfile, err error) {
|
||||
// GetThresholdProfile retrieves a ThresholdProfile from dataDB
|
||||
func (rs *RedisStorage) GetThresholdProfileDrv(tenant, ID string) (tp *ThresholdProfile, err error) {
|
||||
key := utils.ThresholdProfilePrefix + utils.ConcatenatedKey(tenant, ID)
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*ThresholdProfile), nil
|
||||
}
|
||||
}
|
||||
var values []byte
|
||||
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
|
||||
if err == redis.ErrRespNil {
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
err = utils.ErrNotFound
|
||||
}
|
||||
return
|
||||
@@ -1712,7 +1702,6 @@ func (rs *RedisStorage) GetThresholdProfile(tenant, ID string,
|
||||
return
|
||||
}
|
||||
}
|
||||
cache.Set(key, tp, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -258,7 +258,7 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts T
|
||||
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
|
||||
defer guardian.Guardian.UnguardIDs(lockIDs...)
|
||||
for tID := range tIDs {
|
||||
tPrfl, err := tS.dm.DataDB().GetThresholdProfile(ev.Tenant, tID, false, utils.NonTransactional)
|
||||
tPrfl, err := tS.dm.GetThresholdProfile(ev.Tenant, tID, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
if err == utils.ErrNotFound {
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user