From 3c896d6181396cd3bfccc4b355d040dea947c166 Mon Sep 17 00:00:00 2001 From: TeoV Date: Tue, 17 Oct 2017 17:47:21 +0300 Subject: [PATCH] Add Get/Set/Remove Resource in DataManager --- engine/datamanager.go | 36 +++++++++++++++++++++++++++++++++- engine/onstor_it_test.go | 14 ++++++------- engine/resources.go | 6 +++--- engine/storage_interface.go | 6 +++--- engine/storage_map.go | 19 ++++-------------- engine/storage_mongo_datadb.go | 22 ++++----------------- engine/storage_redis.go | 17 +++------------- engine/tp_reader.go | 2 +- 8 files changed, 60 insertions(+), 62 deletions(-) diff --git a/engine/datamanager.go b/engine/datamanager.go index ecfe428a5..efbb5c352 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -191,7 +191,7 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b _, err = dm.DataDB().GetResourceProfile(tntID.Tenant, tntID.ID, true, utils.NonTransactional) case utils.ResourcesPrefix: tntID := utils.NewTenantID(dataID) - _, err = dm.DataDB().GetResource(tntID.Tenant, tntID.ID, true, utils.NonTransactional) + _, err = dm.GetResource(tntID.Tenant, tntID.ID, true, utils.NonTransactional) case utils.TimingsPrefix: _, err = dm.GetTiming(dataID, true, utils.NonTransactional) case utils.ThresholdProfilePrefix: @@ -426,3 +426,37 @@ func (dm *DataManager) RemoveTiming(id, transactionID string) (err error) { cache.RemKey(utils.TimingsPrefix+id, cacheCommit(transactionID), transactionID) return } + +func (dm *DataManager) GetResource(tenant, id string, skipCache bool, transactionID string) (rs *Resource, err error) { + key := utils.ResourcesPrefix + utils.ConcatenatedKey(tenant, id) + if !skipCache { + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*Resource), nil + } + } + rs, err = dm.dataDB.GetResourceDrv(tenant, id) + if err != nil { + if err == utils.ErrNotFound { + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + } + return nil, err + } + cache.Set(key, rs, cacheCommit(transactionID), transactionID) + return +} + +func (dm *DataManager) SetResource(rs *Resource) (err error) { + return dm.DataDB().SetResourceDrv(rs) +} + +func (dm *DataManager) RemoveResource(tenant, id, transactionID string) (err error) { + if err = dm.DataDB().RemoveResourceDrv(tenant, id); err != nil { + return + } + cache.RemKey(utils.ResourcesPrefix+utils.ConcatenatedKey(tenant, id), + cacheCommit(transactionID), transactionID) + return +} diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 8a3545881..5649b06dd 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -877,7 +877,7 @@ func testOnStorITCacheResource(t *testing.T) { }, TTLIdx: []string{"RU1"}, } - if err := onStor.DataDB().SetResource(res); err != nil { + if err := onStor.SetResource(res); err != nil { t.Error(err) } expectedT := []string{"res_cgrates.org:RL1"} @@ -1860,26 +1860,26 @@ func testOnStorITCRUDResource(t *testing.T) { }, TTLIdx: []string{"RU1"}, } - if _, rcvErr := onStor.DataDB().GetResource("cgrates.org", "RL1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetResource("cgrates.org", "RL1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound { t.Error(rcvErr) } - if err := onStor.DataDB().SetResource(res); err != nil { + if err := onStor.SetResource(res); err != nil { t.Error(err) } - if rcv, err := onStor.DataDB().GetResource("cgrates.org", "RL1", true, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetResource("cgrates.org", "RL1", true, utils.NonTransactional); err != nil { t.Error(err) } else if !(reflect.DeepEqual(res, rcv)) { t.Errorf("Expecting: %v, received: %v", res, rcv) } - if rcv, err := onStor.DataDB().GetResource("cgrates.org", "RL1", false, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetResource("cgrates.org", "RL1", false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(res, rcv) { t.Errorf("Expecting: %v, received: %v", res, rcv) } - if err := onStor.DataDB().RemoveResource(res.Tenant, res.ID, utils.NonTransactional); err != nil { + if err := onStor.RemoveResource(res.Tenant, res.ID, utils.NonTransactional); err != nil { t.Error(err) } - if _, rcvErr := onStor.DataDB().GetResource(res.Tenant, res.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetResource(res.Tenant, res.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } } diff --git a/engine/resources.go b/engine/resources.go index e0b577bc4..d19bfddc8 100755 --- a/engine/resources.go +++ b/engine/resources.go @@ -320,7 +320,7 @@ func (rS *ResourceService) StoreResource(r *Resource) (err error) { if r.dirty == nil || !*r.dirty { return } - if err = rS.dm.DataDB().SetResource(r); err != nil { + if err = rS.dm.SetResource(r); err != nil { utils.Logger.Warning( fmt.Sprintf(" failed saving Resource with ID: %s, error: %s", r.ID, err.Error())) @@ -404,7 +404,7 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...) defer guardian.Guardian.UnguardIDs(lockIDs...) for i, rTid := range rIDs { - if r, err := rS.dm.DataDB().GetResource(rTid.Tenant, rTid.ID, false, ""); err != nil { + if r, err := rS.dm.GetResource(rTid.Tenant, rTid.ID, false, ""); err != nil { utils.Logger.Warning( fmt.Sprintf(" force-uncaching resources for evUUID: <%s>, error: <%s>", evUUID, err.Error())) @@ -458,7 +458,7 @@ func (rS *ResourceService) matchingResourcesForEvent(tenant string, ev map[strin if !passAllFilters { continue } - r, err := rS.dm.DataDB().GetResource(rPrf.Tenant, rPrf.ID, false, "") + r, err := rS.dm.GetResource(rPrf.Tenant, rPrf.ID, false, "") if err != nil { return nil, err } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 99552b5b4..5c6a3d66c 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -102,9 +102,9 @@ type DataDB interface { GetResourceProfile(string, string, bool, string) (*ResourceProfile, error) SetResourceProfile(*ResourceProfile) error RemoveResourceProfile(string, string, string) error - GetResource(string, string, bool, string) (*Resource, error) - SetResource(*Resource) error - RemoveResource(string, string, string) error + GetResourceDrv(string, string) (*Resource, error) + SetResourceDrv(*Resource) error + RemoveResourceDrv(string, string) error GetTimingDrv(string) (*utils.TPTiming, error) SetTimingDrv(*utils.TPTiming) error RemoveTimingDrv(string) error diff --git a/engine/storage_map.go b/engine/storage_map.go index 071168b14..757b1d846 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1189,32 +1189,22 @@ func (ms *MapStorage) RemoveResourceProfile(tenant, id string, transactionID str return nil } -func (ms *MapStorage) GetResource(tenant, id string, skipCache bool, transactionID string) (r *Resource, err error) { +func (ms *MapStorage) GetResourceDrv(tenant, id string) (r *Resource, err error) { ms.mu.RLock() defer ms.mu.RUnlock() key := utils.ResourcesPrefix + utils.ConcatenatedKey(tenant, id) - if !skipCache { - if x, ok := cache.Get(key); ok { - if x != nil { - return x.(*Resource), nil - } - return nil, utils.ErrNotFound - } - } values, ok := ms.dict[key] if !ok { - cache.Set(key, nil, cacheCommit(transactionID), transactionID) return nil, utils.ErrNotFound } - err = ms.ms.Unmarshal(values, r) + err = ms.ms.Unmarshal(values, &r) if err != nil { return nil, err } - cache.Set(key, r, cacheCommit(transactionID), transactionID) return } -func (ms *MapStorage) SetResource(r *Resource) (err error) { +func (ms *MapStorage) SetResourceDrv(r *Resource) (err error) { ms.mu.Lock() defer ms.mu.Unlock() result, err := ms.ms.Marshal(r) @@ -1225,12 +1215,11 @@ func (ms *MapStorage) SetResource(r *Resource) (err error) { return } -func (ms *MapStorage) RemoveResource(tenant, id string, transactionID string) (err error) { +func (ms *MapStorage) RemoveResourceDrv(tenant, id string) (err error) { ms.mu.Lock() defer ms.mu.Unlock() key := utils.ResourcesPrefix + utils.ConcatenatedKey(tenant, id) delete(ms.dict, key) - cache.RemKey(key, cacheCommit(transactionID), transactionID) return } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index bde9f24c0..7f3c6fe9f 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1800,45 +1800,31 @@ func (ms *MongoStorage) RemoveResourceProfile(tenant, id string, transactionID s return nil } -func (ms *MongoStorage) GetResource(tenant, id string, skipCache bool, transactionID string) (r *Resource, err error) { - key := utils.ResourcesPrefix + utils.ConcatenatedKey(tenant, id) - if !skipCache { - if x, ok := cache.Get(key); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.(*Resource), nil - } - } +func (ms *MongoStorage) GetResourceDrv(tenant, id string) (r *Resource, err error) { session, col := ms.conn(colRes) defer session.Close() - r = new(Resource) - if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(r); err != nil { + if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(&r); err != nil { if err == mgo.ErrNotFound { err = utils.ErrNotFound - cache.Set(key, nil, cacheCommit(transactionID), transactionID) } return nil, err } - cache.Set(key, r, cacheCommit(transactionID), transactionID) return } -func (ms *MongoStorage) SetResource(r *Resource) (err error) { +func (ms *MongoStorage) SetResourceDrv(r *Resource) (err error) { session, col := ms.conn(colRes) defer session.Close() _, err = col.Upsert(bson.M{"tenant": r.Tenant, "id": r.ID}, r) return } -func (ms *MongoStorage) RemoveResource(tenant, id string, transactionID string) (err error) { +func (ms *MongoStorage) RemoveResourceDrv(tenant, id string) (err error) { session, col := ms.conn(colRes) defer session.Close() if err = col.Remove(bson.M{"tenant": tenant, "id": id}); err != nil { return } - cache.RemKey(utils.ResourcesPrefix+utils.ConcatenatedKey(tenant, id), - cacheCommit(transactionID), transactionID) return nil } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 3c8134c37..6f6a4f8b7 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1284,20 +1284,11 @@ func (rs *RedisStorage) RemoveResourceProfile(tenant, id string, transactionID s return } -func (rs *RedisStorage) GetResource(tenant, id string, skipCache bool, transactionID string) (r *Resource, err error) { +func (rs *RedisStorage) GetResourceDrv(tenant, id string) (r *Resource, err error) { key := utils.ResourcesPrefix + utils.ConcatenatedKey(tenant, id) - if !skipCache { - if x, ok := cache.Get(key); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.(*Resource), nil - } - } var values []byte if values, err = rs.Cmd("GET", key).Bytes(); err != nil { if err == redis.ErrRespNil { // did not find the destination - cache.Set(key, nil, cacheCommit(transactionID), transactionID) err = utils.ErrNotFound } return @@ -1305,11 +1296,10 @@ func (rs *RedisStorage) GetResource(tenant, id string, skipCache bool, transacti if err = rs.ms.Unmarshal(values, &r); err != nil { return } - cache.Set(key, r, cacheCommit(transactionID), transactionID) return } -func (rs *RedisStorage) SetResource(r *Resource) (err error) { +func (rs *RedisStorage) SetResourceDrv(r *Resource) (err error) { result, err := rs.ms.Marshal(r) if err != nil { return err @@ -1317,12 +1307,11 @@ func (rs *RedisStorage) SetResource(r *Resource) (err error) { return rs.Cmd("SET", utils.ResourcesPrefix+r.TenantID(), result).Err } -func (rs *RedisStorage) RemoveResource(tenant, id string, transactionID string) (err error) { +func (rs *RedisStorage) RemoveResourceDrv(tenant, id string) (err error) { key := utils.ResourcesPrefix + utils.ConcatenatedKey(tenant, id) if err = rs.Cmd("DEL", key).Err; err != nil { return } - cache.RemKey(key, cacheCommit(transactionID), transactionID) return } diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 552c8c767..5c73d40aa 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -2033,7 +2033,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Resources:") } for _, rTid := range tpr.resources { - if err = tpr.dm.DataDB().SetResource(&Resource{Tenant: rTid.Tenant, ID: rTid.ID, Usages: make(map[string]*ResourceUsage)}); err != nil { + if err = tpr.dm.SetResource(&Resource{Tenant: rTid.Tenant, ID: rTid.ID, Usages: make(map[string]*ResourceUsage)}); err != nil { return } if verbose {