From b2aad38e6881d5ed90a1b83cc381a912344a7584 Mon Sep 17 00:00:00 2001 From: gezimbll Date: Fri, 24 Nov 2023 11:27:59 -0500 Subject: [PATCH] Added OneEvent functionality to StatS --- engine/libstats.go | 33 +++++ engine/libstats_test.go | 4 + engine/statmetrics.go | 283 +++++++++++++++++++++++++++++++++++----- engine/stats.go | 3 + 4 files changed, 292 insertions(+), 31 deletions(-) diff --git a/engine/libstats.go b/engine/libstats.go index fdd79b9af..6720f89c8 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -234,15 +234,48 @@ func (sq *StatQueue) TenantID() string { // ProcessEvent processes a utils.CGREvent, returns true if processed func (sq *StatQueue) ProcessEvent(tnt, evID string, filterS *FilterS, evNm utils.MapStorage) (err error) { + var oneEv bool + if oneEv, err = sq.isOneEvent(tnt, filterS, evNm); oneEv { + return err + } if _, err = sq.remExpired(); err != nil { return } + if err = sq.remOnQueueLength(); err != nil { return } return sq.addStatEvent(tnt, evID, filterS, evNm) } +func (sq *StatQueue) isOneEvent(tnt string, filterS *FilterS, evNm utils.MapStorage) (bool, error) { + if sq.ttl != nil && *sq.ttl == -1 { + sq.SQItems = make([]SQItem, 0) + return true, sq.addOneEvent(tnt, filterS, evNm) + } + return false, nil +} + +func (sq *StatQueue) addOneEvent(tnt string, filterS *FilterS, evNm utils.MapStorage) (err error) { + var pass bool + dDP := newDynamicDP(config.CgrConfig().FilterSCfg().ResourceSConns, config.CgrConfig().FilterSCfg().StatSConns, + config.CgrConfig().FilterSCfg().ApierSConns, tnt, utils.MapStorage{utils.MetaReq: evNm[utils.MetaReq]}) + for metricID, metric := range sq.SQMetrics { + if pass, err = filterS.Pass(tnt, metric.GetFilterIDs(), + evNm); err != nil { + return + } else if !pass { + continue + } + if err = metric.OneEvent(dDP); err != nil { + utils.Logger.Warning(fmt.Sprintf(" metricID: %s, OneEvent, error: %s", + metricID, err.Error())) + return + } + } + return +} + // remStatEvent removes an event from metrics func (sq *StatQueue) remEventWithID(evID string) (err error) { for metricID, metric := range sq.SQMetrics { diff --git a/engine/libstats_test.go b/engine/libstats_test.go index c326d9c1e..6b9845fcc 100644 --- a/engine/libstats_test.go +++ b/engine/libstats_test.go @@ -730,6 +730,10 @@ func (sMM *statMetricMock) AddEvent(evID string, ev utils.DataProvider) error { return nil } +func (sMM *statMetricMock) OneEvent(ev utils.DataProvider) error { + return nil +} + func (sMM *statMetricMock) RemEvent(evTenantID string) error { switch sMM.testcase { case "remExpired error": diff --git a/engine/statmetrics.go b/engine/statmetrics.go index a0f1e0996..fb7940b66 100644 --- a/engine/statmetrics.go +++ b/engine/statmetrics.go @@ -74,6 +74,7 @@ type StatMetric interface { GetStringValue(roundingDecimal int) (val string) GetFloat64Value(roundingDecimal int) (val float64) AddEvent(evID string, ev utils.DataProvider) error + OneEvent(ev utils.DataProvider) error RemEvent(evTenantID string) error Marshal(ms Marshaler) (marshaled []byte, err error) LoadMarshaled(ms Marshaler, marshaled []byte) (err error) @@ -130,20 +131,27 @@ func (asr *StatASR) GetFloat64Value(roundingDecimal int) (val float64) { return asr.getValue(roundingDecimal) } -// AddEvent is part of StatMetric interface -func (asr *StatASR) AddEvent(evID string, ev utils.DataProvider) (err error) { - var answered int - if val, err := ev.FieldAsInterface([]string{utils.MetaReq, utils.AnswerTime}); err != nil { +func (asr *StatASR) getFieldVal(ev utils.DataProvider) (answered int, err error) { + var val any + if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.AnswerTime}); err != nil { if err != utils.ErrNotFound { - return err + return answered, err } } else if at, err := utils.IfaceAsTime(val, config.CgrConfig().GeneralCfg().DefaultTimezone); err != nil { - return err + return answered, err } else if !at.IsZero() { answered = 1 } + return answered, nil +} +// AddEvent is part of StatMetric interface +func (asr *StatASR) AddEvent(evID string, ev utils.DataProvider) (err error) { + var answered int + if answered, err = asr.getFieldVal(ev); err != nil { + return + } if val, has := asr.Events[evID]; !has { asr.Events[evID] = &StatWithCompress{Stat: float64(answered), CompressFactor: 1} } else { @@ -158,6 +166,22 @@ func (asr *StatASR) AddEvent(evID string, ev utils.DataProvider) (err error) { return } +func (asr *StatASR) OneEvent(ev utils.DataProvider) (err error) { + var answered int + if len(asr.Events) != 0 { + asr.Events = make(map[string]*StatWithCompress) + } + if answered, err = asr.getFieldVal(ev); err != nil { + return + } + asr.Count++ + if answered == 1 { + asr.Answered++ + } + asr.val = nil + return +} + func (asr *StatASR) RemEvent(evID string) (err error) { val, has := asr.Events[evID] if !has { @@ -277,8 +301,7 @@ func (acd *StatACD) GetFloat64Value(roundingDecimal int) (v float64) { return } -func (acd *StatACD) AddEvent(evID string, ev utils.DataProvider) (err error) { - var dur time.Duration +func (acd *StatACD) getFieldVal(ev utils.DataProvider) (dur time.Duration, err error) { var val any if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.Usage}); err != nil { if err == utils.ErrNotFound { @@ -288,6 +311,14 @@ func (acd *StatACD) AddEvent(evID string, ev utils.DataProvider) (err error) { } else if dur, err = utils.IfaceAsDuration(val); err != nil { return } + return +} + +func (acd *StatACD) AddEvent(evID string, ev utils.DataProvider) (err error) { + var dur time.Duration + if dur, err = acd.getFieldVal(ev); err != nil { + return + } acd.Sum += dur if val, has := acd.Events[evID]; !has { acd.Events[evID] = &DurationWithCompress{Duration: dur, CompressFactor: 1} @@ -300,6 +331,20 @@ func (acd *StatACD) AddEvent(evID string, ev utils.DataProvider) (err error) { return } +func (acd *StatACD) OneEvent(ev utils.DataProvider) (err error) { + if len(acd.Events) != 0 { + acd.Events = make(map[string]*DurationWithCompress) + } + var dur time.Duration + if dur, err = acd.getFieldVal(ev); err != nil { + return + } + acd.Sum += dur + acd.Count++ + acd.val = nil + return +} + func (acd *StatACD) RemEvent(evID string) (err error) { val, has := acd.Events[evID] if !has { @@ -414,8 +459,7 @@ func (tcd *StatTCD) GetFloat64Value(roundingDecimal int) (v float64) { return } -func (tcd *StatTCD) AddEvent(evID string, ev utils.DataProvider) (err error) { - var dur time.Duration +func (tcd *StatTCD) getFieldVal(ev utils.DataProvider) (dur time.Duration, err error) { var val any if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.Usage}); err != nil { if err == utils.ErrNotFound { @@ -425,6 +469,14 @@ func (tcd *StatTCD) AddEvent(evID string, ev utils.DataProvider) (err error) { } else if dur, err = utils.IfaceAsDuration(val); err != nil { return } + return +} + +func (tcd *StatTCD) AddEvent(evID string, ev utils.DataProvider) (err error) { + var dur time.Duration + if dur, err = tcd.getFieldVal(ev); err != nil { + return + } tcd.Sum += dur if val, has := tcd.Events[evID]; !has { tcd.Events[evID] = &DurationWithCompress{Duration: dur, CompressFactor: 1} @@ -437,6 +489,20 @@ func (tcd *StatTCD) AddEvent(evID string, ev utils.DataProvider) (err error) { return } +func (tcd *StatTCD) OneEvent(ev utils.DataProvider) (err error) { + if len(tcd.Events) != 0 { + tcd.Events = make(map[string]*DurationWithCompress) + } + var dur time.Duration + if dur, err = tcd.getFieldVal(ev); err != nil { + return + } + tcd.Sum += dur + tcd.Count++ + tcd.val = nil + return +} + func (tcd *StatTCD) RemEvent(evID string) (err error) { val, has := tcd.Events[evID] if !has { @@ -546,8 +612,7 @@ func (acc *StatACC) GetFloat64Value(roundingDecimal int) (v float64) { return acc.getValue(roundingDecimal) } -func (acc *StatACC) AddEvent(evID string, ev utils.DataProvider) (err error) { - var cost float64 +func (acc *StatACC) getFieldVal(ev utils.DataProvider) (cost float64, err error) { var val any if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.Cost}); err != nil { if err == utils.ErrNotFound { @@ -557,7 +622,15 @@ func (acc *StatACC) AddEvent(evID string, ev utils.DataProvider) (err error) { } else if cost, err = utils.IfaceAsFloat64(val); err != nil { return } else if cost < 0 { - return utils.ErrPrefix(utils.ErrNegative, utils.Cost) + return cost, utils.ErrPrefix(utils.ErrNegative, utils.Cost) + } + return +} + +func (acc *StatACC) AddEvent(evID string, ev utils.DataProvider) (err error) { + var cost float64 + if cost, err = acc.getFieldVal(ev); err != nil { + return } acc.Sum += cost if val, has := acc.Events[evID]; !has { @@ -571,6 +644,22 @@ func (acc *StatACC) AddEvent(evID string, ev utils.DataProvider) (err error) { return } +func (acc *StatACC) OneEvent(ev utils.DataProvider) (err error) { + if len(acc.Events) != 0 { + acc.Events = make(map[string]*StatWithCompress) + } + + var cost float64 + if cost, err = acc.getFieldVal(ev); err != nil { + return + } + + acc.Sum += cost + acc.Count++ + acc.val = nil + return +} + func (acc *StatACC) RemEvent(evID string) (err error) { cost, has := acc.Events[evID] if !has { @@ -678,8 +767,7 @@ func (tcc *StatTCC) GetFloat64Value(roundingDecimal int) (v float64) { return tcc.getValue(roundingDecimal) } -func (tcc *StatTCC) AddEvent(evID string, ev utils.DataProvider) (err error) { - var cost float64 +func (tcc *StatTCC) getFieldVal(ev utils.DataProvider) (cost float64, err error) { var val any if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.Cost}); err != nil { if err == utils.ErrNotFound { @@ -689,7 +777,15 @@ func (tcc *StatTCC) AddEvent(evID string, ev utils.DataProvider) (err error) { } else if cost, err = utils.IfaceAsFloat64(val); err != nil { return } else if cost < 0 { - return utils.ErrPrefix(utils.ErrNegative, utils.Cost) + return cost, utils.ErrPrefix(utils.ErrNegative, utils.Cost) + } + return +} + +func (tcc *StatTCC) AddEvent(evID string, ev utils.DataProvider) (err error) { + var cost float64 + if cost, err = tcc.getFieldVal(ev); err != nil { + return } tcc.Sum += cost if val, has := tcc.Events[evID]; !has { @@ -703,6 +799,22 @@ func (tcc *StatTCC) AddEvent(evID string, ev utils.DataProvider) (err error) { return } +func (tcc *StatTCC) OneEvent(ev utils.DataProvider) (err error) { + if len(tcc.Events) != 0 { + tcc.Events = make(map[string]*StatWithCompress) + } + + var cost float64 + if cost, err = tcc.getFieldVal(ev); err != nil { + return + } + + tcc.Sum += cost + tcc.Count++ + tcc.val = nil + return +} + func (tcc *StatTCC) RemEvent(evID string) (err error) { cost, has := tcc.Events[evID] if !has { @@ -817,8 +929,7 @@ func (pdd *StatPDD) GetFloat64Value(roundingDecimal int) (v float64) { return } -func (pdd *StatPDD) AddEvent(evID string, ev utils.DataProvider) (err error) { - var dur time.Duration +func (pdd *StatPDD) getFieldVal(ev utils.DataProvider) (dur time.Duration, err error) { var val any if val, err = ev.FieldAsInterface([]string{utils.MetaReq, utils.PDD}); err != nil { if err == utils.ErrNotFound { @@ -828,6 +939,14 @@ func (pdd *StatPDD) AddEvent(evID string, ev utils.DataProvider) (err error) { } else if dur, err = utils.IfaceAsDuration(val); err != nil { return } + return +} + +func (pdd *StatPDD) AddEvent(evID string, ev utils.DataProvider) (err error) { + var dur time.Duration + if dur, err = pdd.getFieldVal(ev); err != nil { + return + } pdd.Sum += dur if val, has := pdd.Events[evID]; !has { pdd.Events[evID] = &DurationWithCompress{Duration: dur, CompressFactor: 1} @@ -839,6 +958,19 @@ func (pdd *StatPDD) AddEvent(evID string, ev utils.DataProvider) (err error) { pdd.val = nil return } +func (pdd *StatPDD) OneEvent(ev utils.DataProvider) (err error) { + if len(pdd.Events) != 0 { + pdd.Events = make(map[string]*DurationWithCompress) + } + var dur time.Duration + if dur, err = pdd.getFieldVal(ev); err != nil { + return + } + pdd.Sum += dur + pdd.Count++ + pdd.val = nil + return +} func (pdd *StatPDD) RemEvent(evID string) (err error) { val, has := pdd.Events[evID] @@ -941,15 +1073,21 @@ func (ddc *StatDDC) GetFloat64Value(roundingDecimal int) (v float64) { return ddc.getValue(roundingDecimal) } -func (ddc *StatDDC) AddEvent(evID string, ev utils.DataProvider) (err error) { - var fieldValue string - if fieldValue, err = ev.FieldAsString([]string{utils.MetaReq, utils.Destination}); err != nil { +func (ddc *StatDDC) getFieldVal(ev utils.DataProvider) (fieldVal string, err error) { + if fieldVal, err = ev.FieldAsString([]string{utils.MetaReq, utils.Destination}); err != nil { if err == utils.ErrNotFound { err = utils.ErrPrefix(err, utils.Destination) } return } + return +} +func (ddc *StatDDC) AddEvent(evID string, ev utils.DataProvider) (err error) { + var fieldValue string + if fieldValue, err = ddc.getFieldVal(ev); err != nil { + return + } // add to fieldValues if _, has := ddc.FieldValues[fieldValue]; !has { ddc.FieldValues[fieldValue] = make(utils.StringSet) @@ -969,6 +1107,23 @@ func (ddc *StatDDC) AddEvent(evID string, ev utils.DataProvider) (err error) { return } +func (ddc *StatDDC) OneEvent(ev utils.DataProvider) (err error) { + if len(ddc.Events) != 0 { + ddc.Events = make(map[string]map[string]int64) + } + + var fieldValue string + if fieldValue, err = ddc.getFieldVal(ev); err != nil { + return + } + + if _, has := ddc.FieldValues[fieldValue]; !has { + ddc.FieldValues[fieldValue] = make(utils.StringSet) + } + ddc.Count++ + return +} + func (ddc *StatDDC) RemEvent(evID string) (err error) { fieldValues, has := ddc.Events[evID] if !has { @@ -1091,8 +1246,7 @@ func (sum *StatSum) GetFloat64Value(roundingDecimal int) (v float64) { return sum.getValue(roundingDecimal) } -func (sum *StatSum) AddEvent(evID string, ev utils.DataProvider) (err error) { - var val float64 +func (sum *StatSum) getFieldVal(ev utils.DataProvider) (val float64, err error) { var ival any if ival, err = utils.DPDynamicInterface(sum.FieldName, ev); err != nil { if err == utils.ErrNotFound { @@ -1102,6 +1256,15 @@ func (sum *StatSum) AddEvent(evID string, ev utils.DataProvider) (err error) { } else if val, err = utils.IfaceAsFloat64(ival); err != nil { return } + return +} + +func (sum *StatSum) AddEvent(evID string, ev utils.DataProvider) (err error) { + var val float64 + if val, err = sum.getFieldVal(ev); err != nil { + return + } + sum.Sum += val if v, has := sum.Events[evID]; !has { sum.Events[evID] = &StatWithCompress{Stat: val, CompressFactor: 1} @@ -1114,6 +1277,22 @@ func (sum *StatSum) AddEvent(evID string, ev utils.DataProvider) (err error) { return } +func (sum *StatSum) OneEvent(ev utils.DataProvider) (err error) { + if len(sum.Events) != 0 { + sum.Events = make(map[string]*StatWithCompress) + } + + var val float64 + if val, err = sum.getFieldVal(ev); err != nil { + return + } + + sum.Sum += val + sum.Count++ + sum.val = nil + return +} + func (sum *StatSum) RemEvent(evID string) (err error) { val, has := sum.Events[evID] if !has { @@ -1225,8 +1404,7 @@ func (avg *StatAverage) GetFloat64Value(roundingDecimal int) (v float64) { return avg.getValue(roundingDecimal) } -func (avg *StatAverage) AddEvent(evID string, ev utils.DataProvider) (err error) { - var val float64 +func (avg *StatAverage) getFieldVal(ev utils.DataProvider) (val float64, err error) { var ival any if ival, err = utils.DPDynamicInterface(avg.FieldName, ev); err != nil { if err == utils.ErrNotFound { @@ -1236,6 +1414,15 @@ func (avg *StatAverage) AddEvent(evID string, ev utils.DataProvider) (err error) } else if val, err = utils.IfaceAsFloat64(ival); err != nil { return } + return +} + +func (avg *StatAverage) AddEvent(evID string, ev utils.DataProvider) (err error) { + var val float64 + if val, err = avg.getFieldVal(ev); err != nil { + return + } + avg.Sum += val if v, has := avg.Events[evID]; !has { avg.Events[evID] = &StatWithCompress{Stat: val, CompressFactor: 1} @@ -1248,6 +1435,21 @@ func (avg *StatAverage) AddEvent(evID string, ev utils.DataProvider) (err error) return } +// simply remove the ~*req. prefix and do normal process +func (avg *StatAverage) OneEvent(ev utils.DataProvider) (err error) { + if len(avg.Events) != 0 { + avg.Events = make(map[string]*StatWithCompress) + } + var val float64 + if val, err = avg.getFieldVal(ev); err != nil { + return + } + avg.Sum += val + avg.Count++ + avg.val = nil + return +} + func (avg *StatAverage) RemEvent(evID string) (err error) { val, has := avg.Events[evID] if !has { @@ -1351,20 +1553,24 @@ func (dst *StatDistinct) GetFloat64Value(roundingDecimal int) (v float64) { return dst.getValue(roundingDecimal) } -func (dst *StatDistinct) AddEvent(evID string, ev utils.DataProvider) (err error) { - var fieldValue string - // simply remove the ~*req. prefix and do normal process +func (dst *StatDistinct) getFieldVal(ev utils.DataProvider) (fieldVal string, err error) { if !strings.HasPrefix(dst.FieldName, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep) { - return fmt.Errorf("Invalid format for field <%s>", dst.FieldName) + return fieldVal, fmt.Errorf("invalid format for field <%s>", dst.FieldName) } - - if fieldValue, err = utils.DPDynamicString(dst.FieldName, ev); err != nil { + if fieldVal, err = utils.DPDynamicString(dst.FieldName, ev); err != nil { if err == utils.ErrNotFound { err = utils.ErrPrefix(err, dst.FieldName) } return } + return +} +func (dst *StatDistinct) AddEvent(evID string, ev utils.DataProvider) (err error) { + var fieldValue string + if fieldValue, err = dst.getFieldVal(ev); err != nil { + return + } // add to fieldValues if _, has := dst.FieldValues[fieldValue]; !has { dst.FieldValues[fieldValue] = make(utils.StringSet) @@ -1384,6 +1590,21 @@ func (dst *StatDistinct) AddEvent(evID string, ev utils.DataProvider) (err error return } +func (dst *StatDistinct) OneEvent(ev utils.DataProvider) (err error) { + if len(dst.Events) != 0 { + dst.Events = make(map[string]map[string]int64) + } + var fieldValue string + if fieldValue, err = dst.getFieldVal(ev); err != nil { + return + } + if _, has := dst.FieldValues[fieldValue]; !has { + dst.FieldValues[fieldValue] = make(utils.StringSet) + } + dst.Count++ + return +} + func (dst *StatDistinct) RemEvent(evID string) (err error) { fieldValues, has := dst.Events[evID] if !has { diff --git a/engine/stats.go b/engine/stats.go index 7784feccf..ee6977101 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -223,6 +223,9 @@ func (sS *StatService) matchingStatQueuesForEvent(tnt string, statsIDs []string, if sqPrfl.TTL > 0 { sq.ttl = utils.DurationPointer(sqPrfl.TTL) } + if sqPrfl.TTL == -1 || sqPrfl.QueueLength == -1 { + sq.ttl = utils.DurationPointer(sqPrfl.TTL) + } sq.sqPrfl = sqPrfl sqs = append(sqs, sq) }