Added maxWait for nats

This commit is contained in:
Trial97
2021-07-12 15:30:42 +03:00
committed by Dan Christian Bogos
parent b18467fdc3
commit eddabfb985
4 changed files with 27 additions and 2 deletions

View File

@@ -445,6 +445,7 @@ const CGRATES_CFG_JSON = `
// "natsCertificateAuthority": "", // the path to a custom certificate authority file( used by tls)
// "natsClientCertificate": "", // the path to a client certificate( used by tls)
// "natsClientKey": "", // the path to a client key( used by tls)
// "natsJetStreamMaxWait": "5s", // the maximum amount of time to wait for a response
// "natsJetStreamProcessed": false, // controls if the nats poster uses the JetStream
// "natsSubjectProcessed": "cgrates_cdrs", // the subject were the events are posted
@@ -453,6 +454,7 @@ const CGRATES_CFG_JSON = `
// "natsCertificateAuthorityProcessed": "", // the path to a custom certificate authority file( used by tls)
// "natsClientCertificateProcessed": "", // the path to a client certificate( used by tls)
// "natsClientKeyProcessed": "", // the path to a client key( used by tls)
// "natsJetStreamMaxWaitProcessed": "5s ", // the maximum amount of time to wait for a response
},
"tenant": "", // tenant used by import
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
@@ -551,6 +553,7 @@ const CGRATES_CFG_JSON = `
// "natsCertificateAuthority": "", // the path to a custom certificate authority file( used by tls)
// "natsClientCertificate": "", // the path to a client certificate( used by tls)
// "natsClientKey": "", // the path to a client key( used by tls)
// "natsJetStreamMaxWait": "5s", // the maximum amount of time to wait for a response
}, // extra options for exporter
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"filters": [], // limit parsing based on the filters

View File

@@ -47,6 +47,7 @@ type NatsPoster struct {
attempts int
jetStream bool
opts []nats.Option
jsOpts []nats.JSOpt
sync.Mutex // protect writer
poster *nats.Conn
@@ -110,6 +111,15 @@ func (pstr *NatsPoster) parseOpt(opts map[string]interface{}, nodeID string, con
pstr.subject = utils.IfaceAsString(vals)
}
pstr.opts, err = GetNatsOpts(opts, nodeID, connTimeout)
if pstr.jetStream {
if maxWaitVal, has := opts[utils.NatsJetStreamMaxWait]; has {
var maxWait time.Duration
if maxWait, err = utils.IfaceAsDuration(maxWaitVal); err != nil {
return
}
pstr.jsOpts = []nats.JSOpt{nats.MaxWait(maxWait)}
}
}
return
}
@@ -121,7 +131,7 @@ func (pstr *NatsPoster) newPostWriter() (err error) {
return
}
if pstr.jetStream {
pstr.posterJS, err = pstr.poster.JetStream()
pstr.posterJS, err = pstr.poster.JetStream(pstr.jsOpts...)
}
}
return

View File

@@ -21,6 +21,7 @@ package ers
import (
"encoding/json"
"fmt"
"time"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
@@ -75,6 +76,7 @@ type NatsER struct {
jetStream bool
consumerName string
opts []nats.Option
jsOpts []nats.JSOpt
poster *engine.NatsPoster
}
@@ -99,7 +101,7 @@ func (rdr *NatsER) Serve() (err error) {
return
}
} else {
js, err = nc.JetStream()
js, err = nc.JetStream(rdr.jsOpts...)
if err != nil {
return
}
@@ -202,6 +204,15 @@ func (rdr *NatsER) processOpts() (err error) {
return
}
}
if rdr.jetStream {
if maxWaitVal, has := rdr.Config().Opts[utils.NatsJetStreamMaxWait]; has {
var maxWait time.Duration
if maxWait, err = utils.IfaceAsDuration(maxWaitVal); err != nil {
return
}
rdr.jsOpts = []nats.JSOpt{nats.MaxWait(maxWait)}
}
}
rdr.opts, err = engine.GetNatsOpts(rdr.Config().Opts,
rdr.cgrCfg.GeneralCfg().NodeID,
rdr.cgrCfg.GeneralCfg().ConnectTimeout)

View File

@@ -2618,6 +2618,7 @@ const (
NatsClientKey = "natsClientKey"
NatsCertificateAuthority = "natsCertificateAuthority"
NatsJetStream = "natsJetStream"
NatsJetStreamMaxWait = "natsJetStreamMaxWait"
)
// Analyzers constants