mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add GetFilter in Datamanager
This commit is contained in:
committed by
Dan Christian Bogos
parent
073a353049
commit
7304b80fb9
@@ -40,7 +40,7 @@ func (self *ApierV1) GetFilter(arg utils.TenantID, reply *engine.Filter) error {
|
||||
if missing := utils.MissingStructFields(&arg, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
if fltr, err := self.DataManager.DataDB().GetFilter(arg.Tenant, arg.ID, true, utils.NonTransactional); err != nil {
|
||||
if fltr, err := self.DataManager.GetFilter(arg.Tenant, arg.ID, true, utils.NonTransactional); err != nil {
|
||||
if err.Error() != utils.ErrNotFound.Error() {
|
||||
err = utils.NewErrServerError(err)
|
||||
}
|
||||
|
||||
@@ -78,3 +78,24 @@ func (dm *DataManager) RemStatQueue(tenant, id string, transactionID string) (er
|
||||
cache.RemKey(utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id), cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (dm *DataManager) GetFilter(tenant, id string, skipCache bool, transactionID string) (fltr *Filter, err error) {
|
||||
key := utils.FilterPrefix + utils.ConcatenatedKey(tenant, id)
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*Filter), nil
|
||||
}
|
||||
}
|
||||
fltr, err = dm.dataDB.GetFilterDrv(tenant, id)
|
||||
if err != nil {
|
||||
if err == utils.ErrNotFound {
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
cache.Set(key, fltr, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -126,7 +126,7 @@ type DataDB interface {
|
||||
GetThreshold(string, string, bool, string) (*Threshold, error)
|
||||
SetThreshold(*Threshold) error
|
||||
RemoveThreshold(string, string, string) error
|
||||
GetFilter(string, string, bool, string) (*Filter, error)
|
||||
GetFilterDrv(string, string) (*Filter, error)
|
||||
SetFilter(*Filter) error
|
||||
RemoveFilter(string, string, string) error
|
||||
// CacheDataFromDB loads data to cache, prefix represents the cache prefix, IDs should be nil if all available data should be loaded
|
||||
|
||||
@@ -1669,28 +1669,14 @@ func (ms *MapStorage) RemoveThreshold(tenant, id string, transactionID string) (
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetFilter(tenant, id string, skipCache bool, transactionID string) (r *Filter, err error) {
|
||||
func (ms *MapStorage) GetFilterDrv(tenant, id string) (r *Filter, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
key := utils.FilterPrefix + utils.ConcatenatedKey(tenant, id)
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x != nil {
|
||||
return x.(*Filter), nil
|
||||
}
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
}
|
||||
values, ok := ms.dict[key]
|
||||
values, ok := ms.dict[utils.FilterPrefix+utils.ConcatenatedKey(tenant, id)]
|
||||
if !ok {
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
err = ms.ms.Unmarshal(values, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cache.Set(key, r, cacheCommit(transactionID), transactionID)
|
||||
err = ms.ms.Unmarshal(values, &r)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -546,7 +546,7 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
_, err = ms.GetThreshold(tntID.Tenant, tntID.ID, true, utils.NonTransactional)
|
||||
case utils.FilterPrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = ms.GetFilter(tntID.Tenant, tntID.ID, true, utils.NonTransactional)
|
||||
_, err = ms.GetFilterDrv(tntID.Tenant, tntID.ID)
|
||||
}
|
||||
if err != nil {
|
||||
return utils.NewCGRError(utils.MONGO,
|
||||
@@ -2253,27 +2253,15 @@ func (ms *MongoStorage) RemoveThreshold(tenant, id string, transactionID string)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetFilter(tenant, id string, skipCache bool, transactionID string) (r *Filter, err error) {
|
||||
key := utils.FilterPrefix + utils.ConcatenatedKey(tenant, id)
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*Filter), nil
|
||||
}
|
||||
}
|
||||
func (ms *MongoStorage) GetFilterDrv(tenant, id string) (r *Filter, err error) {
|
||||
session, col := ms.conn(colFlt)
|
||||
defer session.Close()
|
||||
r = new(Filter)
|
||||
if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(r); err != nil {
|
||||
if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(&r); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
err = utils.ErrNotFound
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
cache.Set(key, r, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -1776,20 +1776,11 @@ func (rs *RedisStorage) RemoveThreshold(tenant, id string, transactionID string)
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetFilter(tenant, id string, skipCache bool, transactionID string) (r *Filter, err error) {
|
||||
func (rs *RedisStorage) GetFilterDrv(tenant, id string) (r *Filter, err error) {
|
||||
key := utils.FilterPrefix + utils.ConcatenatedKey(tenant, id)
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*Filter), nil
|
||||
}
|
||||
}
|
||||
var values []byte
|
||||
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
|
||||
if err == redis.ErrRespNil { // did not find the destination
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
err = utils.ErrNotFound
|
||||
}
|
||||
return
|
||||
@@ -1797,7 +1788,6 @@ func (rs *RedisStorage) GetFilter(tenant, id string, skipCache bool, transaction
|
||||
if err = rs.ms.Unmarshal(values, &r); err != nil {
|
||||
return
|
||||
}
|
||||
cache.Set(key, r, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -2247,7 +2247,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
|
||||
log.Print("Indexing Filters")
|
||||
}
|
||||
for tenant, mpID := range tpr.flProfiles {
|
||||
stIdxr, err := NewReqFilterIndexer(tpr.dm, utils.ThresholdsIndex+tenant)
|
||||
stIdxr, err := NewReqFilterIndexer(tpr.dm, utils.FilterIndex+tenant)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user