diff --git a/apis/loaders_it_test.go b/apis/loaders_it_test.go index 589a6e5d7..524f02c66 100644 --- a/apis/loaders_it_test.go +++ b/apis/loaders_it_test.go @@ -319,13 +319,13 @@ cgrates.org,RoutePrf2,*string:~*req.Account:1002,;20,,*lc,,route1,fltr3,Account3 // Create and populate Stats.csv if err := writeFile(utils.StatsCsv, ` -#Tenant[0],ID[1],FilterIDs[2],Weights[3],QueueLength[4],TTL[5],MinItems[6],MetricIDs[7],MetricFilterIDs[8],Stored[9],Blockers[10],ThresholdIDs[11] -cgrates.org,TestStats,*string:~*req.Account:1001,;20,100,1s,2,*sum#~*req.Value;*average#~*req.Value,fltr1;fltr2,true,;true,Th1;Th2 -cgrates.org,TestStats,,,,,2,*sum#~*req.Usage,,,, -cgrates.org,TestStats2,*string:~*req.Account:1002,,100,1s,2,*sum#~*req.Value;*sum#~*req.Usage;*average#~*req.Value;*average#~*req.Usage,,true,;true,Th -cgrates.org,TestStats2,,;20,,,2,*sum#~*req.Cost;*average#~*req.Cost,,true,, -cgrates.org,TestStats3,,,,,,,,,, -cgrates.org,TestStats3,*string:~*req.Account:1003,;20,100,1s,2,*sum#~*req.Value;*average#~*req.Value,,true,;true,Th1;Th2 +#Tenant[0],ID[1],FilterIDs[2],Weights[3],QueueLength[4],TTL[5],MinItems[6],MetricIDs[7],MetricFilterIDs[8],MetricBlockers[9],Stored[10],Blockers[11],ThresholdIDs[12] +cgrates.org,TestStats,*string:~*req.Account:1001,;20,100,1s,2,*sum#~*req.Value;*average#~*req.Value,fltr1;fltr2,,true,;true,Th1;Th2 +cgrates.org,TestStats,,,,,2,*sum#~*req.Usage,,*string:~*req.Account:1003&fltr2;true;;false,,, +cgrates.org,TestStats2,*string:~*req.Account:1002,,100,1s,2,*sum#~*req.Value;*sum#~*req.Usage;*average#~*req.Value;*average#~*req.Usage,,,true,;true,Th +cgrates.org,TestStats2,,;20,,,2,*sum#~*req.Cost;*average#~*req.Cost,,;false,true,, +cgrates.org,TestStats3,,,,,,,,,,, +cgrates.org,TestStats3,*string:~*req.Account:1003,;20,100,1s,2,*sum#~*req.Value;*average#~*req.Value,,fltr_for_stats;false,true,;true,Th1;Th2 `); err != nil { t.Fatal(err) } @@ -1153,6 +1153,15 @@ func testLoadersGetStatQueueProfiles(t *testing.T) { }, { MetricID: "*sum#~*req.Usage", + Blockers: utils.Blockers{ + { + FilterIDs: []string{"*string:~*req.Account:1003", "fltr2"}, + Blocker: true, + }, + { + Blocker: false, + }, + }, }, }, ThresholdIDs: []string{"Th1", "Th2"}, @@ -1189,6 +1198,7 @@ func testLoadersGetStatQueueProfiles(t *testing.T) { }, { MetricID: "*average#~*req.Cost", + Blockers: utils.Blockers{{Blocker: false}}, }, }, ThresholdIDs: []string{"Th"}, @@ -1213,18 +1223,22 @@ func testLoadersGetStatQueueProfiles(t *testing.T) { }, { MetricID: "*average#~*req.Value", + Blockers: utils.Blockers{ + { + FilterIDs: []string{"fltr_for_stats"}, + Blocker: false, + }, + }, }, }, ThresholdIDs: []string{"Th1", "Th2"}, Blockers: utils.Blockers{{Blocker: true}}, Stored: true, - Weights: utils.DynamicWeights{ { Weight: 20, }, }, - MinItems: 2, }, } diff --git a/config/config_defaults.go b/config/config_defaults.go index 419c15193..974126669 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -1237,14 +1237,15 @@ const CGRATES_CFG_JSON = ` {"tag": "ID", "path": "ID", "type": "*variable", "value": "~*req.1", "mandatory": true}, {"tag": "FilterIDs", "path": "FilterIDs", "type": "*variable", "value": "~*req.2"}, {"tag": "Weights", "path": "Weights", "type": "*variable", "value": "~*req.3"}, - {"tag": "QueueLength", "path": "QueueLength", "type": "*variable", "value": "~*req.4"}, - {"tag": "TTL", "path": "TTL", "type": "*variable", "value": "~*req.5"}, - {"tag": "MinItems", "path": "MinItems", "type": "*variable", "value": "~*req.6"}, - {"tag": "MetricIDs", "path": "Metrics.MetricID", "type": "*variable", "value": "~*req.7","new_branch":true}, - {"tag": "MetricFilterIDs", "path": "Metrics.FilterIDs", "type": "*variable", "value": "~*req.8"}, - {"tag": "Stored", "path": "Stored", "type": "*variable", "value": "~*req.9"}, - {"tag": "Blockers", "path": "Blockers", "type": "*variable", "value": "~*req.10"}, - {"tag": "ThresholdIDs", "path": "ThresholdIDs", "type": "*variable", "value": "~*req.11"}, + {"tag": "Blockers", "path": "Blockers", "type": "*variable", "value": "~*req.4"}, + {"tag": "QueueLength", "path": "QueueLength", "type": "*variable", "value": "~*req.5"}, + {"tag": "TTL", "path": "TTL", "type": "*variable", "value": "~*req.6"}, + {"tag": "MinItems", "path": "MinItems", "type": "*variable", "value": "~*req.7"}, + {"tag": "Stored", "path": "Stored", "type": "*variable", "value": "~*req.8"}, + {"tag": "ThresholdIDs", "path": "ThresholdIDs", "type": "*variable", "value": "~*req.9"}, + {"tag": "MetricIDs", "path": "Metrics.MetricID", "type": "*variable", "value": "~*req.10","new_branch":true}, + {"tag": "MetricFilterIDs", "path": "Metrics.FilterIDs", "type": "*variable", "value": "~*req.11"}, + {"tag": "MetricBlockers", "path": "Metrics.Blockers", "type": "*variable", "value": "~*req.12"}, ], }, { diff --git a/data/storage/docker_mysql/scripts/create_tariffplan_tables.sql b/data/storage/docker_mysql/scripts/create_tariffplan_tables.sql index 5cf18155d..91d106301 100644 --- a/data/storage/docker_mysql/scripts/create_tariffplan_tables.sql +++ b/data/storage/docker_mysql/scripts/create_tariffplan_tables.sql @@ -35,15 +35,15 @@ CREATE TABLE tp_stats ( `tenant` varchar(64) NOT NULL, `id` varchar(64) NOT NULL, `filter_ids` varchar(64) NOT NULL, - `activation_interval` varchar(64) NOT NULL, + `weights` varchar(64) NOT NULL, `queue_length` int(11) NOT NULL, `ttl` varchar(32) NOT NULL, `min_items` int(11) NOT NULL, `metric_ids` varchar(128) NOT NULL, `metric_filter_ids` varchar(64) NOT NULL, + `metric_blockers` varchar(128) NOT NULL, `stored` BOOLEAN NOT NULL, - `weights` varchar(64) NOT NULL, - `blockers` VARCHAR(128) NOT NULL, + `blockers` varchar(128) NOT NULL, `threshold_ids` varchar(64) NOT NULL, `created_at` TIMESTAMP, PRIMARY KEY (`pk`), diff --git a/data/storage/mysql/create_tariffplan_tables.sql b/data/storage/mysql/create_tariffplan_tables.sql index 1618443fb..784f4d086 100644 --- a/data/storage/mysql/create_tariffplan_tables.sql +++ b/data/storage/mysql/create_tariffplan_tables.sql @@ -40,9 +40,10 @@ CREATE TABLE tp_stats ( `min_items` int(11) NOT NULL, `metric_ids` varchar(128) NOT NULL, `metric_filter_ids` varchar(64) NOT NULL, + `metric_blockers` varchar(128) NOT NULL, `stored` BOOLEAN NOT NULL, `weights` varchar(128) NOT NULL, - `blockers` VARCHAR(128) NOT NULL, + `blockers` varchar(128) NOT NULL, `threshold_ids` varchar(64) NOT NULL, `created_at` TIMESTAMP, PRIMARY KEY (`pk`), diff --git a/data/storage/postgres/create_tariffplan_tables.sql b/data/storage/postgres/create_tariffplan_tables.sql index 14fcd240a..12106e7f0 100644 --- a/data/storage/postgres/create_tariffplan_tables.sql +++ b/data/storage/postgres/create_tariffplan_tables.sql @@ -35,12 +35,12 @@ CREATE TABLE tp_stats ( "tenant"varchar(64) NOT NULL, "id" varchar(64) NOT NULL, "filter_ids" varchar(64) NOT NULL, - "activation_interval" varchar(64) NOT NULL, "queue_length" INTEGER NOT NULL, "ttl" varchar(32) NOT NULL, "min_items" INTEGER NOT NULL, "metric_ids" VARCHAR(128) NOT NULL, "metric_filter_ids" VARCHAR(128) NOT NULL, + "metric_blockers" VARCHAR(128) NOT NULL, "stored" BOOLEAN NOT NULL, "weights" VARCHAR(128) NOT NULL, "blockers" VARCHAR(128) NOT NULL, diff --git a/data/tariffplans/testit/Stats.csv b/data/tariffplans/testit/Stats.csv index b8a9f6e94..7f225531d 100644 --- a/data/tariffplans/testit/Stats.csv +++ b/data/tariffplans/testit/Stats.csv @@ -6,3 +6,5 @@ cgrates.org,Stat_3,FLTR_STAT_3,;30,100,1s,0,*acd;*tcd;*asr,,false,;true,*none cgrates.org,Stat_Supplier1,*string:~*req.StatID:Stat_Supplier1,;30,100,1s,0,*sum#~*req.LoadReq,,true,;true,*none cgrates.org,Stat_Supplier2,*string:~*req.StatID:Stat_Supplier2,;30,100,1s,0,*sum#~*req.LoadReq,,true,;true,*none cgrates.org,Stat_Supplier3,*string:~*req.StatID:Stat_Supplier3,;30,100,1s,0,*sum#~*req.LoadReq,,true,;true,*none + + diff --git a/engine/libstats.go b/engine/libstats.go index 7cba76cf0..01d2e3744 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -38,12 +38,12 @@ type StatQueueProfile struct { Tenant string ID string // QueueID FilterIDs []string + Weights utils.DynamicWeights + Blockers utils.Blockers // blocker flag to stop processing on filters matched QueueLength int TTL time.Duration MinItems int Stored bool - Weights utils.DynamicWeights - Blockers utils.Blockers // blocker flag to stop processing on filters matched ThresholdIDs []string // list of thresholds to be checked after changes Metrics []*MetricWithFilters // list of metrics to build @@ -91,8 +91,9 @@ func (sqp *StatQueueProfile) isLocked() bool { } type MetricWithFilters struct { - FilterIDs []string MetricID string + FilterIDs []string + Blockers utils.Blockers // blocker flag to stop processing for next metric on filters matched } // NewStoredStatQueue initiates a StoredStatQueue out of StatQueue @@ -586,9 +587,11 @@ func (sqp *StatQueueProfile) Set(path []string, val interface{}, newBranch bool, sqp.ThresholdIDs = append(sqp.ThresholdIDs, valA...) } case 2: + // path =[]string{Metrics, MetricID} if path[0] != utils.Metrics { return utils.ErrWrongPath } + // val := *acd;*tcd;*asr if val != utils.EmptyString { if len(sqp.Metrics) == 0 || newBranch { sqp.Metrics = append(sqp.Metrics, new(MetricWithFilters)) @@ -604,7 +607,12 @@ func (sqp *StatQueueProfile) Set(path []string, val interface{}, newBranch bool, for _, mID := range valA[1:] { // add the rest of the metrics sqp.Metrics = append(sqp.Metrics, &MetricWithFilters{MetricID: mID}) } - + case utils.BlockersField: + var blkrs utils.Blockers + if blkrs, err = utils.NewBlockersFromString(utils.IfaceAsString(val), utils.InfieldSep, utils.ANDSep); err != nil { + return + } + sqp.Metrics[len(sqp.Metrics)-1].Blockers = append(sqp.Metrics[len(sqp.Metrics)-1].Blockers, blkrs...) default: return utils.ErrWrongPath } @@ -734,5 +742,7 @@ func (mf *MetricWithFilters) FieldAsInterface(fldPath []string) (_ interface{}, return mf.MetricID, nil case utils.FilterIDs: return mf.FilterIDs, nil + case utils.BlockersField: + return mf.Blockers, nil } } diff --git a/engine/model_helpers.go b/engine/model_helpers.go index 7122da567..d46569f71 100644 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -324,8 +324,7 @@ type StatMdls []*StatMdl // CSVHeader return the header for csv fields as a slice of string func (tps StatMdls) CSVHeader() (result []string) { return []string{"#" + utils.Tenant, utils.ID, utils.FilterIDs, utils.Weight, - utils.QueueLength, utils.TTL, utils.MinItems, utils.MetricIDs, utils.MetricFilterIDs, - utils.Stored, utils.BlockersField, utils.ThresholdIDs} + utils.QueueLength, utils.TTL, utils.MinItems, utils.MetricIDs, utils.MetricFilterIDs, utils.MetricBlockers, utils.Stored, utils.BlockersField, utils.ThresholdIDs} } func (tps StatMdls) AsTPStats() (result []*utils.TPStatProfile) { @@ -394,6 +393,9 @@ func (tps StatMdls) AsTPStats() (result []*utils.TPStatProfile) { filterIDs := strings.Split(model.MetricFilterIDs, utils.InfieldSep) stsMetric.FilterIDs = append(stsMetric.FilterIDs, filterIDs...) } + if model.MetricBlockers != utils.EmptyString { + stsMetric.Blockers = model.MetricBlockers + } statMetricsMap[key.TenantID()][metricID] = stsMetric } } @@ -447,6 +449,7 @@ func APItoModelStats(st *utils.TPStatProfile) (mdls StatMdls) { } mdl.MetricFilterIDs += val } + mdl.MetricBlockers = metric.Blockers mdl.MetricIDs = metric.MetricID mdls = append(mdls, mdl) } @@ -456,14 +459,13 @@ func APItoModelStats(st *utils.TPStatProfile) (mdls StatMdls) { func APItoStats(tpST *utils.TPStatProfile, timezone string) (st *StatQueueProfile, err error) { st = &StatQueueProfile{ - Tenant: tpST.Tenant, - ID: tpST.ID, - FilterIDs: make([]string, len(tpST.FilterIDs)), - QueueLength: tpST.QueueLength, - MinItems: tpST.MinItems, - Metrics: make([]*MetricWithFilters, len(tpST.Metrics)), - Stored: tpST.Stored, - + Tenant: tpST.Tenant, + ID: tpST.ID, + FilterIDs: make([]string, len(tpST.FilterIDs)), + QueueLength: tpST.QueueLength, + MinItems: tpST.MinItems, + Metrics: make([]*MetricWithFilters, len(tpST.Metrics)), + Stored: tpST.Stored, ThresholdIDs: make([]string, len(tpST.ThresholdIDs)), } if tpST.Weights != utils.EmptyString { @@ -486,6 +488,11 @@ func APItoStats(tpST *utils.TPStatProfile, timezone string) (st *StatQueueProfil MetricID: metric.MetricID, FilterIDs: metric.FilterIDs, } + if metric.Blockers != utils.EmptyString { + if st.Metrics[i].Blockers, err = utils.NewBlockersFromString(metric.Blockers, utils.InfieldSep, utils.ANDSep); err != nil { + return + } + } } for i, trh := range tpST.ThresholdIDs { st.ThresholdIDs[i] = trh @@ -512,6 +519,7 @@ func StatQueueProfileToAPI(st *StatQueueProfile) (tpST *utils.TPStatProfile) { for i, metric := range st.Metrics { tpST.Metrics[i] = &utils.MetricWithFilters{ MetricID: metric.MetricID, + Blockers: metric.Blockers.String(utils.InfieldSep, utils.ANDSep), } if len(metric.FilterIDs) != 0 { tpST.Metrics[i].FilterIDs = make([]string, len(metric.FilterIDs)) @@ -519,7 +527,6 @@ func StatQueueProfileToAPI(st *StatQueueProfile) (tpST *utils.TPStatProfile) { tpST.Metrics[i].FilterIDs[j] = fltr } } - } if st.TTL != time.Duration(0) { tpST.TTL = st.TTL.String() diff --git a/engine/models.go b/engine/models.go index 5bbc59188..a287f67f1 100644 --- a/engine/models.go +++ b/engine/models.go @@ -59,9 +59,10 @@ type StatMdl struct { MinItems int `index:"6" re:""` MetricIDs string `index:"7" re:""` MetricFilterIDs string `index:"8" re:""` - Stored bool `index:"9" re:""` - Blockers string `index:"10" re:""` - ThresholdIDs string `index:"11" re:""` + MetricBlockers string `index:"9" re:""` + Stored bool `index:"10" re:""` + Blockers string `index:"11" re:""` + ThresholdIDs string `index:"12" re:""` CreatedAt time.Time } diff --git a/engine/stats.go b/engine/stats.go index 8b75663ca..d005385c1 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -220,6 +220,18 @@ func (sS *StatS) matchingStatQueuesForEvent(ctx *context.Context, tnt string, st if sqPrfl.TTL > 0 { sq.ttl = utils.DurationPointer(sqPrfl.TTL) } + // every metrics has a blocker, verify them + for idx, metric := range sqPrfl.Metrics { + var blocker bool + if blocker, err = BlockerFromDynamics(ctx, metric.Blockers, sS.fltrS, tnt, evNm); err != nil { + return + } + // if we have blocker, ignore the rest of the metrics + if blocker { + sqPrfl.Metrics = sqPrfl.Metrics[:idx+1] + break + } + } sq.sqPrfl = sqPrfl if sq.weight, err = WeightFromDynamics(ctx, sqPrfl.Weights, sS.fltrS, tnt, evNm); err != nil { diff --git a/tpes/tpe_stats.go b/tpes/tpe_stats.go index 0f48082a5..3368089ac 100644 --- a/tpes/tpe_stats.go +++ b/tpes/tpe_stats.go @@ -45,7 +45,7 @@ func (tpSts TPStats) exportItems(ctx *context.Context, wrtr io.Writer, tnt strin csvWriter := csv.NewWriter(wrtr) csvWriter.Comma = utils.CSVSep // before writing the profiles, we must write the headers - if err = csvWriter.Write([]string{"#Tenant", "ID", "FilterIDs", "Weights", "QueueLength", "TTL", "MinItems", "Metrics", "MetricFilterIDs", "Stored", "Blockers", "ThresholdIDs"}); err != nil { + if err = csvWriter.Write([]string{"#Tenant", "ID", "FilterIDs", "Weights", "QueueLength", "TTL", "MinItems", "Metrics", "MetricFilterIDs", "MetricBlockers", "Stored", "Blockers", "ThresholdIDs"}); err != nil { return } for _, statsID := range itmIDs { diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 24d5edeec..861a8ab9d 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -231,6 +231,7 @@ type AttrDisconnectSession struct { type MetricWithFilters struct { FilterIDs []string MetricID string + Blockers string } // TPStatProfile is used in APIs to manage remotely offline StatProfile diff --git a/utils/consts.go b/utils/consts.go index b99a3b4a8..993c1f623 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -555,6 +555,7 @@ const ( MinItems = "MinItems" MetricIDs = "MetricIDs" MetricFilterIDs = "MetricFilterIDs" + MetricBlockers = "MetricBlockers" FieldName = "FieldName" Path = "Path" Hosts = "Hosts"