mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add action type *dynamicTrend
This commit is contained in:
committed by
Dan Christian Bogos
parent
f85ceebf85
commit
6302bb0fa1
@@ -764,3 +764,158 @@ func (aL *actDynamicResource) execute(ctx *context.Context, data utils.MapStorag
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// actDynamicTrend processes the `ActionDiktatsOpts` field from the action to construct a TrendProfile
|
||||
//
|
||||
// The ActionDiktatsOpts field format is expected as follows:
|
||||
//
|
||||
// 0 Tenant: string
|
||||
// 1 ID: string
|
||||
// 2 Schedule: string
|
||||
// 3 StatID: string
|
||||
// 4 Metrics: strings separated by "&".
|
||||
// 5 TTL: duration
|
||||
// 6 QueueLength: integer
|
||||
// 7 MinItems: integer
|
||||
// 8 CorrelationType: string
|
||||
// 9 Tolerance: float
|
||||
// 10 Stored: bool
|
||||
// 11 ThresholdIDs: strings separated by "&".
|
||||
// 12 APIOpts: set of key-value pairs (separated by "&").
|
||||
//
|
||||
// Parameters are separated by ";" and must be provided in the specified order.
|
||||
type actDynamicTrend struct {
|
||||
config *config.CGRConfig
|
||||
connMgr *engine.ConnManager
|
||||
fltrS *engine.FilterS
|
||||
aCfg *utils.APAction
|
||||
tnt string
|
||||
cgrEv *utils.CGREvent
|
||||
}
|
||||
|
||||
func (aL *actDynamicTrend) id() string {
|
||||
return aL.aCfg.ID
|
||||
}
|
||||
|
||||
func (aL *actDynamicTrend) cfg() *utils.APAction {
|
||||
return aL.aCfg
|
||||
}
|
||||
|
||||
// execute implements actioner interface
|
||||
func (aL *actDynamicTrend) execute(ctx *context.Context, data utils.MapStorage, trgID string) (err error) {
|
||||
if len(aL.config.ActionSCfg().AdminSConns) == 0 {
|
||||
return fmt.Errorf("no connection with AdminS")
|
||||
}
|
||||
data[utils.MetaNow] = time.Now()
|
||||
data[utils.MetaTenant] = utils.FirstNonEmpty(aL.cgrEv.Tenant, aL.tnt,
|
||||
config.CgrConfig().GeneralCfg().DefaultTenant)
|
||||
// Parse action parameters based on the predefined format.
|
||||
if len(aL.aCfg.Diktats) == 0 {
|
||||
return fmt.Errorf("No diktats were speified for action <%v>", aL.aCfg.ID)
|
||||
}
|
||||
weights := make(map[string]float64) // stores sorting weights by Diktat ID
|
||||
diktats := make([]*utils.APDiktat, 0) // list of diktats which have *template in opts, will be weight sorted later
|
||||
for _, diktat := range aL.aCfg.Diktats {
|
||||
if pass, err := aL.fltrS.Pass(ctx, aL.tnt, diktat.FilterIDs, data); err != nil {
|
||||
return err
|
||||
} else if !pass {
|
||||
continue
|
||||
}
|
||||
weight, err := engine.WeightFromDynamics(ctx, diktat.Weights, aL.fltrS, aL.tnt, data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
weights[diktat.ID] = weight
|
||||
diktats = append(diktats, diktat)
|
||||
}
|
||||
// Sort by weight (higher values first).
|
||||
slices.SortFunc(diktats, func(a, b *utils.APDiktat) int {
|
||||
return cmp.Compare(weights[b.ID], weights[a.ID])
|
||||
})
|
||||
for _, diktat := range diktats {
|
||||
params := strings.Split(utils.IfaceAsString(diktat.Opts[utils.MetaTemplate]),
|
||||
utils.InfieldSep)
|
||||
if len(params) != 13 {
|
||||
return fmt.Errorf("invalid number of parameters <%d> expected 13", len(params))
|
||||
}
|
||||
// parse dynamic parameters
|
||||
for i := range params {
|
||||
if params[i], err = utils.ParseParamForDataProvider(params[i], data, false); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Prepare request arguments based on provided parameters.
|
||||
args := &utils.TrendProfileWithAPIOpts{
|
||||
TrendProfile: &utils.TrendProfile{
|
||||
Tenant: params[0],
|
||||
ID: params[1],
|
||||
Schedule: params[2],
|
||||
StatID: params[3],
|
||||
CorrelationType: params[8],
|
||||
},
|
||||
APIOpts: make(map[string]any),
|
||||
}
|
||||
// populate Trend's Metrics
|
||||
if params[4] != utils.EmptyString {
|
||||
args.Metrics = strings.Split(params[4], utils.ANDSep)
|
||||
}
|
||||
// populate Trend's TTL
|
||||
if params[5] != utils.EmptyString {
|
||||
args.TTL, err = utils.ParseDurationWithNanosecs(params[5])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// populate Trend's QueueLengh
|
||||
if params[6] != utils.EmptyString {
|
||||
args.QueueLength, err = strconv.Atoi(params[6])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// populate Trend's MinItems
|
||||
if params[7] != utils.EmptyString {
|
||||
args.MinItems, err = strconv.Atoi(params[7])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// populate Trend's Tolerance
|
||||
if params[9] != utils.EmptyString {
|
||||
args.Tolerance, err = strconv.ParseFloat(params[9], 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// populate Trend's Stored
|
||||
if params[10] != utils.EmptyString {
|
||||
args.Stored, err = strconv.ParseBool(params[10])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// populate Trend's ThresholdIDs
|
||||
if params[11] != utils.EmptyString {
|
||||
args.ThresholdIDs = strings.Split(params[11], utils.ANDSep)
|
||||
}
|
||||
// populate Trend's APIOpts
|
||||
if params[12] != utils.EmptyString {
|
||||
if err := parseParamStringToMap(params[12], args.APIOpts); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// create the TrendProfile based on the populated parameters
|
||||
var rply string
|
||||
if err = aL.connMgr.Call(ctx, aL.config.ActionSCfg().AdminSConns,
|
||||
utils.AdminSv1SetTrendProfile, args, &rply); err != nil {
|
||||
return err
|
||||
}
|
||||
if blocker, err := engine.BlockerFromDynamics(ctx, diktat.Blockers, aL.fltrS, aL.tnt, data); err != nil {
|
||||
return err
|
||||
} else if blocker {
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -41,6 +41,8 @@ func actionTarget(act string) string {
|
||||
return utils.MetaAttributes
|
||||
case utils.MetaDynamicResource:
|
||||
return utils.MetaResources
|
||||
case utils.MetaDynamicTrend:
|
||||
return utils.MetaTrends
|
||||
default:
|
||||
return utils.MetaNone
|
||||
}
|
||||
@@ -145,6 +147,8 @@ func newActioner(ctx *context.Context, cgrEv *utils.CGREvent, cfg *config.CGRCon
|
||||
return &actDynamicAttribute{cfg, connMgr, fltrS, aCfg, tnt, cgrEv}, nil
|
||||
case utils.MetaDynamicResource:
|
||||
return &actDynamicResource{cfg, connMgr, fltrS, aCfg, tnt, cgrEv}, nil
|
||||
case utils.MetaDynamicTrend:
|
||||
return &actDynamicTrend{cfg, connMgr, fltrS, aCfg, tnt, cgrEv}, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported action type: <%s>", aCfg.Type)
|
||||
|
||||
|
||||
@@ -43,7 +43,7 @@ func TestACExecuteCDRLog(t *testing.T) {
|
||||
|
||||
expectedErr := "unsupported action type: <not_a_type>"
|
||||
if _, err := newActionersFromActions(context.Background(), new(utils.CGREvent), cfg, fltr, dm, nil,
|
||||
actCfg, "cgrates.org"); err == nil || err.Error() != expectedErr {
|
||||
actCfg, utils.CGRateSorg); err == nil || err.Error() != expectedErr {
|
||||
t.Errorf("Expected %+v, received %+v", expectedErr, err)
|
||||
}
|
||||
|
||||
@@ -56,6 +56,10 @@ func TestACExecuteCDRLog(t *testing.T) {
|
||||
{Type: utils.MetaAddBalance},
|
||||
{Type: utils.MetaSetBalance},
|
||||
{Type: utils.MetaRemBalance},
|
||||
{Type: utils.MetaDynamicThreshold},
|
||||
{Type: utils.MetaDynamicStats},
|
||||
{Type: utils.MetaDynamicAttribute},
|
||||
{Type: utils.MetaDynamicResource},
|
||||
}
|
||||
|
||||
actHttp, err := newActHTTPPost(context.Background(), cfg.GeneralCfg().DefaultTenant, new(utils.CGREvent), new(engine.FilterS),
|
||||
@@ -67,15 +71,19 @@ func TestACExecuteCDRLog(t *testing.T) {
|
||||
expectedActs := []actioner{
|
||||
&actCDRLog{cfg, fltr, nil, &utils.APAction{Type: utils.CDRLog}},
|
||||
actHttp,
|
||||
&actExport{"cgrates.org", cfg, nil, &utils.APAction{Type: utils.MetaExport}},
|
||||
&actResetStat{"cgrates.org", cfg, nil, &utils.APAction{Type: utils.MetaResetStatQueue}},
|
||||
&actResetThreshold{"cgrates.org", cfg, nil, &utils.APAction{Type: utils.MetaResetThreshold}},
|
||||
&actSetBalance{cfg, nil, fltr, &utils.APAction{Type: utils.MetaAddBalance}, "cgrates.org", false},
|
||||
&actSetBalance{cfg, nil, fltr, &utils.APAction{Type: utils.MetaSetBalance}, "cgrates.org", true},
|
||||
&actRemBalance{cfg, nil, fltr, &utils.APAction{Type: utils.MetaRemBalance}, "cgrates.org"},
|
||||
&actExport{utils.CGRateSorg, cfg, nil, &utils.APAction{Type: utils.MetaExport}},
|
||||
&actResetStat{utils.CGRateSorg, cfg, nil, &utils.APAction{Type: utils.MetaResetStatQueue}},
|
||||
&actResetThreshold{utils.CGRateSorg, cfg, nil, &utils.APAction{Type: utils.MetaResetThreshold}},
|
||||
&actSetBalance{cfg, nil, fltr, &utils.APAction{Type: utils.MetaAddBalance}, utils.CGRateSorg, false},
|
||||
&actSetBalance{cfg, nil, fltr, &utils.APAction{Type: utils.MetaSetBalance}, utils.CGRateSorg, true},
|
||||
&actRemBalance{cfg, nil, fltr, &utils.APAction{Type: utils.MetaRemBalance}, utils.CGRateSorg},
|
||||
&actDynamicThreshold{cfg, nil, fltr, &utils.APAction{Type: utils.MetaDynamicThreshold}, utils.CGRateSorg, new(utils.CGREvent)},
|
||||
&actDynamicStats{cfg, nil, fltr, &utils.APAction{Type: utils.MetaDynamicStats}, utils.CGRateSorg, new(utils.CGREvent)},
|
||||
&actDynamicAttribute{cfg, nil, fltr, &utils.APAction{Type: utils.MetaDynamicAttribute}, utils.CGRateSorg, new(utils.CGREvent)},
|
||||
&actDynamicResource{cfg, nil, fltr, &utils.APAction{Type: utils.MetaDynamicResource}, utils.CGRateSorg, new(utils.CGREvent)},
|
||||
}
|
||||
|
||||
acts, err := newActionersFromActions(context.Background(), new(utils.CGREvent), cfg, fltr, dm, nil, actCfg, "cgrates.org")
|
||||
acts, err := newActionersFromActions(context.Background(), new(utils.CGREvent), cfg, fltr, dm, nil, actCfg, utils.CGRateSorg)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(acts, expectedActs) {
|
||||
@@ -98,7 +106,7 @@ func TestACExecuteScheduledAction(t *testing.T) {
|
||||
utils.LogLevelCfg: 7,
|
||||
}
|
||||
|
||||
schedActs := newScheduledActs(nil, "cgrates.org", "FirstAction",
|
||||
schedActs := newScheduledActs(nil, utils.CGRateSorg, "FirstAction",
|
||||
utils.EmptyString, utils.MetaTopUp, utils.MetaNow,
|
||||
dataStorage, acts)
|
||||
|
||||
|
||||
@@ -258,6 +258,7 @@ func TestDynThdIT(t *testing.T) {
|
||||
utils.MetaStats: {"someID": {}},
|
||||
utils.MetaAttributes: {"someID": {}},
|
||||
utils.MetaResources: {"someID": {}},
|
||||
utils.MetaTrends: {"someID": {}},
|
||||
},
|
||||
Actions: []*utils.APAction{
|
||||
{
|
||||
@@ -488,6 +489,63 @@ func TestDynThdIT(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "Dynamic_Trend_ID",
|
||||
Type: utils.MetaDynamicTrend,
|
||||
Diktats: []*utils.APDiktat{
|
||||
{
|
||||
ID: "CreateDynamicTrend1002",
|
||||
FilterIDs: []string{"*string:~*req.Account:1002"},
|
||||
Opts: map[string]any{
|
||||
"*template": "*tenant;DYNAMICLY_TRND_<~*req.Account>;@every 1s;Stats1_1;*acc&*tcc;-1;-1;1;*last;1;true;THID1&THID2;~*opts",
|
||||
},
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 50,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "CreateDynamicTrend1002NotFoundFilter",
|
||||
FilterIDs: []string{"*string:~*req.Account:1003"},
|
||||
Opts: map[string]any{
|
||||
"*template": "*tenant;DYNAMICLY_TRND_2_<~*req.Account>;@every 1s;Stats1_1;*acc&*tcc;-1;-1;1;*last;1;true;THID1&THID2;~*opts",
|
||||
},
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 90,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "CreateDynamicTrend1002Blocker",
|
||||
Opts: map[string]any{
|
||||
"*template": "*tenant;DYNAMICLY_TRND_3_<~*req.Account>;@every 1s;Stats1_1;*acc&*tcc;-1;-1;1;*last;1;true;THID1&THID2;~*opts",
|
||||
},
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 20,
|
||||
},
|
||||
},
|
||||
Blockers: utils.DynamicBlockers{
|
||||
{
|
||||
Blocker: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "CreateDynamicTrend1002Blocked",
|
||||
Opts: map[string]any{
|
||||
"*template": "*tenant;DYNAMICLY_TRND_4_<~*req.Account>;@every 1s;Stats1_1;*acc&*tcc;-1;-1;1;*last;1;true;THID1&THID2;~*opts",
|
||||
},
|
||||
Weights: utils.DynamicWeights{
|
||||
{
|
||||
Weight: 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -844,7 +902,7 @@ func TestDynThdIT(t *testing.T) {
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(exp, attrs) {
|
||||
t.Errorf("Expected <%v>\nReceived <%v>", utils.ToJSON(attrs), utils.ToJSON(exp))
|
||||
t.Errorf("Expected <%v>\nReceived <%v>", utils.ToJSON(exp), utils.ToJSON(attrs))
|
||||
}
|
||||
})
|
||||
|
||||
@@ -900,7 +958,58 @@ func TestDynThdIT(t *testing.T) {
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(exp, rsc) {
|
||||
t.Errorf("Expected <%v>\nReceived <%v>", utils.ToJSON(rsc), utils.ToJSON(exp))
|
||||
t.Errorf("Expected <%v>\nReceived <%v>", utils.ToJSON(exp), utils.ToJSON(rsc))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("GetDynamicTrendProfile", func(t *testing.T) {
|
||||
var rcv []*utils.TrendProfile
|
||||
if err := client.Call(context.Background(), utils.AdminSv1GetTrendProfiles,
|
||||
&utils.ArgsItemIDs{
|
||||
Tenant: utils.CGRateSorg,
|
||||
}, &rcv); err != nil {
|
||||
t.Errorf("AdminSv1GetTrendProfiles failed unexpectedly: %v", err)
|
||||
}
|
||||
if len(rcv) != 2 {
|
||||
t.Fatalf("AdminSv1GetTrendProfiles len(rcv)=%v, want 2", len(rcv))
|
||||
}
|
||||
sort.Slice(rcv, func(i, j int) bool {
|
||||
return rcv[i].ID > rcv[j].ID
|
||||
})
|
||||
exp := []*utils.TrendProfile{
|
||||
{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "DYNAMICLY_TRND_3_1002",
|
||||
Schedule: "@every 1s",
|
||||
StatID: "Stats1_1",
|
||||
Metrics: []string{"*acc", "*tcc"},
|
||||
TTL: -1,
|
||||
QueueLength: -1,
|
||||
MinItems: 1,
|
||||
CorrelationType: "*last",
|
||||
Tolerance: 1,
|
||||
Stored: true,
|
||||
ThresholdIDs: []string{"THID1", "THID2"},
|
||||
},
|
||||
{
|
||||
|
||||
Tenant: "cgrates.org",
|
||||
ID: "DYNAMICLY_TRND_1002",
|
||||
Schedule: "@every 1s",
|
||||
StatID: "Stats1_1",
|
||||
Metrics: []string{"*acc", "*tcc"},
|
||||
TTL: -1,
|
||||
QueueLength: -1,
|
||||
MinItems: 1,
|
||||
CorrelationType: "*last",
|
||||
Tolerance: 1,
|
||||
Stored: true,
|
||||
ThresholdIDs: []string{"THID1", "THID2"},
|
||||
},
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(exp, rcv) {
|
||||
t.Errorf("Expected <%v>\nReceived <%v>", utils.ToJSON(exp), utils.ToJSON(rcv))
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
@@ -1150,6 +1150,7 @@ const (
|
||||
MetaDynamicStats = "*dynamic_stats"
|
||||
MetaDynamicAttribute = "*dynamic_attribute"
|
||||
MetaDynamicResource = "*dynamic_resource"
|
||||
MetaDynamicTrend = "*dynamicTrend"
|
||||
|
||||
// Diktats Opts Fields
|
||||
MetaBalancePath = "*balancePath"
|
||||
|
||||
Reference in New Issue
Block a user