ResourceLimiter with limits matched via db driver, MapStorage implementing RL methods

This commit is contained in:
DanB
2016-10-09 13:41:56 +02:00
parent f6ac392439
commit 70a7d2ae75
5 changed files with 205 additions and 141 deletions

View File

@@ -21,7 +21,7 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewReqFilterIndexer(dataDB AccountingStorage, dbKey string, overwriteDB bool) (*ReqFilterIndexer, error) {
func NewReqFilterIndexer(dataDB AccountingStorage, dbKey string) (*ReqFilterIndexer, error) {
indexes, err := dataDB.GetReqFilterIndexes(dbKey)
if err != nil && err != utils.ErrNotFound {
return nil, err
@@ -29,7 +29,7 @@ func NewReqFilterIndexer(dataDB AccountingStorage, dbKey string, overwriteDB boo
if indexes == nil {
indexes = make(map[string]map[string]utils.StringMap)
}
return &ReqFilterIndexer{dataDB: dataDB, indexes: indexes, chngdIndxKeys: make(utils.StringMap)}, nil
return &ReqFilterIndexer{dataDB: dataDB, dbKey: dbKey, indexes: indexes, chngdIndxKeys: make(utils.StringMap)}, nil
}
// ReqFilterIndexer is a centralized indexer for all data sources using RequestFilter

View File

@@ -91,126 +91,42 @@ func NewResourceLimiterService(cfg *config.CGRConfig, dataDB AccountingStorage,
if cdrStatS != nil && reflect.ValueOf(cdrStatS).IsNil() {
cdrStatS = nil
}
rls := &ResourceLimiterService{stringIndexes: make(map[string]map[string]utils.StringMap), dataDB: dataDB, cdrStatS: cdrStatS}
rls := &ResourceLimiterService{dataDB: dataDB, cdrStatS: cdrStatS}
return rls, nil
}
// ResourcesLimiter is the service handling channel limits
type ResourceLimiterService struct {
sync.RWMutex
stringIndexes map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue]utils.StringMap[resourceID]
dataDB AccountingStorage // So we can load the data in cache and index it
cdrStatS rpcclient.RpcClientConnection
}
// Index cached ResourceLimits with MetaString filter types
func (rls *ResourceLimiterService) indexStringFilters(rlIDs []string) error {
utils.Logger.Info("<RLs> Start indexing string filters")
newStringIndexes := make(map[string]map[string]utils.StringMap) // Index it transactional
var cacheIDsToIndex []string // Cache keys of RLs to be indexed
if rlIDs == nil {
cacheIDsToIndex = cache.GetEntryKeys(utils.ResourceLimitsPrefix)
} else {
for _, rlID := range rlIDs {
cacheIDsToIndex = append(cacheIDsToIndex, utils.ResourceLimitsPrefix+rlID)
}
}
for _, cacheKey := range cacheIDsToIndex {
x, ok := cache.Get(cacheKey)
if !ok {
return utils.ErrNotFound
}
rl := x.(*ResourceLimit)
var hasMetaString bool
for _, fltr := range rl.Filters {
if fltr.Type != MetaString {
continue
}
hasMetaString = true // Mark that we found at least one metatring so we don't need to index globally
if _, hastIt := newStringIndexes[fltr.FieldName]; !hastIt {
newStringIndexes[fltr.FieldName] = make(map[string]utils.StringMap)
}
for _, fldVal := range fltr.Values {
if _, hasIt := newStringIndexes[fltr.FieldName][fldVal]; !hasIt {
newStringIndexes[fltr.FieldName][fldVal] = make(utils.StringMap)
}
newStringIndexes[fltr.FieldName][fldVal][rl.ID] = true
}
}
if !hasMetaString {
if _, hasIt := newStringIndexes[utils.NOT_AVAILABLE]; !hasIt {
newStringIndexes[utils.NOT_AVAILABLE] = make(map[string]utils.StringMap)
}
if _, hasIt := newStringIndexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE]; !hasIt {
newStringIndexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE] = make(utils.StringMap)
}
newStringIndexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE][rl.ID] = true // Fields without real field index will be located in map[NOT_AVAILABLE][NOT_AVAILABLE][rl.ID]
}
}
rls.Lock()
defer rls.Unlock()
if rlIDs == nil { // We have rebuilt complete index
rls.stringIndexes = newStringIndexes
return nil
}
// Merge the indexes since we have only performed limited indexing
for fldNameKey, mpFldName := range newStringIndexes {
if _, hasIt := rls.stringIndexes[fldNameKey]; !hasIt {
rls.stringIndexes[fldNameKey] = mpFldName
} else {
for fldValKey, strMap := range newStringIndexes[fldNameKey] {
if _, hasIt := rls.stringIndexes[fldNameKey][fldValKey]; !hasIt {
rls.stringIndexes[fldNameKey][fldValKey] = strMap
} else {
for resIDKey := range newStringIndexes[fldNameKey][fldValKey] {
rls.stringIndexes[fldNameKey][fldValKey][resIDKey] = true
}
}
}
}
}
utils.Logger.Info("<RLs> Done indexing string filters")
return nil
}
// Called when cache/re-caching is necessary
func (rls *ResourceLimiterService) cacheResourceLimits(loadID string, rlIDs []string) error {
if rlIDs == nil {
utils.Logger.Info("<RLs> Start caching all resource limits")
} else if len(rlIDs) == 0 {
return nil
} else {
utils.Logger.Info(fmt.Sprintf("<RLs> Start caching resource limits with ids: %+v", rlIDs))
}
if err := rls.dataDB.PreloadCacheForPrefix(utils.ResourceLimitsPrefix); err != nil {
return err
}
utils.Logger.Info("<RLs> Done caching resource limits")
return rls.indexStringFilters(rlIDs)
dataDB AccountingStorage // So we can load the data in cache and index it
cdrStatS rpcclient.RpcClientConnection
}
func (rls *ResourceLimiterService) matchingResourceLimitsForEvent(ev map[string]interface{}) (map[string]*ResourceLimit, error) {
matchingResources := make(map[string]*ResourceLimit)
for fldName, fieldValIf := range ev {
if _, hasIt := rls.stringIndexes[fldName]; !hasIt {
continue
}
strVal, canCast := utils.CastFieldIfToString(fieldValIf)
fldVal, canCast := utils.CastFieldIfToString(fieldValIf)
if !canCast {
return nil, fmt.Errorf("Cannot cast field: %s into string", fldName)
}
if _, hasIt := rls.stringIndexes[fldName][strVal]; !hasIt {
continue
rlIDs, err := rls.dataDB.MatchReqFilterIndex(utils.ResourceLimitsIndex, utils.ConcatenatedKey(fldName, fldVal))
if err != nil {
if err == utils.ErrNotFound {
continue
}
return nil, err
}
for resName := range rls.stringIndexes[fldName][strVal] {
for resName := range rlIDs {
if _, hasIt := matchingResources[resName]; hasIt { // Already checked this RL
continue
}
x, ok := cache.Get(utils.ResourceLimitsPrefix + resName)
if !ok {
return nil, utils.ErrNotFound
rl, err := rls.dataDB.GetResourceLimit(resName, false, utils.NonTransactional)
if err != nil {
if err == utils.ErrNotFound {
continue
}
return nil, err
}
rl := x.(*ResourceLimit)
now := time.Now()
if rl.ActivationTime.After(now) || (!rl.ExpiryTime.IsZero() && rl.ExpiryTime.Before(now)) { // not active
continue
@@ -230,15 +146,21 @@ func (rls *ResourceLimiterService) matchingResourceLimitsForEvent(ev map[string]
}
}
// Check un-indexed resources
for resName := range rls.stringIndexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE] {
uIdxRLIDs, err := rls.dataDB.MatchReqFilterIndex(utils.ResourceLimitsIndex, utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE))
if err != nil && err != utils.ErrNotFound {
return nil, err
}
for resName := range uIdxRLIDs {
if _, hasIt := matchingResources[resName]; hasIt { // Already checked this RL
continue
}
x, ok := cache.Get(utils.ResourceLimitsPrefix + resName)
if !ok {
return nil, utils.ErrNotFound
rl, err := rls.dataDB.GetResourceLimit(resName, false, utils.NonTransactional)
if err != nil {
if err == utils.ErrNotFound {
continue
}
return nil, err
}
rl := x.(*ResourceLimit)
now := time.Now()
if rl.ActivationTime.After(now) || (!rl.ExpiryTime.IsZero() && rl.ExpiryTime.Before(now)) { // not active
continue
@@ -257,9 +179,6 @@ func (rls *ResourceLimiterService) matchingResourceLimitsForEvent(ev map[string]
// Called to start the service
func (rls *ResourceLimiterService) ListenAndServe() error {
if err := rls.cacheResourceLimits("ResourceLimiterServiceStart", nil); err != nil {
return err
}
return nil
}
@@ -270,20 +189,6 @@ func (rls *ResourceLimiterService) ServiceShutdown() error {
// RPC Methods
// Cache/Re-cache
func (rls *ResourceLimiterService) V1CacheResourceLimits(attrs *utils.AttrRLsCache, reply *string) error {
if err := rls.cacheResourceLimits(attrs.LoadID, attrs.ResourceLimitIDs); err != nil {
return err
}
*reply = utils.OK
return nil
}
// Alias API for external usage
func (rls *ResourceLimiterService) CacheResourceLimits(attrs *utils.AttrRLsCache, reply *string) error {
return rls.V1CacheResourceLimits(attrs, reply)
}
func (rls *ResourceLimiterService) V1ResourceLimitsForEvent(ev map[string]interface{}, reply *[]*ResourceLimit) error {
rls.Lock() // Unknown number of RLs updated
defer rls.Unlock()

View File

@@ -18,16 +18,15 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/cache"
"github.com/cgrates/cgrates/utils"
)
var rLS *ResourceLimiterService
/*
func TestRLsIndexStringFilters(t *testing.T) {
rls := []*ResourceLimit{
&ResourceLimit{
@@ -171,8 +170,95 @@ func TestRLsIndexStringFilters(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", eIndexes, rLS.stringIndexes)
}
}
*/
func TestRLsLoadRLs(t *testing.T) {
rls := []*ResourceLimit{
&ResourceLimit{
ID: "RL1",
Weight: 20,
Filters: []*RequestFilter{
&RequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}},
&RequestFilter{Type: MetaRSRFields, Values: []string{"Subject(~^1.*1$)", "Destination(1002)"},
rsrFields: utils.ParseRSRFieldsMustCompile("Subject(~^1.*1$);Destination(1002)", utils.INFIELD_SEP),
}},
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Limit: 2,
Usage: make(map[string]*ResourceUsage),
},
&ResourceLimit{
ID: "RL2",
Weight: 10,
Filters: []*RequestFilter{
&RequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"dan", "1002"}},
&RequestFilter{Type: MetaString, FieldName: "Subject", Values: []string{"dan"}},
},
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Limit: 1,
UsageTTL: time.Duration(1 * time.Millisecond),
Usage: make(map[string]*ResourceUsage),
},
&ResourceLimit{
ID: "RL3",
Weight: 10,
Filters: []*RequestFilter{
&RequestFilter{Type: MetaString, FieldName: "Subject", Values: []string{"dan"}},
&RequestFilter{Type: MetaString, FieldName: "Subject", Values: []string{"1003"}},
},
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Limit: 1,
Usage: make(map[string]*ResourceUsage),
},
&ResourceLimit{
ID: "RL4",
Weight: 10,
Filters: []*RequestFilter{
&RequestFilter{Type: MetaStringPrefix, FieldName: "Destination", Values: []string{"+49"}},
},
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Limit: 1,
Usage: make(map[string]*ResourceUsage),
},
&ResourceLimit{
ID: "RL5",
Weight: 10,
Filters: []*RequestFilter{
&RequestFilter{Type: MetaStringPrefix, FieldName: "Destination", Values: []string{"+40"}},
},
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Limit: 1,
UsageTTL: time.Duration(10 * time.Millisecond),
Usage: make(map[string]*ResourceUsage),
},
&ResourceLimit{ // Add it so we can test expiryTime
ID: "RL6",
Weight: 10,
Filters: []*RequestFilter{
&RequestFilter{Type: MetaString, FieldName: "Subject", Values: []string{"dan"}},
},
ActivationTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
ExpiryTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Limit: 1,
Usage: make(map[string]*ResourceUsage),
},
}
rlIdxr, err := NewReqFilterIndexer(accountingStorage, utils.ResourceLimitsIndex)
if err != nil {
t.Error(err)
}
for _, rl := range rls {
if err := accountingStorage.SetResourceLimit(rl, utils.NonTransactional); err != nil {
t.Error(err)
}
rlIdxr.IndexFilters(rl.ID, rl.Filters)
}
if err := rlIdxr.StoreIndexes(); err != nil {
t.Error(err)
}
}
func TestRLsMatchingResourceLimitsForEvent(t *testing.T) {
rLS = &ResourceLimiterService{dataDB: accountingStorage, cdrStatS: nil}
eResLimits := map[string]*ResourceLimit{
"RL1": &ResourceLimit{
ID: "RL1",

View File

@@ -1153,23 +1153,101 @@ func (ms *MapStorage) GetStructVersion() (rsv *StructVersion, err error) {
return
}
func (ms *MapStorage) GetResourceLimit(id string, skipCache bool, transactionID string) (*ResourceLimit, error) {
return nil, nil
func (ms *MapStorage) GetResourceLimit(id string, skipCache bool, transactionID string) (rl *ResourceLimit, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key := utils.ResourceLimitsPrefix + id
if !skipCache {
if x, ok := cache.Get(key); ok {
if x != nil {
return x.(*ResourceLimit), nil
}
return nil, utils.ErrNotFound
}
}
values, ok := ms.dict[key]
if !ok {
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
return nil, utils.ErrNotFound
}
err = ms.ms.Unmarshal(values, &rl)
if err != nil {
return nil, err
}
for _, fltr := range rl.Filters {
if err := fltr.CompileValues(); err != nil {
return nil, err
}
}
cache.Set(key, rl, cacheCommit(transactionID), transactionID)
return
}
func (ms *MapStorage) SetResourceLimit(rl *ResourceLimit, transactionID string) error {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(rl)
if err != nil {
return err
}
ms.dict[utils.ResourceLimitsPrefix+rl.ID] = result
return nil
}
func (ms *MapStorage) RemoveResourceLimit(id string, transactionID string) error {
ms.mu.Lock()
defer ms.mu.Unlock()
key := utils.ResourceLimitsPrefix + id
delete(ms.dict, key)
cache.RemKey(key, cacheCommit(transactionID), transactionID)
return nil
}
func (ms *MapStorage) GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
values, ok := ms.dict[dbKey]
if !ok {
return nil, utils.ErrNotFound
}
err = ms.ms.Unmarshal(values, &indexes)
if err != nil {
return nil, err
}
return
}
func (ms *MapStorage) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(indexes)
if err != nil {
return err
}
ms.dict[dbKey] = result
return
}
func (ms *MapStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
if x, ok := cache.Get(dbKey + fieldValKey); ok { // Attempt to find in cache first
if x != nil {
return x.(utils.StringMap), nil
}
return nil, utils.ErrNotFound
}
// Not found in cache, check in DB
values, ok := ms.dict[dbKey]
if !ok {
cache.Set(dbKey+fieldValKey, nil, true, utils.NonTransactional)
return nil, utils.ErrNotFound
}
var indexes map[string]map[string]utils.StringMap
if err = ms.ms.Unmarshal(values, &indexes); err != nil {
return nil, err
}
keySplt := strings.Split(fieldValKey, ":")
if _, hasIt := indexes[keySplt[0]]; hasIt {
itemIDs = indexes[keySplt[0]][keySplt[1]]
}
cache.Set(dbKey+fieldValKey, itemIDs, true, utils.NonTransactional)
return
}

View File

@@ -1183,8 +1183,8 @@ func (rs *RedisStorage) GetResourceLimit(id string, skipCache bool, transactionI
return nil, err
}
}
cache.Set(key, rl, cacheCommit(transactionID), transactionID)
}
cache.Set(key, rl, cacheCommit(transactionID), transactionID)
return
}
func (rs *RedisStorage) SetResourceLimit(rl *ResourceLimit, transactionID string) error {
@@ -1192,10 +1192,7 @@ func (rs *RedisStorage) SetResourceLimit(rl *ResourceLimit, transactionID string
if err != nil {
return err
}
key := utils.ResourceLimitsPrefix + rl.ID
err = rs.Cmd("SET", key, result).Err
cache.Set(key, rl, cacheCommit(transactionID), transactionID)
return err
return rs.Cmd("SET", utils.ResourceLimitsPrefix+rl.ID, result).Err
}
func (rs *RedisStorage) RemoveResourceLimit(id string, transactionID string) error {
key := utils.ResourceLimitsPrefix + id
@@ -1252,14 +1249,14 @@ func (rs *RedisStorage) SetReqFilterIndexes(dbKey string, indexes map[string]map
}
func (rs *RedisStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error) {
if x, ok := cache.Get(fieldValKey); ok { // Attempt to find in cache first
if x, ok := cache.Get(dbKey + fieldValKey); ok { // Attempt to find in cache first
if x != nil {
return x.(utils.StringMap), nil
}
return nil, utils.ErrNotFound
}
// Not found in cache, check in DB
str, err := rs.Cmd("HGET", dbKey, fieldValKey).Str()
fldValBytes, err := rs.Cmd("HGET", dbKey, fieldValKey).Bytes()
if err != nil {
if err.Error() != "wrong type" {
return nil, err
@@ -1267,11 +1264,9 @@ func (rs *RedisStorage) MatchReqFilterIndex(dbKey, fieldValKey string) (itemIDs
// Case when str is not found
err = utils.ErrNotFound
}
if str != "" {
if err = rs.ms.Unmarshal([]byte(str), &itemIDs); err != nil {
return
}
if err = rs.ms.Unmarshal(fldValBytes, &itemIDs); err != nil {
return
}
cache.Set(dbKey, itemIDs, true, utils.NonTransactional)
cache.Set(dbKey+fieldValKey, itemIDs, true, utils.NonTransactional)
return
}