Attemtps config change

This commit is contained in:
adi
2022-07-22 10:27:18 +03:00
committed by Dan Christian Bogos
parent 7882f1530b
commit 86d2f1476a
21 changed files with 101 additions and 57 deletions

View File

@@ -219,7 +219,7 @@ func (aS *ActionS) scheduledActions(ctx *context.Context, tnt string, cgrEv *uti
trgActs := map[string][]actioner{} // build here the list of actioners based on the trgKey
var partExec bool
for _, aCfg := range aPf.Actions { // create actioners and attach them to the right target
if act, errAct := newActioner(aS.cfg, aS.fltrS, aS.dm, aS.connMgr, aCfg, tnt); errAct != nil {
if act, errAct := newActioner(ctx, cgrEv, aS.cfg, aS.fltrS, aS.dm, aS.connMgr, aCfg, tnt); errAct != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> ignoring ActionProfile with id: <%s:%s> creating action: <%s>, error: <%s>",

View File

@@ -30,17 +30,23 @@ import (
"github.com/cgrates/cgrates/utils"
)
func newActHTTPPost(cfg *config.CGRConfig, aCfg *engine.APAction) (aL *actHTTPPost) {
func newActHTTPPost(ctx *context.Context, tnt string, cgrEv *utils.CGREvent,
fltrS *engine.FilterS, cfg *config.CGRConfig, aCfg *engine.APAction) (aL *actHTTPPost, err error) {
aL = &actHTTPPost{
config: cfg,
aCfg: aCfg,
pstrs: make([]*ees.HTTPjsonMapEE, len(aCfg.Diktats)),
}
for i, actD := range aL.cfg().Diktats {
attempts, err := engine.GetIntOpts(ctx, tnt, cgrEv, fltrS, cfg.ActionSCfg().Opts.PosterAttempts,
1, utils.MetaPosterAttempts)
if err != nil {
return nil, err
}
aL.pstrs[i], _ = ees.NewHTTPjsonMapEE(&config.EventExporterCfg{
ID: aL.id(),
ExportPath: actD.Path,
Attempts: cfg.EEsCfg().GetDefaultExporter().Attempts,
Attempts: attempts,
FailedPostsDir: cfg.EEsCfg().GetDefaultExporter().FailedPostsDir,
Opts: &config.EventExporterOpts{},
}, cfg, nil, nil)

View File

@@ -43,7 +43,11 @@ func TestACHTTPPostExecute(t *testing.T) {
},
},
}
http := newActHTTPPost(cfg, apAction)
http, err := newActHTTPPost(context.Background(), "cgrates.org", new(utils.CGREvent),
new(engine.FilterS), cfg, apAction)
if err != nil {
t.Error(err)
}
dataStorage := utils.MapStorage{
utils.MetaReq: map[string]interface{}{
@@ -102,7 +106,11 @@ func TestACHTTPPostValues(t *testing.T) {
},
},
}
http := newActHTTPPost(cfg, apAction)
http, err := newActHTTPPost(context.Background(), "cgrates.org", new(utils.CGREvent),
new(engine.FilterS), cfg, apAction)
if err != nil {
t.Error(err)
}
dataStorage := utils.MapStorage{
utils.MetaReq: map[string]interface{}{
utils.AccountField: 1003,

View File

@@ -99,11 +99,11 @@ func (s *scheduledActs) postExec() (err error) {
}
// newActionersFromActions constructs multiple actioners out of APAction configurations
func newActionersFromActions(cfg *config.CGRConfig, fltrS *engine.FilterS, dm *engine.DataManager,
func newActionersFromActions(ctx *context.Context, cgrEv *utils.CGREvent, cfg *config.CGRConfig, fltrS *engine.FilterS, dm *engine.DataManager,
connMgr *engine.ConnManager, aCfgs []*engine.APAction, tnt string) (acts []actioner, err error) {
acts = make([]actioner, len(aCfgs))
for i, aCfg := range aCfgs {
if acts[i], err = newActioner(cfg, fltrS, dm, connMgr, aCfg, tnt); err != nil {
if acts[i], err = newActioner(ctx, cgrEv, cfg, fltrS, dm, connMgr, aCfg, tnt); err != nil {
return nil, err
}
}
@@ -111,7 +111,7 @@ func newActionersFromActions(cfg *config.CGRConfig, fltrS *engine.FilterS, dm *e
}
// newAction is the constructor to create actioner
func newActioner(cfg *config.CGRConfig, fltrS *engine.FilterS, dm *engine.DataManager,
func newActioner(ctx *context.Context, cgrEv *utils.CGREvent, cfg *config.CGRConfig, fltrS *engine.FilterS, dm *engine.DataManager,
connMgr *engine.ConnManager, aCfg *engine.APAction, tnt string) (act actioner, err error) {
switch aCfg.Type {
case utils.MetaLog:
@@ -119,7 +119,7 @@ func newActioner(cfg *config.CGRConfig, fltrS *engine.FilterS, dm *engine.DataMa
case utils.CDRLog:
return &actCDRLog{cfg, fltrS, connMgr, aCfg}, nil
case utils.MetaHTTPPost:
return newActHTTPPost(cfg, aCfg), nil
return newActHTTPPost(ctx, tnt, cgrEv, fltrS, cfg, aCfg)
case utils.MetaExport:
return &actExport{tnt, cfg, connMgr, aCfg}, nil
case utils.MetaResetStatQueue:

View File

@@ -24,6 +24,7 @@ import (
"strings"
"testing"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cgrates/config"
@@ -41,7 +42,7 @@ func TestACExecuteCDRLog(t *testing.T) {
}
expectedErr := "unsupported action type: <not_a_type>"
if _, err := newActionersFromActions(cfg, fltr, dm, nil,
if _, err := newActionersFromActions(context.Background(), new(utils.CGREvent), cfg, fltr, dm, nil,
actCfg, "cgrates.org"); err == nil || err.Error() != expectedErr {
t.Errorf("Expected %+v, received %+v", expectedErr, err)
}
@@ -57,9 +58,15 @@ func TestACExecuteCDRLog(t *testing.T) {
{Type: utils.MetaRemBalance},
}
actHttp, err := newActHTTPPost(context.Background(), cfg.GeneralCfg().DefaultTenant, new(utils.CGREvent), new(engine.FilterS),
cfg, &engine.APAction{Type: utils.MetaHTTPPost})
if err != nil {
t.Error(err)
}
expectedActs := []actioner{
&actCDRLog{cfg, fltr, nil, &engine.APAction{Type: utils.CDRLog}},
newActHTTPPost(cfg, &engine.APAction{Type: utils.MetaHTTPPost}),
actHttp,
&actExport{"cgrates.org", cfg, nil, &engine.APAction{Type: utils.MetaExport}},
&actResetStat{"cgrates.org", cfg, nil, &engine.APAction{Type: utils.MetaResetStatQueue}},
&actResetThreshold{"cgrates.org", cfg, nil, &engine.APAction{Type: utils.MetaResetThreshold}},
@@ -68,7 +75,7 @@ func TestACExecuteCDRLog(t *testing.T) {
&actRemBalance{cfg, nil, &engine.APAction{Type: utils.MetaRemBalance}, "cgrates.org"},
}
acts, err := newActionersFromActions(cfg, fltr, dm, nil, actCfg, "cgrates.org")
acts, err := newActionersFromActions(context.Background(), new(utils.CGREvent), cfg, fltr, dm, nil, actCfg, "cgrates.org")
if err != nil {
t.Error(err)
} else if !reflect.DeepEqual(acts, expectedActs) {

View File

@@ -30,6 +30,7 @@ const ActionsProfileIgnoreFiltersDftOpt = false
type ActionsOpts struct {
ProfileIDs []*utils.DynamicStringSliceOpt
ProfileIgnoreFilters []*utils.DynamicBoolOpt
PosterAttempts []*utils.DynamicIntOpt
}
// ActionSCfg is the configuration of ActionS
@@ -71,6 +72,9 @@ func (actOpts *ActionsOpts) loadFromJSONCfg(jsnCfg *ActionsOptsJson) {
if jsnCfg.ProfileIgnoreFilters != nil {
actOpts.ProfileIgnoreFilters = append(actOpts.ProfileIgnoreFilters, jsnCfg.ProfileIgnoreFilters...)
}
if jsnCfg.PosterAttempts != nil {
actOpts.PosterAttempts = append(actOpts.PosterAttempts, jsnCfg.PosterAttempts...)
}
}
func (acS *ActionSCfg) loadFromJSONCfg(jsnCfg *ActionSJsonCfg) (err error) {
@@ -134,6 +138,7 @@ func (acS ActionSCfg) AsMapInterface(string) interface{} {
opts := map[string]interface{}{
utils.MetaProfileIDs: acS.Opts.ProfileIDs,
utils.MetaProfileIgnoreFilters: acS.Opts.ProfileIgnoreFilters,
utils.MetaPosterAttempts: acS.Opts.PosterAttempts,
}
mp := map[string]interface{}{
utils.EnabledCfg: acS.Enabled,
@@ -190,9 +195,14 @@ func (actOpts *ActionsOpts) Clone() *ActionsOpts {
if actOpts.ProfileIgnoreFilters != nil {
profileIgnoreFilters = utils.CloneDynamicBoolOpt(actOpts.ProfileIgnoreFilters)
}
var posterAttempts []*utils.DynamicIntOpt
if actOpts.PosterAttempts != nil {
posterAttempts = utils.CloneDynamicIntOpt(actOpts.PosterAttempts)
}
return &ActionsOpts{
ProfileIDs: actPrfIDs,
ProfileIgnoreFilters: profileIgnoreFilters,
PosterAttempts: posterAttempts,
}
}
@@ -247,6 +257,7 @@ func (acS ActionSCfg) Clone() (cln *ActionSCfg) {
type ActionsOptsJson struct {
ProfileIDs []*utils.DynamicStringSliceOpt `json:"*profileIDs"`
ProfileIgnoreFilters []*utils.DynamicBoolOpt `json:"*profileIgnoreFilters"`
PosterAttempts []*utils.DynamicIntOpt `json:"*posterAttempts"`
}
// Action service config section
@@ -279,6 +290,9 @@ func diffActionsOptsJsonCfg(d *ActionsOptsJson, v1, v2 *ActionsOpts) *ActionsOpt
if !utils.DynamicBoolOptEqual(v1.ProfileIgnoreFilters, v2.ProfileIgnoreFilters) {
d.ProfileIgnoreFilters = v2.ProfileIgnoreFilters
}
if !utils.DynamicIntOptEqual(v1.PosterAttempts, v2.PosterAttempts) {
d.PosterAttempts = v2.PosterAttempts
}
return d
}

View File

@@ -62,6 +62,7 @@ func TestActionSCfgLoadFromJSONCfg(t *testing.T) {
Opts: &ActionsOpts{
ProfileIDs: []*utils.DynamicStringSliceOpt{},
ProfileIgnoreFilters: []*utils.DynamicBoolOpt{},
PosterAttempts: []*utils.DynamicIntOpt{},
},
}
jsnCfg := NewDefaultCGRConfig()
@@ -125,7 +126,7 @@ func TestActionSCfgAsMapInterface(t *testing.T) {
"exists_indexed_fields": ["*req.index1","*req.index2"],
"notexists_indexed_fields": ["*req.index1"],
"nested_fields": true,
"DynaprepaidActionProfile": [],
"dynaprepaid_actionprofile": [],
},
}`
@@ -148,6 +149,7 @@ func TestActionSCfgAsMapInterface(t *testing.T) {
utils.OptsCfg: map[string]interface{}{
utils.MetaProfileIDs: []*utils.DynamicStringSliceOpt{},
utils.MetaProfileIgnoreFilters: []*utils.DynamicBoolOpt{},
utils.MetaPosterAttempts: []*utils.DynamicIntOpt{},
},
}
if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil {

View File

@@ -238,6 +238,7 @@ func newCGRConfig(config []byte) (cfg *CGRConfig, err error) {
actionSCfg: &ActionSCfg{Opts: &ActionsOpts{
ProfileIDs: []*utils.DynamicStringSliceOpt{},
ProfileIgnoreFilters: []*utils.DynamicBoolOpt{},
PosterAttempts: []*utils.DynamicIntOpt{},
}},
sipAgentCfg: new(SIPAgentCfg),
configSCfg: new(ConfigSCfg),

View File

@@ -72,7 +72,7 @@ const CGRATES_CFG_JSON = `
"opts": {
"kafka_conn": "", // the connection trough kafka
"kafka_topic": "", // the topic from where the events are exported
"attempts": 1, // number of attempts of connecting
"kafka_attempts": 1, // number of attempts of connecting
"failed_posts_dir": "/var/spool/cgrates/failed_posts" // path where fail logs are exported
},
},
@@ -1752,7 +1752,7 @@ const CGRATES_CFG_JSON = `
"notexists_indexed_fields": [], // query indexes based on these fields for faster processing
"nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level)
"dynaprepaid_actionprofile": [], //
"opts":{ //
"opts":{
"*profileIDs": [
// {
// "Tenant": "*any",
@@ -1766,6 +1766,13 @@ const CGRATES_CFG_JSON = `
// "FilterIDs": [],
// "Value": false,
// },
],
"*posterAttempts": [ // poster attempts for HTTPPost action type
// {
// "Tenant": "*any",
// "FilterIDs": [],
// "Value": 1,
// },
],
},
},

View File

@@ -2418,6 +2418,7 @@ func TestDfActionSJsonCfg(t *testing.T) {
Opts: &ActionsOptsJson{
ProfileIDs: []*utils.DynamicStringSliceOpt{},
ProfileIgnoreFilters: []*utils.DynamicBoolOpt{},
PosterAttempts: []*utils.DynamicIntOpt{},
},
}
dfCgrJSONCfg, err := NewCgrJsonCfgFromBytes([]byte(CGRATES_CFG_JSON))

File diff suppressed because one or more lines are too long

View File

@@ -75,7 +75,7 @@ func (loggCfg *LoggerCfg) AsMapInterface(string) interface{} {
type LoggerOptsCfg struct {
KafkaConn string
KafkaTopic string
Attempts int
KafkaAttempts int
FailedPostsDir string
}
@@ -106,8 +106,8 @@ func (loggOpts *LoggerOptsCfg) loadFromJSONCfg(jsnCfg *LoggerOptsJson) {
if jsnCfg.Kafka_topic != nil {
loggOpts.KafkaTopic = *jsnCfg.Kafka_topic
}
if jsnCfg.Attempts != nil {
loggOpts.Attempts = *jsnCfg.Attempts
if jsnCfg.Kafka_attempts != nil {
loggOpts.KafkaAttempts = *jsnCfg.Kafka_attempts
}
if jsnCfg.Failed_posts_dir != nil {
loggOpts.FailedPostsDir = *jsnCfg.Failed_posts_dir
@@ -119,7 +119,7 @@ func (loggOpts *LoggerOptsCfg) AsMapInterface() interface{} {
return map[string]interface{}{
utils.KafkaConnCfg: loggOpts.KafkaConn,
utils.KafkaTopicCfg: loggOpts.KafkaTopic,
utils.AttemptsCfg: loggOpts.Attempts,
utils.KafkaAttemptsCfg: loggOpts.KafkaAttempts,
utils.FailedPostsDirCfg: loggOpts.FailedPostsDir,
}
}
@@ -132,7 +132,7 @@ func (loggerOpts *LoggerOptsCfg) Clone() *LoggerOptsCfg {
return &LoggerOptsCfg{
KafkaConn: loggerOpts.KafkaConn,
KafkaTopic: loggerOpts.KafkaTopic,
Attempts: loggerOpts.Attempts,
KafkaAttempts: loggerOpts.KafkaAttempts,
FailedPostsDir: loggerOpts.FailedPostsDir,
}
}
@@ -147,7 +147,7 @@ type LoggerJsonCfg struct {
type LoggerOptsJson struct {
Kafka_conn *string `json:"kafka_conn"`
Kafka_topic *string `json:"kafka_topic"`
Attempts *int `json:"attempts"`
Kafka_attempts *int `json:"kafka_attempts"`
Failed_posts_dir *string `json:"failed_posts_dir"`
}
@@ -178,8 +178,8 @@ func diffLoggerOptsJsonCfg(d *LoggerOptsJson, v1, v2 *LoggerOptsCfg) *LoggerOpts
if v1.KafkaTopic != v2.KafkaTopic {
d.Kafka_topic = utils.StringPointer(v2.KafkaTopic)
}
if v1.Attempts != v2.Attempts {
d.Attempts = utils.IntPointer(v2.Attempts)
if v1.KafkaAttempts != v2.KafkaAttempts {
d.Kafka_attempts = utils.IntPointer(v2.KafkaAttempts)
}
if v1.FailedPostsDir != v2.FailedPostsDir {
d.Failed_posts_dir = utils.StringPointer(v2.FailedPostsDir)

View File

@@ -11,7 +11,7 @@
"opts": {
"kafka_conn": "ldasdas:9092", // the connection trough kafka
"kafka_topic": "TutorialTopic", // the topic from where the events are exported
"attempts": 1, // number of attempts of connecting
"kafka_attempts": 1, // number of attempts of connecting
},
},
*/

View File

@@ -259,10 +259,9 @@ func (rdr *AMQPER) createPoster() {
processedOpt = new(config.EventExporterOpts)
}
rdr.poster = ees.NewAMQPee(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
Opts: processedOpt,
FailedPostsDir: rdr.cgrCfg.EEsCfg().GetDefaultExporter().FailedPostsDir,
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
Opts: processedOpt,
}, nil)
}

View File

@@ -211,10 +211,9 @@ func (rdr *AMQPv1ER) createPoster() {
processedOpt = new(config.EventExporterOpts)
}
rdr.poster = ees.NewAMQPv1EE(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
Opts: processedOpt,
FailedPostsDir: rdr.cgrCfg.EEsCfg().GetDefaultExporter().FailedPostsDir,
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
Opts: processedOpt,
}, nil)
}

View File

@@ -212,10 +212,9 @@ func (rdr *KafkaER) createPoster() {
processedOpt = new(config.EventExporterOpts)
}
rdr.poster = ees.NewKafkaEE(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
Opts: processedOpt,
FailedPostsDir: rdr.cgrCfg.EEsCfg().GetDefaultExporter().FailedPostsDir,
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
Opts: processedOpt,
}, nil)
}

View File

@@ -199,9 +199,8 @@ func (rdr *NatsER) createPoster() (err error) {
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(
rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Opts: processedOpt,
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
FailedPostsDir: rdr.cgrCfg.EEsCfg().GetDefaultExporter().FailedPostsDir,
Opts: processedOpt,
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
}, rdr.cgrCfg.GeneralCfg().NodeID,
rdr.cgrCfg.GeneralCfg().ConnectTimeout, nil)
return

View File

@@ -197,11 +197,10 @@ func (rdr *S3ER) createPoster() {
processedOpt = new(config.EventExporterOpts)
}
rdr.poster = ees.NewS3EE(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
Opts: processedOpt,
FailedPostsDir: rdr.cgrCfg.EEsCfg().GetDefaultExporter().FailedPostsDir,
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
Opts: processedOpt,
}, nil)
}

View File

@@ -226,11 +226,10 @@ func (rdr *SQSER) createPoster() {
processedOpt = new(config.EventExporterOpts)
}
rdr.poster = ees.NewSQSee(&config.EventExporterCfg{
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
Opts: processedOpt,
FailedPostsDir: rdr.cgrCfg.EEsCfg().GetDefaultExporter().FailedPostsDir,
ID: rdr.Config().ID,
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.EEsCfg().GetDefaultExporter().Attempts,
Opts: processedOpt,
}, nil)
}

View File

@@ -358,7 +358,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags
cgr.cfg.GeneralCfg().DefaultTenant,
cgr.cfg.GeneralCfg().NodeID,
cgr.cfg.LoggerCfg().Level,
cgr.cfg.LoggerCfg().Opts.Attempts,
cgr.cfg.LoggerCfg().Opts.KafkaAttempts,
cgr.cfg.LoggerCfg().Opts.KafkaConn,
cgr.cfg.LoggerCfg().Opts.KafkaTopic,
cgr.cfg.LoggerCfg().Opts.FailedPostsDir); err != nil {

View File

@@ -1743,9 +1743,10 @@ const (
)
const (
LevelCfg = "level"
KafkaConnCfg = "kafka_conn"
KafkaTopicCfg = "kafka_topic"
LevelCfg = "level"
KafkaConnCfg = "kafka_conn"
KafkaTopicCfg = "kafka_topic"
KafkaAttemptsCfg = "kafka_attempts"
)
const (
@@ -2357,6 +2358,7 @@ const (
EventType = "EventType"
SchedulerInit = "SchedulerInit"
MetaProfileIgnoreFilters = "*profileIgnoreFilters"
MetaPosterAttempts = "*posterAttempts"
RemoteHostOpt = "*rmtHost"
MetaCache = "*cache"