New ReqFilterIndexer, indexes moved to standard cache

This commit is contained in:
DanB
2016-10-08 19:06:35 +02:00
parent 118d47953a
commit d9c62783f8
9 changed files with 120 additions and 98 deletions

3
cache/cache.go vendored
View File

@@ -35,6 +35,7 @@ var (
cache cacheStore
cacheMux sync.RWMutex
cfg *config.CacheConfig
// transaction stuff
transactionBuffer map[string][]*transactionItem // Queue tasks based on transactionID
transBufMux sync.Mutex // Protects the transactionBuffer
@@ -156,7 +157,7 @@ func CountEntries(prefix string) (result int) {
return cache.CountEntriesForPrefix(prefix)
}
func GetEntriesKeys(prefix string) (keys []string) {
func GetEntryKeys(prefix string) (keys []string) {
cacheMux.RLock()
defer cacheMux.RUnlock()
return cache.GetKeysForPrefix(prefix)

95
cache/ficache.go vendored
View File

@@ -1,95 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package cache
import (
"sync"
"github.com/cgrates/cgrates/utils"
)
type filtersIndexer interface {
Index(itemIDs []string) (map[string]map[string]utils.StringMap, error) // Index items based on ID, nil for all
}
func newFiCache() *fiCache {
return &fiCache{cache: make(map[string]map[string]map[string]utils.StringMap)}
}
// FiCache is a cache handling filter indexing for various services
type fiCache struct {
cache map[string]map[string]map[string]utils.StringMap // map[serviceID]map[fieldName]map[fieldValue]utils.StringMap[itemID]
sync.RWMutex // protects cache
indexers map[string]filtersIndexer
indxrsMux sync.RWMutex // protects indexers
}
func (fiCh *fiCache) registerIndexer(id string, idxr filtersIndexer) { // Not protected but also should
fiCh.indxrsMux.Lock()
fiCh.indexers[id] = idxr
fiCh.indxrsMux.Unlock()
}
func (fiCh *fiCache) indexForIndexer(indxrID string, itemIDs []string) error {
fiCh.indxrsMux.RLock()
idxr, hasIt := fiCh.indexers[indxrID]
fiCh.indxrsMux.RUnlock()
if !hasIt {
return utils.ErrNotFound
}
newIndxMp, err := idxr.Index(itemIDs)
if err != nil {
return err
}
fiCh.Lock()
if _, hasIt := fiCh.indexers[indxrID]; !hasIt {
fiCh.cache[indxrID] = newIndxMp
return nil
}
// Merge old index cache with new one
for fldNameKey, mpFldName := range newIndxMp {
if _, hasIt := fiCh.cache[indxrID][fldNameKey]; !hasIt {
fiCh.cache[indxrID][fldNameKey] = mpFldName
} else {
for fldValKey, strMap := range mpFldName {
if _, hasIt := fiCh.cache[indxrID][fldNameKey][fldValKey]; !hasIt {
fiCh.cache[indxrID][fldNameKey][fldValKey] = strMap
} else {
for resIDKey := range strMap {
fiCh.cache[indxrID][fldNameKey][fldValKey][resIDKey] = true
}
}
}
}
}
fiCh.Unlock()
return nil
}
// Empty the cache for specific indexers
func (fiCh *fiCache) flushForIndexers(indxrIDs []string) {
fiCh.Lock()
defer fiCh.Unlock()
if indxrIDs == nil {
fiCh.cache = make(map[string]map[string]map[string]utils.StringMap)
return
}
for _, indxID := range indxrIDs {
delete(fiCh.cache, indxID)
}
}

View File

@@ -1008,7 +1008,7 @@ func (cd *CallDescriptor) GetLCR(stats rpcclient.RpcClientConnection, lcrFltr *L
}
ratingProfileSearchKey := utils.ConcatenatedKey(lcr.Direction, lcr.Tenant, lcrCost.Entry.RPCategory)
//log.Print("KEY: ", ratingProfileSearchKey)
suppliers := cache.GetEntriesKeys(utils.RATING_PROFILE_PREFIX + ratingProfileSearchKey)
suppliers := cache.GetEntryKeys(utils.RATING_PROFILE_PREFIX + ratingProfileSearchKey)
for _, supplier := range suppliers {
//log.Print("Supplier: ", supplier)
split := strings.Split(supplier, ":")

View File

@@ -0,0 +1,84 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"github.com/cgrates/cgrates/utils"
)
func NewReqFilterIndexer(dataDB AccountingStorage, dbKey string, overwriteDB bool) (*ReqFilterIndexer, error) {
indexes, err := dataDB.GetReqFilterIndexes(dbKey)
if err != nil && err != utils.ErrNotFound {
return nil, err
}
if indexes == nil {
indexes = make(map[string]map[string]utils.StringMap)
}
return &ReqFilterIndexer{dataDB: dataDB, indexes: indexes, chngdIndxKeys: make(utils.StringMap)}, nil
}
// ReqFilterIndexer is a centralized indexer for all data sources using RequestFilter
// retrieves and stores it's data from/to dataDB
// not thread safe, meant to be used as logic within other code blocks
type ReqFilterIndexer struct {
indexes map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue]utils.StringMap[resourceID]
dataDB AccountingStorage
dbKey string // get/store the result from/into this key
chngdIndxKeys utils.StringMap // keep record of the changed fieldName:fieldValue pair so we can re-cache wisely
}
// ChangedKeys returns the changed keys from original indexes so we can reload wisely
func (rfi *ReqFilterIndexer) ChangedKeys() utils.StringMap {
return rfi.chngdIndxKeys
}
// IndexFilters parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys
func (rfi *ReqFilterIndexer) IndexFilters(itemID string, reqFltrs []*RequestFilter) {
var hasMetaString bool
for _, fltr := range reqFltrs {
if fltr.Type != MetaString {
continue
}
hasMetaString = true // Mark that we found at least one metatring so we don't index globally
if _, hastIt := rfi.indexes[fltr.FieldName]; !hastIt {
rfi.indexes[fltr.FieldName] = make(map[string]utils.StringMap)
}
for _, fldVal := range fltr.Values {
if _, hasIt := rfi.indexes[fltr.FieldName][fldVal]; !hasIt {
rfi.indexes[fltr.FieldName][fldVal] = make(utils.StringMap)
}
rfi.indexes[fltr.FieldName][fldVal][itemID] = true
rfi.chngdIndxKeys[utils.ConcatenatedKey(fltr.FieldName, fldVal)] = true
}
}
if !hasMetaString {
if _, hasIt := rfi.indexes[utils.NOT_AVAILABLE]; !hasIt {
rfi.indexes[utils.NOT_AVAILABLE] = make(map[string]utils.StringMap)
}
if _, hasIt := rfi.indexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE]; !hasIt {
rfi.indexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE] = make(utils.StringMap)
}
rfi.indexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE][itemID] = true // Fields without real field index will be located in map[NOT_AVAILABLE][NOT_AVAILABLE][rl.ID]
}
return
}
// StoreIndexes handles storing the indexes to dataDB
func (rfi *ReqFilterIndexer) StoreIndexes() error {
return rfi.dataDB.SetReqFilterIndexes(rfi.dbKey, rfi.indexes)
}

View File

@@ -109,7 +109,7 @@ func (rls *ResourceLimiterService) indexStringFilters(rlIDs []string) error {
newStringIndexes := make(map[string]map[string]utils.StringMap) // Index it transactional
var cacheIDsToIndex []string // Cache keys of RLs to be indexed
if rlIDs == nil {
cacheIDsToIndex = cache.GetEntriesKeys(utils.ResourceLimitsPrefix)
cacheIDsToIndex = cache.GetEntryKeys(utils.ResourceLimitsPrefix)
} else {
for _, rlID := range rlIDs {
cacheIDsToIndex = append(cacheIDsToIndex, utils.ResourceLimitsPrefix+rlID)

View File

@@ -102,6 +102,9 @@ type AccountingStorage interface {
AddLoadHistory(*utils.LoadInstance, int, string) error
GetStructVersion() (*StructVersion, error)
SetStructVersion(*StructVersion) error
GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error)
SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error)
GetFieldIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error)
}
type CdrStorage interface {

View File

@@ -1163,3 +1163,13 @@ func (ms *MapStorage) SetResourceLimit(rl *ResourceLimit, transactionID string)
func (ms *MapStorage) RemoveResourceLimit(id string, transactionID string) error {
return nil
}
func (ms *MapStorage) GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) {
return
}
func (ms *MapStorage) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
return
}
func (ms *MapStorage) GetFieldIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error) {
return
}

View File

@@ -1620,3 +1620,12 @@ func (ms *MongoStorage) SetResourceLimit(rl *ResourceLimit, transactionID string
func (ms *MongoStorage) RemoveResourceLimit(id string, transactionID string) error {
return nil
}
func (ms *MongoStorage) GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) {
return
}
func (ms *MongoStorage) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
return
}
func (ms *MongoStorage) GetFieldIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error) {
return
}

View File

@@ -1205,3 +1205,13 @@ func (rs *RedisStorage) RemoveResourceLimit(id string, transactionID string) err
cache.RemKey(key, cacheCommit(transactionID), transactionID)
return nil
}
func (rs *RedisStorage) GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) {
return
}
func (rs *RedisStorage) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
return
}
func (rs *RedisStorage) GetFieldIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error) {
return
}