diff --git a/actions/dynamic.go b/actions/dynamic.go index 21c17a0df..3e45de6f0 100644 --- a/actions/dynamic.go +++ b/actions/dynamic.go @@ -764,3 +764,158 @@ func (aL *actDynamicResource) execute(ctx *context.Context, data utils.MapStorag } return } + +// actDynamicTrend processes the `ActionDiktatsOpts` field from the action to construct a TrendProfile +// +// The ActionDiktatsOpts field format is expected as follows: +// +// 0 Tenant: string +// 1 ID: string +// 2 Schedule: string +// 3 StatID: string +// 4 Metrics: strings separated by "&". +// 5 TTL: duration +// 6 QueueLength: integer +// 7 MinItems: integer +// 8 CorrelationType: string +// 9 Tolerance: float +// 10 Stored: bool +// 11 ThresholdIDs: strings separated by "&". +// 12 APIOpts: set of key-value pairs (separated by "&"). +// +// Parameters are separated by ";" and must be provided in the specified order. +type actDynamicTrend struct { + config *config.CGRConfig + connMgr *engine.ConnManager + fltrS *engine.FilterS + aCfg *utils.APAction + tnt string + cgrEv *utils.CGREvent +} + +func (aL *actDynamicTrend) id() string { + return aL.aCfg.ID +} + +func (aL *actDynamicTrend) cfg() *utils.APAction { + return aL.aCfg +} + +// execute implements actioner interface +func (aL *actDynamicTrend) execute(ctx *context.Context, data utils.MapStorage, trgID string) (err error) { + if len(aL.config.ActionSCfg().AdminSConns) == 0 { + return fmt.Errorf("no connection with AdminS") + } + data[utils.MetaNow] = time.Now() + data[utils.MetaTenant] = utils.FirstNonEmpty(aL.cgrEv.Tenant, aL.tnt, + config.CgrConfig().GeneralCfg().DefaultTenant) + // Parse action parameters based on the predefined format. + if len(aL.aCfg.Diktats) == 0 { + return fmt.Errorf("No diktats were speified for action <%v>", aL.aCfg.ID) + } + weights := make(map[string]float64) // stores sorting weights by Diktat ID + diktats := make([]*utils.APDiktat, 0) // list of diktats which have *template in opts, will be weight sorted later + for _, diktat := range aL.aCfg.Diktats { + if pass, err := aL.fltrS.Pass(ctx, aL.tnt, diktat.FilterIDs, data); err != nil { + return err + } else if !pass { + continue + } + weight, err := engine.WeightFromDynamics(ctx, diktat.Weights, aL.fltrS, aL.tnt, data) + if err != nil { + return err + } + weights[diktat.ID] = weight + diktats = append(diktats, diktat) + } + // Sort by weight (higher values first). + slices.SortFunc(diktats, func(a, b *utils.APDiktat) int { + return cmp.Compare(weights[b.ID], weights[a.ID]) + }) + for _, diktat := range diktats { + params := strings.Split(utils.IfaceAsString(diktat.Opts[utils.MetaTemplate]), + utils.InfieldSep) + if len(params) != 13 { + return fmt.Errorf("invalid number of parameters <%d> expected 13", len(params)) + } + // parse dynamic parameters + for i := range params { + if params[i], err = utils.ParseParamForDataProvider(params[i], data, false); err != nil { + return err + } + } + // Prepare request arguments based on provided parameters. + args := &utils.TrendProfileWithAPIOpts{ + TrendProfile: &utils.TrendProfile{ + Tenant: params[0], + ID: params[1], + Schedule: params[2], + StatID: params[3], + CorrelationType: params[8], + }, + APIOpts: make(map[string]any), + } + // populate Trend's Metrics + if params[4] != utils.EmptyString { + args.Metrics = strings.Split(params[4], utils.ANDSep) + } + // populate Trend's TTL + if params[5] != utils.EmptyString { + args.TTL, err = utils.ParseDurationWithNanosecs(params[5]) + if err != nil { + return err + } + } + // populate Trend's QueueLengh + if params[6] != utils.EmptyString { + args.QueueLength, err = strconv.Atoi(params[6]) + if err != nil { + return err + } + } + // populate Trend's MinItems + if params[7] != utils.EmptyString { + args.MinItems, err = strconv.Atoi(params[7]) + if err != nil { + return err + } + } + // populate Trend's Tolerance + if params[9] != utils.EmptyString { + args.Tolerance, err = strconv.ParseFloat(params[9], 64) + if err != nil { + return err + } + } + // populate Trend's Stored + if params[10] != utils.EmptyString { + args.Stored, err = strconv.ParseBool(params[10]) + if err != nil { + return err + } + } + // populate Trend's ThresholdIDs + if params[11] != utils.EmptyString { + args.ThresholdIDs = strings.Split(params[11], utils.ANDSep) + } + // populate Trend's APIOpts + if params[12] != utils.EmptyString { + if err := parseParamStringToMap(params[12], args.APIOpts); err != nil { + return err + } + } + + // create the TrendProfile based on the populated parameters + var rply string + if err = aL.connMgr.Call(ctx, aL.config.ActionSCfg().AdminSConns, + utils.AdminSv1SetTrendProfile, args, &rply); err != nil { + return err + } + if blocker, err := engine.BlockerFromDynamics(ctx, diktat.Blockers, aL.fltrS, aL.tnt, data); err != nil { + return err + } else if blocker { + break + } + } + return +} diff --git a/actions/libactions.go b/actions/libactions.go index b5e73effb..598b6193b 100644 --- a/actions/libactions.go +++ b/actions/libactions.go @@ -41,6 +41,8 @@ func actionTarget(act string) string { return utils.MetaAttributes case utils.MetaDynamicResource: return utils.MetaResources + case utils.MetaDynamicTrend: + return utils.MetaTrends default: return utils.MetaNone } @@ -145,6 +147,8 @@ func newActioner(ctx *context.Context, cgrEv *utils.CGREvent, cfg *config.CGRCon return &actDynamicAttribute{cfg, connMgr, fltrS, aCfg, tnt, cgrEv}, nil case utils.MetaDynamicResource: return &actDynamicResource{cfg, connMgr, fltrS, aCfg, tnt, cgrEv}, nil + case utils.MetaDynamicTrend: + return &actDynamicTrend{cfg, connMgr, fltrS, aCfg, tnt, cgrEv}, nil default: return nil, fmt.Errorf("unsupported action type: <%s>", aCfg.Type) diff --git a/actions/libactions_test.go b/actions/libactions_test.go index 3c2a0fa83..74fd8e1a4 100644 --- a/actions/libactions_test.go +++ b/actions/libactions_test.go @@ -43,7 +43,7 @@ func TestACExecuteCDRLog(t *testing.T) { expectedErr := "unsupported action type: " if _, err := newActionersFromActions(context.Background(), new(utils.CGREvent), cfg, fltr, dm, nil, - actCfg, "cgrates.org"); err == nil || err.Error() != expectedErr { + actCfg, utils.CGRateSorg); err == nil || err.Error() != expectedErr { t.Errorf("Expected %+v, received %+v", expectedErr, err) } @@ -56,6 +56,10 @@ func TestACExecuteCDRLog(t *testing.T) { {Type: utils.MetaAddBalance}, {Type: utils.MetaSetBalance}, {Type: utils.MetaRemBalance}, + {Type: utils.MetaDynamicThreshold}, + {Type: utils.MetaDynamicStats}, + {Type: utils.MetaDynamicAttribute}, + {Type: utils.MetaDynamicResource}, } actHttp, err := newActHTTPPost(context.Background(), cfg.GeneralCfg().DefaultTenant, new(utils.CGREvent), new(engine.FilterS), @@ -67,15 +71,19 @@ func TestACExecuteCDRLog(t *testing.T) { expectedActs := []actioner{ &actCDRLog{cfg, fltr, nil, &utils.APAction{Type: utils.CDRLog}}, actHttp, - &actExport{"cgrates.org", cfg, nil, &utils.APAction{Type: utils.MetaExport}}, - &actResetStat{"cgrates.org", cfg, nil, &utils.APAction{Type: utils.MetaResetStatQueue}}, - &actResetThreshold{"cgrates.org", cfg, nil, &utils.APAction{Type: utils.MetaResetThreshold}}, - &actSetBalance{cfg, nil, fltr, &utils.APAction{Type: utils.MetaAddBalance}, "cgrates.org", false}, - &actSetBalance{cfg, nil, fltr, &utils.APAction{Type: utils.MetaSetBalance}, "cgrates.org", true}, - &actRemBalance{cfg, nil, fltr, &utils.APAction{Type: utils.MetaRemBalance}, "cgrates.org"}, + &actExport{utils.CGRateSorg, cfg, nil, &utils.APAction{Type: utils.MetaExport}}, + &actResetStat{utils.CGRateSorg, cfg, nil, &utils.APAction{Type: utils.MetaResetStatQueue}}, + &actResetThreshold{utils.CGRateSorg, cfg, nil, &utils.APAction{Type: utils.MetaResetThreshold}}, + &actSetBalance{cfg, nil, fltr, &utils.APAction{Type: utils.MetaAddBalance}, utils.CGRateSorg, false}, + &actSetBalance{cfg, nil, fltr, &utils.APAction{Type: utils.MetaSetBalance}, utils.CGRateSorg, true}, + &actRemBalance{cfg, nil, fltr, &utils.APAction{Type: utils.MetaRemBalance}, utils.CGRateSorg}, + &actDynamicThreshold{cfg, nil, fltr, &utils.APAction{Type: utils.MetaDynamicThreshold}, utils.CGRateSorg, new(utils.CGREvent)}, + &actDynamicStats{cfg, nil, fltr, &utils.APAction{Type: utils.MetaDynamicStats}, utils.CGRateSorg, new(utils.CGREvent)}, + &actDynamicAttribute{cfg, nil, fltr, &utils.APAction{Type: utils.MetaDynamicAttribute}, utils.CGRateSorg, new(utils.CGREvent)}, + &actDynamicResource{cfg, nil, fltr, &utils.APAction{Type: utils.MetaDynamicResource}, utils.CGRateSorg, new(utils.CGREvent)}, } - acts, err := newActionersFromActions(context.Background(), new(utils.CGREvent), cfg, fltr, dm, nil, actCfg, "cgrates.org") + acts, err := newActionersFromActions(context.Background(), new(utils.CGREvent), cfg, fltr, dm, nil, actCfg, utils.CGRateSorg) if err != nil { t.Error(err) } else if !reflect.DeepEqual(acts, expectedActs) { @@ -98,7 +106,7 @@ func TestACExecuteScheduledAction(t *testing.T) { utils.LogLevelCfg: 7, } - schedActs := newScheduledActs(nil, "cgrates.org", "FirstAction", + schedActs := newScheduledActs(nil, utils.CGRateSorg, "FirstAction", utils.EmptyString, utils.MetaTopUp, utils.MetaNow, dataStorage, acts) diff --git a/general_tests/dynamic_thresholds_it_test.go b/general_tests/dynamic_thresholds_it_test.go index 627293340..3808c5cec 100644 --- a/general_tests/dynamic_thresholds_it_test.go +++ b/general_tests/dynamic_thresholds_it_test.go @@ -258,6 +258,7 @@ func TestDynThdIT(t *testing.T) { utils.MetaStats: {"someID": {}}, utils.MetaAttributes: {"someID": {}}, utils.MetaResources: {"someID": {}}, + utils.MetaTrends: {"someID": {}}, }, Actions: []*utils.APAction{ { @@ -488,6 +489,63 @@ func TestDynThdIT(t *testing.T) { }, }, }, + { + ID: "Dynamic_Trend_ID", + Type: utils.MetaDynamicTrend, + Diktats: []*utils.APDiktat{ + { + ID: "CreateDynamicTrend1002", + FilterIDs: []string{"*string:~*req.Account:1002"}, + Opts: map[string]any{ + "*template": "*tenant;DYNAMICLY_TRND_<~*req.Account>;@every 1s;Stats1_1;*acc&*tcc;-1;-1;1;*last;1;true;THID1&THID2;~*opts", + }, + Weights: utils.DynamicWeights{ + { + Weight: 50, + }, + }, + }, + { + ID: "CreateDynamicTrend1002NotFoundFilter", + FilterIDs: []string{"*string:~*req.Account:1003"}, + Opts: map[string]any{ + "*template": "*tenant;DYNAMICLY_TRND_2_<~*req.Account>;@every 1s;Stats1_1;*acc&*tcc;-1;-1;1;*last;1;true;THID1&THID2;~*opts", + }, + Weights: utils.DynamicWeights{ + { + Weight: 90, + }, + }, + }, + { + ID: "CreateDynamicTrend1002Blocker", + Opts: map[string]any{ + "*template": "*tenant;DYNAMICLY_TRND_3_<~*req.Account>;@every 1s;Stats1_1;*acc&*tcc;-1;-1;1;*last;1;true;THID1&THID2;~*opts", + }, + Weights: utils.DynamicWeights{ + { + Weight: 20, + }, + }, + Blockers: utils.DynamicBlockers{ + { + Blocker: true, + }, + }, + }, + { + ID: "CreateDynamicTrend1002Blocked", + Opts: map[string]any{ + "*template": "*tenant;DYNAMICLY_TRND_4_<~*req.Account>;@every 1s;Stats1_1;*acc&*tcc;-1;-1;1;*last;1;true;THID1&THID2;~*opts", + }, + Weights: utils.DynamicWeights{ + { + Weight: 10, + }, + }, + }, + }, + }, }, }, } @@ -844,7 +902,7 @@ func TestDynThdIT(t *testing.T) { } if !reflect.DeepEqual(exp, attrs) { - t.Errorf("Expected <%v>\nReceived <%v>", utils.ToJSON(attrs), utils.ToJSON(exp)) + t.Errorf("Expected <%v>\nReceived <%v>", utils.ToJSON(exp), utils.ToJSON(attrs)) } }) @@ -900,7 +958,58 @@ func TestDynThdIT(t *testing.T) { } if !reflect.DeepEqual(exp, rsc) { - t.Errorf("Expected <%v>\nReceived <%v>", utils.ToJSON(rsc), utils.ToJSON(exp)) + t.Errorf("Expected <%v>\nReceived <%v>", utils.ToJSON(exp), utils.ToJSON(rsc)) + } + }) + + t.Run("GetDynamicTrendProfile", func(t *testing.T) { + var rcv []*utils.TrendProfile + if err := client.Call(context.Background(), utils.AdminSv1GetTrendProfiles, + &utils.ArgsItemIDs{ + Tenant: utils.CGRateSorg, + }, &rcv); err != nil { + t.Errorf("AdminSv1GetTrendProfiles failed unexpectedly: %v", err) + } + if len(rcv) != 2 { + t.Fatalf("AdminSv1GetTrendProfiles len(rcv)=%v, want 2", len(rcv)) + } + sort.Slice(rcv, func(i, j int) bool { + return rcv[i].ID > rcv[j].ID + }) + exp := []*utils.TrendProfile{ + { + Tenant: "cgrates.org", + ID: "DYNAMICLY_TRND_3_1002", + Schedule: "@every 1s", + StatID: "Stats1_1", + Metrics: []string{"*acc", "*tcc"}, + TTL: -1, + QueueLength: -1, + MinItems: 1, + CorrelationType: "*last", + Tolerance: 1, + Stored: true, + ThresholdIDs: []string{"THID1", "THID2"}, + }, + { + + Tenant: "cgrates.org", + ID: "DYNAMICLY_TRND_1002", + Schedule: "@every 1s", + StatID: "Stats1_1", + Metrics: []string{"*acc", "*tcc"}, + TTL: -1, + QueueLength: -1, + MinItems: 1, + CorrelationType: "*last", + Tolerance: 1, + Stored: true, + ThresholdIDs: []string{"THID1", "THID2"}, + }, + } + + if !reflect.DeepEqual(exp, rcv) { + t.Errorf("Expected <%v>\nReceived <%v>", utils.ToJSON(exp), utils.ToJSON(rcv)) } }) diff --git a/utils/consts.go b/utils/consts.go index 0caa74f79..029c83d59 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1150,6 +1150,7 @@ const ( MetaDynamicStats = "*dynamic_stats" MetaDynamicAttribute = "*dynamic_attribute" MetaDynamicResource = "*dynamic_resource" + MetaDynamicTrend = "*dynamicTrend" // Diktats Opts Fields MetaBalancePath = "*balancePath"