diff --git a/engine/caches.go b/engine/caches.go index eb0b420ba..640378ddf 100644 --- a/engine/caches.go +++ b/engine/caches.go @@ -33,30 +33,37 @@ func init() { InitCache(nil) } -var precachedPartitions = []string{ - utils.CacheDestinations, - utils.CacheReverseDestinations, - utils.CacheRatingPlans, - utils.CacheRatingProfiles, - utils.CacheActions, - utils.CacheActionPlans, - utils.CacheAccountActionPlans, - utils.CacheActionTriggers, - utils.CacheSharedGroups, - utils.CacheResourceProfiles, - utils.CacheResources, - utils.CacheEventResources, - utils.CacheTimings, - utils.CacheStatQueueProfiles, - utils.CacheStatQueues, - utils.CacheThresholdProfiles, - utils.CacheThresholds, - utils.CacheFilters, - utils.CacheSupplierProfiles, - utils.CacheAttributeProfiles, - utils.CacheChargerProfiles, - utils.CacheDispatcherProfiles, - utils.CacheDiameterMessages, +var precachedPartitions = utils.StringMap{ + utils.CacheDestinations: true, + utils.CacheReverseDestinations: true, + utils.CacheRatingPlans: true, + utils.CacheRatingProfiles: true, + utils.CacheActions: true, + utils.CacheActionPlans: true, + utils.CacheAccountActionPlans: true, + utils.CacheActionTriggers: true, + utils.CacheSharedGroups: true, + utils.CacheResourceProfiles: true, + utils.CacheResources: true, + utils.CacheEventResources: true, + utils.CacheTimings: true, + utils.CacheStatQueueProfiles: true, + utils.CacheStatQueues: true, + utils.CacheThresholdProfiles: true, + utils.CacheThresholds: true, + utils.CacheFilters: true, + utils.CacheSupplierProfiles: true, + utils.CacheAttributeProfiles: true, + utils.CacheChargerProfiles: true, + utils.CacheDispatcherProfiles: true, + utils.CacheDiameterMessages: true, + utils.CacheAttributeFilterIndexes: true, + utils.CacheResourceFilterIndexes: true, + utils.CacheStatFilterIndexes: true, + utils.CacheThresholdFilterIndexes: true, + utils.CacheSupplierFilterIndexes: true, + utils.CacheChargerFilterIndexes: true, + utils.CacheDispatcherFilterIndexes: true, } // InitCache will instantiate the cache with specific or default configuraiton @@ -73,7 +80,7 @@ func NewCacheS(cfg *config.CGRConfig, dm *DataManager) (c *CacheS) { c = &CacheS{cfg: cfg, dm: dm, pcItems: make(map[string]chan struct{})} for cacheID := range cfg.CacheCfg() { - if !utils.IsSliceMember(precachedPartitions, cacheID) { + if !precachedPartitions.HasKey(cacheID) { continue } c.pcItems[cacheID] = make(chan struct{}) @@ -96,7 +103,7 @@ func (chS *CacheS) GetPrecacheChannel(chID string) chan struct{} { // Precache loads data from DataDB into cache at engine start func (chS *CacheS) Precache() (err error) { for cacheID, cacheCfg := range chS.cfg.CacheCfg() { - if !utils.IsSliceMember(precachedPartitions, cacheID) { + if !precachedPartitions.HasKey(cacheID) { continue } if cacheCfg.Precache { @@ -170,7 +177,7 @@ func (chS *CacheS) V1GetCacheStats(cacheIDs []string, func (chS *CacheS) V1PrecacheStatus(cacheIDs []string, rply *map[string]string) (err error) { if len(cacheIDs) == 0 { - for _, cacheID := range precachedPartitions { + for cacheID := range precachedPartitions { cacheIDs = append(cacheIDs, cacheID) } } diff --git a/engine/datamanager.go b/engine/datamanager.go index 254631f5e..633c1b0c7 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -24,9 +24,72 @@ import ( "github.com/cgrates/ltcache" ) +var ( + filterIndexesPrefixMap = utils.StringMap{ + utils.AttributeFilterIndexes: true, + utils.ResourceFilterIndexes: true, + utils.StatFilterIndexes: true, + utils.ThresholdFilterIndexes: true, + utils.SupplierFilterIndexes: true, + utils.ChargerFilterIndexes: true, + utils.DispatcherFilterIndexes: true, + } + loadCachePrefixMap = utils.StringMap{ + utils.DESTINATION_PREFIX: true, + utils.REVERSE_DESTINATION_PREFIX: true, + utils.RATING_PLAN_PREFIX: true, + utils.RATING_PROFILE_PREFIX: true, + utils.ACTION_PREFIX: true, + utils.ACTION_PLAN_PREFIX: true, + utils.ACTION_TRIGGER_PREFIX: true, + utils.SHARED_GROUP_PREFIX: true, + utils.StatQueuePrefix: true, + utils.StatQueueProfilePrefix: true, + utils.ThresholdPrefix: true, + utils.ThresholdProfilePrefix: true, + utils.FilterPrefix: true, + utils.SupplierProfilePrefix: true, + utils.AttributeProfilePrefix: true, + utils.ChargerProfilePrefix: true, + utils.DispatcherProfilePrefix: true, + } + cachePrefixMap = utils.StringMap{ + utils.DESTINATION_PREFIX: true, + utils.REVERSE_DESTINATION_PREFIX: true, + utils.RATING_PLAN_PREFIX: true, + utils.RATING_PROFILE_PREFIX: true, + utils.ACTION_PREFIX: true, + utils.ACTION_PLAN_PREFIX: true, + utils.AccountActionPlansPrefix: true, + utils.ACTION_TRIGGER_PREFIX: true, + utils.SHARED_GROUP_PREFIX: true, + utils.ResourceProfilesPrefix: true, + utils.TimingsPrefix: true, + utils.ResourcesPrefix: true, + utils.StatQueuePrefix: true, + utils.StatQueueProfilePrefix: true, + utils.ThresholdPrefix: true, + utils.ThresholdProfilePrefix: true, + utils.FilterPrefix: true, + utils.SupplierProfilePrefix: true, + utils.AttributeProfilePrefix: true, + utils.ChargerProfilePrefix: true, + utils.DispatcherProfilePrefix: true, + utils.AttributeFilterIndexes: true, + utils.ResourceFilterIndexes: true, + utils.StatFilterIndexes: true, + utils.ThresholdFilterIndexes: true, + utils.SupplierFilterIndexes: true, + utils.ChargerFilterIndexes: true, + utils.DispatcherFilterIndexes: true, + } +) + func NewDataManager(dataDB DataDB) *DataManager { - return &DataManager{dataDB: dataDB, - cacheCfg: config.CgrConfig().CacheCfg()} + return &DataManager{ + dataDB: dataDB, + cacheCfg: config.CgrConfig().CacheCfg(), + } } // DataManager is the data storage manager for CGRateS @@ -53,13 +116,7 @@ func (dm *DataManager) LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, } for k, cacheCfg := range dm.cacheCfg { k = utils.CacheInstanceToPrefix[k] // alias into prefixes understood by storage - if utils.IsSliceMember([]string{utils.DESTINATION_PREFIX, utils.REVERSE_DESTINATION_PREFIX, - utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, - utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACTION_TRIGGER_PREFIX, - utils.SHARED_GROUP_PREFIX, utils.StatQueuePrefix, - utils.StatQueueProfilePrefix, utils.ThresholdPrefix, utils.ThresholdProfilePrefix, - utils.FilterPrefix, utils.SupplierProfilePrefix, - utils.AttributeProfilePrefix, utils.ChargerProfilePrefix, utils.DispatcherProfilePrefix}, k) && cacheCfg.Precache { + if loadCachePrefixMap.HasKey(k) && cacheCfg.Precache { if err := dm.PreloadCacheForPrefix(k); err != nil && err != utils.ErrInvalidKey { return err } @@ -124,28 +181,7 @@ func (dm *DataManager) PreloadCacheForPrefix(prefix string) error { } func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached bool) (err error) { - if !utils.IsSliceMember([]string{ - utils.DESTINATION_PREFIX, - utils.REVERSE_DESTINATION_PREFIX, - utils.RATING_PLAN_PREFIX, - utils.RATING_PROFILE_PREFIX, - utils.ACTION_PREFIX, - utils.ACTION_PLAN_PREFIX, - utils.AccountActionPlansPrefix, - utils.ACTION_TRIGGER_PREFIX, - utils.SHARED_GROUP_PREFIX, - utils.ResourceProfilesPrefix, - utils.TimingsPrefix, - utils.ResourcesPrefix, - utils.StatQueuePrefix, - utils.StatQueueProfilePrefix, - utils.ThresholdPrefix, - utils.ThresholdProfilePrefix, - utils.FilterPrefix, - utils.SupplierProfilePrefix, - utils.AttributeProfilePrefix, - utils.ChargerProfilePrefix, - utils.DispatcherProfilePrefix}, prfx) { + if !cachePrefixMap.HasKey(prfx) { return utils.NewCGRError(utils.DataManager, utils.MandatoryIEMissingCaps, utils.UnsupportedCachePrefix, @@ -235,6 +271,20 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b case utils.DispatcherProfilePrefix: tntID := utils.NewTenantID(dataID) _, err = dm.GetDispatcherProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) + case utils.AttributeFilterIndexes: + err = dm.MatchFilterIndexFromKey(utils.CacheAttributeFilterIndexes, dataID) + case utils.ResourceFilterIndexes: + err = dm.MatchFilterIndexFromKey(utils.CacheResourceFilterIndexes, dataID) + case utils.StatFilterIndexes: + err = dm.MatchFilterIndexFromKey(utils.CacheStatFilterIndexes, dataID) + case utils.ThresholdFilterIndexes: + err = dm.MatchFilterIndexFromKey(utils.CacheThresholdFilterIndexes, dataID) + case utils.SupplierFilterIndexes: + err = dm.MatchFilterIndexFromKey(utils.CacheSupplierFilterIndexes, dataID) + case utils.ChargerFilterIndexes: + err = dm.MatchFilterIndexFromKey(utils.CacheChargerFilterIndexes, dataID) + case utils.DispatcherFilterIndexes: + err = dm.MatchFilterIndexFromKey(utils.CacheDispatcherFilterIndexes, dataID) } if err != nil { return utils.NewCGRError(utils.DataManager, @@ -937,6 +987,19 @@ func (dm *DataManager) RemoveFilterIndexes(cacheID, itemIDPrefix string) (err er return dm.DataDB().RemoveFilterIndexesDrv(cacheID, itemIDPrefix) } +func (dm *DataManager) MatchFilterIndexFromKey(cacheID, key string) (err error) { + splt := utils.SplitConcatenatedKey(key) // prefix:filterType:fieldName:fieldVal + lsplt := len(splt) + if lsplt < 4 { + return utils.ErrNotFound + } + fieldVal := splt[lsplt-1] + fieldName := splt[lsplt-2] + filterType := splt[lsplt-3] + itemIDPrefix := utils.ConcatenatedKey(splt[:lsplt-3]...) // prefix may contain context/subsystems + _, err = dm.MatchFilterIndex(cacheID, itemIDPrefix, filterType, fieldName, fieldVal) + return +} func (dm *DataManager) MatchFilterIndex(cacheID, itemIDPrefix, filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) { fieldValKey := utils.ConcatenatedKey(itemIDPrefix, filterType, fieldName, fieldVal) diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 08e873088..72e9a805c 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -511,6 +511,7 @@ func (ms *MongoStorage) getField(sctx mongo.SessionContext, col, prefix, subject } return result, iter.Close(sctx) } + func (ms *MongoStorage) getField2(sctx mongo.SessionContext, col, prefix, subject string, tntID *utils.TenantID) (result []string, err error) { idResult := struct{ Tenant, Id string }{} elem := bson.M{} @@ -536,6 +537,27 @@ func (ms *MongoStorage) getField2(sctx mongo.SessionContext, col, prefix, subjec return result, iter.Close(sctx) } +func (ms *MongoStorage) getField3(sctx mongo.SessionContext, col, prefix, field string) (result []string, err error) { + fieldResult := bson.D{} + iter, err := ms.getCol(col).Find(sctx, + bson.M{field: bsonx.Regex(fmt.Sprintf("^%s", prefix), "")}, + options.Find().SetProjection( + bson.M{field: 1}, + ), + ) + if err != nil { + return + } + for iter.Next(sctx) { + err = iter.Decode(&fieldResult) + if err != nil { + return + } + result = append(result, fieldResult.Map()[field].(string)) + } + return result, iter.Close(sctx) +} + // GetKeysForPrefix implementation func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err error) { var category, subject string @@ -592,6 +614,20 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er result, err = ms.getField2(sctx, colCpp, utils.ChargerProfilePrefix, subject, tntID) case utils.DispatcherProfilePrefix: result, err = ms.getField2(sctx, colDpp, utils.DispatcherProfilePrefix, subject, tntID) + case utils.AttributeFilterIndexes: + result, err = ms.getField3(sctx, colRFI, utils.AttributeFilterIndexes, "key") + case utils.ResourceFilterIndexes: + result, err = ms.getField3(sctx, colRFI, utils.ResourceFilterIndexes, "key") + case utils.StatFilterIndexes: + result, err = ms.getField3(sctx, colRFI, utils.StatFilterIndexes, "key") + case utils.ThresholdFilterIndexes: + result, err = ms.getField3(sctx, colRFI, utils.ThresholdFilterIndexes, "key") + case utils.SupplierFilterIndexes: + result, err = ms.getField3(sctx, colRFI, utils.SupplierFilterIndexes, "key") + case utils.ChargerFilterIndexes: + result, err = ms.getField3(sctx, colRFI, utils.ChargerFilterIndexes, "key") + case utils.DispatcherFilterIndexes: + result, err = ms.getField3(sctx, colRFI, utils.DispatcherFilterIndexes, "key") default: err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix) } @@ -1662,9 +1698,6 @@ func (ms *MongoStorage) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType st if len(strings.Split(itemIDPrefix, ":")) == 2 { indexKey = utils.ConcatenatedKey(keys[2], keys[3], keys[4]) } - if _, hasIt := indexes[indexKey]; !hasIt { - indexes[indexKey] = make(utils.StringMap) - } indexes[indexKey] = utils.StringMapFromSlice(res.Value) } if len(indexes) == 0 { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index b43f2322c..fefc14ab5 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -326,12 +326,31 @@ func (rs *RedisStorage) RemoveReverseForPrefix(prefix string) (err error) { return nil } +func (rs *RedisStorage) getKeysForFilterIndexesKeys(fkeys []string) (keys []string, err error) { + for _, itemIDPrefix := range fkeys { + mp := make(map[string]string) + mp, err = rs.Cmd("HGETALL", itemIDPrefix).Map() + if err != nil { + return + } else if len(mp) == 0 { + return nil, utils.ErrNotFound + } + for k := range mp { + keys = append(keys, utils.ConcatenatedKey(itemIDPrefix, k)) + } + } + return +} + func (rs *RedisStorage) GetKeysForPrefix(prefix string) ([]string, error) { r := rs.Cmd("KEYS", prefix+"*") if r.Err != nil { return nil, r.Err } if keys, _ := r.List(); len(keys) != 0 { + if filterIndexesPrefixMap.HasKey(prefix) { + return rs.getKeysForFilterIndexesKeys(keys) + } return keys, nil } return nil, nil @@ -1137,9 +1156,6 @@ func (rs *RedisStorage) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType st if err = rs.ms.Unmarshal([]byte(v), &sm); err != nil { return } - if _, hasKey := indexes[k]; !hasKey { - indexes[k] = make(utils.StringMap) - } indexes[k] = sm } return