diff --git a/config/config_defaults.go b/config/config_defaults.go index 3d6498d94..bb21aeeec 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -50,22 +50,24 @@ const CGRATES_CFG_JSON = ` "cache":{ - "destinations": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control destination caching - "reverse_destinations": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control reverse destinations index caching - "rating_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control rating plans caching - "rating_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control rating profiles caching - "lcr_rules": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control lcr rules caching - "cdr_stats": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control cdr stats queues caching - "actions": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control actions caching - "action_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control action plans caching - "account_action_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control account action plans index caching - "action_triggers": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control action triggers caching - "shared_groups": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control shared groups caching - "aliases": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control aliases caching - "reverse_aliases": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control reverse aliases index caching - "derived_chargers": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control derived charging rule caching - "resource_configs": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control resource configs caching - "timings": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control timings caching + "destinations": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // destination caching + "reverse_destinations": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // reverse destinations index caching + "rating_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // rating plans caching + "rating_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // rating profiles caching + "lcr_rules": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // lcr rules caching + "cdr_stats": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // cdr stats queues caching + "actions": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // actions caching + "action_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // action plans caching + "account_action_plans": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // account action plans index caching + "action_triggers": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // action triggers caching + "shared_groups": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // shared groups caching + "aliases": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // aliases caching + "reverse_aliases": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // reverse aliases index caching + "derived_chargers": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // derived charging rule caching + "resource_configs": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // resource configs caching + "timings": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // timings caching + "stats_queues": {"limit": -1, "ttl": "5m", "static_ttl": false, "precache": false}, // queues with metrics + "stats_event_queues": {"limit": -1, "ttl": "5m", "static_ttl": false, "precache": false}, // matching queues to events }, diff --git a/config/config_json_test.go b/config/config_json_test.go index d66132ec1..d1bcfa781 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -115,6 +115,12 @@ func TestCacheJsonCfg(t *testing.T) { utils.CacheTimings: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false), Precache: utils.BoolPointer(false)}, + utils.CacheStatSQueues: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), + Ttl: utils.StringPointer("5m"), Static_ttl: utils.BoolPointer(false), + Precache: utils.BoolPointer(false)}, + utils.CacheStatSEventQueues: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), + Ttl: utils.StringPointer("5m"), Static_ttl: utils.BoolPointer(false), + Precache: utils.BoolPointer(false)}, } if gCfg, err := dfCgrJsonCfg.CacheJsonCfg(); err != nil { diff --git a/config/config_test.go b/config/config_test.go index 6c8169e11..015af581c 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -439,9 +439,13 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) { TTL: time.Duration(0), StaticTTL: false, Precache: false}, utils.CacheTimings: &CacheParamConfig{Limit: -1, TTL: time.Duration(0), StaticTTL: false, Precache: false}, + utils.CacheStatSQueues: &CacheParamConfig{Limit: -1, + TTL: time.Duration(5 * time.Minute), StaticTTL: false, Precache: false}, + utils.CacheStatSEventQueues: &CacheParamConfig{Limit: -1, + TTL: time.Duration(5 * time.Minute), StaticTTL: false, Precache: false}, } if !reflect.DeepEqual(eCacheCfg, cgrCfg.CacheConfig) { - t.Errorf("received: %s, \nexpecting: %s", utils.ToIJSON(eCacheCfg), utils.ToIJSON(cgrCfg.CacheConfig)) + t.Errorf("received: %s, \nexpecting: %s", utils.ToJSON(eCacheCfg), utils.ToJSON(cgrCfg.CacheConfig)) } } diff --git a/stats/queue.go b/stats/queue.go index 7fd0c1fcb..18f9f3064 100644 --- a/stats/queue.go +++ b/stats/queue.go @@ -27,16 +27,16 @@ import ( "github.com/cgrates/cgrates/utils" ) -// StatsInstances is a sortable list of StatsInstance -type StatsInstances []*StatsInstance +// StatQueues is a sortable list of StatQueue +type StatQueues []*StatQueue // Sort is part of sort interface, sort based on Weight -func (sis StatsInstances) Sort() { +func (sis StatQueues) Sort() { sort.Slice(sis, func(i, j int) bool { return sis[i].cfg.Weight > sis[j].cfg.Weight }) } // remWithID removes the queue with ID from slice -func (sis StatsInstances) remWithID(qID string) { +func (sis StatQueues) remWithID(qID string) { for i, q := range sis { if q.cfg.ID == qID { copy(sis[i:], sis[i+1:]) @@ -47,10 +47,10 @@ func (sis StatsInstances) remWithID(qID string) { } } -// NewStatsInstance instantiates a StatsInstance -func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler, - sqCfg *engine.StatsConfig, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) { - si = &StatsInstance{sec: sec, ms: ms, cfg: sqCfg, sqMetrics: make(map[string]StatsMetric)} +// NewStatQueue instantiates a StatQueue +func NewStatQueue(sec *StatsEventCache, ms engine.Marshaler, + sqCfg *engine.StatsConfig, sqSM *engine.SQStoredMetrics) (si *StatQueue, err error) { + si = &StatQueue{sec: sec, ms: ms, cfg: sqCfg, sqMetrics: make(map[string]StatsMetric)} for _, metricID := range sqCfg.Metrics { if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil { return @@ -77,8 +77,8 @@ func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler, return } -// StatsInstance represents an individual stats instance -type StatsInstance struct { +// StatQueue represents an individual stats instance +type StatQueue struct { sync.RWMutex dirty bool // needs save sec *StatsEventCache @@ -89,7 +89,7 @@ type StatsInstance struct { } // GetSQStoredMetrics retrieves the data used for store to DB -func (sq *StatsInstance) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) { +func (sq *StatQueue) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) { sq.RLock() defer sq.RUnlock() sEvents := make(map[string]engine.StatsEvent) @@ -97,7 +97,7 @@ func (sq *StatsInstance) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) { for _, sqItem := range sq.sqItems { // make sure event is properly retrieved from cache ev := sq.sec.GetEvent(sqItem.EventID) if ev == nil { - utils.Logger.Warning(fmt.Sprintf(" querying for storage eventID: %s, error: event not cached", + utils.Logger.Warning(fmt.Sprintf(" querying for storage eventID: %s, error: event not cached", sqItem.EventID)) continue } @@ -111,7 +111,7 @@ func (sq *StatsInstance) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) { for metricID, metric := range sq.sqMetrics { var err error if sqSM.SQMetrics[metricID], err = metric.GetMarshaled(sq.ms); err != nil { - utils.Logger.Warning(fmt.Sprintf(" querying for storage metricID: %s, error: %s", + utils.Logger.Warning(fmt.Sprintf(" querying for storage metricID: %s, error: %s", metricID, err.Error())) continue } @@ -120,7 +120,7 @@ func (sq *StatsInstance) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) { } // ProcessEvent processes a StatsEvent, returns true if processed -func (sq *StatsInstance) ProcessEvent(ev engine.StatsEvent) (err error) { +func (sq *StatQueue) ProcessEvent(ev engine.StatsEvent) (err error) { sq.Lock() sq.remExpired() sq.remOnQueueLength() @@ -130,7 +130,7 @@ func (sq *StatsInstance) ProcessEvent(ev engine.StatsEvent) (err error) { } // remExpired expires items in queue -func (sq *StatsInstance) remExpired() { +func (sq *StatQueue) remExpired() { var expIdx *int // index of last item to be expired for i, item := range sq.sqItems { if item.ExpiryTime == nil { @@ -151,7 +151,7 @@ func (sq *StatsInstance) remExpired() { } // remOnQueueLength rems elements based on QueueLength setting -func (sq *StatsInstance) remOnQueueLength() { +func (sq *StatQueue) remOnQueueLength() { if sq.cfg.QueueLength == 0 { return } @@ -164,26 +164,26 @@ func (sq *StatsInstance) remOnQueueLength() { } // addStatsEvent computes metrics for an event -func (sq *StatsInstance) addStatsEvent(ev engine.StatsEvent) { +func (sq *StatQueue) addStatsEvent(ev engine.StatsEvent) { evID := ev.ID() for metricID, metric := range sq.sqMetrics { if err := metric.AddEvent(ev); err != nil { - utils.Logger.Warning(fmt.Sprintf(" metricID: %s, add eventID: %s, error: %s", + utils.Logger.Warning(fmt.Sprintf(" metricID: %s, add eventID: %s, error: %s", metricID, evID, err.Error())) } } } // remStatsEvent removes an event from metrics -func (sq *StatsInstance) remEventWithID(evID string) { +func (sq *StatQueue) remEventWithID(evID string) { ev := sq.sec.GetEvent(evID) if ev == nil { - utils.Logger.Warning(fmt.Sprintf(" removing eventID: %s, error: event not cached", evID)) + utils.Logger.Warning(fmt.Sprintf(" removing eventID: %s, error: event not cached", evID)) return } for metricID, metric := range sq.sqMetrics { if err := metric.RemEvent(ev); err != nil { - utils.Logger.Warning(fmt.Sprintf(" metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error())) + utils.Logger.Warning(fmt.Sprintf(" metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error())) } } } diff --git a/stats/queue_test.go b/stats/queue_test.go index 83505a914..560a1be2e 100644 --- a/stats/queue_test.go +++ b/stats/queue_test.go @@ -24,19 +24,19 @@ import ( "github.com/cgrates/cgrates/engine" ) -func TestStatsInstancesSort(t *testing.T) { - sInsts := StatsInstances{ - &StatsInstance{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}}, - &StatsInstance{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}}, - &StatsInstance{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}}, - &StatsInstance{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}}, +func TestStatQueuesSort(t *testing.T) { + sInsts := StatQueues{ + &StatQueue{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}}, + &StatQueue{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}}, + &StatQueue{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}}, + &StatQueue{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}}, } sInsts.Sort() - eSInst := StatsInstances{ - &StatsInstance{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}}, - &StatsInstance{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}}, - &StatsInstance{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}}, - &StatsInstance{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}}, + eSInst := StatQueues{ + &StatQueue{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}}, + &StatQueue{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}}, + &StatQueue{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}}, + &StatQueue{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}}, } if !reflect.DeepEqual(eSInst, sInsts) { t.Errorf("expecting: %+v, received: %+v", eSInst, sInsts) diff --git a/stats/service.go b/stats/service.go index e119891e2..5e4417a0a 100755 --- a/stats/service.go +++ b/stats/service.go @@ -43,8 +43,8 @@ func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval tim if err != nil { return nil, err } - ss.queuesCache = make(map[string]*StatsInstance) - ss.queues = make(StatsInstances, 0) + ss.queuesCache = make(map[string]*StatQueue) + ss.queues = make(StatQueues, 0) for _, prfx := range sqPrfxs { if q, err := ss.loadQueue(prfx[len(utils.StatsConfigPrefix):]); err != nil { utils.Logger.Err(fmt.Sprintf(" failed loading quueue with id: <%s>, err: <%s>", @@ -66,9 +66,9 @@ type StatService struct { ms engine.Marshaler storeInterval time.Duration stopStoring chan struct{} - evCache *StatsEventCache // so we can pass it to queues - queuesCache map[string]*StatsInstance // unordered db of StatsQueues, used for fast queries - queues StatsInstances // ordered list of StatsQueues + evCache *StatsEventCache // so we can pass it to queues + queuesCache map[string]*StatQueue // unordered db of StatQueues, used for fast queries + queues StatQueues // ordered list of StatQueues } @@ -91,7 +91,7 @@ func (ss *StatService) Shutdown() error { // setQueue adds or modifies a queue into cache // sort will reorder the ss.queues -func (ss *StatService) loadQueue(qID string) (q *StatsInstance, err error) { +func (ss *StatService) loadQueue(qID string) (q *StatQueue, err error) { sq, err := ss.dataDB.GetStatsConfig(qID) if err != nil { return nil, err @@ -102,16 +102,16 @@ func (ss *StatService) loadQueue(qID string) (q *StatsInstance, err error) { return nil, err } } - return NewStatsInstance(ss.evCache, ss.ms, sq, sqSM) + return NewStatQueue(ss.evCache, ss.ms, sq, sqSM) } -func (ss *StatService) setQueue(q *StatsInstance) { +func (ss *StatService) setQueue(q *StatQueue) { ss.queuesCache[q.cfg.ID] = q ss.queues = append(ss.queues, q) } // remQueue will remove a queue based on it's ID -func (ss *StatService) remQueue(qID string) (si *StatsInstance) { +func (ss *StatService) remQueue(qID string) (si *StatQueue) { si = ss.queuesCache[qID] ss.queues.remWithID(qID) delete(ss.queuesCache, qID) @@ -240,7 +240,7 @@ func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err err if qIDs == nil || len(*qIDs) == 0 { return utils.ErrNotFound } - var sQs []*StatsInstance // cache here so we lock only later when data available + var sQs []*StatQueue // cache here so we lock only later when data available for _, qID := range *qIDs { if _, hasPrev := ss.queuesCache[qID]; hasPrev { continue // don't overwrite previous, could be extended in the future by carefully checking cached events diff --git a/utils/consts.go b/utils/consts.go index 3c3d63be2..2d6dddfde 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -57,6 +57,8 @@ var ( CacheResourceConfigs: ResourceConfigsPrefix, CacheResources: ResourcesPrefix, CacheTimings: TimingsPrefix, + CacheStatSQueues: META_NONE, + CacheStatSEventQueues: META_NONE, } CachePrefixToInstance map[string]string // will be built on init ) @@ -433,6 +435,8 @@ const ( CostSource = "CostSource" ExtraInfo = "ExtraInfo" MetaPrefix = "*" + CacheStatSQueues = "stats_queues" + CacheStatSEventQueues = "stats_event_queues" ) func buildCacheInstRevPrefixes() {