Updated Stats Metrics ProccessEvent

This commit is contained in:
Trial97
2020-10-20 17:48:50 +03:00
committed by Dan Christian Bogos
parent 66021cbe21
commit dbf7fb15be
7 changed files with 408 additions and 387 deletions

View File

@@ -174,14 +174,14 @@ func (sq *StatQueue) TenantID() string {
}
// ProcessEvent processes a utils.CGREvent, returns true if processed
func (sq *StatQueue) ProcessEvent(ev *utils.CGREvent, filterS *FilterS) (err error) {
func (sq *StatQueue) ProcessEvent(ev *utils.CGREvent, filterS *FilterS, evNm utils.MapStorage) (err error) {
if err = sq.remExpired(); err != nil {
return
}
if err = sq.remOnQueueLength(); err != nil {
return
}
return sq.addStatEvent(ev, filterS)
return sq.addStatEvent(ev, filterS, evNm)
}
// remStatEvent removes an event from metrics
@@ -237,7 +237,7 @@ func (sq *StatQueue) remOnQueueLength() (err error) {
}
// addStatEvent computes metrics for an event
func (sq *StatQueue) addStatEvent(ev *utils.CGREvent, filterS *FilterS) (err error) {
func (sq *StatQueue) addStatEvent(ev *utils.CGREvent, filterS *FilterS, evNm utils.MapStorage) (err error) {
var expTime *time.Time
if sq.ttl != nil {
expTime = utils.TimePointer(time.Now().Add(*sq.ttl))
@@ -248,7 +248,8 @@ func (sq *StatQueue) addStatEvent(ev *utils.CGREvent, filterS *FilterS) (err err
ExpiryTime *time.Time
}{ev.ID, expTime})
var pass bool
evNm := utils.MapStorage{utils.MetaReq: ev.Event}
// recreate the request without *opts
req := utils.MapStorage{utils.MetaReq: ev.Event}
for metricID, metric := range sq.SQMetrics {
if pass, err = filterS.Pass(ev.Tenant, metric.GetFilterIDs(),
evNm); err != nil {
@@ -256,7 +257,7 @@ func (sq *StatQueue) addStatEvent(ev *utils.CGREvent, filterS *FilterS) (err err
} else if !pass {
continue
}
if err = metric.AddEvent(ev); err != nil {
if err = metric.AddEvent(ev.ID, req); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, add eventID: %s, error: %s",
metricID, ev.ID, err.Error()))
return

View File

@@ -220,7 +220,7 @@ func TestStatAddStatEvent(t *testing.T) {
t.Errorf("received ASR: %v", asr)
}
ev1 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_1"}
sq.addStatEvent(ev1, nil)
sq.addStatEvent(ev1, nil, nil)
if asr := asrMetric.GetFloat64Value(); asr != 50 {
t.Errorf("received ASR: %v", asr)
} else if asrMetric.Answered != 1 || asrMetric.Count != 2 {
@@ -228,7 +228,7 @@ func TestStatAddStatEvent(t *testing.T) {
}
ev1.Event = map[string]interface{}{
utils.AnswerTime: time.Now()}
sq.addStatEvent(ev1, nil)
sq.addStatEvent(ev1, nil, nil)
if asr := asrMetric.GetFloat64Value(); asr != 66.66667 {
t.Errorf("received ASR: %v", asr)
} else if asrMetric.Answered != 2 || asrMetric.Count != 3 {
@@ -634,7 +634,7 @@ func TestStatRemoveExpiredTTL(t *testing.T) {
//add ev1 with ttl 100ms (after 100ms the event should be removed)
ev1 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_1"}
sq.ProcessEvent(ev1, nil)
sq.ProcessEvent(ev1, nil, utils.MapStorage{utils.MetaReq: ev1.Event})
if len(sq.SQItems) != 1 && sq.SQItems[0].EventID != "TestStatAddStatEvent_1" {
t.Errorf("Expecting: 1, received: %+v", len(sq.SQItems))
@@ -644,7 +644,7 @@ func TestStatRemoveExpiredTTL(t *testing.T) {
//processing a new event should clean the expired events and add the new one
ev2 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_2"}
sq.ProcessEvent(ev2, nil)
sq.ProcessEvent(ev2, nil, utils.MapStorage{utils.MetaReq: ev2.Event})
if len(sq.SQItems) != 1 && sq.SQItems[0].EventID != "TestStatAddStatEvent_2" {
t.Errorf("Expecting: 1, received: %+v", len(sq.SQItems))
}
@@ -668,7 +668,7 @@ func TestStatRemoveExpiredQueue(t *testing.T) {
//add ev1 with ttl 100ms (after 100ms the event should be removed)
ev1 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_1"}
sq.ProcessEvent(ev1, nil)
sq.ProcessEvent(ev1, nil, utils.MapStorage{utils.MetaReq: ev1.Event})
if len(sq.SQItems) != 1 && sq.SQItems[0].EventID != "TestStatAddStatEvent_1" {
t.Errorf("Expecting: 1, received: %+v", len(sq.SQItems))
@@ -678,7 +678,7 @@ func TestStatRemoveExpiredQueue(t *testing.T) {
//processing a new event should clean the expired events and add the new one
ev2 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_2"}
sq.ProcessEvent(ev2, nil)
sq.ProcessEvent(ev2, nil, utils.MapStorage{utils.MetaReq: ev2.Event})
if len(sq.SQItems) != 2 && sq.SQItems[0].EventID != "TestStatAddStatEvent_1" &&
sq.SQItems[1].EventID != "TestStatAddStatEvent_2" {
t.Errorf("Expecting: 2, received: %+v", len(sq.SQItems))
@@ -686,7 +686,7 @@ func TestStatRemoveExpiredQueue(t *testing.T) {
//processing a new event should clean the expired events and add the new one
ev3 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_3"}
sq.ProcessEvent(ev3, nil)
sq.ProcessEvent(ev3, nil, utils.MapStorage{utils.MetaReq: ev3.Event})
if len(sq.SQItems) != 2 && sq.SQItems[0].EventID != "TestStatAddStatEvent_2" &&
sq.SQItems[1].EventID != "TestStatAddStatEvent_3" {
t.Errorf("Expecting: 2, received: %+v", len(sq.SQItems))

View File

@@ -76,7 +76,7 @@ type StatMetric interface {
GetValue() interface{}
GetStringValue(fmtOpts string) (val string)
GetFloat64Value() (val float64)
AddEvent(ev *utils.CGREvent) error
AddEvent(evID string, ev utils.DataProvider) error
RemEvent(evTenantID string) error
Marshal(ms Marshaler) (marshaled []byte, err error)
LoadMarshaled(ms Marshaler, marshaled []byte) (err error)
@@ -133,25 +133,28 @@ func (asr *StatASR) GetFloat64Value() (val float64) {
}
// AddEvent is part of StatMetric interface
func (asr *StatASR) AddEvent(ev *utils.CGREvent) (err error) {
func (asr *StatASR) AddEvent(evID string, ev utils.DataProvider) (err error) {
var answered int
if at, err := ev.FieldAsTime(utils.AnswerTime,
config.CgrConfig().GeneralCfg().DefaultTimezone); err != nil &&
err != utils.ErrNotFound {
if val, err := ev.FieldAsInterface([]string{utils.MetaReq, utils.AnswerTime}); err != nil {
if err != utils.ErrNotFound {
return err
}
} else if at, err := utils.IfaceAsTime(val,
config.CgrConfig().GeneralCfg().DefaultTimezone); err != nil {
return err
} else if !at.IsZero() {
answered = 1
}
if val, has := asr.Events[ev.ID]; !has {
asr.Events[ev.ID] = &StatWithCompress{Stat: float64(answered), CompressFactor: 1}
if val, has := asr.Events[evID]; !has {
asr.Events[evID] = &StatWithCompress{Stat: float64(answered), CompressFactor: 1}
} else {
val.Stat = (val.Stat*float64(val.CompressFactor) + float64(answered)) / float64(val.CompressFactor+1)
val.CompressFactor = val.CompressFactor + 1
}
asr.Count += 1
asr.Count++
if answered == 1 {
asr.Answered += 1
asr.Answered++
}
asr.val = nil
return
@@ -165,9 +168,9 @@ func (asr *StatASR) RemEvent(evID string) (err error) {
ans := 0
if val.Stat > 0.5 {
ans = 1
asr.Answered -= 1
asr.Answered--
}
asr.Count -= 1
asr.Count--
if val.CompressFactor <= 1 {
delete(asr.Events, evID)
} else {
@@ -196,7 +199,7 @@ func (asr *StatASR) GetFilterIDs() []string {
// Compress is part of StatMetric interface
func (asr *StatASR) Compress(queueLen int64, defaultID string) (eventIDs []string) {
if asr.Count < queueLen {
for id, _ := range asr.Events {
for id := range asr.Events {
eventIDs = append(eventIDs, id)
}
return
@@ -273,22 +276,25 @@ func (acd *StatACD) GetFloat64Value() (v float64) {
return
}
func (acd *StatACD) AddEvent(ev *utils.CGREvent) (err error) {
func (acd *StatACD) AddEvent(evID string, ev utils.DataProvider) (err error) {
var dur time.Duration
if dur, err = ev.FieldAsDuration(utils.Usage); err != nil {
var val interface{}
if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.Usage}); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, utils.Usage)
}
return
} else if dur, err = utils.IfaceAsDuration(val); err != nil {
return
}
acd.Sum += dur
if val, has := acd.Events[ev.ID]; !has {
acd.Events[ev.ID] = &DurationWithCompress{Duration: dur, CompressFactor: 1}
if val, has := acd.Events[evID]; !has {
acd.Events[evID] = &DurationWithCompress{Duration: dur, CompressFactor: 1}
} else {
val.Duration = time.Duration((float64(val.Duration.Nanoseconds())*float64(val.CompressFactor) + float64(dur.Nanoseconds())) / float64(val.CompressFactor+1))
val.CompressFactor = val.CompressFactor + 1
}
acd.Count += 1
acd.Count++
acd.val = nil
return
}
@@ -301,7 +307,7 @@ func (acd *StatACD) RemEvent(evID string) (err error) {
if val.Duration != 0 {
acd.Sum -= val.Duration
}
acd.Count -= 1
acd.Count--
if val.CompressFactor <= 1 {
delete(acd.Events, evID)
} else {
@@ -326,7 +332,7 @@ func (acd *StatACD) GetFilterIDs() []string {
// Compress is part of StatMetric interface
func (acd *StatACD) Compress(queueLen int64, defaultID string) (eventIDs []string) {
if acd.Count < queueLen {
for id, _ := range acd.Events {
for id := range acd.Events {
eventIDs = append(eventIDs, id)
}
return
@@ -404,22 +410,25 @@ func (tcd *StatTCD) GetFloat64Value() (v float64) {
return
}
func (tcd *StatTCD) AddEvent(ev *utils.CGREvent) (err error) {
func (tcd *StatTCD) AddEvent(evID string, ev utils.DataProvider) (err error) {
var dur time.Duration
if dur, err = ev.FieldAsDuration(utils.Usage); err != nil {
var val interface{}
if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.Usage}); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, utils.Usage)
}
return
} else if dur, err = utils.IfaceAsDuration(val); err != nil {
return
}
tcd.Sum += dur
if val, has := tcd.Events[ev.ID]; !has {
tcd.Events[ev.ID] = &DurationWithCompress{Duration: dur, CompressFactor: 1}
if val, has := tcd.Events[evID]; !has {
tcd.Events[evID] = &DurationWithCompress{Duration: dur, CompressFactor: 1}
} else {
val.Duration = time.Duration((float64(val.Duration.Nanoseconds())*float64(val.CompressFactor) + float64(dur.Nanoseconds())) / float64(val.CompressFactor+1))
val.CompressFactor = val.CompressFactor + 1
}
tcd.Count += 1
tcd.Count++
tcd.val = nil
return
}
@@ -432,7 +441,7 @@ func (tcd *StatTCD) RemEvent(evID string) (err error) {
if val.Duration != 0 {
tcd.Sum -= val.Duration
}
tcd.Count -= 1
tcd.Count--
if val.CompressFactor <= 1 {
delete(tcd.Events, evID)
} else {
@@ -458,7 +467,7 @@ func (tcd *StatTCD) GetFilterIDs() []string {
// Compress is part of StatMetric interface
func (tcd *StatTCD) Compress(queueLen int64, defaultID string) (eventIDs []string) {
if tcd.Count < queueLen {
for id, _ := range tcd.Events {
for id := range tcd.Events {
eventIDs = append(eventIDs, id)
}
return
@@ -530,22 +539,25 @@ func (acc *StatACC) GetFloat64Value() (v float64) {
return acc.getValue()
}
func (acc *StatACC) AddEvent(ev *utils.CGREvent) (err error) {
func (acc *StatACC) AddEvent(evID string, ev utils.DataProvider) (err error) {
var cost float64
if cost, err = ev.FieldAsFloat64(utils.COST); err != nil {
var val interface{}
if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.COST}); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, utils.COST)
}
return
} else if cost, err = utils.IfaceAsFloat64(val); err != nil {
return
}
acc.Sum += cost
if val, has := acc.Events[ev.ID]; !has {
acc.Events[ev.ID] = &StatWithCompress{Stat: cost, CompressFactor: 1}
if val, has := acc.Events[evID]; !has {
acc.Events[evID] = &StatWithCompress{Stat: cost, CompressFactor: 1}
} else {
val.Stat = (val.Stat*float64(val.CompressFactor) + cost) / float64(val.CompressFactor+1)
val.CompressFactor = val.CompressFactor + 1
}
acc.Count += 1
acc.Count++
acc.val = nil
return
}
@@ -556,7 +568,7 @@ func (acc *StatACC) RemEvent(evID string) (err error) {
return utils.ErrNotFound
}
acc.Sum -= cost.Stat
acc.Count -= 1
acc.Count--
if cost.CompressFactor <= 1 {
delete(acc.Events, evID)
} else {
@@ -582,7 +594,7 @@ func (acc *StatACC) GetFilterIDs() []string {
// Compress is part of StatMetric interface
func (acc *StatACC) Compress(queueLen int64, defaultID string) (eventIDs []string) {
if acc.Count < queueLen {
for id, _ := range acc.Events {
for id := range acc.Events {
eventIDs = append(eventIDs, id)
}
return
@@ -654,22 +666,25 @@ func (tcc *StatTCC) GetFloat64Value() (v float64) {
return tcc.getValue()
}
func (tcc *StatTCC) AddEvent(ev *utils.CGREvent) (err error) {
func (tcc *StatTCC) AddEvent(evID string, ev utils.DataProvider) (err error) {
var cost float64
if cost, err = ev.FieldAsFloat64(utils.COST); err != nil {
var val interface{}
if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.COST}); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, utils.COST)
}
return
} else if cost, err = utils.IfaceAsFloat64(val); err != nil {
return
}
tcc.Sum += cost
if val, has := tcc.Events[ev.ID]; !has {
tcc.Events[ev.ID] = &StatWithCompress{Stat: cost, CompressFactor: 1}
if val, has := tcc.Events[evID]; !has {
tcc.Events[evID] = &StatWithCompress{Stat: cost, CompressFactor: 1}
} else {
val.Stat = (val.Stat*float64(val.CompressFactor) + cost) / float64(val.CompressFactor+1)
val.CompressFactor = val.CompressFactor + 1
}
tcc.Count += 1
tcc.Count++
tcc.val = nil
return
}
@@ -682,7 +697,7 @@ func (tcc *StatTCC) RemEvent(evID string) (err error) {
if cost.Stat != 0 {
tcc.Sum -= cost.Stat
}
tcc.Count -= 1
tcc.Count--
if cost.CompressFactor <= 1 {
delete(tcc.Events, evID)
} else {
@@ -708,7 +723,7 @@ func (tcc *StatTCC) GetFilterIDs() []string {
// Compress is part of StatMetric interface
func (tcc *StatTCC) Compress(queueLen int64, defaultID string) (eventIDs []string) {
if tcc.Count < queueLen {
for id, _ := range tcc.Events {
for id := range tcc.Events {
eventIDs = append(eventIDs, id)
}
return
@@ -785,22 +800,25 @@ func (pdd *StatPDD) GetFloat64Value() (v float64) {
return
}
func (pdd *StatPDD) AddEvent(ev *utils.CGREvent) (err error) {
func (pdd *StatPDD) AddEvent(evID string, ev utils.DataProvider) (err error) {
var dur time.Duration
if dur, err = ev.FieldAsDuration(utils.PDD); err != nil {
var val interface{}
if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.PDD}); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, utils.PDD)
}
return
} else if dur, err = utils.IfaceAsDuration(val); err != nil {
return
}
pdd.Sum += dur
if val, has := pdd.Events[ev.ID]; !has {
pdd.Events[ev.ID] = &DurationWithCompress{Duration: dur, CompressFactor: 1}
if val, has := pdd.Events[evID]; !has {
pdd.Events[evID] = &DurationWithCompress{Duration: dur, CompressFactor: 1}
} else {
val.Duration = time.Duration((float64(val.Duration.Nanoseconds())*float64(val.CompressFactor) + float64(dur.Nanoseconds())) / float64(val.CompressFactor+1))
val.CompressFactor = val.CompressFactor + 1
}
pdd.Count += 1
pdd.Count++
pdd.val = nil
return
}
@@ -813,7 +831,7 @@ func (pdd *StatPDD) RemEvent(evID string) (err error) {
if val.Duration != 0 {
pdd.Sum -= val.Duration
}
pdd.Count -= 1
pdd.Count--
if val.CompressFactor <= 1 {
delete(pdd.Events, evID)
} else {
@@ -838,7 +856,7 @@ func (pdd *StatPDD) GetFilterIDs() []string {
// Compress is part of StatMetric interface
func (pdd *StatPDD) Compress(queueLen int64, defaultID string) (eventIDs []string) {
if pdd.Count < queueLen {
for id, _ := range pdd.Events {
for id := range pdd.Events {
eventIDs = append(eventIDs, id)
}
return
@@ -903,28 +921,31 @@ func (ddc *StatDDC) GetFloat64Value() (v float64) {
return ddc.getValue()
}
func (ddc *StatDDC) AddEvent(ev *utils.CGREvent) (err error) {
func (ddc *StatDDC) AddEvent(evID string, ev utils.DataProvider) (err error) {
var fieldValue string
if fieldValue, err = ev.FieldAsString(utils.Destination); err != nil {
return err
if fieldValue, err = ev.FieldAsString([]string{utils.MetaReq, utils.Destination}); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, utils.Destination)
}
return
}
// add to fieldValues
if _, has := ddc.FieldValues[fieldValue]; !has {
ddc.FieldValues[fieldValue] = make(map[string]struct{})
}
ddc.FieldValues[fieldValue][ev.ID] = struct{}{}
ddc.FieldValues[fieldValue][evID] = struct{}{}
// add to events
if _, has := ddc.Events[ev.ID]; !has {
ddc.Events[ev.ID] = make(map[string]int64)
if _, has := ddc.Events[evID]; !has {
ddc.Events[evID] = make(map[string]int64)
}
ddc.Count += 1
if _, has := ddc.Events[ev.ID][fieldValue]; !has {
ddc.Events[ev.ID][fieldValue] = 1
ddc.Count++
if _, has := ddc.Events[evID][fieldValue]; !has {
ddc.Events[evID][fieldValue] = 1
return
}
ddc.Events[ev.ID][fieldValue] = ddc.Events[ev.ID][fieldValue] + 1
ddc.Events[evID][fieldValue] = ddc.Events[evID][fieldValue] + 1
return
}
@@ -940,11 +961,11 @@ func (ddc *StatDDC) RemEvent(evID string) (err error) {
// decrement events
var fieldValue string
for k, _ := range fieldValues {
for k := range fieldValues {
fieldValue = k
break
}
ddc.Count -= 1
ddc.Count--
if fieldValues[fieldValue] > 1 {
ddc.Events[evID][fieldValue] = ddc.Events[evID][fieldValue] - 1
return // do not delete the reference until it reaches 0
@@ -976,7 +997,7 @@ func (ddc *StatDDC) GetFilterIDs() []string {
}
func (ddc *StatDDC) Compress(queueLen int64, defaultID string) (eventIDs []string) {
for id, _ := range ddc.Events {
for id := range ddc.Events {
eventIDs = append(eventIDs, id)
}
return
@@ -1045,32 +1066,25 @@ func (sum *StatSum) GetFloat64Value() (v float64) {
return sum.getValue()
}
func (sum *StatSum) AddEvent(ev *utils.CGREvent) (err error) {
func (sum *StatSum) AddEvent(evID string, ev utils.DataProvider) (err error) {
var val float64
switch {
case strings.HasPrefix(sum.FieldName, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep): // ~*req.
//Remove the dynamic prefix and check in event for field
field := sum.FieldName[6:]
if val, err = ev.FieldAsFloat64(field); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, field)
}
return
}
default:
val, err = utils.IfaceAsFloat64(sum.FieldName)
if err != nil {
return
var ival interface{}
if ival, err = utils.DPDynamicInterface(sum.FieldName, ev); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, sum.FieldName)
}
return
} else if val, err = utils.IfaceAsFloat64(ival); err != nil {
return
}
sum.Sum += val
if v, has := sum.Events[ev.ID]; !has {
sum.Events[ev.ID] = &StatWithCompress{Stat: val, CompressFactor: 1}
if v, has := sum.Events[evID]; !has {
sum.Events[evID] = &StatWithCompress{Stat: val, CompressFactor: 1}
} else {
v.Stat = (v.Stat*float64(v.CompressFactor) + val) / float64(v.CompressFactor+1)
v.CompressFactor = v.CompressFactor + 1
}
sum.Count += 1
sum.Count++
sum.val = nil
return
}
@@ -1083,7 +1097,7 @@ func (sum *StatSum) RemEvent(evID string) (err error) {
if val.Stat != 0 {
sum.Sum -= val.Stat
}
sum.Count -= 1
sum.Count--
if val.CompressFactor <= 1 {
delete(sum.Events, evID)
} else {
@@ -1109,7 +1123,7 @@ func (sum *StatSum) GetFilterIDs() []string {
// Compress is part of StatMetric interface
func (sum *StatSum) Compress(queueLen int64, defaultID string) (eventIDs []string) {
if sum.Count < queueLen {
for id, _ := range sum.Events {
for id := range sum.Events {
eventIDs = append(eventIDs, id)
}
return
@@ -1183,32 +1197,25 @@ func (avg *StatAverage) GetFloat64Value() (v float64) {
return avg.getValue()
}
func (avg *StatAverage) AddEvent(ev *utils.CGREvent) (err error) {
func (avg *StatAverage) AddEvent(evID string, ev utils.DataProvider) (err error) {
var val float64
switch {
case strings.HasPrefix(avg.FieldName, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep): // ~*req.
//Remove the dynamic prefix and check in event for field
field := avg.FieldName[6:]
if val, err = ev.FieldAsFloat64(field); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, field)
}
return
}
default:
val, err = utils.IfaceAsFloat64(avg.FieldName)
if err != nil {
return
var ival interface{}
if ival, err = utils.DPDynamicInterface(avg.FieldName, ev); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, avg.FieldName)
}
return
} else if val, err = utils.IfaceAsFloat64(ival); err != nil {
return
}
avg.Sum += val
if v, has := avg.Events[ev.ID]; !has {
avg.Events[ev.ID] = &StatWithCompress{Stat: val, CompressFactor: 1}
if v, has := avg.Events[evID]; !has {
avg.Events[evID] = &StatWithCompress{Stat: val, CompressFactor: 1}
} else {
v.Stat = (v.Stat*float64(v.CompressFactor) + val) / float64(v.CompressFactor+1)
v.CompressFactor = v.CompressFactor + 1
}
avg.Count += 1
avg.Count++
avg.val = nil
return
}
@@ -1221,7 +1228,7 @@ func (avg *StatAverage) RemEvent(evID string) (err error) {
if val.Stat >= 0 {
avg.Sum -= val.Stat
}
avg.Count -= 1
avg.Count--
if val.CompressFactor <= 1 {
delete(avg.Events, evID)
} else {
@@ -1247,7 +1254,7 @@ func (avg *StatAverage) GetFilterIDs() []string {
// Compress is part of StatMetric interface
func (avg *StatAverage) Compress(queueLen int64, defaultID string) (eventIDs []string) {
if avg.Count < queueLen {
for id, _ := range avg.Events {
for id := range avg.Events {
eventIDs = append(eventIDs, id)
}
return
@@ -1313,33 +1320,36 @@ func (dst *StatDistinct) GetFloat64Value() (v float64) {
return dst.getValue()
}
func (dst *StatDistinct) AddEvent(ev *utils.CGREvent) (err error) {
func (dst *StatDistinct) AddEvent(evID string, ev utils.DataProvider) (err error) {
var fieldValue string
// simply remove the ~*req. prefix and do normal process
if !strings.HasPrefix(dst.FieldName, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep) {
return fmt.Errorf("Invalid format for field <%s>", dst.FieldName)
}
field := dst.FieldName[6:]
if fieldValue, err = ev.FieldAsString(field); err != nil {
return err
if fieldValue, err = utils.DPDynamicString(dst.FieldName, ev); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, dst.FieldName)
}
return
}
// add to fieldValues
if _, has := dst.FieldValues[fieldValue]; !has {
dst.FieldValues[fieldValue] = make(map[string]struct{})
}
dst.FieldValues[fieldValue][ev.ID] = struct{}{}
dst.FieldValues[fieldValue][evID] = struct{}{}
// add to events
if _, has := dst.Events[ev.ID]; !has {
dst.Events[ev.ID] = make(map[string]int64)
if _, has := dst.Events[evID]; !has {
dst.Events[evID] = make(map[string]int64)
}
dst.Count += 1
if _, has := dst.Events[ev.ID][fieldValue]; !has {
dst.Events[ev.ID][fieldValue] = 1
dst.Count++
if _, has := dst.Events[evID][fieldValue]; !has {
dst.Events[evID][fieldValue] = 1
return
}
dst.Events[ev.ID][fieldValue] = dst.Events[ev.ID][fieldValue] + 1
dst.Events[evID][fieldValue] = dst.Events[evID][fieldValue] + 1
return
}
@@ -1355,11 +1365,11 @@ func (dst *StatDistinct) RemEvent(evID string) (err error) {
// decrement events
var fieldValue string
for k, _ := range fieldValues {
for k := range fieldValues {
fieldValue = k
break
}
dst.Count -= 1
dst.Count--
if fieldValues[fieldValue] > 1 {
dst.Events[evID][fieldValue] = dst.Events[evID][fieldValue] - 1
return // do not delete the reference until it reaches 0
@@ -1391,7 +1401,7 @@ func (dst *StatDistinct) GetFilterIDs() []string {
}
func (dst *StatDistinct) Compress(queueLen int64, defaultID string) (eventIDs []string) {
for id, _ := range dst.Events {
for id := range dst.Events {
eventIDs = append(eventIDs, id)
}
return

File diff suppressed because it is too large Load Diff

View File

@@ -150,11 +150,7 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) {
}
// matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call
func (sS *StatService) matchingStatQueuesForEvent(tnt string, args *StatsArgsProcessEvent) (sqs StatQueues, err error) {
evNm := utils.MapStorage{
utils.MetaReq: args.Event,
utils.MetaOpts: args.Opts,
}
func (sS *StatService) matchingStatQueuesForEvent(tnt string, args *StatsArgsProcessEvent, evNm utils.MapStorage) (sqs StatQueues, err error) {
sqIDs := utils.NewStringSet(args.StatIDs)
if len(sqIDs) == 0 {
sqIDs, err = MatchingItemIDsForEvent(evNm,
@@ -264,7 +260,11 @@ func (attr *StatsArgsProcessEvent) Clone() *StatsArgsProcessEvent {
// processEvent processes a new event, dispatching to matching queues
// queues matching are also cached to speed up
func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (statQueueIDs []string, err error) {
matchSQs, err := sS.matchingStatQueuesForEvent(tnt, args)
evNm := utils.MapStorage{
utils.MetaReq: args.Event,
utils.MetaOpts: args.Opts,
}
matchSQs, err := sS.matchingStatQueuesForEvent(tnt, args, evNm)
if err != nil {
return nil, err
}
@@ -281,7 +281,7 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st
stsIDs = append(stsIDs, sq.ID)
lkID := utils.StatQueuePrefix + sq.TenantID()
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
err = sq.ProcessEvent(args.CGREvent, sS.filterS)
err = sq.ProcessEvent(args.CGREvent, sS.filterS, evNm)
return
}, config.CgrConfig().GeneralCfg().LockingTimeout, lkID)
if err != nil {
@@ -384,7 +384,10 @@ func (sS *StatService) V1GetStatQueuesForEvent(args *StatsArgsProcessEvent, repl
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
}
var sQs StatQueues
if sQs, err = sS.matchingStatQueuesForEvent(tnt, args); err != nil {
if sQs, err = sS.matchingStatQueuesForEvent(tnt, args, utils.MapStorage{
utils.MetaReq: args.Event,
utils.MetaOpts: args.Opts,
}); err != nil {
return
}
ids := make([]string, len(sQs))

View File

@@ -250,7 +250,8 @@ func TestStatQueuesPopulateStatsService(t *testing.T) {
}
func TestStatQueuesMatchingStatQueuesForEvent(t *testing.T) {
msq, err := statService.matchingStatQueuesForEvent(statsEvs[0].Tenant, statsEvs[0])
msq, err := statService.matchingStatQueuesForEvent(statsEvs[0].Tenant, statsEvs[0],
utils.MapStorage{utils.MetaReq: statsEvs[0].Event, utils.MetaOpts: statsEvs[0].Opts})
if err != nil {
t.Errorf("Error: %+v", err)
}
@@ -261,7 +262,8 @@ func TestStatQueuesMatchingStatQueuesForEvent(t *testing.T) {
} else if !reflect.DeepEqual(stqs[0].sqPrfl, msq[0].sqPrfl) {
t.Errorf("Expecting: %+v, received: %+v", stqs[0].sqPrfl, msq[0].sqPrfl)
}
msq, err = statService.matchingStatQueuesForEvent(statsEvs[1].Tenant, statsEvs[1])
msq, err = statService.matchingStatQueuesForEvent(statsEvs[1].Tenant, statsEvs[1],
utils.MapStorage{utils.MetaReq: statsEvs[1].Event, utils.MetaOpts: statsEvs[1].Opts})
if err != nil {
t.Errorf("Error: %+v", err)
}
@@ -272,7 +274,8 @@ func TestStatQueuesMatchingStatQueuesForEvent(t *testing.T) {
} else if !reflect.DeepEqual(stqs[1].sqPrfl, msq[0].sqPrfl) {
t.Errorf("Expecting: %+v, received: %+v", stqs[1].sqPrfl, msq[0].sqPrfl)
}
msq, err = statService.matchingStatQueuesForEvent(statsEvs[2].Tenant, statsEvs[2])
msq, err = statService.matchingStatQueuesForEvent(statsEvs[2].Tenant, statsEvs[2],
utils.MapStorage{utils.MetaReq: statsEvs[2].Event, utils.MetaOpts: statsEvs[2].Opts})
if err != nil {
t.Errorf("Error: %+v", err)
}
@@ -327,7 +330,8 @@ func TestStatQueuesProcessEvent(t *testing.T) {
func TestStatQueuesMatchWithIndexFalse(t *testing.T) {
statService.cgrcfg.StatSCfg().IndexedSelects = false
msq, err := statService.matchingStatQueuesForEvent(statsEvs[0].Tenant, statsEvs[0])
msq, err := statService.matchingStatQueuesForEvent(statsEvs[0].Tenant, statsEvs[0],
utils.MapStorage{utils.MetaReq: statsEvs[0].Event, utils.MetaOpts: statsEvs[0].Opts})
if err != nil {
t.Errorf("Error: %+v", err)
}
@@ -338,7 +342,8 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) {
} else if !reflect.DeepEqual(stqs[0].sqPrfl, msq[0].sqPrfl) {
t.Errorf("Expecting: %+v, received: %+v", stqs[0].sqPrfl, msq[0].sqPrfl)
}
msq, err = statService.matchingStatQueuesForEvent(statsEvs[1].Tenant, statsEvs[1])
msq, err = statService.matchingStatQueuesForEvent(statsEvs[1].Tenant, statsEvs[1],
utils.MapStorage{utils.MetaReq: statsEvs[1].Event, utils.MetaOpts: statsEvs[1].Opts})
if err != nil {
t.Errorf("Error: %+v", err)
}
@@ -349,7 +354,8 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) {
} else if !reflect.DeepEqual(stqs[1].sqPrfl, msq[0].sqPrfl) {
t.Errorf("Expecting: %+v, received: %+v", stqs[1].sqPrfl, msq[0].sqPrfl)
}
msq, err = statService.matchingStatQueuesForEvent(statsEvs[2].Tenant, statsEvs[2])
msq, err = statService.matchingStatQueuesForEvent(statsEvs[2].Tenant, statsEvs[2],
utils.MapStorage{utils.MetaReq: statsEvs[2].Event, utils.MetaOpts: statsEvs[2].Opts})
if err != nil {
t.Errorf("Error: %+v", err)
}

View File

@@ -119,6 +119,7 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium
* [SessionS] Use rals_conns when sending refund rounding
* [General] Made tenant optional for all API calls
* [ConfigS] Moved MinCallDuration,MaxCallDuration from sessions config to general config
* [StatS] Added support for nested fields in custom metrics
-- DanB <danb@cgrates.org> Wed, 19 Feb 2020 13:25:52 +0200