diff --git a/engine/reqfilterindexer.go b/engine/reqfilterindexer.go index 36bcdb8fe..7b295c467 100644 --- a/engine/reqfilterindexer.go +++ b/engine/reqfilterindexer.go @@ -21,7 +21,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewReqFilterIndexer(dataDB AccountingStorage, dbKey string, overwriteDB bool) (*ReqFilterIndexer, error) { +func NewReqFilterIndexer(dataDB AccountingStorage, dbKey string) (*ReqFilterIndexer, error) { indexes, err := dataDB.GetReqFilterIndexes(dbKey) if err != nil && err != utils.ErrNotFound { return nil, err @@ -29,7 +29,7 @@ func NewReqFilterIndexer(dataDB AccountingStorage, dbKey string, overwriteDB boo if indexes == nil { indexes = make(map[string]map[string]utils.StringMap) } - return &ReqFilterIndexer{dataDB: dataDB, indexes: indexes, chngdIndxKeys: make(utils.StringMap)}, nil + return &ReqFilterIndexer{dataDB: dataDB, dbKey: dbKey, indexes: indexes, chngdIndxKeys: make(utils.StringMap)}, nil } // ReqFilterIndexer is a centralized indexer for all data sources using RequestFilter diff --git a/engine/reslimiter.go b/engine/reslimiter.go index afdf9dbd9..1426c7f29 100644 --- a/engine/reslimiter.go +++ b/engine/reslimiter.go @@ -91,126 +91,42 @@ func NewResourceLimiterService(cfg *config.CGRConfig, dataDB AccountingStorage, if cdrStatS != nil && reflect.ValueOf(cdrStatS).IsNil() { cdrStatS = nil } - rls := &ResourceLimiterService{stringIndexes: make(map[string]map[string]utils.StringMap), dataDB: dataDB, cdrStatS: cdrStatS} + rls := &ResourceLimiterService{dataDB: dataDB, cdrStatS: cdrStatS} return rls, nil } // ResourcesLimiter is the service handling channel limits type ResourceLimiterService struct { sync.RWMutex - stringIndexes map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue]utils.StringMap[resourceID] - dataDB AccountingStorage // So we can load the data in cache and index it - cdrStatS rpcclient.RpcClientConnection -} - -// Index cached ResourceLimits with MetaString filter types -func (rls *ResourceLimiterService) indexStringFilters(rlIDs []string) error { - utils.Logger.Info(" Start indexing string filters") - newStringIndexes := make(map[string]map[string]utils.StringMap) // Index it transactional - var cacheIDsToIndex []string // Cache keys of RLs to be indexed - if rlIDs == nil { - cacheIDsToIndex = cache.GetEntryKeys(utils.ResourceLimitsPrefix) - } else { - for _, rlID := range rlIDs { - cacheIDsToIndex = append(cacheIDsToIndex, utils.ResourceLimitsPrefix+rlID) - } - } - for _, cacheKey := range cacheIDsToIndex { - x, ok := cache.Get(cacheKey) - if !ok { - return utils.ErrNotFound - } - rl := x.(*ResourceLimit) - var hasMetaString bool - for _, fltr := range rl.Filters { - if fltr.Type != MetaString { - continue - } - hasMetaString = true // Mark that we found at least one metatring so we don't need to index globally - if _, hastIt := newStringIndexes[fltr.FieldName]; !hastIt { - newStringIndexes[fltr.FieldName] = make(map[string]utils.StringMap) - } - for _, fldVal := range fltr.Values { - if _, hasIt := newStringIndexes[fltr.FieldName][fldVal]; !hasIt { - newStringIndexes[fltr.FieldName][fldVal] = make(utils.StringMap) - } - newStringIndexes[fltr.FieldName][fldVal][rl.ID] = true - } - } - if !hasMetaString { - if _, hasIt := newStringIndexes[utils.NOT_AVAILABLE]; !hasIt { - newStringIndexes[utils.NOT_AVAILABLE] = make(map[string]utils.StringMap) - } - if _, hasIt := newStringIndexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE]; !hasIt { - newStringIndexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE] = make(utils.StringMap) - } - newStringIndexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE][rl.ID] = true // Fields without real field index will be located in map[NOT_AVAILABLE][NOT_AVAILABLE][rl.ID] - } - } - rls.Lock() - defer rls.Unlock() - if rlIDs == nil { // We have rebuilt complete index - rls.stringIndexes = newStringIndexes - return nil - } - // Merge the indexes since we have only performed limited indexing - for fldNameKey, mpFldName := range newStringIndexes { - if _, hasIt := rls.stringIndexes[fldNameKey]; !hasIt { - rls.stringIndexes[fldNameKey] = mpFldName - } else { - for fldValKey, strMap := range newStringIndexes[fldNameKey] { - if _, hasIt := rls.stringIndexes[fldNameKey][fldValKey]; !hasIt { - rls.stringIndexes[fldNameKey][fldValKey] = strMap - } else { - for resIDKey := range newStringIndexes[fldNameKey][fldValKey] { - rls.stringIndexes[fldNameKey][fldValKey][resIDKey] = true - } - } - } - } - } - utils.Logger.Info(" Done indexing string filters") - return nil -} - -// Called when cache/re-caching is necessary -func (rls *ResourceLimiterService) cacheResourceLimits(loadID string, rlIDs []string) error { - if rlIDs == nil { - utils.Logger.Info(" Start caching all resource limits") - } else if len(rlIDs) == 0 { - return nil - } else { - utils.Logger.Info(fmt.Sprintf(" Start caching resource limits with ids: %+v", rlIDs)) - } - if err := rls.dataDB.PreloadCacheForPrefix(utils.ResourceLimitsPrefix); err != nil { - return err - } - utils.Logger.Info(" Done caching resource limits") - return rls.indexStringFilters(rlIDs) + dataDB AccountingStorage // So we can load the data in cache and index it + cdrStatS rpcclient.RpcClientConnection } func (rls *ResourceLimiterService) matchingResourceLimitsForEvent(ev map[string]interface{}) (map[string]*ResourceLimit, error) { matchingResources := make(map[string]*ResourceLimit) for fldName, fieldValIf := range ev { - if _, hasIt := rls.stringIndexes[fldName]; !hasIt { - continue - } - strVal, canCast := utils.CastFieldIfToString(fieldValIf) + fldVal, canCast := utils.CastFieldIfToString(fieldValIf) if !canCast { return nil, fmt.Errorf("Cannot cast field: %s into string", fldName) } - if _, hasIt := rls.stringIndexes[fldName][strVal]; !hasIt { - continue + rlIDs, err := rls.dataDB.MatchReqFilterIndex(utils.ResourceLimitsIndex, utils.ConcatenatedKey(fldName, fldVal)) + if err != nil { + if err == utils.ErrNotFound { + continue + } + return nil, err } - for resName := range rls.stringIndexes[fldName][strVal] { + for resName := range rlIDs { if _, hasIt := matchingResources[resName]; hasIt { // Already checked this RL continue } - x, ok := cache.Get(utils.ResourceLimitsPrefix + resName) - if !ok { - return nil, utils.ErrNotFound + rl, err := rls.dataDB.GetResourceLimit(resName, false, utils.NonTransactional) + if err != nil { + if err == utils.ErrNotFound { + continue + } + return nil, err } - rl := x.(*ResourceLimit) now := time.Now() if rl.ActivationTime.After(now) || (!rl.ExpiryTime.IsZero() && rl.ExpiryTime.Before(now)) { // not active continue @@ -230,15 +146,21 @@ func (rls *ResourceLimiterService) matchingResourceLimitsForEvent(ev map[string] } } // Check un-indexed resources - for resName := range rls.stringIndexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE] { + uIdxRLIDs, err := rls.dataDB.MatchReqFilterIndex(utils.ResourceLimitsIndex, utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE)) + if err != nil && err != utils.ErrNotFound { + return nil, err + } + for resName := range uIdxRLIDs { if _, hasIt := matchingResources[resName]; hasIt { // Already checked this RL continue } - x, ok := cache.Get(utils.ResourceLimitsPrefix + resName) - if !ok { - return nil, utils.ErrNotFound + rl, err := rls.dataDB.GetResourceLimit(resName, false, utils.NonTransactional) + if err != nil { + if err == utils.ErrNotFound { + continue + } + return nil, err } - rl := x.(*ResourceLimit) now := time.Now() if rl.ActivationTime.After(now) || (!rl.ExpiryTime.IsZero() && rl.ExpiryTime.Before(now)) { // not active continue @@ -257,9 +179,6 @@ func (rls *ResourceLimiterService) matchingResourceLimitsForEvent(ev map[string] // Called to start the service func (rls *ResourceLimiterService) ListenAndServe() error { - if err := rls.cacheResourceLimits("ResourceLimiterServiceStart", nil); err != nil { - return err - } return nil } @@ -270,20 +189,6 @@ func (rls *ResourceLimiterService) ServiceShutdown() error { // RPC Methods -// Cache/Re-cache -func (rls *ResourceLimiterService) V1CacheResourceLimits(attrs *utils.AttrRLsCache, reply *string) error { - if err := rls.cacheResourceLimits(attrs.LoadID, attrs.ResourceLimitIDs); err != nil { - return err - } - *reply = utils.OK - return nil -} - -// Alias API for external usage -func (rls *ResourceLimiterService) CacheResourceLimits(attrs *utils.AttrRLsCache, reply *string) error { - return rls.V1CacheResourceLimits(attrs, reply) -} - func (rls *ResourceLimiterService) V1ResourceLimitsForEvent(ev map[string]interface{}, reply *[]*ResourceLimit) error { rls.Lock() // Unknown number of RLs updated defer rls.Unlock() diff --git a/engine/reslimiter_test.go b/engine/reslimiter_test.go index 78625e67f..c1f810a9f 100644 --- a/engine/reslimiter_test.go +++ b/engine/reslimiter_test.go @@ -18,16 +18,15 @@ along with this program. If not, see package engine import ( - "reflect" "testing" "time" - "github.com/cgrates/cgrates/cache" "github.com/cgrates/cgrates/utils" ) var rLS *ResourceLimiterService +/* func TestRLsIndexStringFilters(t *testing.T) { rls := []*ResourceLimit{ &ResourceLimit{ @@ -171,8 +170,95 @@ func TestRLsIndexStringFilters(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", eIndexes, rLS.stringIndexes) } } +*/ + +func TestRLsLoadRLs(t *testing.T) { + rls := []*ResourceLimit{ + &ResourceLimit{ + ID: "RL1", + Weight: 20, + Filters: []*RequestFilter{ + &RequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}}, + &RequestFilter{Type: MetaRSRFields, Values: []string{"Subject(~^1.*1$)", "Destination(1002)"}, + rsrFields: utils.ParseRSRFieldsMustCompile("Subject(~^1.*1$);Destination(1002)", utils.INFIELD_SEP), + }}, + ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), + Limit: 2, + Usage: make(map[string]*ResourceUsage), + }, + &ResourceLimit{ + ID: "RL2", + Weight: 10, + Filters: []*RequestFilter{ + &RequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"dan", "1002"}}, + &RequestFilter{Type: MetaString, FieldName: "Subject", Values: []string{"dan"}}, + }, + ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), + Limit: 1, + UsageTTL: time.Duration(1 * time.Millisecond), + Usage: make(map[string]*ResourceUsage), + }, + &ResourceLimit{ + ID: "RL3", + Weight: 10, + Filters: []*RequestFilter{ + &RequestFilter{Type: MetaString, FieldName: "Subject", Values: []string{"dan"}}, + &RequestFilter{Type: MetaString, FieldName: "Subject", Values: []string{"1003"}}, + }, + ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), + Limit: 1, + Usage: make(map[string]*ResourceUsage), + }, + &ResourceLimit{ + ID: "RL4", + Weight: 10, + Filters: []*RequestFilter{ + &RequestFilter{Type: MetaStringPrefix, FieldName: "Destination", Values: []string{"+49"}}, + }, + ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), + Limit: 1, + Usage: make(map[string]*ResourceUsage), + }, + &ResourceLimit{ + ID: "RL5", + Weight: 10, + Filters: []*RequestFilter{ + &RequestFilter{Type: MetaStringPrefix, FieldName: "Destination", Values: []string{"+40"}}, + }, + ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), + Limit: 1, + UsageTTL: time.Duration(10 * time.Millisecond), + Usage: make(map[string]*ResourceUsage), + }, + &ResourceLimit{ // Add it so we can test expiryTime + ID: "RL6", + Weight: 10, + Filters: []*RequestFilter{ + &RequestFilter{Type: MetaString, FieldName: "Subject", Values: []string{"dan"}}, + }, + ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), + ExpiryTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), + Limit: 1, + Usage: make(map[string]*ResourceUsage), + }, + } + rlIdxr, err := NewReqFilterIndexer(accountingStorage, utils.ResourceLimitsIndex) + if err != nil { + t.Error(err) + } + for _, rl := range rls { + if err := accountingStorage.SetResourceLimit(rl, utils.NonTransactional); err != nil { + t.Error(err) + } + rlIdxr.IndexFilters(rl.ID, rl.Filters) + } + if err := rlIdxr.StoreIndexes(); err != nil { + t.Error(err) + } +} func TestRLsMatchingResourceLimitsForEvent(t *testing.T) { + rLS = &ResourceLimiterService{dataDB: accountingStorage, cdrStatS: nil} eResLimits := map[string]*ResourceLimit{ "RL1": &ResourceLimit{ ID: "RL1", diff --git a/engine/storage_map.go b/engine/storage_map.go index b725a2949..22cb58c55 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1153,23 +1153,101 @@ func (ms *MapStorage) GetStructVersion() (rsv *StructVersion, err error) { return } -func (ms *MapStorage) GetResourceLimit(id string, skipCache bool, transactionID string) (*ResourceLimit, error) { - return nil, nil +func (ms *MapStorage) GetResourceLimit(id string, skipCache bool, transactionID string) (rl *ResourceLimit, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + key := utils.ResourceLimitsPrefix + id + if !skipCache { + if x, ok := cache.Get(key); ok { + if x != nil { + return x.(*ResourceLimit), nil + } + return nil, utils.ErrNotFound + } + } + values, ok := ms.dict[key] + if !ok { + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + return nil, utils.ErrNotFound + } + err = ms.ms.Unmarshal(values, &rl) + if err != nil { + return nil, err + } + for _, fltr := range rl.Filters { + if err := fltr.CompileValues(); err != nil { + return nil, err + } + } + cache.Set(key, rl, cacheCommit(transactionID), transactionID) + return } func (ms *MapStorage) SetResourceLimit(rl *ResourceLimit, transactionID string) error { + ms.mu.Lock() + defer ms.mu.Unlock() + result, err := ms.ms.Marshal(rl) + if err != nil { + return err + } + ms.dict[utils.ResourceLimitsPrefix+rl.ID] = result return nil } func (ms *MapStorage) RemoveResourceLimit(id string, transactionID string) error { + ms.mu.Lock() + defer ms.mu.Unlock() + key := utils.ResourceLimitsPrefix + id + delete(ms.dict, key) + cache.RemKey(key, cacheCommit(transactionID), transactionID) return nil } func (ms *MapStorage) GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + values, ok := ms.dict[dbKey] + if !ok { + return nil, utils.ErrNotFound + } + err = ms.ms.Unmarshal(values, &indexes) + if err != nil { + return nil, err + } return } func (ms *MapStorage) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() + result, err := ms.ms.Marshal(indexes) + if err != nil { + return err + } + ms.dict[dbKey] = result return } func (ms *MapStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + if x, ok := cache.Get(dbKey + fieldValKey); ok { // Attempt to find in cache first + if x != nil { + return x.(utils.StringMap), nil + } + return nil, utils.ErrNotFound + } + // Not found in cache, check in DB + values, ok := ms.dict[dbKey] + if !ok { + cache.Set(dbKey+fieldValKey, nil, true, utils.NonTransactional) + return nil, utils.ErrNotFound + } + var indexes map[string]map[string]utils.StringMap + if err = ms.ms.Unmarshal(values, &indexes); err != nil { + return nil, err + } + keySplt := strings.Split(fieldValKey, ":") + if _, hasIt := indexes[keySplt[0]]; hasIt { + itemIDs = indexes[keySplt[0]][keySplt[1]] + } + cache.Set(dbKey+fieldValKey, itemIDs, true, utils.NonTransactional) return } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 4687f8692..852eac14d 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1183,8 +1183,8 @@ func (rs *RedisStorage) GetResourceLimit(id string, skipCache bool, transactionI return nil, err } } - cache.Set(key, rl, cacheCommit(transactionID), transactionID) } + cache.Set(key, rl, cacheCommit(transactionID), transactionID) return } func (rs *RedisStorage) SetResourceLimit(rl *ResourceLimit, transactionID string) error { @@ -1192,10 +1192,7 @@ func (rs *RedisStorage) SetResourceLimit(rl *ResourceLimit, transactionID string if err != nil { return err } - key := utils.ResourceLimitsPrefix + rl.ID - err = rs.Cmd("SET", key, result).Err - cache.Set(key, rl, cacheCommit(transactionID), transactionID) - return err + return rs.Cmd("SET", utils.ResourceLimitsPrefix+rl.ID, result).Err } func (rs *RedisStorage) RemoveResourceLimit(id string, transactionID string) error { key := utils.ResourceLimitsPrefix + id @@ -1252,14 +1249,14 @@ func (rs *RedisStorage) SetReqFilterIndexes(dbKey string, indexes map[string]map } func (rs *RedisStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error) { - if x, ok := cache.Get(fieldValKey); ok { // Attempt to find in cache first + if x, ok := cache.Get(dbKey + fieldValKey); ok { // Attempt to find in cache first if x != nil { return x.(utils.StringMap), nil } return nil, utils.ErrNotFound } // Not found in cache, check in DB - str, err := rs.Cmd("HGET", dbKey, fieldValKey).Str() + fldValBytes, err := rs.Cmd("HGET", dbKey, fieldValKey).Bytes() if err != nil { if err.Error() != "wrong type" { return nil, err @@ -1267,11 +1264,9 @@ func (rs *RedisStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs // Case when str is not found err = utils.ErrNotFound } - if str != "" { - if err = rs.ms.Unmarshal([]byte(str), &itemIDs); err != nil { - return - } + if err = rs.ms.Unmarshal(fldValBytes, &itemIDs); err != nil { + return } - cache.Set(dbKey, itemIDs, true, utils.NonTransactional) + cache.Set(dbKey+fieldValKey, itemIDs, true, utils.NonTransactional) return }