Metrics blockers added in code

This commit is contained in:
porosnicuadrian
2022-04-28 14:48:51 +03:00
parent 9ef70c7ab9
commit 575237ebcf
13 changed files with 91 additions and 41 deletions

View File

@@ -319,13 +319,13 @@ cgrates.org,RoutePrf2,*string:~*req.Account:1002,;20,,*lc,,route1,fltr3,Account3
// Create and populate Stats.csv
if err := writeFile(utils.StatsCsv, `
#Tenant[0],ID[1],FilterIDs[2],Weights[3],QueueLength[4],TTL[5],MinItems[6],MetricIDs[7],MetricFilterIDs[8],Stored[9],Blockers[10],ThresholdIDs[11]
cgrates.org,TestStats,*string:~*req.Account:1001,;20,100,1s,2,*sum#~*req.Value;*average#~*req.Value,fltr1;fltr2,true,;true,Th1;Th2
cgrates.org,TestStats,,,,,2,*sum#~*req.Usage,,,,
cgrates.org,TestStats2,*string:~*req.Account:1002,,100,1s,2,*sum#~*req.Value;*sum#~*req.Usage;*average#~*req.Value;*average#~*req.Usage,,true,;true,Th
cgrates.org,TestStats2,,;20,,,2,*sum#~*req.Cost;*average#~*req.Cost,,true,,
cgrates.org,TestStats3,,,,,,,,,,
cgrates.org,TestStats3,*string:~*req.Account:1003,;20,100,1s,2,*sum#~*req.Value;*average#~*req.Value,,true,;true,Th1;Th2
#Tenant[0],ID[1],FilterIDs[2],Weights[3],QueueLength[4],TTL[5],MinItems[6],MetricIDs[7],MetricFilterIDs[8],MetricBlockers[9],Stored[10],Blockers[11],ThresholdIDs[12]
cgrates.org,TestStats,*string:~*req.Account:1001,;20,100,1s,2,*sum#~*req.Value;*average#~*req.Value,fltr1;fltr2,,true,;true,Th1;Th2
cgrates.org,TestStats,,,,,2,*sum#~*req.Usage,,*string:~*req.Account:1003&fltr2;true;;false,,,
cgrates.org,TestStats2,*string:~*req.Account:1002,,100,1s,2,*sum#~*req.Value;*sum#~*req.Usage;*average#~*req.Value;*average#~*req.Usage,,,true,;true,Th
cgrates.org,TestStats2,,;20,,,2,*sum#~*req.Cost;*average#~*req.Cost,,;false,true,,
cgrates.org,TestStats3,,,,,,,,,,,
cgrates.org,TestStats3,*string:~*req.Account:1003,;20,100,1s,2,*sum#~*req.Value;*average#~*req.Value,,fltr_for_stats;false,true,;true,Th1;Th2
`); err != nil {
t.Fatal(err)
}
@@ -1153,6 +1153,15 @@ func testLoadersGetStatQueueProfiles(t *testing.T) {
},
{
MetricID: "*sum#~*req.Usage",
Blockers: utils.Blockers{
{
FilterIDs: []string{"*string:~*req.Account:1003", "fltr2"},
Blocker: true,
},
{
Blocker: false,
},
},
},
},
ThresholdIDs: []string{"Th1", "Th2"},
@@ -1189,6 +1198,7 @@ func testLoadersGetStatQueueProfiles(t *testing.T) {
},
{
MetricID: "*average#~*req.Cost",
Blockers: utils.Blockers{{Blocker: false}},
},
},
ThresholdIDs: []string{"Th"},
@@ -1213,18 +1223,22 @@ func testLoadersGetStatQueueProfiles(t *testing.T) {
},
{
MetricID: "*average#~*req.Value",
Blockers: utils.Blockers{
{
FilterIDs: []string{"fltr_for_stats"},
Blocker: false,
},
},
},
},
ThresholdIDs: []string{"Th1", "Th2"},
Blockers: utils.Blockers{{Blocker: true}},
Stored: true,
Weights: utils.DynamicWeights{
{
Weight: 20,
},
},
MinItems: 2,
},
}

View File

@@ -1237,14 +1237,15 @@ const CGRATES_CFG_JSON = `
{"tag": "ID", "path": "ID", "type": "*variable", "value": "~*req.1", "mandatory": true},
{"tag": "FilterIDs", "path": "FilterIDs", "type": "*variable", "value": "~*req.2"},
{"tag": "Weights", "path": "Weights", "type": "*variable", "value": "~*req.3"},
{"tag": "QueueLength", "path": "QueueLength", "type": "*variable", "value": "~*req.4"},
{"tag": "TTL", "path": "TTL", "type": "*variable", "value": "~*req.5"},
{"tag": "MinItems", "path": "MinItems", "type": "*variable", "value": "~*req.6"},
{"tag": "MetricIDs", "path": "Metrics.MetricID", "type": "*variable", "value": "~*req.7","new_branch":true},
{"tag": "MetricFilterIDs", "path": "Metrics.FilterIDs", "type": "*variable", "value": "~*req.8"},
{"tag": "Stored", "path": "Stored", "type": "*variable", "value": "~*req.9"},
{"tag": "Blockers", "path": "Blockers", "type": "*variable", "value": "~*req.10"},
{"tag": "ThresholdIDs", "path": "ThresholdIDs", "type": "*variable", "value": "~*req.11"},
{"tag": "Blockers", "path": "Blockers", "type": "*variable", "value": "~*req.4"},
{"tag": "QueueLength", "path": "QueueLength", "type": "*variable", "value": "~*req.5"},
{"tag": "TTL", "path": "TTL", "type": "*variable", "value": "~*req.6"},
{"tag": "MinItems", "path": "MinItems", "type": "*variable", "value": "~*req.7"},
{"tag": "Stored", "path": "Stored", "type": "*variable", "value": "~*req.8"},
{"tag": "ThresholdIDs", "path": "ThresholdIDs", "type": "*variable", "value": "~*req.9"},
{"tag": "MetricIDs", "path": "Metrics.MetricID", "type": "*variable", "value": "~*req.10","new_branch":true},
{"tag": "MetricFilterIDs", "path": "Metrics.FilterIDs", "type": "*variable", "value": "~*req.11"},
{"tag": "MetricBlockers", "path": "Metrics.Blockers", "type": "*variable", "value": "~*req.12"},
],
},
{

View File

@@ -35,15 +35,15 @@ CREATE TABLE tp_stats (
`tenant` varchar(64) NOT NULL,
`id` varchar(64) NOT NULL,
`filter_ids` varchar(64) NOT NULL,
`activation_interval` varchar(64) NOT NULL,
`weights` varchar(64) NOT NULL,
`queue_length` int(11) NOT NULL,
`ttl` varchar(32) NOT NULL,
`min_items` int(11) NOT NULL,
`metric_ids` varchar(128) NOT NULL,
`metric_filter_ids` varchar(64) NOT NULL,
`metric_blockers` varchar(128) NOT NULL,
`stored` BOOLEAN NOT NULL,
`weights` varchar(64) NOT NULL,
`blockers` VARCHAR(128) NOT NULL,
`blockers` varchar(128) NOT NULL,
`threshold_ids` varchar(64) NOT NULL,
`created_at` TIMESTAMP,
PRIMARY KEY (`pk`),

View File

@@ -40,9 +40,10 @@ CREATE TABLE tp_stats (
`min_items` int(11) NOT NULL,
`metric_ids` varchar(128) NOT NULL,
`metric_filter_ids` varchar(64) NOT NULL,
`metric_blockers` varchar(128) NOT NULL,
`stored` BOOLEAN NOT NULL,
`weights` varchar(128) NOT NULL,
`blockers` VARCHAR(128) NOT NULL,
`blockers` varchar(128) NOT NULL,
`threshold_ids` varchar(64) NOT NULL,
`created_at` TIMESTAMP,
PRIMARY KEY (`pk`),

View File

@@ -35,12 +35,12 @@ CREATE TABLE tp_stats (
"tenant"varchar(64) NOT NULL,
"id" varchar(64) NOT NULL,
"filter_ids" varchar(64) NOT NULL,
"activation_interval" varchar(64) NOT NULL,
"queue_length" INTEGER NOT NULL,
"ttl" varchar(32) NOT NULL,
"min_items" INTEGER NOT NULL,
"metric_ids" VARCHAR(128) NOT NULL,
"metric_filter_ids" VARCHAR(128) NOT NULL,
"metric_blockers" VARCHAR(128) NOT NULL,
"stored" BOOLEAN NOT NULL,
"weights" VARCHAR(128) NOT NULL,
"blockers" VARCHAR(128) NOT NULL,

View File

@@ -6,3 +6,5 @@ cgrates.org,Stat_3,FLTR_STAT_3,;30,100,1s,0,*acd;*tcd;*asr,,false,;true,*none
cgrates.org,Stat_Supplier1,*string:~*req.StatID:Stat_Supplier1,;30,100,1s,0,*sum#~*req.LoadReq,,true,;true,*none
cgrates.org,Stat_Supplier2,*string:~*req.StatID:Stat_Supplier2,;30,100,1s,0,*sum#~*req.LoadReq,,true,;true,*none
cgrates.org,Stat_Supplier3,*string:~*req.StatID:Stat_Supplier3,;30,100,1s,0,*sum#~*req.LoadReq,,true,;true,*none
1 #Tenant[0] Id[1] FilterIDs[2] Weights[3] QueueLength[4] TTL[5] MinItems[6] Metrics[7] MetricFilterIDs[8] Stored[9] Blockers[10] ThresholdIDs[11]
6 cgrates.org Stat_Supplier1 *string:~*req.StatID:Stat_Supplier1 ;30 100 1s 0 *sum#~*req.LoadReq true ;true *none
7 cgrates.org Stat_Supplier2 *string:~*req.StatID:Stat_Supplier2 ;30 100 1s 0 *sum#~*req.LoadReq true ;true *none
8 cgrates.org Stat_Supplier3 *string:~*req.StatID:Stat_Supplier3 ;30 100 1s 0 *sum#~*req.LoadReq true ;true *none
9
10

View File

@@ -38,12 +38,12 @@ type StatQueueProfile struct {
Tenant string
ID string // QueueID
FilterIDs []string
Weights utils.DynamicWeights
Blockers utils.Blockers // blocker flag to stop processing on filters matched
QueueLength int
TTL time.Duration
MinItems int
Stored bool
Weights utils.DynamicWeights
Blockers utils.Blockers // blocker flag to stop processing on filters matched
ThresholdIDs []string // list of thresholds to be checked after changes
Metrics []*MetricWithFilters // list of metrics to build
@@ -91,8 +91,9 @@ func (sqp *StatQueueProfile) isLocked() bool {
}
type MetricWithFilters struct {
FilterIDs []string
MetricID string
FilterIDs []string
Blockers utils.Blockers // blocker flag to stop processing for next metric on filters matched
}
// NewStoredStatQueue initiates a StoredStatQueue out of StatQueue
@@ -586,9 +587,11 @@ func (sqp *StatQueueProfile) Set(path []string, val interface{}, newBranch bool,
sqp.ThresholdIDs = append(sqp.ThresholdIDs, valA...)
}
case 2:
// path =[]string{Metrics, MetricID}
if path[0] != utils.Metrics {
return utils.ErrWrongPath
}
// val := *acd;*tcd;*asr
if val != utils.EmptyString {
if len(sqp.Metrics) == 0 || newBranch {
sqp.Metrics = append(sqp.Metrics, new(MetricWithFilters))
@@ -604,7 +607,12 @@ func (sqp *StatQueueProfile) Set(path []string, val interface{}, newBranch bool,
for _, mID := range valA[1:] { // add the rest of the metrics
sqp.Metrics = append(sqp.Metrics, &MetricWithFilters{MetricID: mID})
}
case utils.BlockersField:
var blkrs utils.Blockers
if blkrs, err = utils.NewBlockersFromString(utils.IfaceAsString(val), utils.InfieldSep, utils.ANDSep); err != nil {
return
}
sqp.Metrics[len(sqp.Metrics)-1].Blockers = append(sqp.Metrics[len(sqp.Metrics)-1].Blockers, blkrs...)
default:
return utils.ErrWrongPath
}
@@ -734,5 +742,7 @@ func (mf *MetricWithFilters) FieldAsInterface(fldPath []string) (_ interface{},
return mf.MetricID, nil
case utils.FilterIDs:
return mf.FilterIDs, nil
case utils.BlockersField:
return mf.Blockers, nil
}
}

View File

@@ -324,8 +324,7 @@ type StatMdls []*StatMdl
// CSVHeader return the header for csv fields as a slice of string
func (tps StatMdls) CSVHeader() (result []string) {
return []string{"#" + utils.Tenant, utils.ID, utils.FilterIDs, utils.Weight,
utils.QueueLength, utils.TTL, utils.MinItems, utils.MetricIDs, utils.MetricFilterIDs,
utils.Stored, utils.BlockersField, utils.ThresholdIDs}
utils.QueueLength, utils.TTL, utils.MinItems, utils.MetricIDs, utils.MetricFilterIDs, utils.MetricBlockers, utils.Stored, utils.BlockersField, utils.ThresholdIDs}
}
func (tps StatMdls) AsTPStats() (result []*utils.TPStatProfile) {
@@ -394,6 +393,9 @@ func (tps StatMdls) AsTPStats() (result []*utils.TPStatProfile) {
filterIDs := strings.Split(model.MetricFilterIDs, utils.InfieldSep)
stsMetric.FilterIDs = append(stsMetric.FilterIDs, filterIDs...)
}
if model.MetricBlockers != utils.EmptyString {
stsMetric.Blockers = model.MetricBlockers
}
statMetricsMap[key.TenantID()][metricID] = stsMetric
}
}
@@ -447,6 +449,7 @@ func APItoModelStats(st *utils.TPStatProfile) (mdls StatMdls) {
}
mdl.MetricFilterIDs += val
}
mdl.MetricBlockers = metric.Blockers
mdl.MetricIDs = metric.MetricID
mdls = append(mdls, mdl)
}
@@ -456,14 +459,13 @@ func APItoModelStats(st *utils.TPStatProfile) (mdls StatMdls) {
func APItoStats(tpST *utils.TPStatProfile, timezone string) (st *StatQueueProfile, err error) {
st = &StatQueueProfile{
Tenant: tpST.Tenant,
ID: tpST.ID,
FilterIDs: make([]string, len(tpST.FilterIDs)),
QueueLength: tpST.QueueLength,
MinItems: tpST.MinItems,
Metrics: make([]*MetricWithFilters, len(tpST.Metrics)),
Stored: tpST.Stored,
Tenant: tpST.Tenant,
ID: tpST.ID,
FilterIDs: make([]string, len(tpST.FilterIDs)),
QueueLength: tpST.QueueLength,
MinItems: tpST.MinItems,
Metrics: make([]*MetricWithFilters, len(tpST.Metrics)),
Stored: tpST.Stored,
ThresholdIDs: make([]string, len(tpST.ThresholdIDs)),
}
if tpST.Weights != utils.EmptyString {
@@ -486,6 +488,11 @@ func APItoStats(tpST *utils.TPStatProfile, timezone string) (st *StatQueueProfil
MetricID: metric.MetricID,
FilterIDs: metric.FilterIDs,
}
if metric.Blockers != utils.EmptyString {
if st.Metrics[i].Blockers, err = utils.NewBlockersFromString(metric.Blockers, utils.InfieldSep, utils.ANDSep); err != nil {
return
}
}
}
for i, trh := range tpST.ThresholdIDs {
st.ThresholdIDs[i] = trh
@@ -512,6 +519,7 @@ func StatQueueProfileToAPI(st *StatQueueProfile) (tpST *utils.TPStatProfile) {
for i, metric := range st.Metrics {
tpST.Metrics[i] = &utils.MetricWithFilters{
MetricID: metric.MetricID,
Blockers: metric.Blockers.String(utils.InfieldSep, utils.ANDSep),
}
if len(metric.FilterIDs) != 0 {
tpST.Metrics[i].FilterIDs = make([]string, len(metric.FilterIDs))
@@ -519,7 +527,6 @@ func StatQueueProfileToAPI(st *StatQueueProfile) (tpST *utils.TPStatProfile) {
tpST.Metrics[i].FilterIDs[j] = fltr
}
}
}
if st.TTL != time.Duration(0) {
tpST.TTL = st.TTL.String()

View File

@@ -59,9 +59,10 @@ type StatMdl struct {
MinItems int `index:"6" re:""`
MetricIDs string `index:"7" re:""`
MetricFilterIDs string `index:"8" re:""`
Stored bool `index:"9" re:""`
Blockers string `index:"10" re:""`
ThresholdIDs string `index:"11" re:""`
MetricBlockers string `index:"9" re:""`
Stored bool `index:"10" re:""`
Blockers string `index:"11" re:""`
ThresholdIDs string `index:"12" re:""`
CreatedAt time.Time
}

View File

@@ -220,6 +220,18 @@ func (sS *StatS) matchingStatQueuesForEvent(ctx *context.Context, tnt string, st
if sqPrfl.TTL > 0 {
sq.ttl = utils.DurationPointer(sqPrfl.TTL)
}
// every metrics has a blocker, verify them
for idx, metric := range sqPrfl.Metrics {
var blocker bool
if blocker, err = BlockerFromDynamics(ctx, metric.Blockers, sS.fltrS, tnt, evNm); err != nil {
return
}
// if we have blocker, ignore the rest of the metrics
if blocker {
sqPrfl.Metrics = sqPrfl.Metrics[:idx+1]
break
}
}
sq.sqPrfl = sqPrfl
if sq.weight, err = WeightFromDynamics(ctx, sqPrfl.Weights,
sS.fltrS, tnt, evNm); err != nil {

View File

@@ -45,7 +45,7 @@ func (tpSts TPStats) exportItems(ctx *context.Context, wrtr io.Writer, tnt strin
csvWriter := csv.NewWriter(wrtr)
csvWriter.Comma = utils.CSVSep
// before writing the profiles, we must write the headers
if err = csvWriter.Write([]string{"#Tenant", "ID", "FilterIDs", "Weights", "QueueLength", "TTL", "MinItems", "Metrics", "MetricFilterIDs", "Stored", "Blockers", "ThresholdIDs"}); err != nil {
if err = csvWriter.Write([]string{"#Tenant", "ID", "FilterIDs", "Weights", "QueueLength", "TTL", "MinItems", "Metrics", "MetricFilterIDs", "MetricBlockers", "Stored", "Blockers", "ThresholdIDs"}); err != nil {
return
}
for _, statsID := range itmIDs {

View File

@@ -231,6 +231,7 @@ type AttrDisconnectSession struct {
type MetricWithFilters struct {
FilterIDs []string
MetricID string
Blockers string
}
// TPStatProfile is used in APIs to manage remotely offline StatProfile

View File

@@ -555,6 +555,7 @@ const (
MinItems = "MinItems"
MetricIDs = "MetricIDs"
MetricFilterIDs = "MetricFilterIDs"
MetricBlockers = "MetricBlockers"
FieldName = "FieldName"
Path = "Path"
Hosts = "Hosts"