StatsInstance -> StatQueue, config cache stats_queues and stats_event_queues in defaults

This commit is contained in:
DanB
2017-09-06 19:07:32 +02:00
parent dd8afa2486
commit e1e7b5e13c
7 changed files with 75 additions and 59 deletions

View File

@@ -50,22 +50,24 @@ const CGRATES_CFG_JSON = `
"cache":{
"destinations": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control destination caching
"reverse_destinations": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control reverse destinations index caching
"rating_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control rating plans caching
"rating_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control rating profiles caching
"lcr_rules": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control lcr rules caching
"cdr_stats": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control cdr stats queues caching
"actions": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control actions caching
"action_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control action plans caching
"account_action_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control account action plans index caching
"action_triggers": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control action triggers caching
"shared_groups": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control shared groups caching
"aliases": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control aliases caching
"reverse_aliases": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control reverse aliases index caching
"derived_chargers": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control derived charging rule caching
"resource_configs": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control resource configs caching
"timings": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control timings caching
"destinations": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // destination caching
"reverse_destinations": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // reverse destinations index caching
"rating_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // rating plans caching
"rating_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // rating profiles caching
"lcr_rules": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // lcr rules caching
"cdr_stats": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // cdr stats queues caching
"actions": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // actions caching
"action_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // action plans caching
"account_action_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // account action plans index caching
"action_triggers": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // action triggers caching
"shared_groups": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // shared groups caching
"aliases": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // aliases caching
"reverse_aliases": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // reverse aliases index caching
"derived_chargers": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // derived charging rule caching
"resource_configs": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // resource configs caching
"timings": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // timings caching
"stats_queues": {"limit": -1, "ttl": "5m", "static_ttl": false, "precache": false}, // queues with metrics
"stats_event_queues": {"limit": -1, "ttl": "5m", "static_ttl": false, "precache": false}, // matching queues to events
},

View File

@@ -115,6 +115,12 @@ func TestCacheJsonCfg(t *testing.T) {
utils.CacheTimings: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
utils.CacheStatSQueues: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer("5m"), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
utils.CacheStatSEventQueues: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer("5m"), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
}
if gCfg, err := dfCgrJsonCfg.CacheJsonCfg(); err != nil {

View File

@@ -439,9 +439,13 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) {
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheTimings: &CacheParamConfig{Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheStatSQueues: &CacheParamConfig{Limit: -1,
TTL: time.Duration(5 * time.Minute), StaticTTL: false, Precache: false},
utils.CacheStatSEventQueues: &CacheParamConfig{Limit: -1,
TTL: time.Duration(5 * time.Minute), StaticTTL: false, Precache: false},
}
if !reflect.DeepEqual(eCacheCfg, cgrCfg.CacheConfig) {
t.Errorf("received: %s, \nexpecting: %s", utils.ToIJSON(eCacheCfg), utils.ToIJSON(cgrCfg.CacheConfig))
t.Errorf("received: %s, \nexpecting: %s", utils.ToJSON(eCacheCfg), utils.ToJSON(cgrCfg.CacheConfig))
}
}

View File

@@ -27,16 +27,16 @@ import (
"github.com/cgrates/cgrates/utils"
)
// StatsInstances is a sortable list of StatsInstance
type StatsInstances []*StatsInstance
// StatQueues is a sortable list of StatQueue
type StatQueues []*StatQueue
// Sort is part of sort interface, sort based on Weight
func (sis StatsInstances) Sort() {
func (sis StatQueues) Sort() {
sort.Slice(sis, func(i, j int) bool { return sis[i].cfg.Weight > sis[j].cfg.Weight })
}
// remWithID removes the queue with ID from slice
func (sis StatsInstances) remWithID(qID string) {
func (sis StatQueues) remWithID(qID string) {
for i, q := range sis {
if q.cfg.ID == qID {
copy(sis[i:], sis[i+1:])
@@ -47,10 +47,10 @@ func (sis StatsInstances) remWithID(qID string) {
}
}
// NewStatsInstance instantiates a StatsInstance
func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler,
sqCfg *engine.StatsConfig, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) {
si = &StatsInstance{sec: sec, ms: ms, cfg: sqCfg, sqMetrics: make(map[string]StatsMetric)}
// NewStatQueue instantiates a StatQueue
func NewStatQueue(sec *StatsEventCache, ms engine.Marshaler,
sqCfg *engine.StatsConfig, sqSM *engine.SQStoredMetrics) (si *StatQueue, err error) {
si = &StatQueue{sec: sec, ms: ms, cfg: sqCfg, sqMetrics: make(map[string]StatsMetric)}
for _, metricID := range sqCfg.Metrics {
if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil {
return
@@ -77,8 +77,8 @@ func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler,
return
}
// StatsInstance represents an individual stats instance
type StatsInstance struct {
// StatQueue represents an individual stats instance
type StatQueue struct {
sync.RWMutex
dirty bool // needs save
sec *StatsEventCache
@@ -89,7 +89,7 @@ type StatsInstance struct {
}
// GetSQStoredMetrics retrieves the data used for store to DB
func (sq *StatsInstance) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) {
func (sq *StatQueue) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) {
sq.RLock()
defer sq.RUnlock()
sEvents := make(map[string]engine.StatsEvent)
@@ -97,7 +97,7 @@ func (sq *StatsInstance) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) {
for _, sqItem := range sq.sqItems { // make sure event is properly retrieved from cache
ev := sq.sec.GetEvent(sqItem.EventID)
if ev == nil {
utils.Logger.Warning(fmt.Sprintf("<StatsInstance> querying for storage eventID: %s, error: event not cached",
utils.Logger.Warning(fmt.Sprintf("<StatQueue> querying for storage eventID: %s, error: event not cached",
sqItem.EventID))
continue
}
@@ -111,7 +111,7 @@ func (sq *StatsInstance) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) {
for metricID, metric := range sq.sqMetrics {
var err error
if sqSM.SQMetrics[metricID], err = metric.GetMarshaled(sq.ms); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatsInstance> querying for storage metricID: %s, error: %s",
utils.Logger.Warning(fmt.Sprintf("<StatQueue> querying for storage metricID: %s, error: %s",
metricID, err.Error()))
continue
}
@@ -120,7 +120,7 @@ func (sq *StatsInstance) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) {
}
// ProcessEvent processes a StatsEvent, returns true if processed
func (sq *StatsInstance) ProcessEvent(ev engine.StatsEvent) (err error) {
func (sq *StatQueue) ProcessEvent(ev engine.StatsEvent) (err error) {
sq.Lock()
sq.remExpired()
sq.remOnQueueLength()
@@ -130,7 +130,7 @@ func (sq *StatsInstance) ProcessEvent(ev engine.StatsEvent) (err error) {
}
// remExpired expires items in queue
func (sq *StatsInstance) remExpired() {
func (sq *StatQueue) remExpired() {
var expIdx *int // index of last item to be expired
for i, item := range sq.sqItems {
if item.ExpiryTime == nil {
@@ -151,7 +151,7 @@ func (sq *StatsInstance) remExpired() {
}
// remOnQueueLength rems elements based on QueueLength setting
func (sq *StatsInstance) remOnQueueLength() {
func (sq *StatQueue) remOnQueueLength() {
if sq.cfg.QueueLength == 0 {
return
}
@@ -164,26 +164,26 @@ func (sq *StatsInstance) remOnQueueLength() {
}
// addStatsEvent computes metrics for an event
func (sq *StatsInstance) addStatsEvent(ev engine.StatsEvent) {
func (sq *StatQueue) addStatsEvent(ev engine.StatsEvent) {
evID := ev.ID()
for metricID, metric := range sq.sqMetrics {
if err := metric.AddEvent(ev); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatsInstance> metricID: %s, add eventID: %s, error: %s",
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, add eventID: %s, error: %s",
metricID, evID, err.Error()))
}
}
}
// remStatsEvent removes an event from metrics
func (sq *StatsInstance) remEventWithID(evID string) {
func (sq *StatQueue) remEventWithID(evID string) {
ev := sq.sec.GetEvent(evID)
if ev == nil {
utils.Logger.Warning(fmt.Sprintf("<StatsInstance> removing eventID: %s, error: event not cached", evID))
utils.Logger.Warning(fmt.Sprintf("<StatQueue> removing eventID: %s, error: event not cached", evID))
return
}
for metricID, metric := range sq.sqMetrics {
if err := metric.RemEvent(ev); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatsInstance> metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error()))
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error()))
}
}
}

View File

@@ -24,19 +24,19 @@ import (
"github.com/cgrates/cgrates/engine"
)
func TestStatsInstancesSort(t *testing.T) {
sInsts := StatsInstances{
&StatsInstance{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}},
&StatsInstance{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}},
&StatsInstance{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}},
&StatsInstance{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}},
func TestStatQueuesSort(t *testing.T) {
sInsts := StatQueues{
&StatQueue{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}},
&StatQueue{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}},
&StatQueue{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}},
&StatQueue{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}},
}
sInsts.Sort()
eSInst := StatsInstances{
&StatsInstance{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}},
&StatsInstance{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}},
&StatsInstance{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}},
&StatsInstance{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}},
eSInst := StatQueues{
&StatQueue{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}},
&StatQueue{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}},
&StatQueue{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}},
&StatQueue{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}},
}
if !reflect.DeepEqual(eSInst, sInsts) {
t.Errorf("expecting: %+v, received: %+v", eSInst, sInsts)

View File

@@ -43,8 +43,8 @@ func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval tim
if err != nil {
return nil, err
}
ss.queuesCache = make(map[string]*StatsInstance)
ss.queues = make(StatsInstances, 0)
ss.queuesCache = make(map[string]*StatQueue)
ss.queues = make(StatQueues, 0)
for _, prfx := range sqPrfxs {
if q, err := ss.loadQueue(prfx[len(utils.StatsConfigPrefix):]); err != nil {
utils.Logger.Err(fmt.Sprintf("<StatS> failed loading quueue with id: <%s>, err: <%s>",
@@ -66,9 +66,9 @@ type StatService struct {
ms engine.Marshaler
storeInterval time.Duration
stopStoring chan struct{}
evCache *StatsEventCache // so we can pass it to queues
queuesCache map[string]*StatsInstance // unordered db of StatsQueues, used for fast queries
queues StatsInstances // ordered list of StatsQueues
evCache *StatsEventCache // so we can pass it to queues
queuesCache map[string]*StatQueue // unordered db of StatQueues, used for fast queries
queues StatQueues // ordered list of StatQueues
}
@@ -91,7 +91,7 @@ func (ss *StatService) Shutdown() error {
// setQueue adds or modifies a queue into cache
// sort will reorder the ss.queues
func (ss *StatService) loadQueue(qID string) (q *StatsInstance, err error) {
func (ss *StatService) loadQueue(qID string) (q *StatQueue, err error) {
sq, err := ss.dataDB.GetStatsConfig(qID)
if err != nil {
return nil, err
@@ -102,16 +102,16 @@ func (ss *StatService) loadQueue(qID string) (q *StatsInstance, err error) {
return nil, err
}
}
return NewStatsInstance(ss.evCache, ss.ms, sq, sqSM)
return NewStatQueue(ss.evCache, ss.ms, sq, sqSM)
}
func (ss *StatService) setQueue(q *StatsInstance) {
func (ss *StatService) setQueue(q *StatQueue) {
ss.queuesCache[q.cfg.ID] = q
ss.queues = append(ss.queues, q)
}
// remQueue will remove a queue based on it's ID
func (ss *StatService) remQueue(qID string) (si *StatsInstance) {
func (ss *StatService) remQueue(qID string) (si *StatQueue) {
si = ss.queuesCache[qID]
ss.queues.remWithID(qID)
delete(ss.queuesCache, qID)
@@ -240,7 +240,7 @@ func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err err
if qIDs == nil || len(*qIDs) == 0 {
return utils.ErrNotFound
}
var sQs []*StatsInstance // cache here so we lock only later when data available
var sQs []*StatQueue // cache here so we lock only later when data available
for _, qID := range *qIDs {
if _, hasPrev := ss.queuesCache[qID]; hasPrev {
continue // don't overwrite previous, could be extended in the future by carefully checking cached events

View File

@@ -57,6 +57,8 @@ var (
CacheResourceConfigs: ResourceConfigsPrefix,
CacheResources: ResourcesPrefix,
CacheTimings: TimingsPrefix,
CacheStatSQueues: META_NONE,
CacheStatSEventQueues: META_NONE,
}
CachePrefixToInstance map[string]string // will be built on init
)
@@ -433,6 +435,8 @@ const (
CostSource = "CostSource"
ExtraInfo = "ExtraInfo"
MetaPrefix = "*"
CacheStatSQueues = "stats_queues"
CacheStatSEventQueues = "stats_event_queues"
)
func buildCacheInstRevPrefixes() {