Added OneEvent functionality to StatS

This commit is contained in:
gezimbll
2023-11-24 11:27:59 -05:00
committed by Dan Christian Bogos
parent 5f801c8b67
commit b2aad38e68
4 changed files with 292 additions and 31 deletions

View File

@@ -234,15 +234,48 @@ func (sq *StatQueue) TenantID() string {
// ProcessEvent processes a utils.CGREvent, returns true if processed
func (sq *StatQueue) ProcessEvent(tnt, evID string, filterS *FilterS, evNm utils.MapStorage) (err error) {
var oneEv bool
if oneEv, err = sq.isOneEvent(tnt, filterS, evNm); oneEv {
return err
}
if _, err = sq.remExpired(); err != nil {
return
}
if err = sq.remOnQueueLength(); err != nil {
return
}
return sq.addStatEvent(tnt, evID, filterS, evNm)
}
func (sq *StatQueue) isOneEvent(tnt string, filterS *FilterS, evNm utils.MapStorage) (bool, error) {
if sq.ttl != nil && *sq.ttl == -1 {
sq.SQItems = make([]SQItem, 0)
return true, sq.addOneEvent(tnt, filterS, evNm)
}
return false, nil
}
func (sq *StatQueue) addOneEvent(tnt string, filterS *FilterS, evNm utils.MapStorage) (err error) {
var pass bool
dDP := newDynamicDP(config.CgrConfig().FilterSCfg().ResourceSConns, config.CgrConfig().FilterSCfg().StatSConns,
config.CgrConfig().FilterSCfg().ApierSConns, tnt, utils.MapStorage{utils.MetaReq: evNm[utils.MetaReq]})
for metricID, metric := range sq.SQMetrics {
if pass, err = filterS.Pass(tnt, metric.GetFilterIDs(),
evNm); err != nil {
return
} else if !pass {
continue
}
if err = metric.OneEvent(dDP); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, OneEvent, error: %s",
metricID, err.Error()))
return
}
}
return
}
// remStatEvent removes an event from metrics
func (sq *StatQueue) remEventWithID(evID string) (err error) {
for metricID, metric := range sq.SQMetrics {

View File

@@ -730,6 +730,10 @@ func (sMM *statMetricMock) AddEvent(evID string, ev utils.DataProvider) error {
return nil
}
func (sMM *statMetricMock) OneEvent(ev utils.DataProvider) error {
return nil
}
func (sMM *statMetricMock) RemEvent(evTenantID string) error {
switch sMM.testcase {
case "remExpired error":

View File

@@ -74,6 +74,7 @@ type StatMetric interface {
GetStringValue(roundingDecimal int) (val string)
GetFloat64Value(roundingDecimal int) (val float64)
AddEvent(evID string, ev utils.DataProvider) error
OneEvent(ev utils.DataProvider) error
RemEvent(evTenantID string) error
Marshal(ms Marshaler) (marshaled []byte, err error)
LoadMarshaled(ms Marshaler, marshaled []byte) (err error)
@@ -130,20 +131,27 @@ func (asr *StatASR) GetFloat64Value(roundingDecimal int) (val float64) {
return asr.getValue(roundingDecimal)
}
// AddEvent is part of StatMetric interface
func (asr *StatASR) AddEvent(evID string, ev utils.DataProvider) (err error) {
var answered int
if val, err := ev.FieldAsInterface([]string{utils.MetaReq, utils.AnswerTime}); err != nil {
func (asr *StatASR) getFieldVal(ev utils.DataProvider) (answered int, err error) {
var val any
if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.AnswerTime}); err != nil {
if err != utils.ErrNotFound {
return err
return answered, err
}
} else if at, err := utils.IfaceAsTime(val,
config.CgrConfig().GeneralCfg().DefaultTimezone); err != nil {
return err
return answered, err
} else if !at.IsZero() {
answered = 1
}
return answered, nil
}
// AddEvent is part of StatMetric interface
func (asr *StatASR) AddEvent(evID string, ev utils.DataProvider) (err error) {
var answered int
if answered, err = asr.getFieldVal(ev); err != nil {
return
}
if val, has := asr.Events[evID]; !has {
asr.Events[evID] = &StatWithCompress{Stat: float64(answered), CompressFactor: 1}
} else {
@@ -158,6 +166,22 @@ func (asr *StatASR) AddEvent(evID string, ev utils.DataProvider) (err error) {
return
}
func (asr *StatASR) OneEvent(ev utils.DataProvider) (err error) {
var answered int
if len(asr.Events) != 0 {
asr.Events = make(map[string]*StatWithCompress)
}
if answered, err = asr.getFieldVal(ev); err != nil {
return
}
asr.Count++
if answered == 1 {
asr.Answered++
}
asr.val = nil
return
}
func (asr *StatASR) RemEvent(evID string) (err error) {
val, has := asr.Events[evID]
if !has {
@@ -277,8 +301,7 @@ func (acd *StatACD) GetFloat64Value(roundingDecimal int) (v float64) {
return
}
func (acd *StatACD) AddEvent(evID string, ev utils.DataProvider) (err error) {
var dur time.Duration
func (acd *StatACD) getFieldVal(ev utils.DataProvider) (dur time.Duration, err error) {
var val any
if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.Usage}); err != nil {
if err == utils.ErrNotFound {
@@ -288,6 +311,14 @@ func (acd *StatACD) AddEvent(evID string, ev utils.DataProvider) (err error) {
} else if dur, err = utils.IfaceAsDuration(val); err != nil {
return
}
return
}
func (acd *StatACD) AddEvent(evID string, ev utils.DataProvider) (err error) {
var dur time.Duration
if dur, err = acd.getFieldVal(ev); err != nil {
return
}
acd.Sum += dur
if val, has := acd.Events[evID]; !has {
acd.Events[evID] = &DurationWithCompress{Duration: dur, CompressFactor: 1}
@@ -300,6 +331,20 @@ func (acd *StatACD) AddEvent(evID string, ev utils.DataProvider) (err error) {
return
}
func (acd *StatACD) OneEvent(ev utils.DataProvider) (err error) {
if len(acd.Events) != 0 {
acd.Events = make(map[string]*DurationWithCompress)
}
var dur time.Duration
if dur, err = acd.getFieldVal(ev); err != nil {
return
}
acd.Sum += dur
acd.Count++
acd.val = nil
return
}
func (acd *StatACD) RemEvent(evID string) (err error) {
val, has := acd.Events[evID]
if !has {
@@ -414,8 +459,7 @@ func (tcd *StatTCD) GetFloat64Value(roundingDecimal int) (v float64) {
return
}
func (tcd *StatTCD) AddEvent(evID string, ev utils.DataProvider) (err error) {
var dur time.Duration
func (tcd *StatTCD) getFieldVal(ev utils.DataProvider) (dur time.Duration, err error) {
var val any
if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.Usage}); err != nil {
if err == utils.ErrNotFound {
@@ -425,6 +469,14 @@ func (tcd *StatTCD) AddEvent(evID string, ev utils.DataProvider) (err error) {
} else if dur, err = utils.IfaceAsDuration(val); err != nil {
return
}
return
}
func (tcd *StatTCD) AddEvent(evID string, ev utils.DataProvider) (err error) {
var dur time.Duration
if dur, err = tcd.getFieldVal(ev); err != nil {
return
}
tcd.Sum += dur
if val, has := tcd.Events[evID]; !has {
tcd.Events[evID] = &DurationWithCompress{Duration: dur, CompressFactor: 1}
@@ -437,6 +489,20 @@ func (tcd *StatTCD) AddEvent(evID string, ev utils.DataProvider) (err error) {
return
}
func (tcd *StatTCD) OneEvent(ev utils.DataProvider) (err error) {
if len(tcd.Events) != 0 {
tcd.Events = make(map[string]*DurationWithCompress)
}
var dur time.Duration
if dur, err = tcd.getFieldVal(ev); err != nil {
return
}
tcd.Sum += dur
tcd.Count++
tcd.val = nil
return
}
func (tcd *StatTCD) RemEvent(evID string) (err error) {
val, has := tcd.Events[evID]
if !has {
@@ -546,8 +612,7 @@ func (acc *StatACC) GetFloat64Value(roundingDecimal int) (v float64) {
return acc.getValue(roundingDecimal)
}
func (acc *StatACC) AddEvent(evID string, ev utils.DataProvider) (err error) {
var cost float64
func (acc *StatACC) getFieldVal(ev utils.DataProvider) (cost float64, err error) {
var val any
if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.Cost}); err != nil {
if err == utils.ErrNotFound {
@@ -557,7 +622,15 @@ func (acc *StatACC) AddEvent(evID string, ev utils.DataProvider) (err error) {
} else if cost, err = utils.IfaceAsFloat64(val); err != nil {
return
} else if cost < 0 {
return utils.ErrPrefix(utils.ErrNegative, utils.Cost)
return cost, utils.ErrPrefix(utils.ErrNegative, utils.Cost)
}
return
}
func (acc *StatACC) AddEvent(evID string, ev utils.DataProvider) (err error) {
var cost float64
if cost, err = acc.getFieldVal(ev); err != nil {
return
}
acc.Sum += cost
if val, has := acc.Events[evID]; !has {
@@ -571,6 +644,22 @@ func (acc *StatACC) AddEvent(evID string, ev utils.DataProvider) (err error) {
return
}
func (acc *StatACC) OneEvent(ev utils.DataProvider) (err error) {
if len(acc.Events) != 0 {
acc.Events = make(map[string]*StatWithCompress)
}
var cost float64
if cost, err = acc.getFieldVal(ev); err != nil {
return
}
acc.Sum += cost
acc.Count++
acc.val = nil
return
}
func (acc *StatACC) RemEvent(evID string) (err error) {
cost, has := acc.Events[evID]
if !has {
@@ -678,8 +767,7 @@ func (tcc *StatTCC) GetFloat64Value(roundingDecimal int) (v float64) {
return tcc.getValue(roundingDecimal)
}
func (tcc *StatTCC) AddEvent(evID string, ev utils.DataProvider) (err error) {
var cost float64
func (tcc *StatTCC) getFieldVal(ev utils.DataProvider) (cost float64, err error) {
var val any
if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.Cost}); err != nil {
if err == utils.ErrNotFound {
@@ -689,7 +777,15 @@ func (tcc *StatTCC) AddEvent(evID string, ev utils.DataProvider) (err error) {
} else if cost, err = utils.IfaceAsFloat64(val); err != nil {
return
} else if cost < 0 {
return utils.ErrPrefix(utils.ErrNegative, utils.Cost)
return cost, utils.ErrPrefix(utils.ErrNegative, utils.Cost)
}
return
}
func (tcc *StatTCC) AddEvent(evID string, ev utils.DataProvider) (err error) {
var cost float64
if cost, err = tcc.getFieldVal(ev); err != nil {
return
}
tcc.Sum += cost
if val, has := tcc.Events[evID]; !has {
@@ -703,6 +799,22 @@ func (tcc *StatTCC) AddEvent(evID string, ev utils.DataProvider) (err error) {
return
}
func (tcc *StatTCC) OneEvent(ev utils.DataProvider) (err error) {
if len(tcc.Events) != 0 {
tcc.Events = make(map[string]*StatWithCompress)
}
var cost float64
if cost, err = tcc.getFieldVal(ev); err != nil {
return
}
tcc.Sum += cost
tcc.Count++
tcc.val = nil
return
}
func (tcc *StatTCC) RemEvent(evID string) (err error) {
cost, has := tcc.Events[evID]
if !has {
@@ -817,8 +929,7 @@ func (pdd *StatPDD) GetFloat64Value(roundingDecimal int) (v float64) {
return
}
func (pdd *StatPDD) AddEvent(evID string, ev utils.DataProvider) (err error) {
var dur time.Duration
func (pdd *StatPDD) getFieldVal(ev utils.DataProvider) (dur time.Duration, err error) {
var val any
if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.PDD}); err != nil {
if err == utils.ErrNotFound {
@@ -828,6 +939,14 @@ func (pdd *StatPDD) AddEvent(evID string, ev utils.DataProvider) (err error) {
} else if dur, err = utils.IfaceAsDuration(val); err != nil {
return
}
return
}
func (pdd *StatPDD) AddEvent(evID string, ev utils.DataProvider) (err error) {
var dur time.Duration
if dur, err = pdd.getFieldVal(ev); err != nil {
return
}
pdd.Sum += dur
if val, has := pdd.Events[evID]; !has {
pdd.Events[evID] = &DurationWithCompress{Duration: dur, CompressFactor: 1}
@@ -839,6 +958,19 @@ func (pdd *StatPDD) AddEvent(evID string, ev utils.DataProvider) (err error) {
pdd.val = nil
return
}
func (pdd *StatPDD) OneEvent(ev utils.DataProvider) (err error) {
if len(pdd.Events) != 0 {
pdd.Events = make(map[string]*DurationWithCompress)
}
var dur time.Duration
if dur, err = pdd.getFieldVal(ev); err != nil {
return
}
pdd.Sum += dur
pdd.Count++
pdd.val = nil
return
}
func (pdd *StatPDD) RemEvent(evID string) (err error) {
val, has := pdd.Events[evID]
@@ -941,15 +1073,21 @@ func (ddc *StatDDC) GetFloat64Value(roundingDecimal int) (v float64) {
return ddc.getValue(roundingDecimal)
}
func (ddc *StatDDC) AddEvent(evID string, ev utils.DataProvider) (err error) {
var fieldValue string
if fieldValue, err = ev.FieldAsString([]string{utils.MetaReq, utils.Destination}); err != nil {
func (ddc *StatDDC) getFieldVal(ev utils.DataProvider) (fieldVal string, err error) {
if fieldVal, err = ev.FieldAsString([]string{utils.MetaReq, utils.Destination}); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, utils.Destination)
}
return
}
return
}
func (ddc *StatDDC) AddEvent(evID string, ev utils.DataProvider) (err error) {
var fieldValue string
if fieldValue, err = ddc.getFieldVal(ev); err != nil {
return
}
// add to fieldValues
if _, has := ddc.FieldValues[fieldValue]; !has {
ddc.FieldValues[fieldValue] = make(utils.StringSet)
@@ -969,6 +1107,23 @@ func (ddc *StatDDC) AddEvent(evID string, ev utils.DataProvider) (err error) {
return
}
func (ddc *StatDDC) OneEvent(ev utils.DataProvider) (err error) {
if len(ddc.Events) != 0 {
ddc.Events = make(map[string]map[string]int64)
}
var fieldValue string
if fieldValue, err = ddc.getFieldVal(ev); err != nil {
return
}
if _, has := ddc.FieldValues[fieldValue]; !has {
ddc.FieldValues[fieldValue] = make(utils.StringSet)
}
ddc.Count++
return
}
func (ddc *StatDDC) RemEvent(evID string) (err error) {
fieldValues, has := ddc.Events[evID]
if !has {
@@ -1091,8 +1246,7 @@ func (sum *StatSum) GetFloat64Value(roundingDecimal int) (v float64) {
return sum.getValue(roundingDecimal)
}
func (sum *StatSum) AddEvent(evID string, ev utils.DataProvider) (err error) {
var val float64
func (sum *StatSum) getFieldVal(ev utils.DataProvider) (val float64, err error) {
var ival any
if ival, err = utils.DPDynamicInterface(sum.FieldName, ev); err != nil {
if err == utils.ErrNotFound {
@@ -1102,6 +1256,15 @@ func (sum *StatSum) AddEvent(evID string, ev utils.DataProvider) (err error) {
} else if val, err = utils.IfaceAsFloat64(ival); err != nil {
return
}
return
}
func (sum *StatSum) AddEvent(evID string, ev utils.DataProvider) (err error) {
var val float64
if val, err = sum.getFieldVal(ev); err != nil {
return
}
sum.Sum += val
if v, has := sum.Events[evID]; !has {
sum.Events[evID] = &StatWithCompress{Stat: val, CompressFactor: 1}
@@ -1114,6 +1277,22 @@ func (sum *StatSum) AddEvent(evID string, ev utils.DataProvider) (err error) {
return
}
func (sum *StatSum) OneEvent(ev utils.DataProvider) (err error) {
if len(sum.Events) != 0 {
sum.Events = make(map[string]*StatWithCompress)
}
var val float64
if val, err = sum.getFieldVal(ev); err != nil {
return
}
sum.Sum += val
sum.Count++
sum.val = nil
return
}
func (sum *StatSum) RemEvent(evID string) (err error) {
val, has := sum.Events[evID]
if !has {
@@ -1225,8 +1404,7 @@ func (avg *StatAverage) GetFloat64Value(roundingDecimal int) (v float64) {
return avg.getValue(roundingDecimal)
}
func (avg *StatAverage) AddEvent(evID string, ev utils.DataProvider) (err error) {
var val float64
func (avg *StatAverage) getFieldVal(ev utils.DataProvider) (val float64, err error) {
var ival any
if ival, err = utils.DPDynamicInterface(avg.FieldName, ev); err != nil {
if err == utils.ErrNotFound {
@@ -1236,6 +1414,15 @@ func (avg *StatAverage) AddEvent(evID string, ev utils.DataProvider) (err error)
} else if val, err = utils.IfaceAsFloat64(ival); err != nil {
return
}
return
}
func (avg *StatAverage) AddEvent(evID string, ev utils.DataProvider) (err error) {
var val float64
if val, err = avg.getFieldVal(ev); err != nil {
return
}
avg.Sum += val
if v, has := avg.Events[evID]; !has {
avg.Events[evID] = &StatWithCompress{Stat: val, CompressFactor: 1}
@@ -1248,6 +1435,21 @@ func (avg *StatAverage) AddEvent(evID string, ev utils.DataProvider) (err error)
return
}
// simply remove the ~*req. prefix and do normal process
func (avg *StatAverage) OneEvent(ev utils.DataProvider) (err error) {
if len(avg.Events) != 0 {
avg.Events = make(map[string]*StatWithCompress)
}
var val float64
if val, err = avg.getFieldVal(ev); err != nil {
return
}
avg.Sum += val
avg.Count++
avg.val = nil
return
}
func (avg *StatAverage) RemEvent(evID string) (err error) {
val, has := avg.Events[evID]
if !has {
@@ -1351,20 +1553,24 @@ func (dst *StatDistinct) GetFloat64Value(roundingDecimal int) (v float64) {
return dst.getValue(roundingDecimal)
}
func (dst *StatDistinct) AddEvent(evID string, ev utils.DataProvider) (err error) {
var fieldValue string
// simply remove the ~*req. prefix and do normal process
func (dst *StatDistinct) getFieldVal(ev utils.DataProvider) (fieldVal string, err error) {
if !strings.HasPrefix(dst.FieldName, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep) {
return fmt.Errorf("Invalid format for field <%s>", dst.FieldName)
return fieldVal, fmt.Errorf("invalid format for field <%s>", dst.FieldName)
}
if fieldValue, err = utils.DPDynamicString(dst.FieldName, ev); err != nil {
if fieldVal, err = utils.DPDynamicString(dst.FieldName, ev); err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, dst.FieldName)
}
return
}
return
}
func (dst *StatDistinct) AddEvent(evID string, ev utils.DataProvider) (err error) {
var fieldValue string
if fieldValue, err = dst.getFieldVal(ev); err != nil {
return
}
// add to fieldValues
if _, has := dst.FieldValues[fieldValue]; !has {
dst.FieldValues[fieldValue] = make(utils.StringSet)
@@ -1384,6 +1590,21 @@ func (dst *StatDistinct) AddEvent(evID string, ev utils.DataProvider) (err error
return
}
func (dst *StatDistinct) OneEvent(ev utils.DataProvider) (err error) {
if len(dst.Events) != 0 {
dst.Events = make(map[string]map[string]int64)
}
var fieldValue string
if fieldValue, err = dst.getFieldVal(ev); err != nil {
return
}
if _, has := dst.FieldValues[fieldValue]; !has {
dst.FieldValues[fieldValue] = make(utils.StringSet)
}
dst.Count++
return
}
func (dst *StatDistinct) RemEvent(evID string) (err error) {
fieldValues, has := dst.Events[evID]
if !has {

View File

@@ -223,6 +223,9 @@ func (sS *StatService) matchingStatQueuesForEvent(tnt string, statsIDs []string,
if sqPrfl.TTL > 0 {
sq.ttl = utils.DurationPointer(sqPrfl.TTL)
}
if sqPrfl.TTL == -1 || sqPrfl.QueueLength == -1 {
sq.ttl = utils.DurationPointer(sqPrfl.TTL)
}
sq.sqPrfl = sqPrfl
sqs = append(sqs, sq)
}