diff --git a/engine/datamanager.go b/engine/datamanager.go index adba745e8..4575c676e 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -112,3 +112,37 @@ func (dm *DataManager) RemoveFilter(tenant, id, transactionID string) (err error cacheCommit(transactionID), transactionID) return } + +func (dm *DataManager) GetThreshold(tenant, id string, skipCache bool, transactionID string) (th *Threshold, err error) { + key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id) + if !skipCache { + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*Threshold), nil + } + } + th, err = dm.dataDB.GetThresholdDrv(tenant, id) + if err != nil { + if err == utils.ErrNotFound { + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + } + return nil, err + } + cache.Set(key, th, cacheCommit(transactionID), transactionID) + return +} + +func (dm *DataManager) SetThreshold(th *Threshold) (err error) { + return dm.DataDB().SetThresholdDrv(th) +} + +func (dm *DataManager) RemoveThreshold(tenant, id, transactionID string) (err error) { + if err = dm.DataDB().RemoveThresholdDrv(tenant, id); err != nil { + return + } + cache.RemKey(utils.ThresholdPrefix+utils.ConcatenatedKey(tenant, id), + cacheCommit(transactionID), transactionID) + return +} diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index a705959eb..9a42187a1 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -2096,26 +2096,26 @@ func testOnStorITCRUDThreshold(t *testing.T) { ID: "TH1", Snooze: time.Date(2016, 10, 1, 0, 0, 0, 0, time.UTC).Local(), } - if _, rcvErr := onStor.DataDB().GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound { t.Error(rcvErr) } - if err := onStor.DataDB().SetThreshold(res); err != nil { + if err := onStor.SetThreshold(res); err != nil { t.Error(err) } - if rcv, err := onStor.DataDB().GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); err != nil { t.Error(err) } else if !(reflect.DeepEqual(res, rcv)) { t.Errorf("Expecting: %v, received: %v", res, rcv) } - if rcv, err := onStor.DataDB().GetThreshold("cgrates.org", "TH1", false, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetThreshold("cgrates.org", "TH1", false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(res, rcv) { t.Errorf("Expecting: %v, received: %v", res, rcv) } - if err := onStor.DataDB().RemoveThreshold(res.Tenant, res.ID, utils.NonTransactional); err != nil { + if err := onStor.RemoveThreshold(res.Tenant, res.ID, utils.NonTransactional); err != nil { t.Error(err) } - if _, rcvErr := onStor.DataDB().GetThreshold(res.Tenant, res.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetThreshold(res.Tenant, res.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 4357739ec..675c95fd7 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -123,9 +123,9 @@ type DataDB interface { GetThresholdProfile(tenant string, ID string, skipCache bool, transID string) (tp *ThresholdProfile, err error) SetThresholdProfile(tp *ThresholdProfile) (err error) RemThresholdProfile(tenant, id, transactionID string) (err error) - GetThreshold(string, string, bool, string) (*Threshold, error) - SetThreshold(*Threshold) error - RemoveThreshold(string, string, string) error + GetThresholdDrv(string, string) (*Threshold, error) + SetThresholdDrv(*Threshold) error + RemoveThresholdDrv(string, string) error GetFilterDrv(string, string) (*Filter, error) SetFilterDrv(*Filter) error RemoveFilterDrv(string, string) error diff --git a/engine/storage_map.go b/engine/storage_map.go index a82009232..fc318124d 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1624,32 +1624,22 @@ func (ms *MapStorage) RemThresholdProfile(tenant, id, transactionID string) (err return } -func (ms *MapStorage) GetThreshold(tenant, id string, skipCache bool, transactionID string) (r *Threshold, err error) { +func (ms *MapStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err error) { ms.mu.RLock() defer ms.mu.RUnlock() key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id) - if !skipCache { - if x, ok := cache.Get(key); ok { - if x != nil { - return x.(*Threshold), nil - } - return nil, utils.ErrNotFound - } - } values, ok := ms.dict[key] if !ok { - cache.Set(key, nil, cacheCommit(transactionID), transactionID) return nil, utils.ErrNotFound } err = ms.ms.Unmarshal(values, r) if err != nil { return nil, err } - cache.Set(key, r, cacheCommit(transactionID), transactionID) return } -func (ms *MapStorage) SetThreshold(r *Threshold) (err error) { +func (ms *MapStorage) SetThresholdDrv(r *Threshold) (err error) { ms.mu.Lock() defer ms.mu.Unlock() result, err := ms.ms.Marshal(r) @@ -1660,12 +1650,11 @@ func (ms *MapStorage) SetThreshold(r *Threshold) (err error) { return } -func (ms *MapStorage) RemoveThreshold(tenant, id string, transactionID string) (err error) { +func (ms *MapStorage) RemoveThresholdDrv(tenant, id string) (err error) { ms.mu.Lock() defer ms.mu.Unlock() key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id) delete(ms.dict, key) - cache.RemKey(key, cacheCommit(transactionID), transactionID) return } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 0b91975e5..119dfc6f7 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -543,7 +543,7 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached _, err = ms.GetThresholdProfile(tntID.Tenant, tntID.ID, true, utils.NonTransactional) case utils.ThresholdPrefix: tntID := utils.NewTenantID(dataID) - _, err = ms.GetThreshold(tntID.Tenant, tntID.ID, true, utils.NonTransactional) + _, err = ms.GetThresholdDrv(tntID.Tenant, tntID.ID) case utils.FilterPrefix: tntID := utils.NewTenantID(dataID) _, err = ms.GetFilterDrv(tntID.Tenant, tntID.ID) @@ -2211,45 +2211,31 @@ func (ms *MongoStorage) RemThresholdProfile(tenant, id, transactionID string) (e return } -func (ms *MongoStorage) GetThreshold(tenant, id string, skipCache bool, transactionID string) (r *Threshold, err error) { - key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id) - if !skipCache { - if x, ok := cache.Get(key); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.(*Threshold), nil - } - } +func (ms *MongoStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err error) { session, col := ms.conn(colThs) defer session.Close() - r = new(Threshold) - if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(r); err != nil { + if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(&r); err != nil { if err == mgo.ErrNotFound { err = utils.ErrNotFound - cache.Set(key, nil, cacheCommit(transactionID), transactionID) } return nil, err } - cache.Set(key, r, cacheCommit(transactionID), transactionID) return } -func (ms *MongoStorage) SetThreshold(r *Threshold) (err error) { +func (ms *MongoStorage) SetThresholdDrv(r *Threshold) (err error) { session, col := ms.conn(colThs) defer session.Close() _, err = col.Upsert(bson.M{"tenant": r.Tenant, "id": r.ID}, r) return } -func (ms *MongoStorage) RemoveThreshold(tenant, id string, transactionID string) (err error) { +func (ms *MongoStorage) RemoveThresholdDrv(tenant, id string) (err error) { session, col := ms.conn(colThs) defer session.Close() if err = col.Remove(bson.M{"tenant": tenant, "id": id}); err != nil { return } - cache.RemKey(utils.ThresholdPrefix+utils.ConcatenatedKey(tenant, id), - cacheCommit(transactionID), transactionID) return nil } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index dd7f4c934..2b3216c65 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1734,20 +1734,11 @@ func (rs *RedisStorage) RemThresholdProfile(tenant, id, transactionID string) (e return } -func (rs *RedisStorage) GetThreshold(tenant, id string, skipCache bool, transactionID string) (r *Threshold, err error) { +func (rs *RedisStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err error) { key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id) - if !skipCache { - if x, ok := cache.Get(key); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.(*Threshold), nil - } - } var values []byte if values, err = rs.Cmd("GET", key).Bytes(); err != nil { if err == redis.ErrRespNil { // did not find the destination - cache.Set(key, nil, cacheCommit(transactionID), transactionID) err = utils.ErrNotFound } return @@ -1755,11 +1746,10 @@ func (rs *RedisStorage) GetThreshold(tenant, id string, skipCache bool, transact if err = rs.ms.Unmarshal(values, &r); err != nil { return } - cache.Set(key, r, cacheCommit(transactionID), transactionID) return } -func (rs *RedisStorage) SetThreshold(r *Threshold) (err error) { +func (rs *RedisStorage) SetThresholdDrv(r *Threshold) (err error) { result, err := rs.ms.Marshal(r) if err != nil { return err @@ -1767,12 +1757,11 @@ func (rs *RedisStorage) SetThreshold(r *Threshold) (err error) { return rs.Cmd("SET", utils.ThresholdPrefix+utils.ConcatenatedKey(r.Tenant, r.ID), result).Err } -func (rs *RedisStorage) RemoveThreshold(tenant, id string, transactionID string) (err error) { +func (rs *RedisStorage) RemoveThresholdDrv(tenant, id string) (err error) { key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id) if err = rs.Cmd("DEL", key).Err; err != nil { return } - cache.RemKey(key, cacheCommit(transactionID), transactionID) return } diff --git a/engine/thresholds.go b/engine/thresholds.go index 6d05a2f2d..a270d9a4d 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -238,7 +238,7 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) { } guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, utils.ThresholdPrefix+t.TenantID()) defer guardian.Guardian.UnguardIDs(utils.ThresholdPrefix + t.TenantID()) - if err = tS.dm.DataDB().SetThreshold(t); err != nil { + if err = tS.dm.SetThreshold(t); err != nil { utils.Logger.Warning( fmt.Sprintf(" failed saving Threshold with tenant: %s and ID: %s, error: %s", t.Tenant, t.ID, err.Error())) @@ -285,7 +285,7 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts T } lockThreshold := utils.ThresholdPrefix + tPrfl.TenantID() guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockThreshold) - t, err := tS.dm.DataDB().GetThreshold(tPrfl.Tenant, tPrfl.ID, false, "") + t, err := tS.dm.GetThreshold(tPrfl.Tenant, tPrfl.ID, false, "") if err != nil { guardian.Guardian.UnguardIDs(lockThreshold) return nil, err @@ -334,7 +334,7 @@ func (tS *ThresholdService) processEvent(ev *ThresholdEvent) (hits int, err erro if t.dirty == nil { // one time threshold lockThreshold := utils.ThresholdPrefix + t.TenantID() guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockThreshold) - if err = tS.dm.DataDB().RemoveThreshold(t.Tenant, t.ID, utils.NonTransactional); err != nil { + if err = tS.dm.RemoveThreshold(t.Tenant, t.ID, utils.NonTransactional); err != nil { utils.Logger.Warning( fmt.Sprintf(" failed removing non-recurrent threshold: %s, error: %s", t.TenantID(), err.Error())) @@ -403,7 +403,7 @@ func (tS *ThresholdService) V1GetThresholdIDs(tenant string, tIDs *[]string) (er // V1GetThreshold retrieves a Threshold func (tS *ThresholdService) V1GetThreshold(tntID *utils.TenantID, t *Threshold) (err error) { - if thd, err := tS.dm.DataDB().GetThreshold(tntID.Tenant, tntID.ID, false, ""); err != nil { + if thd, err := tS.dm.GetThreshold(tntID.Tenant, tntID.ID, false, ""); err != nil { return err } else { *t = *thd diff --git a/engine/tp_reader.go b/engine/tp_reader.go index eb669ed94..cca681fe1 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -2098,7 +2098,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Thresholds:") } for _, thd := range tpr.thresholds { - if err = tpr.dm.DataDB().SetThreshold(&Threshold{Tenant: thd.Tenant, ID: thd.ID}); err != nil { + if err = tpr.dm.SetThreshold(&Threshold{Tenant: thd.Tenant, ID: thd.ID}); err != nil { return err } if verbose {