From 5ad624550129225a49e471c9a76bd1df1952c8f8 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 21 Oct 2020 12:52:04 +0300 Subject: [PATCH] Updated Stats ProcessEvent --- engine/attributes.go | 18 +++++++-------- engine/libstats.go | 16 ++++++------- engine/libstats_test.go | 14 ++++++------ engine/stats.go | 14 ++++++------ engine/stats_test.go | 12 +++++----- engine/z_attributes_test.go | 45 ++++++++++++++++++++----------------- 6 files changed, 61 insertions(+), 58 deletions(-) diff --git a/engine/attributes.go b/engine/attributes.go index c55e2479c..0f01de126 100644 --- a/engine/attributes.go +++ b/engine/attributes.go @@ -62,15 +62,15 @@ func (alS *AttributeService) Shutdown() (err error) { } // attributeProfileForEvent returns the matching attribute -func (alS *AttributeService) attributeProfileForEvent(tnt string, args *AttrArgsProcessEvent, evNm utils.MapStorage, lastID string) (matchAttrPrfl *AttributeProfile, err error) { +func (alS *AttributeService) attributeProfileForEvent(tnt string, ctx *string, attrsIDs []string, actTime *time.Time, evNm utils.MapStorage, lastID string) (matchAttrPrfl *AttributeProfile, err error) { var attrIDs []string contextVal := utils.MetaDefault - if args.Context != nil && *args.Context != "" { - contextVal = *args.Context + if ctx != nil && *ctx != "" { + contextVal = *ctx } attrIdxKey := utils.ConcatenatedKey(tnt, contextVal) - if len(args.AttributeIDs) != 0 { - attrIDs = args.AttributeIDs + if len(attrsIDs) != 0 { + attrIDs = attrsIDs } else { aPrflIDs, err := MatchingItemIDsForEvent(evNm, alS.cgrcfg.AttributeSCfg().StringIndexedFields, @@ -109,8 +109,8 @@ func (alS *AttributeService) attributeProfileForEvent(tnt string, args *AttrArgs !utils.IsSliceMember(aPrfl.Contexts, contextVal) { continue } - if aPrfl.ActivationInterval != nil && args.Time != nil && - !aPrfl.ActivationInterval.IsActiveAtTime(*args.Time) { // not active + if aPrfl.ActivationInterval != nil && actTime != nil && + !aPrfl.ActivationInterval.IsActiveAtTime(*actTime) { // not active continue } if pass, err := alS.filterS.Pass(tnt, aPrfl.FilterIDs, @@ -208,7 +208,7 @@ func (attr *AttrArgsProcessEvent) Clone() *AttrArgsProcessEvent { func (alS *AttributeService) processEvent(tnt string, args *AttrArgsProcessEvent, evNm utils.MapStorage, dynDP utils.DataProvider, lastID string) ( rply *AttrSProcessEventReply, err error) { var attrPrf *AttributeProfile - if attrPrf, err = alS.attributeProfileForEvent(tnt, args, evNm, lastID); err != nil { + if attrPrf, err = alS.attributeProfileForEvent(tnt, args.Context, args.AttributeIDs, args.Time, evNm, lastID); err != nil { return } rply = &AttrSProcessEventReply{ @@ -435,7 +435,7 @@ func (alS *AttributeService) V1GetAttributeForEvent(args *AttrArgsProcessEvent, if tnt == utils.EmptyString { tnt = alS.cgrcfg.GeneralCfg().DefaultTenant } - attrPrf, err := alS.attributeProfileForEvent(tnt, args, utils.MapStorage{ + attrPrf, err := alS.attributeProfileForEvent(tnt, args.Context, args.AttributeIDs, args.Time, utils.MapStorage{ utils.MetaReq: args.CGREvent.Event, utils.MetaOpts: args.Opts, utils.MetaVars: utils.MapStorage{ diff --git a/engine/libstats.go b/engine/libstats.go index 90bab7754..7baa1145e 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -174,14 +174,14 @@ func (sq *StatQueue) TenantID() string { } // ProcessEvent processes a utils.CGREvent, returns true if processed -func (sq *StatQueue) ProcessEvent(ev *utils.CGREvent, filterS *FilterS, evNm utils.MapStorage) (err error) { +func (sq *StatQueue) ProcessEvent(tnt, evID string, filterS *FilterS, evNm utils.MapStorage) (err error) { if err = sq.remExpired(); err != nil { return } if err = sq.remOnQueueLength(); err != nil { return } - return sq.addStatEvent(ev, filterS, evNm) + return sq.addStatEvent(tnt, evID, filterS, evNm) } // remStatEvent removes an event from metrics @@ -237,7 +237,7 @@ func (sq *StatQueue) remOnQueueLength() (err error) { } // addStatEvent computes metrics for an event -func (sq *StatQueue) addStatEvent(ev *utils.CGREvent, filterS *FilterS, evNm utils.MapStorage) (err error) { +func (sq *StatQueue) addStatEvent(tnt, evID string, filterS *FilterS, evNm utils.MapStorage) (err error) { var expTime *time.Time if sq.ttl != nil { expTime = utils.TimePointer(time.Now().Add(*sq.ttl)) @@ -246,20 +246,20 @@ func (sq *StatQueue) addStatEvent(ev *utils.CGREvent, filterS *FilterS, evNm uti struct { EventID string ExpiryTime *time.Time - }{ev.ID, expTime}) + }{evID, expTime}) var pass bool // recreate the request without *opts - req := utils.MapStorage{utils.MetaReq: ev.Event} + req := utils.MapStorage{utils.MetaReq: evNm[utils.MetaReq]} for metricID, metric := range sq.SQMetrics { - if pass, err = filterS.Pass(ev.Tenant, metric.GetFilterIDs(), + if pass, err = filterS.Pass(tnt, metric.GetFilterIDs(), evNm); err != nil { return } else if !pass { continue } - if err = metric.AddEvent(ev.ID, req); err != nil { + if err = metric.AddEvent(evID, req); err != nil { utils.Logger.Warning(fmt.Sprintf(" metricID: %s, add eventID: %s, error: %s", - metricID, ev.ID, err.Error())) + metricID, evID, err.Error())) return } } diff --git a/engine/libstats_test.go b/engine/libstats_test.go index d730dcf37..ac46c2d74 100644 --- a/engine/libstats_test.go +++ b/engine/libstats_test.go @@ -220,7 +220,7 @@ func TestStatAddStatEvent(t *testing.T) { t.Errorf("received ASR: %v", asr) } ev1 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_1"} - sq.addStatEvent(ev1, nil, nil) + sq.addStatEvent(ev1.Tenant, ev1.ID, nil, utils.MapStorage{utils.MetaReq: ev1.Event}) if asr := asrMetric.GetFloat64Value(); asr != 50 { t.Errorf("received ASR: %v", asr) } else if asrMetric.Answered != 1 || asrMetric.Count != 2 { @@ -228,7 +228,7 @@ func TestStatAddStatEvent(t *testing.T) { } ev1.Event = map[string]interface{}{ utils.AnswerTime: time.Now()} - sq.addStatEvent(ev1, nil, nil) + sq.addStatEvent(ev1.Tenant, ev1.ID, nil, utils.MapStorage{utils.MetaReq: ev1.Event}) if asr := asrMetric.GetFloat64Value(); asr != 66.66667 { t.Errorf("received ASR: %v", asr) } else if asrMetric.Answered != 2 || asrMetric.Count != 3 { @@ -634,7 +634,7 @@ func TestStatRemoveExpiredTTL(t *testing.T) { //add ev1 with ttl 100ms (after 100ms the event should be removed) ev1 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_1"} - sq.ProcessEvent(ev1, nil, utils.MapStorage{utils.MetaReq: ev1.Event}) + sq.ProcessEvent(ev1.Tenant, ev1.ID, nil, utils.MapStorage{utils.MetaReq: ev1.Event}) if len(sq.SQItems) != 1 && sq.SQItems[0].EventID != "TestStatAddStatEvent_1" { t.Errorf("Expecting: 1, received: %+v", len(sq.SQItems)) @@ -644,7 +644,7 @@ func TestStatRemoveExpiredTTL(t *testing.T) { //processing a new event should clean the expired events and add the new one ev2 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_2"} - sq.ProcessEvent(ev2, nil, utils.MapStorage{utils.MetaReq: ev2.Event}) + sq.ProcessEvent(ev2.Tenant, ev2.ID, nil, utils.MapStorage{utils.MetaReq: ev2.Event}) if len(sq.SQItems) != 1 && sq.SQItems[0].EventID != "TestStatAddStatEvent_2" { t.Errorf("Expecting: 1, received: %+v", len(sq.SQItems)) } @@ -668,7 +668,7 @@ func TestStatRemoveExpiredQueue(t *testing.T) { //add ev1 with ttl 100ms (after 100ms the event should be removed) ev1 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_1"} - sq.ProcessEvent(ev1, nil, utils.MapStorage{utils.MetaReq: ev1.Event}) + sq.ProcessEvent(ev1.Tenant, ev1.ID, nil, utils.MapStorage{utils.MetaReq: ev1.Event}) if len(sq.SQItems) != 1 && sq.SQItems[0].EventID != "TestStatAddStatEvent_1" { t.Errorf("Expecting: 1, received: %+v", len(sq.SQItems)) @@ -678,7 +678,7 @@ func TestStatRemoveExpiredQueue(t *testing.T) { //processing a new event should clean the expired events and add the new one ev2 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_2"} - sq.ProcessEvent(ev2, nil, utils.MapStorage{utils.MetaReq: ev2.Event}) + sq.ProcessEvent(ev2.Tenant, ev2.ID, nil, utils.MapStorage{utils.MetaReq: ev2.Event}) if len(sq.SQItems) != 2 && sq.SQItems[0].EventID != "TestStatAddStatEvent_1" && sq.SQItems[1].EventID != "TestStatAddStatEvent_2" { t.Errorf("Expecting: 2, received: %+v", len(sq.SQItems)) @@ -686,7 +686,7 @@ func TestStatRemoveExpiredQueue(t *testing.T) { //processing a new event should clean the expired events and add the new one ev3 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_3"} - sq.ProcessEvent(ev3, nil, utils.MapStorage{utils.MetaReq: ev3.Event}) + sq.ProcessEvent(ev3.Tenant, ev3.ID, nil, utils.MapStorage{utils.MetaReq: ev3.Event}) if len(sq.SQItems) != 2 && sq.SQItems[0].EventID != "TestStatAddStatEvent_2" && sq.SQItems[1].EventID != "TestStatAddStatEvent_3" { t.Errorf("Expecting: 2, received: %+v", len(sq.SQItems)) diff --git a/engine/stats.go b/engine/stats.go index b703c2325..ff3875106 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -150,8 +150,8 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) { } // matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call -func (sS *StatService) matchingStatQueuesForEvent(tnt string, args *StatsArgsProcessEvent, evNm utils.MapStorage) (sqs StatQueues, err error) { - sqIDs := utils.NewStringSet(args.StatIDs) +func (sS *StatService) matchingStatQueuesForEvent(tnt string, statsIDs []string, actTime *time.Time, evNm utils.MapStorage) (sqs StatQueues, err error) { + sqIDs := utils.NewStringSet(statsIDs) if len(sqIDs) == 0 { sqIDs, err = MatchingItemIDsForEvent(evNm, sS.cgrcfg.StatSCfg().StringIndexedFields, @@ -174,8 +174,8 @@ func (sS *StatService) matchingStatQueuesForEvent(tnt string, args *StatsArgsPro } return nil, err } - if sqPrfl.ActivationInterval != nil && args.Time != nil && - !sqPrfl.ActivationInterval.IsActiveAtTime(*args.Time) { // not active + if sqPrfl.ActivationInterval != nil && actTime != nil && + !sqPrfl.ActivationInterval.IsActiveAtTime(*actTime) { // not active continue } if pass, err := sS.filterS.Pass(tnt, sqPrfl.FilterIDs, @@ -264,7 +264,7 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st utils.MetaReq: args.Event, utils.MetaOpts: args.Opts, } - matchSQs, err := sS.matchingStatQueuesForEvent(tnt, args, evNm) + matchSQs, err := sS.matchingStatQueuesForEvent(tnt, args.StatIDs, args.Time, evNm) if err != nil { return nil, err } @@ -281,7 +281,7 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st stsIDs = append(stsIDs, sq.ID) lkID := utils.StatQueuePrefix + sq.TenantID() guardian.Guardian.Guard(func() (gRes interface{}, gErr error) { - err = sq.ProcessEvent(args.CGREvent, sS.filterS, evNm) + err = sq.ProcessEvent(tnt, args.ID, sS.filterS, evNm) return }, config.CgrConfig().GeneralCfg().LockingTimeout, lkID) if err != nil { @@ -384,7 +384,7 @@ func (sS *StatService) V1GetStatQueuesForEvent(args *StatsArgsProcessEvent, repl tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } var sQs StatQueues - if sQs, err = sS.matchingStatQueuesForEvent(tnt, args, utils.MapStorage{ + if sQs, err = sS.matchingStatQueuesForEvent(tnt, args.StatIDs, args.Time, utils.MapStorage{ utils.MetaReq: args.Event, utils.MetaOpts: args.Opts, }); err != nil { diff --git a/engine/stats_test.go b/engine/stats_test.go index 5dffc7a40..f47409535 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -250,7 +250,7 @@ func TestStatQueuesPopulateStatsService(t *testing.T) { } func TestStatQueuesMatchingStatQueuesForEvent(t *testing.T) { - msq, err := statService.matchingStatQueuesForEvent(statsEvs[0].Tenant, statsEvs[0], + msq, err := statService.matchingStatQueuesForEvent(statsEvs[0].Tenant, statsEvs[0].StatIDs, statsEvs[0].Time, utils.MapStorage{utils.MetaReq: statsEvs[0].Event, utils.MetaOpts: statsEvs[0].Opts}) if err != nil { t.Errorf("Error: %+v", err) @@ -262,7 +262,7 @@ func TestStatQueuesMatchingStatQueuesForEvent(t *testing.T) { } else if !reflect.DeepEqual(stqs[0].sqPrfl, msq[0].sqPrfl) { t.Errorf("Expecting: %+v, received: %+v", stqs[0].sqPrfl, msq[0].sqPrfl) } - msq, err = statService.matchingStatQueuesForEvent(statsEvs[1].Tenant, statsEvs[1], + msq, err = statService.matchingStatQueuesForEvent(statsEvs[1].Tenant, statsEvs[1].StatIDs, statsEvs[1].Time, utils.MapStorage{utils.MetaReq: statsEvs[1].Event, utils.MetaOpts: statsEvs[1].Opts}) if err != nil { t.Errorf("Error: %+v", err) @@ -274,7 +274,7 @@ func TestStatQueuesMatchingStatQueuesForEvent(t *testing.T) { } else if !reflect.DeepEqual(stqs[1].sqPrfl, msq[0].sqPrfl) { t.Errorf("Expecting: %+v, received: %+v", stqs[1].sqPrfl, msq[0].sqPrfl) } - msq, err = statService.matchingStatQueuesForEvent(statsEvs[2].Tenant, statsEvs[2], + msq, err = statService.matchingStatQueuesForEvent(statsEvs[2].Tenant, statsEvs[2].StatIDs, statsEvs[2].Time, utils.MapStorage{utils.MetaReq: statsEvs[2].Event, utils.MetaOpts: statsEvs[2].Opts}) if err != nil { t.Errorf("Error: %+v", err) @@ -330,7 +330,7 @@ func TestStatQueuesProcessEvent(t *testing.T) { func TestStatQueuesMatchWithIndexFalse(t *testing.T) { statService.cgrcfg.StatSCfg().IndexedSelects = false - msq, err := statService.matchingStatQueuesForEvent(statsEvs[0].Tenant, statsEvs[0], + msq, err := statService.matchingStatQueuesForEvent(statsEvs[0].Tenant, statsEvs[0].StatIDs, statsEvs[0].Time, utils.MapStorage{utils.MetaReq: statsEvs[0].Event, utils.MetaOpts: statsEvs[0].Opts}) if err != nil { t.Errorf("Error: %+v", err) @@ -342,7 +342,7 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) { } else if !reflect.DeepEqual(stqs[0].sqPrfl, msq[0].sqPrfl) { t.Errorf("Expecting: %+v, received: %+v", stqs[0].sqPrfl, msq[0].sqPrfl) } - msq, err = statService.matchingStatQueuesForEvent(statsEvs[1].Tenant, statsEvs[1], + msq, err = statService.matchingStatQueuesForEvent(statsEvs[1].Tenant, statsEvs[1].StatIDs, statsEvs[1].Time, utils.MapStorage{utils.MetaReq: statsEvs[1].Event, utils.MetaOpts: statsEvs[1].Opts}) if err != nil { t.Errorf("Error: %+v", err) @@ -354,7 +354,7 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) { } else if !reflect.DeepEqual(stqs[1].sqPrfl, msq[0].sqPrfl) { t.Errorf("Expecting: %+v, received: %+v", stqs[1].sqPrfl, msq[0].sqPrfl) } - msq, err = statService.matchingStatQueuesForEvent(statsEvs[2].Tenant, statsEvs[2], + msq, err = statService.matchingStatQueuesForEvent(statsEvs[2].Tenant, statsEvs[2].StatIDs, statsEvs[2].Time, utils.MapStorage{utils.MetaReq: statsEvs[2].Event, utils.MetaOpts: statsEvs[2].Opts}) if err != nil { t.Errorf("Error: %+v", err) diff --git a/engine/z_attributes_test.go b/engine/z_attributes_test.go index 4f50895dd..c2a8d228e 100644 --- a/engine/z_attributes_test.go +++ b/engine/z_attributes_test.go @@ -248,26 +248,28 @@ func TestAttributeCache(t *testing.T) { } func TestAttributeProfileForEvent(t *testing.T) { - atrp, err := attrService.attributeProfileForEvent(attrEvs[0].Tenant, attrEvs[0], utils.MapStorage{ - utils.MetaReq: attrEvs[0].CGREvent.Event, - utils.MetaOpts: attrEvs[0].Opts, - utils.MetaVars: utils.MapStorage{ - utils.ProcessRuns: utils.NewNMData(0), - }, - }, utils.EmptyString) + atrp, err := attrService.attributeProfileForEvent(attrEvs[0].Tenant, attrEvs[0].Context, + attrEvs[0].AttributeIDs, attrEvs[0].Time, utils.MapStorage{ + utils.MetaReq: attrEvs[0].CGREvent.Event, + utils.MetaOpts: attrEvs[0].Opts, + utils.MetaVars: utils.MapStorage{ + utils.ProcessRuns: utils.NewNMData(0), + }, + }, utils.EmptyString) if err != nil { t.Errorf("Error: %+v", err) } if !reflect.DeepEqual(atrPs[0], atrp) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(atrPs[0]), utils.ToJSON(atrp)) } - atrp, err = attrService.attributeProfileForEvent(attrEvs[1].Tenant, attrEvs[1], utils.MapStorage{ - utils.MetaReq: attrEvs[1].CGREvent.Event, - utils.MetaOpts: attrEvs[1].Opts, - utils.MetaVars: utils.MapStorage{ - utils.ProcessRuns: utils.NewNMData(0), - }, - }, utils.EmptyString) + atrp, err = attrService.attributeProfileForEvent(attrEvs[1].Tenant, attrEvs[1].Context, + attrEvs[1].AttributeIDs, attrEvs[1].Time, utils.MapStorage{ + utils.MetaReq: attrEvs[1].CGREvent.Event, + utils.MetaOpts: attrEvs[1].Opts, + utils.MetaVars: utils.MapStorage{ + utils.ProcessRuns: utils.NewNMData(0), + }, + }, utils.EmptyString) if err != nil { t.Errorf("Error: %+v", err) } @@ -275,13 +277,14 @@ func TestAttributeProfileForEvent(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(atrPs[1]), utils.ToJSON(atrp)) } - atrp, err = attrService.attributeProfileForEvent(attrEvs[2].Tenant, attrEvs[2], utils.MapStorage{ - utils.MetaReq: attrEvs[2].CGREvent.Event, - utils.MetaOpts: attrEvs[2].Opts, - utils.MetaVars: utils.MapStorage{ - utils.ProcessRuns: utils.NewNMData(0), - }, - }, utils.EmptyString) + atrp, err = attrService.attributeProfileForEvent(attrEvs[2].Tenant, attrEvs[2].Context, + attrEvs[2].AttributeIDs, attrEvs[2].Time, utils.MapStorage{ + utils.MetaReq: attrEvs[2].CGREvent.Event, + utils.MetaOpts: attrEvs[2].Opts, + utils.MetaVars: utils.MapStorage{ + utils.ProcessRuns: utils.NewNMData(0), + }, + }, utils.EmptyString) if err != nil { t.Errorf("Error: %+v", err) }