Updated Stats ProcessEvent

This commit is contained in:
Trial97
2020-10-21 12:52:04 +03:00
committed by Dan Christian Bogos
parent edcc2d959c
commit 5ad6245501
6 changed files with 61 additions and 58 deletions

View File

@@ -62,15 +62,15 @@ func (alS *AttributeService) Shutdown() (err error) {
}
// attributeProfileForEvent returns the matching attribute
func (alS *AttributeService) attributeProfileForEvent(tnt string, args *AttrArgsProcessEvent, evNm utils.MapStorage, lastID string) (matchAttrPrfl *AttributeProfile, err error) {
func (alS *AttributeService) attributeProfileForEvent(tnt string, ctx *string, attrsIDs []string, actTime *time.Time, evNm utils.MapStorage, lastID string) (matchAttrPrfl *AttributeProfile, err error) {
var attrIDs []string
contextVal := utils.MetaDefault
if args.Context != nil && *args.Context != "" {
contextVal = *args.Context
if ctx != nil && *ctx != "" {
contextVal = *ctx
}
attrIdxKey := utils.ConcatenatedKey(tnt, contextVal)
if len(args.AttributeIDs) != 0 {
attrIDs = args.AttributeIDs
if len(attrsIDs) != 0 {
attrIDs = attrsIDs
} else {
aPrflIDs, err := MatchingItemIDsForEvent(evNm,
alS.cgrcfg.AttributeSCfg().StringIndexedFields,
@@ -109,8 +109,8 @@ func (alS *AttributeService) attributeProfileForEvent(tnt string, args *AttrArgs
!utils.IsSliceMember(aPrfl.Contexts, contextVal) {
continue
}
if aPrfl.ActivationInterval != nil && args.Time != nil &&
!aPrfl.ActivationInterval.IsActiveAtTime(*args.Time) { // not active
if aPrfl.ActivationInterval != nil && actTime != nil &&
!aPrfl.ActivationInterval.IsActiveAtTime(*actTime) { // not active
continue
}
if pass, err := alS.filterS.Pass(tnt, aPrfl.FilterIDs,
@@ -208,7 +208,7 @@ func (attr *AttrArgsProcessEvent) Clone() *AttrArgsProcessEvent {
func (alS *AttributeService) processEvent(tnt string, args *AttrArgsProcessEvent, evNm utils.MapStorage, dynDP utils.DataProvider, lastID string) (
rply *AttrSProcessEventReply, err error) {
var attrPrf *AttributeProfile
if attrPrf, err = alS.attributeProfileForEvent(tnt, args, evNm, lastID); err != nil {
if attrPrf, err = alS.attributeProfileForEvent(tnt, args.Context, args.AttributeIDs, args.Time, evNm, lastID); err != nil {
return
}
rply = &AttrSProcessEventReply{
@@ -435,7 +435,7 @@ func (alS *AttributeService) V1GetAttributeForEvent(args *AttrArgsProcessEvent,
if tnt == utils.EmptyString {
tnt = alS.cgrcfg.GeneralCfg().DefaultTenant
}
attrPrf, err := alS.attributeProfileForEvent(tnt, args, utils.MapStorage{
attrPrf, err := alS.attributeProfileForEvent(tnt, args.Context, args.AttributeIDs, args.Time, utils.MapStorage{
utils.MetaReq: args.CGREvent.Event,
utils.MetaOpts: args.Opts,
utils.MetaVars: utils.MapStorage{

View File

@@ -174,14 +174,14 @@ func (sq *StatQueue) TenantID() string {
}
// ProcessEvent processes a utils.CGREvent, returns true if processed
func (sq *StatQueue) ProcessEvent(ev *utils.CGREvent, filterS *FilterS, evNm utils.MapStorage) (err error) {
func (sq *StatQueue) ProcessEvent(tnt, evID string, filterS *FilterS, evNm utils.MapStorage) (err error) {
if err = sq.remExpired(); err != nil {
return
}
if err = sq.remOnQueueLength(); err != nil {
return
}
return sq.addStatEvent(ev, filterS, evNm)
return sq.addStatEvent(tnt, evID, filterS, evNm)
}
// remStatEvent removes an event from metrics
@@ -237,7 +237,7 @@ func (sq *StatQueue) remOnQueueLength() (err error) {
}
// addStatEvent computes metrics for an event
func (sq *StatQueue) addStatEvent(ev *utils.CGREvent, filterS *FilterS, evNm utils.MapStorage) (err error) {
func (sq *StatQueue) addStatEvent(tnt, evID string, filterS *FilterS, evNm utils.MapStorage) (err error) {
var expTime *time.Time
if sq.ttl != nil {
expTime = utils.TimePointer(time.Now().Add(*sq.ttl))
@@ -246,20 +246,20 @@ func (sq *StatQueue) addStatEvent(ev *utils.CGREvent, filterS *FilterS, evNm uti
struct {
EventID string
ExpiryTime *time.Time
}{ev.ID, expTime})
}{evID, expTime})
var pass bool
// recreate the request without *opts
req := utils.MapStorage{utils.MetaReq: ev.Event}
req := utils.MapStorage{utils.MetaReq: evNm[utils.MetaReq]}
for metricID, metric := range sq.SQMetrics {
if pass, err = filterS.Pass(ev.Tenant, metric.GetFilterIDs(),
if pass, err = filterS.Pass(tnt, metric.GetFilterIDs(),
evNm); err != nil {
return
} else if !pass {
continue
}
if err = metric.AddEvent(ev.ID, req); err != nil {
if err = metric.AddEvent(evID, req); err != nil {
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, add eventID: %s, error: %s",
metricID, ev.ID, err.Error()))
metricID, evID, err.Error()))
return
}
}

View File

@@ -220,7 +220,7 @@ func TestStatAddStatEvent(t *testing.T) {
t.Errorf("received ASR: %v", asr)
}
ev1 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_1"}
sq.addStatEvent(ev1, nil, nil)
sq.addStatEvent(ev1.Tenant, ev1.ID, nil, utils.MapStorage{utils.MetaReq: ev1.Event})
if asr := asrMetric.GetFloat64Value(); asr != 50 {
t.Errorf("received ASR: %v", asr)
} else if asrMetric.Answered != 1 || asrMetric.Count != 2 {
@@ -228,7 +228,7 @@ func TestStatAddStatEvent(t *testing.T) {
}
ev1.Event = map[string]interface{}{
utils.AnswerTime: time.Now()}
sq.addStatEvent(ev1, nil, nil)
sq.addStatEvent(ev1.Tenant, ev1.ID, nil, utils.MapStorage{utils.MetaReq: ev1.Event})
if asr := asrMetric.GetFloat64Value(); asr != 66.66667 {
t.Errorf("received ASR: %v", asr)
} else if asrMetric.Answered != 2 || asrMetric.Count != 3 {
@@ -634,7 +634,7 @@ func TestStatRemoveExpiredTTL(t *testing.T) {
//add ev1 with ttl 100ms (after 100ms the event should be removed)
ev1 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_1"}
sq.ProcessEvent(ev1, nil, utils.MapStorage{utils.MetaReq: ev1.Event})
sq.ProcessEvent(ev1.Tenant, ev1.ID, nil, utils.MapStorage{utils.MetaReq: ev1.Event})
if len(sq.SQItems) != 1 && sq.SQItems[0].EventID != "TestStatAddStatEvent_1" {
t.Errorf("Expecting: 1, received: %+v", len(sq.SQItems))
@@ -644,7 +644,7 @@ func TestStatRemoveExpiredTTL(t *testing.T) {
//processing a new event should clean the expired events and add the new one
ev2 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_2"}
sq.ProcessEvent(ev2, nil, utils.MapStorage{utils.MetaReq: ev2.Event})
sq.ProcessEvent(ev2.Tenant, ev2.ID, nil, utils.MapStorage{utils.MetaReq: ev2.Event})
if len(sq.SQItems) != 1 && sq.SQItems[0].EventID != "TestStatAddStatEvent_2" {
t.Errorf("Expecting: 1, received: %+v", len(sq.SQItems))
}
@@ -668,7 +668,7 @@ func TestStatRemoveExpiredQueue(t *testing.T) {
//add ev1 with ttl 100ms (after 100ms the event should be removed)
ev1 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_1"}
sq.ProcessEvent(ev1, nil, utils.MapStorage{utils.MetaReq: ev1.Event})
sq.ProcessEvent(ev1.Tenant, ev1.ID, nil, utils.MapStorage{utils.MetaReq: ev1.Event})
if len(sq.SQItems) != 1 && sq.SQItems[0].EventID != "TestStatAddStatEvent_1" {
t.Errorf("Expecting: 1, received: %+v", len(sq.SQItems))
@@ -678,7 +678,7 @@ func TestStatRemoveExpiredQueue(t *testing.T) {
//processing a new event should clean the expired events and add the new one
ev2 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_2"}
sq.ProcessEvent(ev2, nil, utils.MapStorage{utils.MetaReq: ev2.Event})
sq.ProcessEvent(ev2.Tenant, ev2.ID, nil, utils.MapStorage{utils.MetaReq: ev2.Event})
if len(sq.SQItems) != 2 && sq.SQItems[0].EventID != "TestStatAddStatEvent_1" &&
sq.SQItems[1].EventID != "TestStatAddStatEvent_2" {
t.Errorf("Expecting: 2, received: %+v", len(sq.SQItems))
@@ -686,7 +686,7 @@ func TestStatRemoveExpiredQueue(t *testing.T) {
//processing a new event should clean the expired events and add the new one
ev3 := &utils.CGREvent{Tenant: "cgrates.org", ID: "TestStatAddStatEvent_3"}
sq.ProcessEvent(ev3, nil, utils.MapStorage{utils.MetaReq: ev3.Event})
sq.ProcessEvent(ev3.Tenant, ev3.ID, nil, utils.MapStorage{utils.MetaReq: ev3.Event})
if len(sq.SQItems) != 2 && sq.SQItems[0].EventID != "TestStatAddStatEvent_2" &&
sq.SQItems[1].EventID != "TestStatAddStatEvent_3" {
t.Errorf("Expecting: 2, received: %+v", len(sq.SQItems))

View File

@@ -150,8 +150,8 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) {
}
// matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call
func (sS *StatService) matchingStatQueuesForEvent(tnt string, args *StatsArgsProcessEvent, evNm utils.MapStorage) (sqs StatQueues, err error) {
sqIDs := utils.NewStringSet(args.StatIDs)
func (sS *StatService) matchingStatQueuesForEvent(tnt string, statsIDs []string, actTime *time.Time, evNm utils.MapStorage) (sqs StatQueues, err error) {
sqIDs := utils.NewStringSet(statsIDs)
if len(sqIDs) == 0 {
sqIDs, err = MatchingItemIDsForEvent(evNm,
sS.cgrcfg.StatSCfg().StringIndexedFields,
@@ -174,8 +174,8 @@ func (sS *StatService) matchingStatQueuesForEvent(tnt string, args *StatsArgsPro
}
return nil, err
}
if sqPrfl.ActivationInterval != nil && args.Time != nil &&
!sqPrfl.ActivationInterval.IsActiveAtTime(*args.Time) { // not active
if sqPrfl.ActivationInterval != nil && actTime != nil &&
!sqPrfl.ActivationInterval.IsActiveAtTime(*actTime) { // not active
continue
}
if pass, err := sS.filterS.Pass(tnt, sqPrfl.FilterIDs,
@@ -264,7 +264,7 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st
utils.MetaReq: args.Event,
utils.MetaOpts: args.Opts,
}
matchSQs, err := sS.matchingStatQueuesForEvent(tnt, args, evNm)
matchSQs, err := sS.matchingStatQueuesForEvent(tnt, args.StatIDs, args.Time, evNm)
if err != nil {
return nil, err
}
@@ -281,7 +281,7 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st
stsIDs = append(stsIDs, sq.ID)
lkID := utils.StatQueuePrefix + sq.TenantID()
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
err = sq.ProcessEvent(args.CGREvent, sS.filterS, evNm)
err = sq.ProcessEvent(tnt, args.ID, sS.filterS, evNm)
return
}, config.CgrConfig().GeneralCfg().LockingTimeout, lkID)
if err != nil {
@@ -384,7 +384,7 @@ func (sS *StatService) V1GetStatQueuesForEvent(args *StatsArgsProcessEvent, repl
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
}
var sQs StatQueues
if sQs, err = sS.matchingStatQueuesForEvent(tnt, args, utils.MapStorage{
if sQs, err = sS.matchingStatQueuesForEvent(tnt, args.StatIDs, args.Time, utils.MapStorage{
utils.MetaReq: args.Event,
utils.MetaOpts: args.Opts,
}); err != nil {

View File

@@ -250,7 +250,7 @@ func TestStatQueuesPopulateStatsService(t *testing.T) {
}
func TestStatQueuesMatchingStatQueuesForEvent(t *testing.T) {
msq, err := statService.matchingStatQueuesForEvent(statsEvs[0].Tenant, statsEvs[0],
msq, err := statService.matchingStatQueuesForEvent(statsEvs[0].Tenant, statsEvs[0].StatIDs, statsEvs[0].Time,
utils.MapStorage{utils.MetaReq: statsEvs[0].Event, utils.MetaOpts: statsEvs[0].Opts})
if err != nil {
t.Errorf("Error: %+v", err)
@@ -262,7 +262,7 @@ func TestStatQueuesMatchingStatQueuesForEvent(t *testing.T) {
} else if !reflect.DeepEqual(stqs[0].sqPrfl, msq[0].sqPrfl) {
t.Errorf("Expecting: %+v, received: %+v", stqs[0].sqPrfl, msq[0].sqPrfl)
}
msq, err = statService.matchingStatQueuesForEvent(statsEvs[1].Tenant, statsEvs[1],
msq, err = statService.matchingStatQueuesForEvent(statsEvs[1].Tenant, statsEvs[1].StatIDs, statsEvs[1].Time,
utils.MapStorage{utils.MetaReq: statsEvs[1].Event, utils.MetaOpts: statsEvs[1].Opts})
if err != nil {
t.Errorf("Error: %+v", err)
@@ -274,7 +274,7 @@ func TestStatQueuesMatchingStatQueuesForEvent(t *testing.T) {
} else if !reflect.DeepEqual(stqs[1].sqPrfl, msq[0].sqPrfl) {
t.Errorf("Expecting: %+v, received: %+v", stqs[1].sqPrfl, msq[0].sqPrfl)
}
msq, err = statService.matchingStatQueuesForEvent(statsEvs[2].Tenant, statsEvs[2],
msq, err = statService.matchingStatQueuesForEvent(statsEvs[2].Tenant, statsEvs[2].StatIDs, statsEvs[2].Time,
utils.MapStorage{utils.MetaReq: statsEvs[2].Event, utils.MetaOpts: statsEvs[2].Opts})
if err != nil {
t.Errorf("Error: %+v", err)
@@ -330,7 +330,7 @@ func TestStatQueuesProcessEvent(t *testing.T) {
func TestStatQueuesMatchWithIndexFalse(t *testing.T) {
statService.cgrcfg.StatSCfg().IndexedSelects = false
msq, err := statService.matchingStatQueuesForEvent(statsEvs[0].Tenant, statsEvs[0],
msq, err := statService.matchingStatQueuesForEvent(statsEvs[0].Tenant, statsEvs[0].StatIDs, statsEvs[0].Time,
utils.MapStorage{utils.MetaReq: statsEvs[0].Event, utils.MetaOpts: statsEvs[0].Opts})
if err != nil {
t.Errorf("Error: %+v", err)
@@ -342,7 +342,7 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) {
} else if !reflect.DeepEqual(stqs[0].sqPrfl, msq[0].sqPrfl) {
t.Errorf("Expecting: %+v, received: %+v", stqs[0].sqPrfl, msq[0].sqPrfl)
}
msq, err = statService.matchingStatQueuesForEvent(statsEvs[1].Tenant, statsEvs[1],
msq, err = statService.matchingStatQueuesForEvent(statsEvs[1].Tenant, statsEvs[1].StatIDs, statsEvs[1].Time,
utils.MapStorage{utils.MetaReq: statsEvs[1].Event, utils.MetaOpts: statsEvs[1].Opts})
if err != nil {
t.Errorf("Error: %+v", err)
@@ -354,7 +354,7 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) {
} else if !reflect.DeepEqual(stqs[1].sqPrfl, msq[0].sqPrfl) {
t.Errorf("Expecting: %+v, received: %+v", stqs[1].sqPrfl, msq[0].sqPrfl)
}
msq, err = statService.matchingStatQueuesForEvent(statsEvs[2].Tenant, statsEvs[2],
msq, err = statService.matchingStatQueuesForEvent(statsEvs[2].Tenant, statsEvs[2].StatIDs, statsEvs[2].Time,
utils.MapStorage{utils.MetaReq: statsEvs[2].Event, utils.MetaOpts: statsEvs[2].Opts})
if err != nil {
t.Errorf("Error: %+v", err)

View File

@@ -248,26 +248,28 @@ func TestAttributeCache(t *testing.T) {
}
func TestAttributeProfileForEvent(t *testing.T) {
atrp, err := attrService.attributeProfileForEvent(attrEvs[0].Tenant, attrEvs[0], utils.MapStorage{
utils.MetaReq: attrEvs[0].CGREvent.Event,
utils.MetaOpts: attrEvs[0].Opts,
utils.MetaVars: utils.MapStorage{
utils.ProcessRuns: utils.NewNMData(0),
},
}, utils.EmptyString)
atrp, err := attrService.attributeProfileForEvent(attrEvs[0].Tenant, attrEvs[0].Context,
attrEvs[0].AttributeIDs, attrEvs[0].Time, utils.MapStorage{
utils.MetaReq: attrEvs[0].CGREvent.Event,
utils.MetaOpts: attrEvs[0].Opts,
utils.MetaVars: utils.MapStorage{
utils.ProcessRuns: utils.NewNMData(0),
},
}, utils.EmptyString)
if err != nil {
t.Errorf("Error: %+v", err)
}
if !reflect.DeepEqual(atrPs[0], atrp) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(atrPs[0]), utils.ToJSON(atrp))
}
atrp, err = attrService.attributeProfileForEvent(attrEvs[1].Tenant, attrEvs[1], utils.MapStorage{
utils.MetaReq: attrEvs[1].CGREvent.Event,
utils.MetaOpts: attrEvs[1].Opts,
utils.MetaVars: utils.MapStorage{
utils.ProcessRuns: utils.NewNMData(0),
},
}, utils.EmptyString)
atrp, err = attrService.attributeProfileForEvent(attrEvs[1].Tenant, attrEvs[1].Context,
attrEvs[1].AttributeIDs, attrEvs[1].Time, utils.MapStorage{
utils.MetaReq: attrEvs[1].CGREvent.Event,
utils.MetaOpts: attrEvs[1].Opts,
utils.MetaVars: utils.MapStorage{
utils.ProcessRuns: utils.NewNMData(0),
},
}, utils.EmptyString)
if err != nil {
t.Errorf("Error: %+v", err)
}
@@ -275,13 +277,14 @@ func TestAttributeProfileForEvent(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(atrPs[1]), utils.ToJSON(atrp))
}
atrp, err = attrService.attributeProfileForEvent(attrEvs[2].Tenant, attrEvs[2], utils.MapStorage{
utils.MetaReq: attrEvs[2].CGREvent.Event,
utils.MetaOpts: attrEvs[2].Opts,
utils.MetaVars: utils.MapStorage{
utils.ProcessRuns: utils.NewNMData(0),
},
}, utils.EmptyString)
atrp, err = attrService.attributeProfileForEvent(attrEvs[2].Tenant, attrEvs[2].Context,
attrEvs[2].AttributeIDs, attrEvs[2].Time, utils.MapStorage{
utils.MetaReq: attrEvs[2].CGREvent.Event,
utils.MetaOpts: attrEvs[2].Opts,
utils.MetaVars: utils.MapStorage{
utils.ProcessRuns: utils.NewNMData(0),
},
}, utils.EmptyString)
if err != nil {
t.Errorf("Error: %+v", err)
}