mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-24 16:48:45 +05:00
Updated precache with filter_indexes
This commit is contained in:
committed by
Dan Christian Bogos
parent
781b24a482
commit
98df30cd36
@@ -33,30 +33,37 @@ func init() {
|
||||
InitCache(nil)
|
||||
}
|
||||
|
||||
var precachedPartitions = []string{
|
||||
utils.CacheDestinations,
|
||||
utils.CacheReverseDestinations,
|
||||
utils.CacheRatingPlans,
|
||||
utils.CacheRatingProfiles,
|
||||
utils.CacheActions,
|
||||
utils.CacheActionPlans,
|
||||
utils.CacheAccountActionPlans,
|
||||
utils.CacheActionTriggers,
|
||||
utils.CacheSharedGroups,
|
||||
utils.CacheResourceProfiles,
|
||||
utils.CacheResources,
|
||||
utils.CacheEventResources,
|
||||
utils.CacheTimings,
|
||||
utils.CacheStatQueueProfiles,
|
||||
utils.CacheStatQueues,
|
||||
utils.CacheThresholdProfiles,
|
||||
utils.CacheThresholds,
|
||||
utils.CacheFilters,
|
||||
utils.CacheSupplierProfiles,
|
||||
utils.CacheAttributeProfiles,
|
||||
utils.CacheChargerProfiles,
|
||||
utils.CacheDispatcherProfiles,
|
||||
utils.CacheDiameterMessages,
|
||||
var precachedPartitions = utils.StringMap{
|
||||
utils.CacheDestinations: true,
|
||||
utils.CacheReverseDestinations: true,
|
||||
utils.CacheRatingPlans: true,
|
||||
utils.CacheRatingProfiles: true,
|
||||
utils.CacheActions: true,
|
||||
utils.CacheActionPlans: true,
|
||||
utils.CacheAccountActionPlans: true,
|
||||
utils.CacheActionTriggers: true,
|
||||
utils.CacheSharedGroups: true,
|
||||
utils.CacheResourceProfiles: true,
|
||||
utils.CacheResources: true,
|
||||
utils.CacheEventResources: true,
|
||||
utils.CacheTimings: true,
|
||||
utils.CacheStatQueueProfiles: true,
|
||||
utils.CacheStatQueues: true,
|
||||
utils.CacheThresholdProfiles: true,
|
||||
utils.CacheThresholds: true,
|
||||
utils.CacheFilters: true,
|
||||
utils.CacheSupplierProfiles: true,
|
||||
utils.CacheAttributeProfiles: true,
|
||||
utils.CacheChargerProfiles: true,
|
||||
utils.CacheDispatcherProfiles: true,
|
||||
utils.CacheDiameterMessages: true,
|
||||
utils.CacheAttributeFilterIndexes: true,
|
||||
utils.CacheResourceFilterIndexes: true,
|
||||
utils.CacheStatFilterIndexes: true,
|
||||
utils.CacheThresholdFilterIndexes: true,
|
||||
utils.CacheSupplierFilterIndexes: true,
|
||||
utils.CacheChargerFilterIndexes: true,
|
||||
utils.CacheDispatcherFilterIndexes: true,
|
||||
}
|
||||
|
||||
// InitCache will instantiate the cache with specific or default configuraiton
|
||||
@@ -73,7 +80,7 @@ func NewCacheS(cfg *config.CGRConfig, dm *DataManager) (c *CacheS) {
|
||||
c = &CacheS{cfg: cfg, dm: dm,
|
||||
pcItems: make(map[string]chan struct{})}
|
||||
for cacheID := range cfg.CacheCfg() {
|
||||
if !utils.IsSliceMember(precachedPartitions, cacheID) {
|
||||
if !precachedPartitions.HasKey(cacheID) {
|
||||
continue
|
||||
}
|
||||
c.pcItems[cacheID] = make(chan struct{})
|
||||
@@ -96,7 +103,7 @@ func (chS *CacheS) GetPrecacheChannel(chID string) chan struct{} {
|
||||
// Precache loads data from DataDB into cache at engine start
|
||||
func (chS *CacheS) Precache() (err error) {
|
||||
for cacheID, cacheCfg := range chS.cfg.CacheCfg() {
|
||||
if !utils.IsSliceMember(precachedPartitions, cacheID) {
|
||||
if !precachedPartitions.HasKey(cacheID) {
|
||||
continue
|
||||
}
|
||||
if cacheCfg.Precache {
|
||||
@@ -170,7 +177,7 @@ func (chS *CacheS) V1GetCacheStats(cacheIDs []string,
|
||||
|
||||
func (chS *CacheS) V1PrecacheStatus(cacheIDs []string, rply *map[string]string) (err error) {
|
||||
if len(cacheIDs) == 0 {
|
||||
for _, cacheID := range precachedPartitions {
|
||||
for cacheID := range precachedPartitions {
|
||||
cacheIDs = append(cacheIDs, cacheID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,9 +24,72 @@ import (
|
||||
"github.com/cgrates/ltcache"
|
||||
)
|
||||
|
||||
var (
|
||||
filterIndexesPrefixMap = utils.StringMap{
|
||||
utils.AttributeFilterIndexes: true,
|
||||
utils.ResourceFilterIndexes: true,
|
||||
utils.StatFilterIndexes: true,
|
||||
utils.ThresholdFilterIndexes: true,
|
||||
utils.SupplierFilterIndexes: true,
|
||||
utils.ChargerFilterIndexes: true,
|
||||
utils.DispatcherFilterIndexes: true,
|
||||
}
|
||||
loadCachePrefixMap = utils.StringMap{
|
||||
utils.DESTINATION_PREFIX: true,
|
||||
utils.REVERSE_DESTINATION_PREFIX: true,
|
||||
utils.RATING_PLAN_PREFIX: true,
|
||||
utils.RATING_PROFILE_PREFIX: true,
|
||||
utils.ACTION_PREFIX: true,
|
||||
utils.ACTION_PLAN_PREFIX: true,
|
||||
utils.ACTION_TRIGGER_PREFIX: true,
|
||||
utils.SHARED_GROUP_PREFIX: true,
|
||||
utils.StatQueuePrefix: true,
|
||||
utils.StatQueueProfilePrefix: true,
|
||||
utils.ThresholdPrefix: true,
|
||||
utils.ThresholdProfilePrefix: true,
|
||||
utils.FilterPrefix: true,
|
||||
utils.SupplierProfilePrefix: true,
|
||||
utils.AttributeProfilePrefix: true,
|
||||
utils.ChargerProfilePrefix: true,
|
||||
utils.DispatcherProfilePrefix: true,
|
||||
}
|
||||
cachePrefixMap = utils.StringMap{
|
||||
utils.DESTINATION_PREFIX: true,
|
||||
utils.REVERSE_DESTINATION_PREFIX: true,
|
||||
utils.RATING_PLAN_PREFIX: true,
|
||||
utils.RATING_PROFILE_PREFIX: true,
|
||||
utils.ACTION_PREFIX: true,
|
||||
utils.ACTION_PLAN_PREFIX: true,
|
||||
utils.AccountActionPlansPrefix: true,
|
||||
utils.ACTION_TRIGGER_PREFIX: true,
|
||||
utils.SHARED_GROUP_PREFIX: true,
|
||||
utils.ResourceProfilesPrefix: true,
|
||||
utils.TimingsPrefix: true,
|
||||
utils.ResourcesPrefix: true,
|
||||
utils.StatQueuePrefix: true,
|
||||
utils.StatQueueProfilePrefix: true,
|
||||
utils.ThresholdPrefix: true,
|
||||
utils.ThresholdProfilePrefix: true,
|
||||
utils.FilterPrefix: true,
|
||||
utils.SupplierProfilePrefix: true,
|
||||
utils.AttributeProfilePrefix: true,
|
||||
utils.ChargerProfilePrefix: true,
|
||||
utils.DispatcherProfilePrefix: true,
|
||||
utils.AttributeFilterIndexes: true,
|
||||
utils.ResourceFilterIndexes: true,
|
||||
utils.StatFilterIndexes: true,
|
||||
utils.ThresholdFilterIndexes: true,
|
||||
utils.SupplierFilterIndexes: true,
|
||||
utils.ChargerFilterIndexes: true,
|
||||
utils.DispatcherFilterIndexes: true,
|
||||
}
|
||||
)
|
||||
|
||||
func NewDataManager(dataDB DataDB) *DataManager {
|
||||
return &DataManager{dataDB: dataDB,
|
||||
cacheCfg: config.CgrConfig().CacheCfg()}
|
||||
return &DataManager{
|
||||
dataDB: dataDB,
|
||||
cacheCfg: config.CgrConfig().CacheCfg(),
|
||||
}
|
||||
}
|
||||
|
||||
// DataManager is the data storage manager for CGRateS
|
||||
@@ -53,13 +116,7 @@ func (dm *DataManager) LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs,
|
||||
}
|
||||
for k, cacheCfg := range dm.cacheCfg {
|
||||
k = utils.CacheInstanceToPrefix[k] // alias into prefixes understood by storage
|
||||
if utils.IsSliceMember([]string{utils.DESTINATION_PREFIX, utils.REVERSE_DESTINATION_PREFIX,
|
||||
utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX,
|
||||
utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACTION_TRIGGER_PREFIX,
|
||||
utils.SHARED_GROUP_PREFIX, utils.StatQueuePrefix,
|
||||
utils.StatQueueProfilePrefix, utils.ThresholdPrefix, utils.ThresholdProfilePrefix,
|
||||
utils.FilterPrefix, utils.SupplierProfilePrefix,
|
||||
utils.AttributeProfilePrefix, utils.ChargerProfilePrefix, utils.DispatcherProfilePrefix}, k) && cacheCfg.Precache {
|
||||
if loadCachePrefixMap.HasKey(k) && cacheCfg.Precache {
|
||||
if err := dm.PreloadCacheForPrefix(k); err != nil && err != utils.ErrInvalidKey {
|
||||
return err
|
||||
}
|
||||
@@ -124,28 +181,7 @@ func (dm *DataManager) PreloadCacheForPrefix(prefix string) error {
|
||||
}
|
||||
|
||||
func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached bool) (err error) {
|
||||
if !utils.IsSliceMember([]string{
|
||||
utils.DESTINATION_PREFIX,
|
||||
utils.REVERSE_DESTINATION_PREFIX,
|
||||
utils.RATING_PLAN_PREFIX,
|
||||
utils.RATING_PROFILE_PREFIX,
|
||||
utils.ACTION_PREFIX,
|
||||
utils.ACTION_PLAN_PREFIX,
|
||||
utils.AccountActionPlansPrefix,
|
||||
utils.ACTION_TRIGGER_PREFIX,
|
||||
utils.SHARED_GROUP_PREFIX,
|
||||
utils.ResourceProfilesPrefix,
|
||||
utils.TimingsPrefix,
|
||||
utils.ResourcesPrefix,
|
||||
utils.StatQueuePrefix,
|
||||
utils.StatQueueProfilePrefix,
|
||||
utils.ThresholdPrefix,
|
||||
utils.ThresholdProfilePrefix,
|
||||
utils.FilterPrefix,
|
||||
utils.SupplierProfilePrefix,
|
||||
utils.AttributeProfilePrefix,
|
||||
utils.ChargerProfilePrefix,
|
||||
utils.DispatcherProfilePrefix}, prfx) {
|
||||
if !cachePrefixMap.HasKey(prfx) {
|
||||
return utils.NewCGRError(utils.DataManager,
|
||||
utils.MandatoryIEMissingCaps,
|
||||
utils.UnsupportedCachePrefix,
|
||||
@@ -235,6 +271,20 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b
|
||||
case utils.DispatcherProfilePrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = dm.GetDispatcherProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
case utils.AttributeFilterIndexes:
|
||||
err = dm.MatchFilterIndexFromKey(utils.CacheAttributeFilterIndexes, dataID)
|
||||
case utils.ResourceFilterIndexes:
|
||||
err = dm.MatchFilterIndexFromKey(utils.CacheResourceFilterIndexes, dataID)
|
||||
case utils.StatFilterIndexes:
|
||||
err = dm.MatchFilterIndexFromKey(utils.CacheStatFilterIndexes, dataID)
|
||||
case utils.ThresholdFilterIndexes:
|
||||
err = dm.MatchFilterIndexFromKey(utils.CacheThresholdFilterIndexes, dataID)
|
||||
case utils.SupplierFilterIndexes:
|
||||
err = dm.MatchFilterIndexFromKey(utils.CacheSupplierFilterIndexes, dataID)
|
||||
case utils.ChargerFilterIndexes:
|
||||
err = dm.MatchFilterIndexFromKey(utils.CacheChargerFilterIndexes, dataID)
|
||||
case utils.DispatcherFilterIndexes:
|
||||
err = dm.MatchFilterIndexFromKey(utils.CacheDispatcherFilterIndexes, dataID)
|
||||
}
|
||||
if err != nil {
|
||||
return utils.NewCGRError(utils.DataManager,
|
||||
@@ -937,6 +987,19 @@ func (dm *DataManager) RemoveFilterIndexes(cacheID, itemIDPrefix string) (err er
|
||||
return dm.DataDB().RemoveFilterIndexesDrv(cacheID, itemIDPrefix)
|
||||
}
|
||||
|
||||
func (dm *DataManager) MatchFilterIndexFromKey(cacheID, key string) (err error) {
|
||||
splt := utils.SplitConcatenatedKey(key) // prefix:filterType:fieldName:fieldVal
|
||||
lsplt := len(splt)
|
||||
if lsplt < 4 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
fieldVal := splt[lsplt-1]
|
||||
fieldName := splt[lsplt-2]
|
||||
filterType := splt[lsplt-3]
|
||||
itemIDPrefix := utils.ConcatenatedKey(splt[:lsplt-3]...) // prefix may contain context/subsystems
|
||||
_, err = dm.MatchFilterIndex(cacheID, itemIDPrefix, filterType, fieldName, fieldVal)
|
||||
return
|
||||
}
|
||||
func (dm *DataManager) MatchFilterIndex(cacheID, itemIDPrefix,
|
||||
filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) {
|
||||
fieldValKey := utils.ConcatenatedKey(itemIDPrefix, filterType, fieldName, fieldVal)
|
||||
|
||||
@@ -511,6 +511,7 @@ func (ms *MongoStorage) getField(sctx mongo.SessionContext, col, prefix, subject
|
||||
}
|
||||
return result, iter.Close(sctx)
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) getField2(sctx mongo.SessionContext, col, prefix, subject string, tntID *utils.TenantID) (result []string, err error) {
|
||||
idResult := struct{ Tenant, Id string }{}
|
||||
elem := bson.M{}
|
||||
@@ -536,6 +537,27 @@ func (ms *MongoStorage) getField2(sctx mongo.SessionContext, col, prefix, subjec
|
||||
return result, iter.Close(sctx)
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) getField3(sctx mongo.SessionContext, col, prefix, field string) (result []string, err error) {
|
||||
fieldResult := bson.D{}
|
||||
iter, err := ms.getCol(col).Find(sctx,
|
||||
bson.M{field: bsonx.Regex(fmt.Sprintf("^%s", prefix), "")},
|
||||
options.Find().SetProjection(
|
||||
bson.M{field: 1},
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for iter.Next(sctx) {
|
||||
err = iter.Decode(&fieldResult)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
result = append(result, fieldResult.Map()[field].(string))
|
||||
}
|
||||
return result, iter.Close(sctx)
|
||||
}
|
||||
|
||||
// GetKeysForPrefix implementation
|
||||
func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err error) {
|
||||
var category, subject string
|
||||
@@ -592,6 +614,20 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er
|
||||
result, err = ms.getField2(sctx, colCpp, utils.ChargerProfilePrefix, subject, tntID)
|
||||
case utils.DispatcherProfilePrefix:
|
||||
result, err = ms.getField2(sctx, colDpp, utils.DispatcherProfilePrefix, subject, tntID)
|
||||
case utils.AttributeFilterIndexes:
|
||||
result, err = ms.getField3(sctx, colRFI, utils.AttributeFilterIndexes, "key")
|
||||
case utils.ResourceFilterIndexes:
|
||||
result, err = ms.getField3(sctx, colRFI, utils.ResourceFilterIndexes, "key")
|
||||
case utils.StatFilterIndexes:
|
||||
result, err = ms.getField3(sctx, colRFI, utils.StatFilterIndexes, "key")
|
||||
case utils.ThresholdFilterIndexes:
|
||||
result, err = ms.getField3(sctx, colRFI, utils.ThresholdFilterIndexes, "key")
|
||||
case utils.SupplierFilterIndexes:
|
||||
result, err = ms.getField3(sctx, colRFI, utils.SupplierFilterIndexes, "key")
|
||||
case utils.ChargerFilterIndexes:
|
||||
result, err = ms.getField3(sctx, colRFI, utils.ChargerFilterIndexes, "key")
|
||||
case utils.DispatcherFilterIndexes:
|
||||
result, err = ms.getField3(sctx, colRFI, utils.DispatcherFilterIndexes, "key")
|
||||
default:
|
||||
err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix)
|
||||
}
|
||||
@@ -1662,9 +1698,6 @@ func (ms *MongoStorage) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType st
|
||||
if len(strings.Split(itemIDPrefix, ":")) == 2 {
|
||||
indexKey = utils.ConcatenatedKey(keys[2], keys[3], keys[4])
|
||||
}
|
||||
if _, hasIt := indexes[indexKey]; !hasIt {
|
||||
indexes[indexKey] = make(utils.StringMap)
|
||||
}
|
||||
indexes[indexKey] = utils.StringMapFromSlice(res.Value)
|
||||
}
|
||||
if len(indexes) == 0 {
|
||||
|
||||
@@ -326,12 +326,31 @@ func (rs *RedisStorage) RemoveReverseForPrefix(prefix string) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) getKeysForFilterIndexesKeys(fkeys []string) (keys []string, err error) {
|
||||
for _, itemIDPrefix := range fkeys {
|
||||
mp := make(map[string]string)
|
||||
mp, err = rs.Cmd("HGETALL", itemIDPrefix).Map()
|
||||
if err != nil {
|
||||
return
|
||||
} else if len(mp) == 0 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
for k := range mp {
|
||||
keys = append(keys, utils.ConcatenatedKey(itemIDPrefix, k))
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetKeysForPrefix(prefix string) ([]string, error) {
|
||||
r := rs.Cmd("KEYS", prefix+"*")
|
||||
if r.Err != nil {
|
||||
return nil, r.Err
|
||||
}
|
||||
if keys, _ := r.List(); len(keys) != 0 {
|
||||
if filterIndexesPrefixMap.HasKey(prefix) {
|
||||
return rs.getKeysForFilterIndexesKeys(keys)
|
||||
}
|
||||
return keys, nil
|
||||
}
|
||||
return nil, nil
|
||||
@@ -1137,9 +1156,6 @@ func (rs *RedisStorage) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType st
|
||||
if err = rs.ms.Unmarshal([]byte(v), &sm); err != nil {
|
||||
return
|
||||
}
|
||||
if _, hasKey := indexes[k]; !hasKey {
|
||||
indexes[k] = make(utils.StringMap)
|
||||
}
|
||||
indexes[k] = sm
|
||||
}
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user