diff --git a/cache/cache.go b/cache/cache.go index d51cc4c64..3acc89203 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -35,6 +35,7 @@ var ( cache cacheStore cacheMux sync.RWMutex cfg *config.CacheConfig + // transaction stuff transactionBuffer map[string][]*transactionItem // Queue tasks based on transactionID transBufMux sync.Mutex // Protects the transactionBuffer @@ -156,7 +157,7 @@ func CountEntries(prefix string) (result int) { return cache.CountEntriesForPrefix(prefix) } -func GetEntriesKeys(prefix string) (keys []string) { +func GetEntryKeys(prefix string) (keys []string) { cacheMux.RLock() defer cacheMux.RUnlock() return cache.GetKeysForPrefix(prefix) diff --git a/cache/ficache.go b/cache/ficache.go deleted file mode 100644 index b599d59cd..000000000 --- a/cache/ficache.go +++ /dev/null @@ -1,95 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package cache - -import ( - "sync" - - "github.com/cgrates/cgrates/utils" -) - -type filtersIndexer interface { - Index(itemIDs []string) (map[string]map[string]utils.StringMap, error) // Index items based on ID, nil for all -} - -func newFiCache() *fiCache { - return &fiCache{cache: make(map[string]map[string]map[string]utils.StringMap)} -} - -// FiCache is a cache handling filter indexing for various services -type fiCache struct { - cache map[string]map[string]map[string]utils.StringMap // map[serviceID]map[fieldName]map[fieldValue]utils.StringMap[itemID] - sync.RWMutex // protects cache - indexers map[string]filtersIndexer - indxrsMux sync.RWMutex // protects indexers -} - -func (fiCh *fiCache) registerIndexer(id string, idxr filtersIndexer) { // Not protected but also should - fiCh.indxrsMux.Lock() - fiCh.indexers[id] = idxr - fiCh.indxrsMux.Unlock() -} - -func (fiCh *fiCache) indexForIndexer(indxrID string, itemIDs []string) error { - fiCh.indxrsMux.RLock() - idxr, hasIt := fiCh.indexers[indxrID] - fiCh.indxrsMux.RUnlock() - if !hasIt { - return utils.ErrNotFound - } - newIndxMp, err := idxr.Index(itemIDs) - if err != nil { - return err - } - fiCh.Lock() - if _, hasIt := fiCh.indexers[indxrID]; !hasIt { - fiCh.cache[indxrID] = newIndxMp - return nil - } - // Merge old index cache with new one - for fldNameKey, mpFldName := range newIndxMp { - if _, hasIt := fiCh.cache[indxrID][fldNameKey]; !hasIt { - fiCh.cache[indxrID][fldNameKey] = mpFldName - } else { - for fldValKey, strMap := range mpFldName { - if _, hasIt := fiCh.cache[indxrID][fldNameKey][fldValKey]; !hasIt { - fiCh.cache[indxrID][fldNameKey][fldValKey] = strMap - } else { - for resIDKey := range strMap { - fiCh.cache[indxrID][fldNameKey][fldValKey][resIDKey] = true - } - } - } - } - } - fiCh.Unlock() - return nil -} - -// Empty the cache for specific indexers -func (fiCh *fiCache) flushForIndexers(indxrIDs []string) { - fiCh.Lock() - defer fiCh.Unlock() - if indxrIDs == nil { - fiCh.cache = make(map[string]map[string]map[string]utils.StringMap) - return - } - for _, indxID := range indxrIDs { - delete(fiCh.cache, indxID) - } -} diff --git a/engine/calldesc.go b/engine/calldesc.go index 1677e8213..df6a700a3 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -1008,7 +1008,7 @@ func (cd *CallDescriptor) GetLCR(stats rpcclient.RpcClientConnection, lcrFltr *L } ratingProfileSearchKey := utils.ConcatenatedKey(lcr.Direction, lcr.Tenant, lcrCost.Entry.RPCategory) //log.Print("KEY: ", ratingProfileSearchKey) - suppliers := cache.GetEntriesKeys(utils.RATING_PROFILE_PREFIX + ratingProfileSearchKey) + suppliers := cache.GetEntryKeys(utils.RATING_PROFILE_PREFIX + ratingProfileSearchKey) for _, supplier := range suppliers { //log.Print("Supplier: ", supplier) split := strings.Split(supplier, ":") diff --git a/engine/reqfilterindexer.go b/engine/reqfilterindexer.go new file mode 100644 index 000000000..36bcdb8fe --- /dev/null +++ b/engine/reqfilterindexer.go @@ -0,0 +1,84 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package engine + +import ( + "github.com/cgrates/cgrates/utils" +) + +func NewReqFilterIndexer(dataDB AccountingStorage, dbKey string, overwriteDB bool) (*ReqFilterIndexer, error) { + indexes, err := dataDB.GetReqFilterIndexes(dbKey) + if err != nil && err != utils.ErrNotFound { + return nil, err + } + if indexes == nil { + indexes = make(map[string]map[string]utils.StringMap) + } + return &ReqFilterIndexer{dataDB: dataDB, indexes: indexes, chngdIndxKeys: make(utils.StringMap)}, nil +} + +// ReqFilterIndexer is a centralized indexer for all data sources using RequestFilter +// retrieves and stores it's data from/to dataDB +// not thread safe, meant to be used as logic within other code blocks +type ReqFilterIndexer struct { + indexes map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue]utils.StringMap[resourceID] + dataDB AccountingStorage + dbKey string // get/store the result from/into this key + chngdIndxKeys utils.StringMap // keep record of the changed fieldName:fieldValue pair so we can re-cache wisely +} + +// ChangedKeys returns the changed keys from original indexes so we can reload wisely +func (rfi *ReqFilterIndexer) ChangedKeys() utils.StringMap { + return rfi.chngdIndxKeys +} + +// IndexFilters parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys +func (rfi *ReqFilterIndexer) IndexFilters(itemID string, reqFltrs []*RequestFilter) { + var hasMetaString bool + for _, fltr := range reqFltrs { + if fltr.Type != MetaString { + continue + } + hasMetaString = true // Mark that we found at least one metatring so we don't index globally + if _, hastIt := rfi.indexes[fltr.FieldName]; !hastIt { + rfi.indexes[fltr.FieldName] = make(map[string]utils.StringMap) + } + for _, fldVal := range fltr.Values { + if _, hasIt := rfi.indexes[fltr.FieldName][fldVal]; !hasIt { + rfi.indexes[fltr.FieldName][fldVal] = make(utils.StringMap) + } + rfi.indexes[fltr.FieldName][fldVal][itemID] = true + rfi.chngdIndxKeys[utils.ConcatenatedKey(fltr.FieldName, fldVal)] = true + } + } + if !hasMetaString { + if _, hasIt := rfi.indexes[utils.NOT_AVAILABLE]; !hasIt { + rfi.indexes[utils.NOT_AVAILABLE] = make(map[string]utils.StringMap) + } + if _, hasIt := rfi.indexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE]; !hasIt { + rfi.indexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE] = make(utils.StringMap) + } + rfi.indexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE][itemID] = true // Fields without real field index will be located in map[NOT_AVAILABLE][NOT_AVAILABLE][rl.ID] + } + return +} + +// StoreIndexes handles storing the indexes to dataDB +func (rfi *ReqFilterIndexer) StoreIndexes() error { + return rfi.dataDB.SetReqFilterIndexes(rfi.dbKey, rfi.indexes) +} diff --git a/engine/reslimiter.go b/engine/reslimiter.go index 30534995a..afdf9dbd9 100644 --- a/engine/reslimiter.go +++ b/engine/reslimiter.go @@ -109,7 +109,7 @@ func (rls *ResourceLimiterService) indexStringFilters(rlIDs []string) error { newStringIndexes := make(map[string]map[string]utils.StringMap) // Index it transactional var cacheIDsToIndex []string // Cache keys of RLs to be indexed if rlIDs == nil { - cacheIDsToIndex = cache.GetEntriesKeys(utils.ResourceLimitsPrefix) + cacheIDsToIndex = cache.GetEntryKeys(utils.ResourceLimitsPrefix) } else { for _, rlID := range rlIDs { cacheIDsToIndex = append(cacheIDsToIndex, utils.ResourceLimitsPrefix+rlID) diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 39467ef5b..6dc563ee8 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -102,6 +102,9 @@ type AccountingStorage interface { AddLoadHistory(*utils.LoadInstance, int, string) error GetStructVersion() (*StructVersion, error) SetStructVersion(*StructVersion) error + GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) + SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) + GetFieldIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error) } type CdrStorage interface { diff --git a/engine/storage_map.go b/engine/storage_map.go index 5a0213042..78dea3414 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1163,3 +1163,13 @@ func (ms *MapStorage) SetResourceLimit(rl *ResourceLimit, transactionID string) func (ms *MapStorage) RemoveResourceLimit(id string, transactionID string) error { return nil } + +func (ms *MapStorage) GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) { + return +} +func (ms *MapStorage) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) { + return +} +func (ms *MapStorage) GetFieldIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error) { + return +} diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 0ea49e8c0..1a3c6a3bc 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1620,3 +1620,12 @@ func (ms *MongoStorage) SetResourceLimit(rl *ResourceLimit, transactionID string func (ms *MongoStorage) RemoveResourceLimit(id string, transactionID string) error { return nil } +func (ms *MongoStorage) GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) { + return +} +func (ms *MongoStorage) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) { + return +} +func (ms *MongoStorage) GetFieldIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error) { + return +} diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 6341bf4aa..f06f51b84 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1205,3 +1205,13 @@ func (rs *RedisStorage) RemoveResourceLimit(id string, transactionID string) err cache.RemKey(key, cacheCommit(transactionID), transactionID) return nil } + +func (rs *RedisStorage) GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) { + return +} +func (rs *RedisStorage) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) { + return +} +func (rs *RedisStorage) GetFieldIndex(dbKey, fieldValKey string) (itemIDs utils.StringMap, err error) { + return +}