mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-25 09:08:45 +05:00
Merge branch 'master' of github.com:cgrates/cgrates
This commit is contained in:
@@ -112,3 +112,37 @@ func (dm *DataManager) RemoveFilter(tenant, id, transactionID string) (err error
|
||||
cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (dm *DataManager) GetThreshold(tenant, id string, skipCache bool, transactionID string) (th *Threshold, err error) {
|
||||
key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id)
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*Threshold), nil
|
||||
}
|
||||
}
|
||||
th, err = dm.dataDB.GetThresholdDrv(tenant, id)
|
||||
if err != nil {
|
||||
if err == utils.ErrNotFound {
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
cache.Set(key, th, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (dm *DataManager) SetThreshold(th *Threshold) (err error) {
|
||||
return dm.DataDB().SetThresholdDrv(th)
|
||||
}
|
||||
|
||||
func (dm *DataManager) RemoveThreshold(tenant, id, transactionID string) (err error) {
|
||||
if err = dm.DataDB().RemoveThresholdDrv(tenant, id); err != nil {
|
||||
return
|
||||
}
|
||||
cache.RemKey(utils.ThresholdPrefix+utils.ConcatenatedKey(tenant, id),
|
||||
cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -2096,26 +2096,26 @@ func testOnStorITCRUDThreshold(t *testing.T) {
|
||||
ID: "TH1",
|
||||
Snooze: time.Date(2016, 10, 1, 0, 0, 0, 0, time.UTC).Local(),
|
||||
}
|
||||
if _, rcvErr := onStor.DataDB().GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound {
|
||||
if _, rcvErr := onStor.GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound {
|
||||
t.Error(rcvErr)
|
||||
}
|
||||
if err := onStor.DataDB().SetThreshold(res); err != nil {
|
||||
if err := onStor.SetThreshold(res); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rcv, err := onStor.DataDB().GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); err != nil {
|
||||
if rcv, err := onStor.GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
} else if !(reflect.DeepEqual(res, rcv)) {
|
||||
t.Errorf("Expecting: %v, received: %v", res, rcv)
|
||||
}
|
||||
if rcv, err := onStor.DataDB().GetThreshold("cgrates.org", "TH1", false, utils.NonTransactional); err != nil {
|
||||
if rcv, err := onStor.GetThreshold("cgrates.org", "TH1", false, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(res, rcv) {
|
||||
t.Errorf("Expecting: %v, received: %v", res, rcv)
|
||||
}
|
||||
if err := onStor.DataDB().RemoveThreshold(res.Tenant, res.ID, utils.NonTransactional); err != nil {
|
||||
if err := onStor.RemoveThreshold(res.Tenant, res.ID, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, rcvErr := onStor.DataDB().GetThreshold(res.Tenant, res.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
|
||||
if _, rcvErr := onStor.GetThreshold(res.Tenant, res.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
|
||||
t.Error(rcvErr)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,9 +123,9 @@ type DataDB interface {
|
||||
GetThresholdProfile(tenant string, ID string, skipCache bool, transID string) (tp *ThresholdProfile, err error)
|
||||
SetThresholdProfile(tp *ThresholdProfile) (err error)
|
||||
RemThresholdProfile(tenant, id, transactionID string) (err error)
|
||||
GetThreshold(string, string, bool, string) (*Threshold, error)
|
||||
SetThreshold(*Threshold) error
|
||||
RemoveThreshold(string, string, string) error
|
||||
GetThresholdDrv(string, string) (*Threshold, error)
|
||||
SetThresholdDrv(*Threshold) error
|
||||
RemoveThresholdDrv(string, string) error
|
||||
GetFilterDrv(string, string) (*Filter, error)
|
||||
SetFilterDrv(*Filter) error
|
||||
RemoveFilterDrv(string, string) error
|
||||
|
||||
@@ -1624,32 +1624,22 @@ func (ms *MapStorage) RemThresholdProfile(tenant, id, transactionID string) (err
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetThreshold(tenant, id string, skipCache bool, transactionID string) (r *Threshold, err error) {
|
||||
func (ms *MapStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id)
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x != nil {
|
||||
return x.(*Threshold), nil
|
||||
}
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
}
|
||||
values, ok := ms.dict[key]
|
||||
if !ok {
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
err = ms.ms.Unmarshal(values, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cache.Set(key, r, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) SetThreshold(r *Threshold) (err error) {
|
||||
func (ms *MapStorage) SetThresholdDrv(r *Threshold) (err error) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
result, err := ms.ms.Marshal(r)
|
||||
@@ -1660,12 +1650,11 @@ func (ms *MapStorage) SetThreshold(r *Threshold) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) RemoveThreshold(tenant, id string, transactionID string) (err error) {
|
||||
func (ms *MapStorage) RemoveThresholdDrv(tenant, id string) (err error) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id)
|
||||
delete(ms.dict, key)
|
||||
cache.RemKey(key, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -543,7 +543,7 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
_, err = ms.GetThresholdProfile(tntID.Tenant, tntID.ID, true, utils.NonTransactional)
|
||||
case utils.ThresholdPrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = ms.GetThreshold(tntID.Tenant, tntID.ID, true, utils.NonTransactional)
|
||||
_, err = ms.GetThresholdDrv(tntID.Tenant, tntID.ID)
|
||||
case utils.FilterPrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = ms.GetFilterDrv(tntID.Tenant, tntID.ID)
|
||||
@@ -2211,45 +2211,31 @@ func (ms *MongoStorage) RemThresholdProfile(tenant, id, transactionID string) (e
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetThreshold(tenant, id string, skipCache bool, transactionID string) (r *Threshold, err error) {
|
||||
key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id)
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*Threshold), nil
|
||||
}
|
||||
}
|
||||
func (ms *MongoStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err error) {
|
||||
session, col := ms.conn(colThs)
|
||||
defer session.Close()
|
||||
r = new(Threshold)
|
||||
if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(r); err != nil {
|
||||
if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(&r); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
err = utils.ErrNotFound
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
cache.Set(key, r, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetThreshold(r *Threshold) (err error) {
|
||||
func (ms *MongoStorage) SetThresholdDrv(r *Threshold) (err error) {
|
||||
session, col := ms.conn(colThs)
|
||||
defer session.Close()
|
||||
_, err = col.Upsert(bson.M{"tenant": r.Tenant, "id": r.ID}, r)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) RemoveThreshold(tenant, id string, transactionID string) (err error) {
|
||||
func (ms *MongoStorage) RemoveThresholdDrv(tenant, id string) (err error) {
|
||||
session, col := ms.conn(colThs)
|
||||
defer session.Close()
|
||||
if err = col.Remove(bson.M{"tenant": tenant, "id": id}); err != nil {
|
||||
return
|
||||
}
|
||||
cache.RemKey(utils.ThresholdPrefix+utils.ConcatenatedKey(tenant, id),
|
||||
cacheCommit(transactionID), transactionID)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1734,20 +1734,11 @@ func (rs *RedisStorage) RemThresholdProfile(tenant, id, transactionID string) (e
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetThreshold(tenant, id string, skipCache bool, transactionID string) (r *Threshold, err error) {
|
||||
func (rs *RedisStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err error) {
|
||||
key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id)
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*Threshold), nil
|
||||
}
|
||||
}
|
||||
var values []byte
|
||||
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
|
||||
if err == redis.ErrRespNil { // did not find the destination
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
err = utils.ErrNotFound
|
||||
}
|
||||
return
|
||||
@@ -1755,11 +1746,10 @@ func (rs *RedisStorage) GetThreshold(tenant, id string, skipCache bool, transact
|
||||
if err = rs.ms.Unmarshal(values, &r); err != nil {
|
||||
return
|
||||
}
|
||||
cache.Set(key, r, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetThreshold(r *Threshold) (err error) {
|
||||
func (rs *RedisStorage) SetThresholdDrv(r *Threshold) (err error) {
|
||||
result, err := rs.ms.Marshal(r)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -1767,12 +1757,11 @@ func (rs *RedisStorage) SetThreshold(r *Threshold) (err error) {
|
||||
return rs.Cmd("SET", utils.ThresholdPrefix+utils.ConcatenatedKey(r.Tenant, r.ID), result).Err
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) RemoveThreshold(tenant, id string, transactionID string) (err error) {
|
||||
func (rs *RedisStorage) RemoveThresholdDrv(tenant, id string) (err error) {
|
||||
key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id)
|
||||
if err = rs.Cmd("DEL", key).Err; err != nil {
|
||||
return
|
||||
}
|
||||
cache.RemKey(key, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -238,7 +238,7 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) {
|
||||
}
|
||||
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, utils.ThresholdPrefix+t.TenantID())
|
||||
defer guardian.Guardian.UnguardIDs(utils.ThresholdPrefix + t.TenantID())
|
||||
if err = tS.dm.DataDB().SetThreshold(t); err != nil {
|
||||
if err = tS.dm.SetThreshold(t); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<ThresholdS> failed saving Threshold with tenant: %s and ID: %s, error: %s",
|
||||
t.Tenant, t.ID, err.Error()))
|
||||
@@ -285,7 +285,7 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts T
|
||||
}
|
||||
lockThreshold := utils.ThresholdPrefix + tPrfl.TenantID()
|
||||
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockThreshold)
|
||||
t, err := tS.dm.DataDB().GetThreshold(tPrfl.Tenant, tPrfl.ID, false, "")
|
||||
t, err := tS.dm.GetThreshold(tPrfl.Tenant, tPrfl.ID, false, "")
|
||||
if err != nil {
|
||||
guardian.Guardian.UnguardIDs(lockThreshold)
|
||||
return nil, err
|
||||
@@ -334,7 +334,7 @@ func (tS *ThresholdService) processEvent(ev *ThresholdEvent) (hits int, err erro
|
||||
if t.dirty == nil { // one time threshold
|
||||
lockThreshold := utils.ThresholdPrefix + t.TenantID()
|
||||
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockThreshold)
|
||||
if err = tS.dm.DataDB().RemoveThreshold(t.Tenant, t.ID, utils.NonTransactional); err != nil {
|
||||
if err = tS.dm.RemoveThreshold(t.Tenant, t.ID, utils.NonTransactional); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<ThresholdService> failed removing non-recurrent threshold: %s, error: %s",
|
||||
t.TenantID(), err.Error()))
|
||||
@@ -403,7 +403,7 @@ func (tS *ThresholdService) V1GetThresholdIDs(tenant string, tIDs *[]string) (er
|
||||
|
||||
// V1GetThreshold retrieves a Threshold
|
||||
func (tS *ThresholdService) V1GetThreshold(tntID *utils.TenantID, t *Threshold) (err error) {
|
||||
if thd, err := tS.dm.DataDB().GetThreshold(tntID.Tenant, tntID.ID, false, ""); err != nil {
|
||||
if thd, err := tS.dm.GetThreshold(tntID.Tenant, tntID.ID, false, ""); err != nil {
|
||||
return err
|
||||
} else {
|
||||
*t = *thd
|
||||
|
||||
@@ -2098,7 +2098,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
|
||||
log.Print("Thresholds:")
|
||||
}
|
||||
for _, thd := range tpr.thresholds {
|
||||
if err = tpr.dm.DataDB().SetThreshold(&Threshold{Tenant: thd.Tenant, ID: thd.ID}); err != nil {
|
||||
if err = tpr.dm.SetThreshold(&Threshold{Tenant: thd.Tenant, ID: thd.ID}); err != nil {
|
||||
return err
|
||||
}
|
||||
if verbose {
|
||||
|
||||
Reference in New Issue
Block a user