This commit is contained in:
TeoV
2017-09-12 09:39:36 +03:00
2 changed files with 28 additions and 63 deletions

View File

@@ -20,6 +20,7 @@ package engine
import (
"errors"
"fmt"
"sort"
"time"
@@ -49,6 +50,11 @@ type StatEvent struct {
Fields map[string]interface{}
}
// TenantID returns the unique identifier based on Tenant and ID
func (se StatEvent) TenantID() string {
return utils.ConcatenatedKey(se.Tenant, se.ID)
}
// AnswerTime returns the AnswerTime of StatEvent
func (se StatEvent) AnswerTime(timezone string) (at time.Time, err error) {
atIf, has := se.Fields[utils.ANSWER_TIME]
@@ -149,52 +155,18 @@ func (sq *StatQueue) SqID() string {
return utils.ConcatenatedKey(sq.Tenant, sq.ID)
}
/*
// GetSQStoredMetrics retrieves the data used for store to DB
func (sq *StatQueue) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) {
sq.RLock()
defer sq.RUnlock()
sEvents := make(map[string]engine.StatEvent)
var sItems []*engine.SQItem
for _, sqItem := range sq.sqItems { // make sure event is properly retrieved from cache
ev := sq.sec.GetEvent(sqItem.EventID)
if ev == nil {
utils.Logger.Warning(fmt.Sprintf("<StatQueue> querying for storage eventID: %s, error: event not cached",
sqItem.EventID))
continue
}
sEvents[sqItem.EventID] = ev
sItems = append(sItems, sqItem)
}
sqSM = &engine.SQStoredMetrics{
SEvents: sEvents,
SQItems: sItems,
SQMetrics: make(map[string][]byte, len(sq.sqMetrics))}
for metricID, metric := range sq.sqMetrics {
var err error
if sqSM.SQMetrics[metricID], err = metric.GetMarshaled(sq.ms); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatQueue> querying for storage metricID: %s, error: %s",
metricID, err.Error()))
continue
}
}
return
}
// ProcessEvent processes a StatEvent, returns true if processed
func (sq *StatQueue) ProcessEvent(ev engine.StatEvent) (err error) {
sq.Lock()
func (sq *StatQueue) ProcessEvent(ev StatEvent) (err error) {
sq.remExpired()
sq.remOnQueueLength()
sq.addStatEvent(ev)
sq.Unlock()
return
}
// remExpired expires items in queue
func (sq *StatQueue) remExpired() {
var expIdx *int // index of last item to be expired
for i, item := range sq.sqItems {
for i, item := range sq.SQItems {
if item.ExpiryTime == nil {
break
}
@@ -202,54 +174,45 @@ func (sq *StatQueue) remExpired() {
break
}
sq.remEventWithID(item.EventID)
item = nil // garbage collected asap
expIdx = &i
}
if expIdx == nil {
return
}
nextValidIdx := *expIdx + 1
sq.sqItems = sq.sqItems[nextValidIdx:]
sq.SQItems = sq.SQItems[nextValidIdx:]
}
// remOnQueueLength rems elements based on QueueLength setting
// remOnQueueLength removes elements based on QueueLength setting
func (sq *StatQueue) remOnQueueLength() {
if sq.cfg.QueueLength == 0 {
if sq.sqPrfl.QueueLength <= 0 { // infinite length
return
}
if len(sq.sqItems) == sq.cfg.QueueLength { // reached limit, rem first element
itm := sq.sqItems[0]
if len(sq.SQItems) == sq.sqPrfl.QueueLength { // reached limit, rem first element
itm := sq.SQItems[0]
sq.remEventWithID(itm.EventID)
itm = nil
sq.sqItems = sq.sqItems[1:]
sq.SQItems = sq.SQItems[1:]
}
}
// addStatEvent computes metrics for an event
func (sq *StatQueue) addStatEvent(ev engine.StatEvent) {
evID := ev.ID()
for metricID, metric := range sq.sqMetrics {
func (sq *StatQueue) addStatEvent(ev StatEvent) {
for metricID, metric := range sq.SQMetrics {
if err := metric.AddEvent(ev); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, add eventID: %s, error: %s",
metricID, evID, err.Error()))
metricID, ev.TenantID(), err.Error()))
}
}
}
// remStatEvent removes an event from metrics
func (sq *StatQueue) remEventWithID(evID string) {
ev := sq.sec.GetEvent(evID)
if ev == nil {
utils.Logger.Warning(fmt.Sprintf("<StatQueue> removing eventID: %s, error: event not cached", evID))
return
}
for metricID, metric := range sq.sqMetrics {
if err := metric.RemEvent(ev); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error()))
func (sq *StatQueue) remEventWithID(evTenantID string) {
for metricID, metric := range sq.SQMetrics {
if err := metric.RemEvent(evTenantID); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, remove eventID: %s, error: %s", metricID, evTenantID, err.Error()))
}
}
}
*/
// StatQueues is a sortable list of StatQueue
type StatQueues []*StatQueue

View File

@@ -45,7 +45,7 @@ type StatMetric interface {
GetStringValue(fmtOpts string) (val string)
GetFloat64Value() (val float64)
AddEvent(ev StatEvent) error
RemEvent(evID string) error
RemEvent(evTenantID string) error
Marshal(ms Marshaler) (marshaled []byte, err error)
LoadMarshaled(ms Marshaler, marshaled []byte) (err error)
}
@@ -58,7 +58,7 @@ func NewASR() (StatMetric, error) {
type StatASR struct {
Answered float64
Count float64
Events map[string]bool // map[EventID]Answered
Events map[string]bool // map[EventTenantID]Answered
val *float64 // cached ASR value
}
@@ -101,6 +101,7 @@ func (asr *StatASR) AddEvent(ev StatEvent) (err error) {
} else if !at.IsZero() {
answered = true
}
asr.Events[ev.TenantID()] = answered
asr.Count += 1
if answered {
asr.Answered += 1
@@ -109,8 +110,8 @@ func (asr *StatASR) AddEvent(ev StatEvent) (err error) {
return
}
func (asr *StatASR) RemEvent(evID string) (err error) {
answered, has := asr.Events[evID]
func (asr *StatASR) RemEvent(evTenantID string) (err error) {
answered, has := asr.Events[evTenantID]
if !has {
return utils.ErrNotFound
}
@@ -118,6 +119,7 @@ func (asr *StatASR) RemEvent(evID string) (err error) {
asr.Answered -= 1
}
asr.Count -= 1
delete(asr.Events, evTenantID)
asr.val = nil
return
}
@@ -158,7 +160,7 @@ func (acd *StatACD) AddEvent(ev StatEvent) (err error) {
return
}
func (acd *StatACD) RemEvent(evID string) (err error) {
func (acd *StatACD) RemEvent(evTenantID string) (err error) {
return
}