diff --git a/config/config_defaults.go b/config/config_defaults.go index 633b163b2..77e596ac2 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -445,6 +445,7 @@ const CGRATES_CFG_JSON = ` // "natsCertificateAuthority": "", // the path to a custom certificate authority file( used by tls) // "natsClientCertificate": "", // the path to a client certificate( used by tls) // "natsClientKey": "", // the path to a client key( used by tls) + // "natsJetStreamMaxWait": "5s", // the maximum amount of time to wait for a response // "natsJetStreamProcessed": false, // controls if the nats poster uses the JetStream // "natsSubjectProcessed": "cgrates_cdrs", // the subject were the events are posted @@ -453,6 +454,7 @@ const CGRATES_CFG_JSON = ` // "natsCertificateAuthorityProcessed": "", // the path to a custom certificate authority file( used by tls) // "natsClientCertificateProcessed": "", // the path to a client certificate( used by tls) // "natsClientKeyProcessed": "", // the path to a client key( used by tls) + // "natsJetStreamMaxWaitProcessed": "5s ", // the maximum amount of time to wait for a response }, "tenant": "", // tenant used by import "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> @@ -551,6 +553,7 @@ const CGRATES_CFG_JSON = ` // "natsCertificateAuthority": "", // the path to a custom certificate authority file( used by tls) // "natsClientCertificate": "", // the path to a client certificate( used by tls) // "natsClientKey": "", // the path to a client key( used by tls) + // "natsJetStreamMaxWait": "5s", // the maximum amount of time to wait for a response }, // extra options for exporter "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> "filters": [], // limit parsing based on the filters diff --git a/engine/pstr_nats.go b/engine/pstr_nats.go index 05e8db4d8..15f08c544 100644 --- a/engine/pstr_nats.go +++ b/engine/pstr_nats.go @@ -47,6 +47,7 @@ type NatsPoster struct { attempts int jetStream bool opts []nats.Option + jsOpts []nats.JSOpt sync.Mutex // protect writer poster *nats.Conn @@ -110,6 +111,15 @@ func (pstr *NatsPoster) parseOpt(opts map[string]interface{}, nodeID string, con pstr.subject = utils.IfaceAsString(vals) } pstr.opts, err = GetNatsOpts(opts, nodeID, connTimeout) + if pstr.jetStream { + if maxWaitVal, has := opts[utils.NatsJetStreamMaxWait]; has { + var maxWait time.Duration + if maxWait, err = utils.IfaceAsDuration(maxWaitVal); err != nil { + return + } + pstr.jsOpts = []nats.JSOpt{nats.MaxWait(maxWait)} + } + } return } @@ -121,7 +131,7 @@ func (pstr *NatsPoster) newPostWriter() (err error) { return } if pstr.jetStream { - pstr.posterJS, err = pstr.poster.JetStream() + pstr.posterJS, err = pstr.poster.JetStream(pstr.jsOpts...) } } return diff --git a/ers/nats.go b/ers/nats.go index 90c06e774..e710d00c8 100644 --- a/ers/nats.go +++ b/ers/nats.go @@ -21,6 +21,7 @@ package ers import ( "encoding/json" "fmt" + "time" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" @@ -75,6 +76,7 @@ type NatsER struct { jetStream bool consumerName string opts []nats.Option + jsOpts []nats.JSOpt poster *engine.NatsPoster } @@ -99,7 +101,7 @@ func (rdr *NatsER) Serve() (err error) { return } } else { - js, err = nc.JetStream() + js, err = nc.JetStream(rdr.jsOpts...) if err != nil { return } @@ -202,6 +204,15 @@ func (rdr *NatsER) processOpts() (err error) { return } } + if rdr.jetStream { + if maxWaitVal, has := rdr.Config().Opts[utils.NatsJetStreamMaxWait]; has { + var maxWait time.Duration + if maxWait, err = utils.IfaceAsDuration(maxWaitVal); err != nil { + return + } + rdr.jsOpts = []nats.JSOpt{nats.MaxWait(maxWait)} + } + } rdr.opts, err = engine.GetNatsOpts(rdr.Config().Opts, rdr.cgrCfg.GeneralCfg().NodeID, rdr.cgrCfg.GeneralCfg().ConnectTimeout) diff --git a/utils/consts.go b/utils/consts.go index 02d106470..50bab1c59 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2618,6 +2618,7 @@ const ( NatsClientKey = "natsClientKey" NatsCertificateAuthority = "natsCertificateAuthority" NatsJetStream = "natsJetStream" + NatsJetStreamMaxWait = "natsJetStreamMaxWait" ) // Analyzers constants