From 9ea6bf488d6558a171712ce98ca8f62883a5b180 Mon Sep 17 00:00:00 2001 From: TeoV Date: Fri, 13 Oct 2017 16:20:04 +0300 Subject: [PATCH 1/3] Add GetThreshold to DataManager --- engine/datamanager.go | 21 +++++++++++++++++++++ engine/onstor_it_test.go | 8 ++++---- engine/storage_interface.go | 2 +- engine/storage_map.go | 12 +----------- engine/storage_mongo_datadb.go | 18 +++--------------- engine/storage_redis.go | 12 +----------- engine/thresholds.go | 4 ++-- 7 files changed, 33 insertions(+), 44 deletions(-) diff --git a/engine/datamanager.go b/engine/datamanager.go index adba745e8..d05a31feb 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -112,3 +112,24 @@ func (dm *DataManager) RemoveFilter(tenant, id, transactionID string) (err error cacheCommit(transactionID), transactionID) return } + +func (dm *DataManager) GetThreshold(tenant, id string, skipCache bool, transactionID string) (th *Threshold, err error) { + key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id) + if !skipCache { + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*Threshold), nil + } + } + th, err = dm.dataDB.GetThresholdDrv(tenant, id) + if err != nil { + if err == utils.ErrNotFound { + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + } + return nil, err + } + cache.Set(key, th, cacheCommit(transactionID), transactionID) + return +} diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index a705959eb..546049fac 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -2096,18 +2096,18 @@ func testOnStorITCRUDThreshold(t *testing.T) { ID: "TH1", Snooze: time.Date(2016, 10, 1, 0, 0, 0, 0, time.UTC).Local(), } - if _, rcvErr := onStor.DataDB().GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound { t.Error(rcvErr) } if err := onStor.DataDB().SetThreshold(res); err != nil { t.Error(err) } - if rcv, err := onStor.DataDB().GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); err != nil { t.Error(err) } else if !(reflect.DeepEqual(res, rcv)) { t.Errorf("Expecting: %v, received: %v", res, rcv) } - if rcv, err := onStor.DataDB().GetThreshold("cgrates.org", "TH1", false, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetThreshold("cgrates.org", "TH1", false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(res, rcv) { t.Errorf("Expecting: %v, received: %v", res, rcv) @@ -2115,7 +2115,7 @@ func testOnStorITCRUDThreshold(t *testing.T) { if err := onStor.DataDB().RemoveThreshold(res.Tenant, res.ID, utils.NonTransactional); err != nil { t.Error(err) } - if _, rcvErr := onStor.DataDB().GetThreshold(res.Tenant, res.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetThreshold(res.Tenant, res.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 4357739ec..deca6201e 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -123,7 +123,7 @@ type DataDB interface { GetThresholdProfile(tenant string, ID string, skipCache bool, transID string) (tp *ThresholdProfile, err error) SetThresholdProfile(tp *ThresholdProfile) (err error) RemThresholdProfile(tenant, id, transactionID string) (err error) - GetThreshold(string, string, bool, string) (*Threshold, error) + GetThresholdDrv(string, string) (*Threshold, error) SetThreshold(*Threshold) error RemoveThreshold(string, string, string) error GetFilterDrv(string, string) (*Filter, error) diff --git a/engine/storage_map.go b/engine/storage_map.go index a82009232..d135c16f3 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1624,28 +1624,18 @@ func (ms *MapStorage) RemThresholdProfile(tenant, id, transactionID string) (err return } -func (ms *MapStorage) GetThreshold(tenant, id string, skipCache bool, transactionID string) (r *Threshold, err error) { +func (ms *MapStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err error) { ms.mu.RLock() defer ms.mu.RUnlock() key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id) - if !skipCache { - if x, ok := cache.Get(key); ok { - if x != nil { - return x.(*Threshold), nil - } - return nil, utils.ErrNotFound - } - } values, ok := ms.dict[key] if !ok { - cache.Set(key, nil, cacheCommit(transactionID), transactionID) return nil, utils.ErrNotFound } err = ms.ms.Unmarshal(values, r) if err != nil { return nil, err } - cache.Set(key, r, cacheCommit(transactionID), transactionID) return } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 0b91975e5..d713356c0 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -543,7 +543,7 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached _, err = ms.GetThresholdProfile(tntID.Tenant, tntID.ID, true, utils.NonTransactional) case utils.ThresholdPrefix: tntID := utils.NewTenantID(dataID) - _, err = ms.GetThreshold(tntID.Tenant, tntID.ID, true, utils.NonTransactional) + _, err = ms.GetThresholdDrv(tntID.Tenant, tntID.ID) case utils.FilterPrefix: tntID := utils.NewTenantID(dataID) _, err = ms.GetFilterDrv(tntID.Tenant, tntID.ID) @@ -2211,27 +2211,15 @@ func (ms *MongoStorage) RemThresholdProfile(tenant, id, transactionID string) (e return } -func (ms *MongoStorage) GetThreshold(tenant, id string, skipCache bool, transactionID string) (r *Threshold, err error) { - key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id) - if !skipCache { - if x, ok := cache.Get(key); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.(*Threshold), nil - } - } +func (ms *MongoStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err error) { session, col := ms.conn(colThs) defer session.Close() - r = new(Threshold) - if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(r); err != nil { + if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(&r); err != nil { if err == mgo.ErrNotFound { err = utils.ErrNotFound - cache.Set(key, nil, cacheCommit(transactionID), transactionID) } return nil, err } - cache.Set(key, r, cacheCommit(transactionID), transactionID) return } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index dd7f4c934..f1c223582 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1734,20 +1734,11 @@ func (rs *RedisStorage) RemThresholdProfile(tenant, id, transactionID string) (e return } -func (rs *RedisStorage) GetThreshold(tenant, id string, skipCache bool, transactionID string) (r *Threshold, err error) { +func (rs *RedisStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err error) { key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id) - if !skipCache { - if x, ok := cache.Get(key); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.(*Threshold), nil - } - } var values []byte if values, err = rs.Cmd("GET", key).Bytes(); err != nil { if err == redis.ErrRespNil { // did not find the destination - cache.Set(key, nil, cacheCommit(transactionID), transactionID) err = utils.ErrNotFound } return @@ -1755,7 +1746,6 @@ func (rs *RedisStorage) GetThreshold(tenant, id string, skipCache bool, transact if err = rs.ms.Unmarshal(values, &r); err != nil { return } - cache.Set(key, r, cacheCommit(transactionID), transactionID) return } diff --git a/engine/thresholds.go b/engine/thresholds.go index 42654bd45..aa53b6ca4 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -258,7 +258,7 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts T } lockThreshold := utils.ThresholdPrefix + tPrfl.TenantID() guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockThreshold) - t, err := tS.dm.DataDB().GetThreshold(tPrfl.Tenant, tPrfl.ID, false, "") + t, err := tS.dm.GetThreshold(tPrfl.Tenant, tPrfl.ID, false, "") if err != nil { guardian.Guardian.UnguardIDs(lockThreshold) return nil, err @@ -376,7 +376,7 @@ func (tS *ThresholdService) V1GetThresholdIDs(tenant string, tIDs *[]string) (er // V1GetThreshold retrieves a Threshold func (tS *ThresholdService) V1GetThreshold(tntID *utils.TenantID, t *Threshold) (err error) { - if thd, err := tS.dm.DataDB().GetThreshold(tntID.Tenant, tntID.ID, false, ""); err != nil { + if thd, err := tS.dm.GetThreshold(tntID.Tenant, tntID.ID, false, ""); err != nil { return err } else { *t = *thd From 47f3f09af92c1d17202049c32a86b15fd3c8fda2 Mon Sep 17 00:00:00 2001 From: TeoV Date: Fri, 13 Oct 2017 16:28:12 +0300 Subject: [PATCH 2/3] Add SetThreshold in DataManager --- engine/datamanager.go | 4 ++++ engine/onstor_it_test.go | 2 +- engine/storage_interface.go | 2 +- engine/storage_map.go | 2 +- engine/storage_mongo_datadb.go | 2 +- engine/storage_redis.go | 2 +- engine/thresholds.go | 2 +- engine/tp_reader.go | 2 +- 8 files changed, 11 insertions(+), 7 deletions(-) diff --git a/engine/datamanager.go b/engine/datamanager.go index d05a31feb..1501fea27 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -133,3 +133,7 @@ func (dm *DataManager) GetThreshold(tenant, id string, skipCache bool, transacti cache.Set(key, th, cacheCommit(transactionID), transactionID) return } + +func (dm *DataManager) SetThreshold(th *Threshold) (err error) { + return dm.DataDB().SetThresholdDrv(th) +} diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 546049fac..bafa8879c 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -2099,7 +2099,7 @@ func testOnStorITCRUDThreshold(t *testing.T) { if _, rcvErr := onStor.GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound { t.Error(rcvErr) } - if err := onStor.DataDB().SetThreshold(res); err != nil { + if err := onStor.SetThreshold(res); err != nil { t.Error(err) } if rcv, err := onStor.GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); err != nil { diff --git a/engine/storage_interface.go b/engine/storage_interface.go index deca6201e..8568ae6c0 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -124,7 +124,7 @@ type DataDB interface { SetThresholdProfile(tp *ThresholdProfile) (err error) RemThresholdProfile(tenant, id, transactionID string) (err error) GetThresholdDrv(string, string) (*Threshold, error) - SetThreshold(*Threshold) error + SetThresholdDrv(*Threshold) error RemoveThreshold(string, string, string) error GetFilterDrv(string, string) (*Filter, error) SetFilterDrv(*Filter) error diff --git a/engine/storage_map.go b/engine/storage_map.go index d135c16f3..e577c8102 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1639,7 +1639,7 @@ func (ms *MapStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err erro return } -func (ms *MapStorage) SetThreshold(r *Threshold) (err error) { +func (ms *MapStorage) SetThresholdDrv(r *Threshold) (err error) { ms.mu.Lock() defer ms.mu.Unlock() result, err := ms.ms.Marshal(r) diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index d713356c0..e09fea169 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -2223,7 +2223,7 @@ func (ms *MongoStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err er return } -func (ms *MongoStorage) SetThreshold(r *Threshold) (err error) { +func (ms *MongoStorage) SetThresholdDrv(r *Threshold) (err error) { session, col := ms.conn(colThs) defer session.Close() _, err = col.Upsert(bson.M{"tenant": r.Tenant, "id": r.ID}, r) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index f1c223582..0aeebd6b1 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1749,7 +1749,7 @@ func (rs *RedisStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err er return } -func (rs *RedisStorage) SetThreshold(r *Threshold) (err error) { +func (rs *RedisStorage) SetThresholdDrv(r *Threshold) (err error) { result, err := rs.ms.Marshal(r) if err != nil { return err diff --git a/engine/thresholds.go b/engine/thresholds.go index aa53b6ca4..6737160b1 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -211,7 +211,7 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) { } guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, utils.ThresholdPrefix+t.TenantID()) defer guardian.Guardian.UnguardIDs(utils.ThresholdPrefix + t.TenantID()) - if err = tS.dm.DataDB().SetThreshold(t); err != nil { + if err = tS.dm.SetThreshold(t); err != nil { utils.Logger.Warning( fmt.Sprintf(" failed saving Threshold with tenant: %s and ID: %s, error: %s", t.Tenant, t.ID, err.Error())) diff --git a/engine/tp_reader.go b/engine/tp_reader.go index eb669ed94..cca681fe1 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -2098,7 +2098,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Thresholds:") } for _, thd := range tpr.thresholds { - if err = tpr.dm.DataDB().SetThreshold(&Threshold{Tenant: thd.Tenant, ID: thd.ID}); err != nil { + if err = tpr.dm.SetThreshold(&Threshold{Tenant: thd.Tenant, ID: thd.ID}); err != nil { return err } if verbose { From ff2f7fad66f0996cd1f195a1497c8458c47f8220 Mon Sep 17 00:00:00 2001 From: TeoV Date: Fri, 13 Oct 2017 16:36:18 +0300 Subject: [PATCH 3/3] Add RemoveThreshold in DataManager --- engine/datamanager.go | 9 +++++++++ engine/onstor_it_test.go | 2 +- engine/storage_interface.go | 2 +- engine/storage_map.go | 3 +-- engine/storage_mongo_datadb.go | 4 +--- engine/storage_redis.go | 3 +-- engine/thresholds.go | 2 +- 7 files changed, 15 insertions(+), 10 deletions(-) diff --git a/engine/datamanager.go b/engine/datamanager.go index 1501fea27..4575c676e 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -137,3 +137,12 @@ func (dm *DataManager) GetThreshold(tenant, id string, skipCache bool, transacti func (dm *DataManager) SetThreshold(th *Threshold) (err error) { return dm.DataDB().SetThresholdDrv(th) } + +func (dm *DataManager) RemoveThreshold(tenant, id, transactionID string) (err error) { + if err = dm.DataDB().RemoveThresholdDrv(tenant, id); err != nil { + return + } + cache.RemKey(utils.ThresholdPrefix+utils.ConcatenatedKey(tenant, id), + cacheCommit(transactionID), transactionID) + return +} diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index bafa8879c..9a42187a1 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -2112,7 +2112,7 @@ func testOnStorITCRUDThreshold(t *testing.T) { } else if !reflect.DeepEqual(res, rcv) { t.Errorf("Expecting: %v, received: %v", res, rcv) } - if err := onStor.DataDB().RemoveThreshold(res.Tenant, res.ID, utils.NonTransactional); err != nil { + if err := onStor.RemoveThreshold(res.Tenant, res.ID, utils.NonTransactional); err != nil { t.Error(err) } if _, rcvErr := onStor.GetThreshold(res.Tenant, res.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 8568ae6c0..675c95fd7 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -125,7 +125,7 @@ type DataDB interface { RemThresholdProfile(tenant, id, transactionID string) (err error) GetThresholdDrv(string, string) (*Threshold, error) SetThresholdDrv(*Threshold) error - RemoveThreshold(string, string, string) error + RemoveThresholdDrv(string, string) error GetFilterDrv(string, string) (*Filter, error) SetFilterDrv(*Filter) error RemoveFilterDrv(string, string) error diff --git a/engine/storage_map.go b/engine/storage_map.go index e577c8102..fc318124d 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1650,12 +1650,11 @@ func (ms *MapStorage) SetThresholdDrv(r *Threshold) (err error) { return } -func (ms *MapStorage) RemoveThreshold(tenant, id string, transactionID string) (err error) { +func (ms *MapStorage) RemoveThresholdDrv(tenant, id string) (err error) { ms.mu.Lock() defer ms.mu.Unlock() key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id) delete(ms.dict, key) - cache.RemKey(key, cacheCommit(transactionID), transactionID) return } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index e09fea169..119dfc6f7 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -2230,14 +2230,12 @@ func (ms *MongoStorage) SetThresholdDrv(r *Threshold) (err error) { return } -func (ms *MongoStorage) RemoveThreshold(tenant, id string, transactionID string) (err error) { +func (ms *MongoStorage) RemoveThresholdDrv(tenant, id string) (err error) { session, col := ms.conn(colThs) defer session.Close() if err = col.Remove(bson.M{"tenant": tenant, "id": id}); err != nil { return } - cache.RemKey(utils.ThresholdPrefix+utils.ConcatenatedKey(tenant, id), - cacheCommit(transactionID), transactionID) return nil } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 0aeebd6b1..2b3216c65 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1757,12 +1757,11 @@ func (rs *RedisStorage) SetThresholdDrv(r *Threshold) (err error) { return rs.Cmd("SET", utils.ThresholdPrefix+utils.ConcatenatedKey(r.Tenant, r.ID), result).Err } -func (rs *RedisStorage) RemoveThreshold(tenant, id string, transactionID string) (err error) { +func (rs *RedisStorage) RemoveThresholdDrv(tenant, id string) (err error) { key := utils.ThresholdPrefix + utils.ConcatenatedKey(tenant, id) if err = rs.Cmd("DEL", key).Err; err != nil { return } - cache.RemKey(key, cacheCommit(transactionID), transactionID) return } diff --git a/engine/thresholds.go b/engine/thresholds.go index 6737160b1..3a25b7c01 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -307,7 +307,7 @@ func (tS *ThresholdService) processEvent(ev *ThresholdEvent) (hits int, err erro if t.dirty == nil { // one time threshold lockThreshold := utils.ThresholdPrefix + t.TenantID() guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockThreshold) - if err = tS.dm.DataDB().RemoveThreshold(t.Tenant, t.ID, utils.NonTransactional); err != nil { + if err = tS.dm.RemoveThreshold(t.Tenant, t.ID, utils.NonTransactional); err != nil { utils.Logger.Warning( fmt.Sprintf(" failed removing non-recurrent threshold: %s, error: %s", t.TenantID(), err.Error()))