mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add reconnects and max_reconnect_interval config options for ers
They are separate for each configured reader. Additional changes: - rearrange config_defaults fields for ers/ees; - add comment for RunDelay config option inside struct definition; - improve comments for amqp opts in config_defaults.
This commit is contained in:
committed by
Dan Christian Bogos
parent
a30e261260
commit
f696164177
@@ -370,6 +370,12 @@ const CGRATES_CFG_JSON = `
|
||||
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
|
||||
"source_path": "/var/spool/cgrates/ers/in", // read data from this path
|
||||
"processed_path": "/var/spool/cgrates/ers/out", // move processed data here
|
||||
"tenant": "", // tenant used by import
|
||||
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
"filters": [], // limit parsing based on the filters
|
||||
"flags": [], // flags to influence the event processing
|
||||
"reconnects": -1, // number of retries in case of connection lost
|
||||
"max_reconnect_interval": "5m", // time to wait in between reconnect attempts
|
||||
"opts": {
|
||||
// Partial
|
||||
// "partialPath": "/", // the path were the partial events will be sent
|
||||
@@ -385,27 +391,26 @@ const CGRATES_CFG_JSON = `
|
||||
|
||||
// FileXML
|
||||
// "xmlRootPath": "", // path towards one event in case of XML CDRs
|
||||
|
||||
|
||||
// AMQP and AMQPv1
|
||||
// "amqpQueueID": "cgrates_cdrs", // the queue id for AMQP and AMQPv1 readers from were the events are read
|
||||
// "amqpQueueIDProcessed": "", // the queue id for AMQP and AMQPv1 readers were the events are sent after they are processed
|
||||
// "amqpQueueID": "cgrates_cdrs", // identifier for the primary queue where messages are consumed (0.9.1/1.0)
|
||||
// "amqpQueueIDProcessed": "", // identifier for the queue where processed events are sent (0.9.1/1.0)
|
||||
|
||||
// "amqpUsername": "", // username for SASL PLAIN auth, exclusive to AMQP 1.0, often representing the policy name
|
||||
// "amqpPassword": "", // password for authentication, exclusive to AMQP 1.0
|
||||
|
||||
// "amqpUsername": "", // amqp 1.0 exclusive, used for SASL PLAIN auth, usually represents the policy name
|
||||
// "amqpPassword": "", // amqp 1.0 exclusive, used for SASL PLAIN auth, populated with one of its policy's keys
|
||||
|
||||
// "amqpUsernameProcessed": "",
|
||||
// "amqpPasswordProcessed": "",
|
||||
|
||||
// "amqpConsumerTag": "cgrates", // the ID of the consumer, amqp 0.9.1 exclusive
|
||||
// "amqpExchange": "", // amqp 0.9.1 exclusive
|
||||
// "amqpExchangeType": "", // amqp 0.9.1 exclusive
|
||||
// "amqpRoutingKey": "", // amqp 0.9.1 exclusive
|
||||
|
||||
// "amqpExchangeProcessed": "",
|
||||
// "amqpExchangeTypeProcessed": "",
|
||||
// "amqpRoutingKeyProcessed": "",
|
||||
// "amqpUsernameProcessed": "", // username for authentication related to processed messages queue
|
||||
// "amqpPasswordProcessed": "", // password for authentication related to processed messages queue
|
||||
|
||||
// "amqpConsumerTag": "cgrates", // unique tag for the consumer, useful for message tracking and consumer management (0.9.1)
|
||||
// "amqpExchange": "", // name of the primary exchange where messages will be published (0.9.1)
|
||||
// "amqpExchangeType": "", // type of the primary exchange (direct, topic, fanout, headers) (0.9.1)
|
||||
// "amqpRoutingKey": "", // key used for routing messages to the primary queue (0.9.1)
|
||||
|
||||
// "amqpExchangeProcessed": "", // name of the exchange where processed messages will be published
|
||||
// "amqpExchangeTypeProcessed": "", // type of the exchange for processed messages
|
||||
// "amqpRoutingKeyProcessed": "", // key used for routing processed messages
|
||||
|
||||
// Kafka
|
||||
// "kafkaTopic": "cgrates", // the topic from were the events are read
|
||||
// "kafkaGroupID": "cgrates", // the group that reads the events
|
||||
@@ -421,13 +426,13 @@ const CGRATES_CFG_JSON = `
|
||||
// "sqlDBNameProcessed": "", // the name of the database were the events are sent after they are processed
|
||||
// "sqlTableNameProcessed": "", // the name of the table were the events are sent after they are processed
|
||||
// "pgSSLModeProcessed": "", // the ssl mode for postgres db
|
||||
|
||||
|
||||
// SQS and S3
|
||||
// "awsRegion": "",
|
||||
// "awsKey": "",
|
||||
// "awsSecret": "",
|
||||
// "awsToken": "",
|
||||
|
||||
|
||||
// "awsRegionProcessed": "",
|
||||
// "awsKeyProcessed": "",
|
||||
// "awsSecretProcessed": "",
|
||||
@@ -435,9 +440,9 @@ const CGRATES_CFG_JSON = `
|
||||
|
||||
// SQS
|
||||
// "sqsQueueID": "cgrates_cdrs", // the queue id for SQS readers from were the events are read
|
||||
|
||||
|
||||
// "sqsQueueIDProcessed": "", // the queue id for SQS readers were the events are sent after they are processed
|
||||
|
||||
|
||||
// S3
|
||||
// "s3BucketID": "cgrates_cdrs", // the bucket id for S3 readers from were the events are read
|
||||
// "s3FolderPathProcessed": "", // only for S3 event posting
|
||||
@@ -466,10 +471,6 @@ const CGRATES_CFG_JSON = `
|
||||
// "natsClientKeyProcessed": "", // the path to a client key( used by tls)
|
||||
// "natsJetStreamMaxWaitProcessed": "5s", // the maximum amount of time to wait for a response
|
||||
},
|
||||
"tenant": "", // tenant used by import
|
||||
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
"filters": [], // limit parsing based on the filters
|
||||
"flags": [], // flags to influence the event processing
|
||||
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.2", "mandatory": true},
|
||||
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.3", "mandatory": true},
|
||||
@@ -488,8 +489,7 @@ const CGRATES_CFG_JSON = `
|
||||
},
|
||||
],
|
||||
},
|
||||
|
||||
|
||||
|
||||
"ees": { // EventExporterService
|
||||
"enabled": false, // starts the EventReader service: <true|false>
|
||||
"attributes_conns":[], // RPC Connections IDs
|
||||
@@ -498,10 +498,18 @@ const CGRATES_CFG_JSON = `
|
||||
},
|
||||
"exporters": [
|
||||
{
|
||||
"id": "*default", // identifier of the EventReader profile
|
||||
"type": "*none", // exporter type
|
||||
"export_path": "/var/spool/cgrates/ees", // path where the exported events will be placed
|
||||
"concurrent_requests": 0, // maximum simultaneous requests to process, 0 for unlimited
|
||||
"id": "*default", // identifier of the EventReader profile
|
||||
"type": "*none", // exporter type
|
||||
"export_path": "/var/spool/cgrates/ees", // path where the exported events will be placed
|
||||
"failed_posts_dir": "/var/spool/cgrates/failed_posts", // directory path where we store failed requests
|
||||
"concurrent_requests": 0, // maximum simultaneous requests to process, 0 for unlimited
|
||||
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
"filters": [], // limit parsing based on the filters
|
||||
"flags": [], // flags to influence the event processing
|
||||
"attribute_ids": [], // select Attribute profiles instead of discovering them
|
||||
"attribute_context": "", // context used to discover matching Attribute profiles
|
||||
"synchronous": false, // block processing until export has a result
|
||||
"attempts": 1, // export attempts
|
||||
"opts": {
|
||||
|
||||
// CSV
|
||||
@@ -509,7 +517,7 @@ const CGRATES_CFG_JSON = `
|
||||
|
||||
|
||||
// Elasticsearch options
|
||||
// "elsCloud":true, //ExportPath will be an CLoud ID deployment
|
||||
// "elsCloud":true, // ExportPath will be an CLoud ID deployment
|
||||
// "elsApiKey": "", // Base64-encoded token for authorization; if set, overrides username/password and service token.
|
||||
// "elsUsername":"", // Username for HTTP Basic Authentication.
|
||||
// "elsPassword":"", // Password for HTTP Basic Authentication.
|
||||
@@ -598,17 +606,9 @@ const CGRATES_CFG_JSON = `
|
||||
// "rpcReplyTimeout":"2s", // connection down at replies if taking longer that this value
|
||||
// "rpcAPIOpts": {},
|
||||
}, // extra options for exporter
|
||||
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
"filters": [], // limit parsing based on the filters
|
||||
"flags": [], // flags to influence the event processing <*attributes|*log>
|
||||
"attribute_ids": [], // select Attribute profiles instead of discovering them
|
||||
"attribute_context": "", // context used to discover matching Attribute profiles
|
||||
"synchronous": false, // block processing until export has a result
|
||||
"attempts": 1, // export attempts
|
||||
"fields":[], // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"failed_posts_dir": "/var/spool/cgrates/failed_posts", // directory path where we store failed requests
|
||||
},
|
||||
],
|
||||
"fields":[] // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -445,20 +445,28 @@ type EventReaderOpts struct {
|
||||
|
||||
// EventReaderCfg the event for the Event Reader
|
||||
type EventReaderCfg struct {
|
||||
ID string
|
||||
Type string
|
||||
RunDelay time.Duration
|
||||
ConcurrentReqs int
|
||||
SourcePath string
|
||||
ProcessedPath string
|
||||
Opts *EventReaderOpts
|
||||
Tenant RSRParsers
|
||||
Timezone string
|
||||
Filters []string
|
||||
Flags utils.FlagsWithParams
|
||||
Fields []*FCTemplate
|
||||
PartialCommitFields []*FCTemplate
|
||||
CacheDumpFields []*FCTemplate
|
||||
ID string
|
||||
Type string
|
||||
|
||||
// RunDelay determines how the Serve method initiates the reading process.
|
||||
// - A value of 0 disables automatic reading, allowing manual control, possibly through an API.
|
||||
// - A value of -1 enables watching directory changes indefinitely, applicable for file-based readers.
|
||||
// - Any positive duration sets a fixed time interval for automatic reading cycles.
|
||||
RunDelay time.Duration
|
||||
|
||||
ConcurrentReqs int
|
||||
SourcePath string
|
||||
ProcessedPath string
|
||||
Tenant RSRParsers
|
||||
Timezone string
|
||||
Filters []string
|
||||
Flags utils.FlagsWithParams
|
||||
Reconnects int
|
||||
MaxReconnectInterval time.Duration
|
||||
Opts *EventReaderOpts
|
||||
Fields []*FCTemplate
|
||||
PartialCommitFields []*FCTemplate
|
||||
CacheDumpFields []*FCTemplate
|
||||
}
|
||||
|
||||
func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) {
|
||||
@@ -492,7 +500,6 @@ func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err
|
||||
if jsnCfg.PartialOrderField != nil {
|
||||
erOpts.PartialOrderField = jsnCfg.PartialOrderField
|
||||
}
|
||||
|
||||
if jsnCfg.XMLRootPath != nil {
|
||||
erOpts.XMLRootPath = jsnCfg.XMLRootPath
|
||||
}
|
||||
@@ -539,6 +546,14 @@ func (er *EventReaderCfg) loadFromJSONCfg(jsnCfg *EventReaderJsonCfg, msgTemplat
|
||||
if jsnCfg.Flags != nil {
|
||||
er.Flags = utils.FlagsWithParamsFromSlice(*jsnCfg.Flags)
|
||||
}
|
||||
if jsnCfg.Reconnects != nil {
|
||||
er.Reconnects = *jsnCfg.Reconnects
|
||||
}
|
||||
if jsnCfg.Max_reconnect_interval != nil {
|
||||
if er.MaxReconnectInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Max_reconnect_interval); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if jsnCfg.Fields != nil {
|
||||
if er.Fields, err = FCTemplatesFromFCTemplatesJSONCfg(*jsnCfg.Fields, sep); err != nil {
|
||||
return err
|
||||
@@ -885,16 +900,18 @@ func (erOpts *EventReaderOpts) Clone() *EventReaderOpts {
|
||||
// Clone returns a deep copy of EventReaderCfg
|
||||
func (er EventReaderCfg) Clone() (cln *EventReaderCfg) {
|
||||
cln = &EventReaderCfg{
|
||||
ID: er.ID,
|
||||
Type: er.Type,
|
||||
RunDelay: er.RunDelay,
|
||||
ConcurrentReqs: er.ConcurrentReqs,
|
||||
SourcePath: er.SourcePath,
|
||||
ProcessedPath: er.ProcessedPath,
|
||||
Tenant: er.Tenant.Clone(),
|
||||
Timezone: er.Timezone,
|
||||
Flags: er.Flags.Clone(),
|
||||
Opts: er.Opts.Clone(),
|
||||
ID: er.ID,
|
||||
Type: er.Type,
|
||||
RunDelay: er.RunDelay,
|
||||
ConcurrentReqs: er.ConcurrentReqs,
|
||||
SourcePath: er.SourcePath,
|
||||
ProcessedPath: er.ProcessedPath,
|
||||
Tenant: er.Tenant.Clone(),
|
||||
Timezone: er.Timezone,
|
||||
Flags: er.Flags.Clone(),
|
||||
Reconnects: er.Reconnects,
|
||||
MaxReconnectInterval: er.MaxReconnectInterval,
|
||||
Opts: er.Opts.Clone(),
|
||||
}
|
||||
if er.Filters != nil {
|
||||
cln.Filters = make([]string, len(er.Filters))
|
||||
@@ -1135,19 +1152,24 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string
|
||||
}
|
||||
}
|
||||
initialMP = map[string]any{
|
||||
utils.IDCfg: er.ID,
|
||||
utils.TypeCfg: er.Type,
|
||||
utils.ConcurrentRequestsCfg: er.ConcurrentReqs,
|
||||
utils.SourcePathCfg: er.SourcePath,
|
||||
utils.ProcessedPathCfg: er.ProcessedPath,
|
||||
utils.TenantCfg: er.Tenant.GetRule(separator),
|
||||
utils.TimezoneCfg: er.Timezone,
|
||||
utils.FiltersCfg: er.Filters,
|
||||
utils.FlagsCfg: []string{},
|
||||
utils.RunDelayCfg: "0",
|
||||
utils.OptsCfg: opts,
|
||||
utils.IDCfg: er.ID,
|
||||
utils.TypeCfg: er.Type,
|
||||
utils.ConcurrentRequestsCfg: er.ConcurrentReqs,
|
||||
utils.SourcePathCfg: er.SourcePath,
|
||||
utils.ProcessedPathCfg: er.ProcessedPath,
|
||||
utils.TenantCfg: er.Tenant.GetRule(separator),
|
||||
utils.TimezoneCfg: er.Timezone,
|
||||
utils.FiltersCfg: er.Filters,
|
||||
utils.FlagsCfg: []string{},
|
||||
utils.RunDelayCfg: "0",
|
||||
utils.ReconnectsCfg: er.Reconnects,
|
||||
utils.MaxReconnectIntervalCfg: "0",
|
||||
utils.OptsCfg: opts,
|
||||
}
|
||||
|
||||
if er.MaxReconnectInterval != 0 {
|
||||
initialMP[utils.MaxReconnectIntervalCfg] = er.MaxReconnectInterval.String()
|
||||
}
|
||||
initialMP[utils.OptsCfg] = opts
|
||||
|
||||
if flags := er.Flags.SliceFlags(); flags != nil {
|
||||
|
||||
@@ -274,20 +274,22 @@ type EventReaderOptsJson struct {
|
||||
|
||||
// EventReaderSJsonCfg is the configuration of a single EventReader
|
||||
type EventReaderJsonCfg struct {
|
||||
Id *string
|
||||
Type *string
|
||||
Run_delay *string
|
||||
Concurrent_requests *int
|
||||
Source_path *string
|
||||
Processed_path *string
|
||||
Opts *EventReaderOptsJson
|
||||
Tenant *string
|
||||
Timezone *string
|
||||
Filters *[]string
|
||||
Flags *[]string
|
||||
Fields *[]*FcTemplateJsonCfg
|
||||
Partial_commit_fields *[]*FcTemplateJsonCfg
|
||||
Cache_dump_fields *[]*FcTemplateJsonCfg
|
||||
Id *string
|
||||
Type *string
|
||||
Run_delay *string
|
||||
Concurrent_requests *int
|
||||
Source_path *string
|
||||
Processed_path *string
|
||||
Tenant *string
|
||||
Timezone *string
|
||||
Filters *[]string
|
||||
Flags *[]string
|
||||
Reconnects *int
|
||||
Max_reconnect_interval *string
|
||||
Opts *EventReaderOptsJson
|
||||
Fields *[]*FcTemplateJsonCfg
|
||||
Partial_commit_fields *[]*FcTemplateJsonCfg
|
||||
Cache_dump_fields *[]*FcTemplateJsonCfg
|
||||
}
|
||||
|
||||
// EEsJsonCfg contains the configuration of EventExporterService
|
||||
|
||||
Reference in New Issue
Block a user