This commit is contained in:
TeoV
2017-09-07 10:05:43 -04:00
8 changed files with 29 additions and 42 deletions

View File

@@ -539,7 +539,7 @@ func startResourceService(internalRsChan, internalStatSConn chan rpcclient.RpcCl
return
}
}
rS, err := engine.NewResourceService(dataDB, cfg.ResourceSCfg().ShortCache, cfg.ResourceSCfg().StoreInterval, statsConn)
rS, err := engine.NewResourceService(dataDB, cfg.ResourceSCfg().StoreInterval, statsConn)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<ResourceS> Could not init, error: %s", err.Error()))
exitChan <- true

View File

@@ -65,10 +65,11 @@ const CGRATES_CFG_JSON = `
"reverse_aliases": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // reverse aliases index caching
"derived_chargers": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // derived charging rule caching
"resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control resource profiles caching
"resources": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control resources caching
"resources": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control resources caching
"event_resources": {"limit": -1, "ttl": "1m", "static_ttl": false}, // matching resources to events
"timings": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // timings caching
"stats_queues": {"limit": -1, "ttl": "5m", "static_ttl": false, "precache": false}, // queues with metrics
"stats_event_queues": {"limit": -1, "ttl": "5m", "static_ttl": false, "precache": false}, // matching queues to events
"stats_queues": {"limit": -1, "ttl": "1m", "static_ttl": false, "precache": false}, // queues with metrics
"event_queues": {"limit": -1, "ttl": "1m", "static_ttl": false}, // matching queues to events
},
@@ -411,7 +412,6 @@ const CGRATES_CFG_JSON = `
"enabled": false, // starts ResourceLimiter service: <true|false>.
"stats_conns": [], // address where to reach the stats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
"store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur>
"short_cache": {"limit": -1, "ttl": "1m", "static_ttl": false}, // short cache for data like resources for events in case of allow queries
},

View File

@@ -115,15 +115,16 @@ func TestCacheJsonCfg(t *testing.T) {
utils.CacheResources: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
utils.CacheEventResources: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer("1m"), Static_ttl: utils.BoolPointer(false)},
utils.CacheTimings: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
utils.CacheStatSQueues: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer("5m"), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
utils.CacheStatSEventQueues: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer("5m"), Static_ttl: utils.BoolPointer(false),
Ttl: utils.StringPointer("1m"), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
utils.CacheEventQueues: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer("1m"), Static_ttl: utils.BoolPointer(false)},
}
if gCfg, err := dfCgrJsonCfg.CacheJsonCfg(); err != nil {
@@ -679,15 +680,11 @@ func TestDfResourceLimiterSJsonCfg(t *testing.T) {
Enabled: utils.BoolPointer(false),
Stats_conns: &[]*HaPoolJsonCfg{},
Store_interval: utils.StringPointer(""),
Short_cache: &CacheParamJsonCfg{
Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer("1m"),
Static_ttl: utils.BoolPointer(false)},
}
if cfg, err := dfCgrJsonCfg.ResourceSJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, cfg) {
t.Errorf("Received: %s", utils.ToIJSON(cfg))
t.Errorf("Expected: %s, received: %s", utils.ToJSON(eCfg), utils.ToJSON(cfg))
}
}

View File

@@ -439,12 +439,14 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) {
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheResources: &CacheParamConfig{Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheEventResources: &CacheParamConfig{Limit: -1,
TTL: time.Duration(1 * time.Minute), StaticTTL: false},
utils.CacheTimings: &CacheParamConfig{Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheStatSQueues: &CacheParamConfig{Limit: -1,
TTL: time.Duration(5 * time.Minute), StaticTTL: false, Precache: false},
utils.CacheStatSEventQueues: &CacheParamConfig{Limit: -1,
TTL: time.Duration(5 * time.Minute), StaticTTL: false, Precache: false},
TTL: time.Duration(1 * time.Minute), StaticTTL: false, Precache: false},
utils.CacheEventQueues: &CacheParamConfig{Limit: -1,
TTL: time.Duration(1 * time.Minute), StaticTTL: false},
}
if !reflect.DeepEqual(eCacheCfg, cgrCfg.CacheConfig) {
t.Errorf("received: %s, \nexpecting: %s", utils.ToJSON(eCacheCfg), utils.ToJSON(cgrCfg.CacheConfig))
@@ -565,12 +567,9 @@ func TestCgrCfgJSONDefaultsResLimCfg(t *testing.T) {
Enabled: false,
StatSConns: []*HaPoolConfig{},
StoreInterval: 0,
ShortCache: &CacheParamConfig{Limit: -1,
TTL: time.Duration(1 * time.Minute), StaticTTL: false},
}
if !reflect.DeepEqual(cgrCfg.resourceSCfg, eResLiCfg) {
t.Errorf("expecting: %s, received: %s", utils.ToIJSON(eResLiCfg), utils.ToIJSON(cgrCfg.resourceSCfg))
t.Errorf("expecting: %s, received: %s", utils.ToJSON(eResLiCfg), utils.ToJSON(cgrCfg.resourceSCfg))
}
}

View File

@@ -383,7 +383,6 @@ type ResourceSJsonCfg struct {
Enabled *bool
Stats_conns *[]*HaPoolJsonCfg
Store_interval *string
Short_cache *CacheParamJsonCfg
}
// Stat service config section

View File

@@ -27,7 +27,6 @@ type ResourceSConfig struct {
Enabled bool
StatSConns []*HaPoolConfig // Connections towards StatS
StoreInterval time.Duration // Dump regularly from cache into dataDB
ShortCache *CacheParamConfig
}
func (rlcfg *ResourceSConfig) loadFromJsonCfg(jsnCfg *ResourceSJsonCfg) (err error) {
@@ -49,11 +48,5 @@ func (rlcfg *ResourceSConfig) loadFromJsonCfg(jsnCfg *ResourceSJsonCfg) (err err
return
}
}
if jsnCfg.Short_cache != nil {
rlcfg.ShortCache = new(CacheParamConfig)
if err = rlcfg.ShortCache.loadFromJsonCfg(jsnCfg.Short_cache); err != nil {
return
}
}
return nil
}

View File

@@ -29,7 +29,6 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/ltcache"
"github.com/cgrates/rpcclient"
)
@@ -220,13 +219,12 @@ func (rs Resources) AllocateResource(ru *ResourceUsage, dryRun bool) (alcMessage
}
// Pas the config as a whole so we can ask access concurrently
func NewResourceService(dataDB DataDB, shortCache *config.CacheParamConfig, storeInterval time.Duration,
func NewResourceService(dataDB DataDB, storeInterval time.Duration,
statS rpcclient.RpcClientConnection) (*ResourceService, error) {
if statS != nil && reflect.ValueOf(statS).IsNil() {
statS = nil
}
return &ResourceService{dataDB: dataDB, statS: statS,
scEventResources: ltcache.New(shortCache.Limit, shortCache.TTL, shortCache.StaticTTL, nil),
lcEventResources: make(map[string][]string),
storedResources: make(utils.StringMap),
storeInterval: storeInterval, stopBackup: make(chan struct{})}, nil
@@ -236,7 +234,6 @@ func NewResourceService(dataDB DataDB, shortCache *config.CacheParamConfig, stor
type ResourceService struct {
dataDB DataDB // So we can load the data in cache and index it
statS rpcclient.RpcClientConnection // allows applying filters based on stats
scEventResources *ltcache.Cache // short cache map[ruID], used to keep references to matched resources for events in allow queries
lcEventResources map[string][]string // cache recording resources for events in alocation phase
lcERMux sync.RWMutex // protects the lcEventResources
storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool
@@ -331,7 +328,7 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources)
rIDs, has := rS.lcEventResources[evUUID]
rS.lcERMux.RUnlock()
if !has {
if rIDsIf, has := rS.scEventResources.Get(evUUID); !has {
if rIDsIf, has := cache.Get(utils.EventResourcesPrefix + evUUID); !has {
return nil
} else if rIDsIf != nil {
rIDs = rIDsIf.([]string)
@@ -342,7 +339,6 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources)
if len(rIDs) == 0 {
return
}
lockIDs := utils.PrefixSliceItems(rIDs, utils.ResourcesPrefix)
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
defer guardian.Guardian.UnguardIDs(lockIDs...)
@@ -353,7 +349,7 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources)
evUUID, err.Error()))
// on errors, cleanup cache so we recache
if shortCached {
rS.scEventResources.Remove(evUUID)
cache.RemKey(utils.EventResourcesPrefix+evUUID, true, "")
} else {
rS.lcERMux.Lock()
delete(rS.lcEventResources, evUUID)
@@ -450,13 +446,13 @@ func (rS *ResourceService) V1AllowUsage(args utils.AttrRLsResourceUsage, allow *
if mtcRLs, err = rS.matchingResourcesForEvent(args.Event); err != nil {
return err
}
rS.scEventResources.Set(args.UsageID, mtcRLs.ids())
cache.Set(utils.EventResourcesPrefix+args.UsageID, mtcRLs.ids(), true, "")
}
if _, err = mtcRLs.AllocateResource(
&ResourceUsage{ID: args.UsageID,
Units: args.Units}, true); err != nil {
if err == utils.ErrResourceUnavailable {
rS.scEventResources.Set(args.UsageID, nil)
cache.Set(utils.EventResourcesPrefix+args.UsageID, nil, true, "")
err = nil
return // not error but still not allowed
}
@@ -485,10 +481,10 @@ func (rS *ResourceService) V1AllocateResource(args utils.AttrRLsResourceUsage, r
// index it for matching out of cache
var wasShortCached bool
if wasCached {
if _, has := rS.scEventResources.Get(args.UsageID); has {
if _, has := cache.Get(utils.EventResourcesPrefix + args.UsageID); has {
// remove from short cache to populate event cache
wasShortCached = true
rS.scEventResources.Remove(args.UsageID)
cache.RemKey(utils.EventResourcesPrefix+args.UsageID, true, "")
}
}
if wasShortCached || !wasCached {

View File

@@ -56,9 +56,10 @@ var (
CacheDerivedChargers: DERIVEDCHARGERS_PREFIX,
CacheResourceProfiles: ResourceProfilesPrefix,
CacheResources: ResourcesPrefix,
CacheEventResources: EventResourcesPrefix,
CacheTimings: TimingsPrefix,
CacheStatSQueues: META_NONE,
CacheStatSEventQueues: META_NONE,
CacheEventQueues: META_NONE,
}
CachePrefixToInstance map[string]string // will be built on init
)
@@ -436,7 +437,9 @@ const (
ExtraInfo = "ExtraInfo"
MetaPrefix = "*"
CacheStatSQueues = "stats_queues"
CacheStatSEventQueues = "stats_event_queues"
CacheEventQueues = "event_queues"
CacheEventResources = "event_resources"
EventResourcesPrefix = "ers_"
)
func buildCacheInstRevPrefixes() {