From b1d68b6cfeafe7dd02db8612cdcd5e4ab802a6cf Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Sun, 24 Sep 2023 17:16:18 -0400 Subject: [PATCH] Upgrade NATS driver version and implementation Upgraded NATS server version in ansible role. upgraded go.mod nats version due to an issue caused by version mismatch between driver and server (uncertain). Increased amount of sleep in tests after starting a NATS server with jetstream enabled from 50ms to 100ms. ## *NatsER.Serve - Replaced ChanQueueSubscribe with QueueSubscribe for Core NATS consumer to handle the message processing directly. - Since QueueSubscribe is now used regardless of jetstream status, the message handler has been assigned to a separate variable that can be reused. - The message handler is now dealing with the message processing directly, therefore the select case listening for the channel which is feeding NATS messages can be removed together with the channel itself and the select. Currently, the goroutine within Serve only has to block until the rdrExit chan is closed. - Moved the resource check inside the handler right before starting the message processing goroutine. ## ers.getProcessedOptions - Renamed function from getProcessOptions to getProcessedOptions. - Initially, the EventExporterOpts struct was always initialized to be non-nil which meant exporting processed messages by the reader was always enabled by force. Function has been updated to initialize it only once when the first Processed option that was set is found. - Created init function for all types of opts structs to avoid repetition. ## *NatsEE.parseOpts - Renamed function from parseOpt to parseOpts. - Updated function to return early in case of nil opts struct to reduce nesting. - The nested jetstream status and maxwait conditions nil verification have been merged into one condition. - Handled the error coming from GetNatsOpts function. - NATS Subject assignment has been removed. It was redundant, since it had already been set from before this function was called. ## *NatsEE.Connect - Updated function to return early in case of non-nil nats.Conn value to reduce nesting. ## *NatsEE.ExportEvent - Use defer to release resources and RUnlock. ## *NatsEE.Close - Use defer to Unlock. - Update function to return early in case of nil nats.Conn value to reduce nesting. ## ees.GetNatsOpts - Chose switch over if else when parsing client certificate and keys opts. - Updated function to return the errors directly instead of assigning them to a separate variable right before returning. ## ers.GetNatsOpts - Passed the NATSROpts struct directly to the function. - Chose switch over if else when parsing client certificate and keys opts. - Updated function to return the errors directly instead of assigning them to a separate variable right before returning. Removed tab from commented natsJetStreamMaxWaitProcessed option value in config_defaults.go under ers section. Removed redundant NATS prefix from config.NATSROpts type field names. Added function in ers/lib_test.go to create a config file in /tmp based on a config string. Added integration test for ERs NATS. --- config/config_defaults.go | 2 +- config/config_it_test.go | 4 +- config/config_test.go | 6 +- config/erscfg.go | 252 +++++++++--------- config/erscfg_test.go | 58 ++--- data/ansible/roles/nats/defaults/main.yaml | 2 +- ees/nats.go | 122 +++++---- ees/nats_test.go | 8 +- ers/amqp.go | 2 +- ers/amqpv1.go | 2 +- ers/kafka.go | 2 +- ers/lib_test.go | 33 +++ ers/libers.go | 281 ++++++++++----------- ers/libers_test.go | 2 +- ers/nats.go | 188 +++++++------- ers/nats_it_test.go | 177 ++++++++++++- ers/s3.go | 2 +- ers/sql.go | 2 +- ers/sqs.go | 2 +- go.mod | 8 +- go.sum | 12 +- 21 files changed, 698 insertions(+), 469 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index a12c9cc3c..c79f183e7 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -463,7 +463,7 @@ const CGRATES_CFG_JSON = ` // "natsCertificateAuthorityProcessed": "", // the path to a custom certificate authority file( used by tls) // "natsClientCertificateProcessed": "", // the path to a client certificate( used by tls) // "natsClientKeyProcessed": "", // the path to a client key( used by tls) - // "natsJetStreamMaxWaitProcessed": "5s ", // the maximum amount of time to wait for a response + // "natsJetStreamMaxWaitProcessed": "5s", // the maximum amount of time to wait for a response }, "tenant": "", // tenant used by import "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> diff --git a/config/config_it_test.go b/config/config_it_test.go index 4bf9af8b0..33ef64111 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -621,7 +621,7 @@ func testCGRConfigReloadERs(t *testing.T) { PartialCacheAction: utils.StringPointer(utils.MetaNone), XMLRootPath: utils.StringPointer(utils.EmptyString), NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -651,7 +651,7 @@ func testCGRConfigReloadERs(t *testing.T) { PartialCacheAction: utils.StringPointer(utils.MetaNone), XMLRootPath: utils.StringPointer(utils.EmptyString), NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, }, }, diff --git a/config/config_test.go b/config/config_test.go index 94c6e3a6a..3a2689736 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -2368,7 +2368,7 @@ func TestERSConfig(t *testing.T) { PartialCacheAction: utils.StringPointer(utils.MetaNone), XMLRootPath: utils.StringPointer(utils.EmptyString), NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -5411,7 +5411,7 @@ func TestCgrCdfEventReader(t *testing.T) { KafkaOpts: &KafkaROpts{}, PartialOrderField: utils.StringPointer("~*req.AnswerTime"), NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, PartialCacheAction: utils.StringPointer(utils.MetaNone), XMLRootPath: utils.StringPointer(utils.EmptyString), @@ -5524,7 +5524,7 @@ func TestCgrCfgEventReaderDefault(t *testing.T) { SQLOpts: &SQLROpts{}, KafkaOpts: &KafkaROpts{}, NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, }, } diff --git a/config/erscfg.go b/config/erscfg.go index 6aa7ac2fa..381797cf4 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -313,88 +313,88 @@ func (awsROpts *AWSROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err erro } type NATSROpts struct { - NATSJetStream *bool - NATSConsumerName *string - NATSSubject *string - NATSQueueID *string - NATSJWTFile *string - NATSSeedFile *string - NATSCertificateAuthority *string - NATSClientCertificate *string - NATSClientKey *string - NATSJetStreamMaxWait *time.Duration - NATSJetStreamProcessed *bool - NATSSubjectProcessed *string - NATSJWTFileProcessed *string - NATSSeedFileProcessed *string - NATSCertificateAuthorityProcessed *string - NATSClientCertificateProcessed *string - NATSClientKeyProcessed *string - NATSJetStreamMaxWaitProcessed *time.Duration + JetStream *bool + ConsumerName *string + Subject *string + QueueID *string + JWTFile *string + SeedFile *string + CertificateAuthority *string + ClientCertificate *string + ClientKey *string + JetStreamMaxWait *time.Duration + JetStreamProcessed *bool + SubjectProcessed *string + JWTFileProcessed *string + SeedFileProcessed *string + CertificateAuthorityProcessed *string + ClientCertificateProcessed *string + ClientKeyProcessed *string + JetStreamMaxWaitProcessed *time.Duration } func (natsOpts *NATSROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) { if jsnCfg.NATSJetStream != nil { - natsOpts.NATSJetStream = jsnCfg.NATSJetStream + natsOpts.JetStream = jsnCfg.NATSJetStream } if jsnCfg.NATSConsumerName != nil { - natsOpts.NATSConsumerName = jsnCfg.NATSConsumerName + natsOpts.ConsumerName = jsnCfg.NATSConsumerName } if jsnCfg.NATSSubject != nil { - natsOpts.NATSSubject = jsnCfg.NATSSubject + natsOpts.Subject = jsnCfg.NATSSubject } if jsnCfg.NATSQueueID != nil { - natsOpts.NATSQueueID = jsnCfg.NATSQueueID + natsOpts.QueueID = jsnCfg.NATSQueueID } if jsnCfg.NATSJWTFile != nil { - natsOpts.NATSJWTFile = jsnCfg.NATSJWTFile + natsOpts.JWTFile = jsnCfg.NATSJWTFile } if jsnCfg.NATSSeedFile != nil { - natsOpts.NATSSeedFile = jsnCfg.NATSSeedFile + natsOpts.SeedFile = jsnCfg.NATSSeedFile } if jsnCfg.NATSCertificateAuthority != nil { - natsOpts.NATSCertificateAuthority = jsnCfg.NATSCertificateAuthority + natsOpts.CertificateAuthority = jsnCfg.NATSCertificateAuthority } if jsnCfg.NATSClientCertificate != nil { - natsOpts.NATSClientCertificate = jsnCfg.NATSClientCertificate + natsOpts.ClientCertificate = jsnCfg.NATSClientCertificate } if jsnCfg.NATSClientKey != nil { - natsOpts.NATSClientKey = jsnCfg.NATSClientKey + natsOpts.ClientKey = jsnCfg.NATSClientKey } if jsnCfg.NATSJetStreamMaxWait != nil { var jetStreamMaxWait time.Duration if jetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWait); err != nil { return } - natsOpts.NATSJetStreamMaxWait = utils.DurationPointer(jetStreamMaxWait) + natsOpts.JetStreamMaxWait = utils.DurationPointer(jetStreamMaxWait) } if jsnCfg.NATSJetStreamProcessed != nil { - natsOpts.NATSJetStreamProcessed = jsnCfg.NATSJetStreamProcessed + natsOpts.JetStreamProcessed = jsnCfg.NATSJetStreamProcessed } if jsnCfg.NATSSubjectProcessed != nil { - natsOpts.NATSSubjectProcessed = jsnCfg.NATSSubjectProcessed + natsOpts.SubjectProcessed = jsnCfg.NATSSubjectProcessed } if jsnCfg.NATSJWTFileProcessed != nil { - natsOpts.NATSJWTFileProcessed = jsnCfg.NATSJWTFileProcessed + natsOpts.JWTFileProcessed = jsnCfg.NATSJWTFileProcessed } if jsnCfg.NATSSeedFileProcessed != nil { - natsOpts.NATSSeedFileProcessed = jsnCfg.NATSSeedFileProcessed + natsOpts.SeedFileProcessed = jsnCfg.NATSSeedFileProcessed } if jsnCfg.NATSCertificateAuthorityProcessed != nil { - natsOpts.NATSCertificateAuthorityProcessed = jsnCfg.NATSCertificateAuthorityProcessed + natsOpts.CertificateAuthorityProcessed = jsnCfg.NATSCertificateAuthorityProcessed } if jsnCfg.NATSClientCertificateProcessed != nil { - natsOpts.NATSClientCertificateProcessed = jsnCfg.NATSClientCertificateProcessed + natsOpts.ClientCertificateProcessed = jsnCfg.NATSClientCertificateProcessed } if jsnCfg.NATSClientKeyProcessed != nil { - natsOpts.NATSClientKeyProcessed = jsnCfg.NATSClientKeyProcessed + natsOpts.ClientKeyProcessed = jsnCfg.NATSClientKeyProcessed } if jsnCfg.NATSJetStreamMaxWaitProcessed != nil { var jetStreamMaxWait time.Duration if jetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWaitProcessed); err != nil { return } - natsOpts.NATSJetStreamMaxWaitProcessed = utils.DurationPointer(jetStreamMaxWait) + natsOpts.JetStreamMaxWaitProcessed = utils.DurationPointer(jetStreamMaxWait) } return } @@ -761,77 +761,77 @@ func (awsOpt *AWSROpts) Clone() *AWSROpts { } func (natOpts *NATSROpts) Clone() *NATSROpts { cln := &NATSROpts{} - if natOpts.NATSJetStream != nil { - cln.NATSJetStream = new(bool) - *cln.NATSJetStream = *natOpts.NATSJetStream + if natOpts.JetStream != nil { + cln.JetStream = new(bool) + *cln.JetStream = *natOpts.JetStream } - if natOpts.NATSConsumerName != nil { - cln.NATSConsumerName = new(string) - *cln.NATSConsumerName = *natOpts.NATSConsumerName + if natOpts.ConsumerName != nil { + cln.ConsumerName = new(string) + *cln.ConsumerName = *natOpts.ConsumerName } - if natOpts.NATSSubject != nil { - cln.NATSSubject = new(string) - *cln.NATSSubject = *natOpts.NATSSubject + if natOpts.Subject != nil { + cln.Subject = new(string) + *cln.Subject = *natOpts.Subject } - if natOpts.NATSQueueID != nil { - cln.NATSQueueID = new(string) - *cln.NATSQueueID = *natOpts.NATSQueueID + if natOpts.QueueID != nil { + cln.QueueID = new(string) + *cln.QueueID = *natOpts.QueueID } - if natOpts.NATSJWTFile != nil { - cln.NATSJWTFile = new(string) - *cln.NATSJWTFile = *natOpts.NATSJWTFile + if natOpts.JWTFile != nil { + cln.JWTFile = new(string) + *cln.JWTFile = *natOpts.JWTFile } - if natOpts.NATSSeedFile != nil { - cln.NATSSeedFile = new(string) - *cln.NATSSeedFile = *natOpts.NATSSeedFile + if natOpts.SeedFile != nil { + cln.SeedFile = new(string) + *cln.SeedFile = *natOpts.SeedFile } - if natOpts.NATSCertificateAuthority != nil { - cln.NATSCertificateAuthority = new(string) - *cln.NATSCertificateAuthority = *natOpts.NATSCertificateAuthority + if natOpts.CertificateAuthority != nil { + cln.CertificateAuthority = new(string) + *cln.CertificateAuthority = *natOpts.CertificateAuthority } - if natOpts.NATSClientCertificate != nil { - cln.NATSClientCertificate = new(string) - *cln.NATSClientCertificate = *natOpts.NATSClientCertificate + if natOpts.ClientCertificate != nil { + cln.ClientCertificate = new(string) + *cln.ClientCertificate = *natOpts.ClientCertificate } - if natOpts.NATSClientKey != nil { - cln.NATSClientKey = new(string) - *cln.NATSClientKey = *natOpts.NATSClientKey + if natOpts.ClientKey != nil { + cln.ClientKey = new(string) + *cln.ClientKey = *natOpts.ClientKey } - if natOpts.NATSJetStreamMaxWait != nil { - cln.NATSJetStreamMaxWait = new(time.Duration) - *cln.NATSJetStreamMaxWait = *natOpts.NATSJetStreamMaxWait + if natOpts.JetStreamMaxWait != nil { + cln.JetStreamMaxWait = new(time.Duration) + *cln.JetStreamMaxWait = *natOpts.JetStreamMaxWait } - if natOpts.NATSJetStreamProcessed != nil { - cln.NATSJetStreamProcessed = new(bool) - *cln.NATSJetStreamProcessed = *natOpts.NATSJetStreamProcessed + if natOpts.JetStreamProcessed != nil { + cln.JetStreamProcessed = new(bool) + *cln.JetStreamProcessed = *natOpts.JetStreamProcessed } - if natOpts.NATSSubjectProcessed != nil { - cln.NATSSubjectProcessed = new(string) - *cln.NATSSubjectProcessed = *natOpts.NATSSubjectProcessed + if natOpts.SubjectProcessed != nil { + cln.SubjectProcessed = new(string) + *cln.SubjectProcessed = *natOpts.SubjectProcessed } - if natOpts.NATSJWTFileProcessed != nil { - cln.NATSJWTFileProcessed = new(string) - *cln.NATSJWTFileProcessed = *natOpts.NATSJWTFileProcessed + if natOpts.JWTFileProcessed != nil { + cln.JWTFileProcessed = new(string) + *cln.JWTFileProcessed = *natOpts.JWTFileProcessed } - if natOpts.NATSSeedFileProcessed != nil { - cln.NATSSeedFileProcessed = new(string) - *cln.NATSSeedFileProcessed = *natOpts.NATSSeedFileProcessed + if natOpts.SeedFileProcessed != nil { + cln.SeedFileProcessed = new(string) + *cln.SeedFileProcessed = *natOpts.SeedFileProcessed } - if natOpts.NATSCertificateAuthorityProcessed != nil { - cln.NATSCertificateAuthorityProcessed = new(string) - *cln.NATSCertificateAuthorityProcessed = *natOpts.NATSCertificateAuthorityProcessed + if natOpts.CertificateAuthorityProcessed != nil { + cln.CertificateAuthorityProcessed = new(string) + *cln.CertificateAuthorityProcessed = *natOpts.CertificateAuthorityProcessed } - if natOpts.NATSClientCertificateProcessed != nil { - cln.NATSClientCertificateProcessed = new(string) - *cln.NATSClientCertificateProcessed = *natOpts.NATSClientCertificateProcessed + if natOpts.ClientCertificateProcessed != nil { + cln.ClientCertificateProcessed = new(string) + *cln.ClientCertificateProcessed = *natOpts.ClientCertificateProcessed } - if natOpts.NATSClientKeyProcessed != nil { - cln.NATSClientKeyProcessed = new(string) - *cln.NATSClientKeyProcessed = *natOpts.NATSClientKeyProcessed + if natOpts.ClientKeyProcessed != nil { + cln.ClientKeyProcessed = new(string) + *cln.ClientKeyProcessed = *natOpts.ClientKeyProcessed } - if natOpts.NATSJetStreamMaxWaitProcessed != nil { - cln.NATSJetStreamMaxWaitProcessed = new(time.Duration) - *cln.NATSJetStreamMaxWaitProcessed = *natOpts.NATSJetStreamMaxWaitProcessed + if natOpts.JetStreamMaxWaitProcessed != nil { + cln.JetStreamMaxWaitProcessed = new(time.Duration) + *cln.JetStreamMaxWaitProcessed = *natOpts.JetStreamMaxWaitProcessed } return cln } @@ -1072,59 +1072,59 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string } if natsOpts := er.Opts.NATSOpts; natsOpts != nil { - if natsOpts.NATSJetStream != nil { - opts[utils.NatsJetStream] = *natsOpts.NATSJetStream + if natsOpts.JetStream != nil { + opts[utils.NatsJetStream] = *natsOpts.JetStream } - if natsOpts.NATSConsumerName != nil { - opts[utils.NatsConsumerName] = *natsOpts.NATSConsumerName + if natsOpts.ConsumerName != nil { + opts[utils.NatsConsumerName] = *natsOpts.ConsumerName } - if natsOpts.NATSSubject != nil { - opts[utils.NatsSubject] = *natsOpts.NATSSubject + if natsOpts.Subject != nil { + opts[utils.NatsSubject] = *natsOpts.Subject } - if natsOpts.NATSQueueID != nil { - opts[utils.NatsQueueID] = *natsOpts.NATSQueueID + if natsOpts.QueueID != nil { + opts[utils.NatsQueueID] = *natsOpts.QueueID } - if natsOpts.NATSJWTFile != nil { - opts[utils.NatsJWTFile] = *natsOpts.NATSJWTFile + if natsOpts.JWTFile != nil { + opts[utils.NatsJWTFile] = *natsOpts.JWTFile } - if natsOpts.NATSSeedFile != nil { - opts[utils.NatsSeedFile] = *natsOpts.NATSSeedFile + if natsOpts.SeedFile != nil { + opts[utils.NatsSeedFile] = *natsOpts.SeedFile } - if natsOpts.NATSCertificateAuthority != nil { - opts[utils.NatsCertificateAuthority] = *natsOpts.NATSCertificateAuthority + if natsOpts.CertificateAuthority != nil { + opts[utils.NatsCertificateAuthority] = *natsOpts.CertificateAuthority } - if natsOpts.NATSClientCertificate != nil { - opts[utils.NatsClientCertificate] = *natsOpts.NATSClientCertificate + if natsOpts.ClientCertificate != nil { + opts[utils.NatsClientCertificate] = *natsOpts.ClientCertificate } - if natsOpts.NATSClientKey != nil { - opts[utils.NatsClientKey] = *natsOpts.NATSClientKey + if natsOpts.ClientKey != nil { + opts[utils.NatsClientKey] = *natsOpts.ClientKey } - if natsOpts.NATSJetStreamMaxWait != nil { - opts[utils.NatsJetStreamMaxWait] = natsOpts.NATSJetStreamMaxWait.String() + if natsOpts.JetStreamMaxWait != nil { + opts[utils.NatsJetStreamMaxWait] = natsOpts.JetStreamMaxWait.String() } - if natsOpts.NATSJetStreamProcessed != nil { - opts[utils.NATSJetStreamProcessedCfg] = *natsOpts.NATSJetStreamProcessed + if natsOpts.JetStreamProcessed != nil { + opts[utils.NATSJetStreamProcessedCfg] = *natsOpts.JetStreamProcessed } - if natsOpts.NATSSubjectProcessed != nil { - opts[utils.NATSSubjectProcessedCfg] = *natsOpts.NATSSubjectProcessed + if natsOpts.SubjectProcessed != nil { + opts[utils.NATSSubjectProcessedCfg] = *natsOpts.SubjectProcessed } - if natsOpts.NATSJWTFileProcessed != nil { - opts[utils.NATSJWTFileProcessedCfg] = *natsOpts.NATSJWTFileProcessed + if natsOpts.JWTFileProcessed != nil { + opts[utils.NATSJWTFileProcessedCfg] = *natsOpts.JWTFileProcessed } - if natsOpts.NATSSeedFileProcessed != nil { - opts[utils.NATSSeedFileProcessedCfg] = *natsOpts.NATSSeedFileProcessed + if natsOpts.SeedFileProcessed != nil { + opts[utils.NATSSeedFileProcessedCfg] = *natsOpts.SeedFileProcessed } - if natsOpts.NATSCertificateAuthorityProcessed != nil { - opts[utils.NATSCertificateAuthorityProcessedCfg] = *natsOpts.NATSCertificateAuthorityProcessed + if natsOpts.CertificateAuthorityProcessed != nil { + opts[utils.NATSCertificateAuthorityProcessedCfg] = *natsOpts.CertificateAuthorityProcessed } - if natsOpts.NATSClientCertificateProcessed != nil { - opts[utils.NATSClientCertificateProcessed] = *natsOpts.NATSClientCertificateProcessed + if natsOpts.ClientCertificateProcessed != nil { + opts[utils.NATSClientCertificateProcessed] = *natsOpts.ClientCertificateProcessed } - if natsOpts.NATSClientKeyProcessed != nil { - opts[utils.NATSClientKeyProcessedCfg] = *natsOpts.NATSClientKeyProcessed + if natsOpts.ClientKeyProcessed != nil { + opts[utils.NATSClientKeyProcessedCfg] = *natsOpts.ClientKeyProcessed } - if natsOpts.NATSJetStreamMaxWaitProcessed != nil { - opts[utils.NATSJetStreamMaxWaitProcessedCfg] = natsOpts.NATSJetStreamMaxWaitProcessed.String() + if natsOpts.JetStreamMaxWaitProcessed != nil { + opts[utils.NATSJetStreamMaxWaitProcessedCfg] = natsOpts.JetStreamMaxWaitProcessed.String() } } initialMP = map[string]any{ diff --git a/config/erscfg_test.go b/config/erscfg_test.go index 6a12589b6..b19a606c7 100644 --- a/config/erscfg_test.go +++ b/config/erscfg_test.go @@ -113,7 +113,7 @@ func TestERSClone(t *testing.T) { PartialOrderField: utils.StringPointer("~*req.AnswerTime"), PartialCacheAction: utils.StringPointer(utils.MetaNone), NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -153,7 +153,7 @@ func TestERSClone(t *testing.T) { PartialOrderField: utils.StringPointer("~*req.AnswerTime"), PartialCacheAction: utils.StringPointer(utils.MetaNone), NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -281,7 +281,7 @@ func TestERSLoadFromjsonCfg(t *testing.T) { PartialOrderField: utils.StringPointer("~*req.AnswerTime"), PartialCacheAction: utils.StringPointer(utils.MetaNone), NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -336,7 +336,7 @@ func TestERSLoadFromjsonCfg(t *testing.T) { PartialOrderField: utils.StringPointer("~*req.AnswerTime"), PartialCacheAction: utils.StringPointer(utils.MetaNone), NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -524,7 +524,7 @@ func TestERSloadFromJsonCase3(t *testing.T) { PartialOrderField: utils.StringPointer("~*req.AnswerTime"), PartialCacheAction: utils.StringPointer(utils.MetaNone), NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -563,7 +563,7 @@ func TestERSloadFromJsonCase3(t *testing.T) { PartialOrderField: utils.StringPointer("~*req.AnswerTime"), PartialCacheAction: utils.StringPointer(utils.MetaNone), NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -675,7 +675,7 @@ func TestERSloadFromJsonCase4(t *testing.T) { PartialOrderField: utils.StringPointer("~*req.AnswerTime"), PartialCacheAction: utils.StringPointer(utils.MetaNone), NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -714,7 +714,7 @@ func TestERSloadFromJsonCase4(t *testing.T) { PartialOrderField: utils.StringPointer("~*req.AnswerTime"), PartialCacheAction: utils.StringPointer(utils.MetaNone), NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -821,7 +821,7 @@ func TestEventReaderSameID(t *testing.T) { PartialOrderField: utils.StringPointer("~*req.AnswerTime"), PartialCacheAction: utils.StringPointer(utils.MetaNone), NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -856,7 +856,7 @@ func TestEventReaderSameID(t *testing.T) { PartialOrderField: utils.StringPointer("~*req.AnswerTime"), PartialCacheAction: utils.StringPointer(utils.MetaNone), NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -1315,7 +1315,7 @@ func TestERsloadFromJsonCfg(t *testing.T) { PartialOrderField: utils.StringPointer("~*req.AnswerTime"), PartialCacheAction: utils.StringPointer(utils.MetaNone), NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -1356,7 +1356,7 @@ func TestERsloadFromJsonCfg(t *testing.T) { PartialOrderField: utils.StringPointer("~*req.AnswerTime"), PartialCacheAction: utils.StringPointer(utils.MetaNone), NATSOpts: &NATSROpts{ - NATSSubject: utils.StringPointer("cgrates_cdrs"), + Subject: utils.StringPointer("cgrates_cdrs"), }, }, }, @@ -1539,23 +1539,23 @@ func TestEventReaderCfgClone(t *testing.T) { S3BucketIDProcessed: utils.StringPointer("S3BucketProc"), }, NATSOpts: &NATSROpts{ - NATSJetStream: utils.BoolPointer(false), - NATSConsumerName: utils.StringPointer("user"), - NATSQueueID: utils.StringPointer("id"), - NATSJWTFile: utils.StringPointer("jwt"), - NATSSeedFile: utils.StringPointer("seed"), - NATSCertificateAuthority: utils.StringPointer("authority"), - NATSClientCertificate: utils.StringPointer("certificate"), - NATSClientKey: utils.StringPointer("key5"), - NATSJetStreamMaxWait: utils.DurationPointer(1 * time.Minute), - NATSJetStreamProcessed: utils.BoolPointer(true), - NATSJWTFileProcessed: utils.StringPointer("file"), - NATSSeedFileProcessed: utils.StringPointer("natseed"), - NATSCertificateAuthorityProcessed: utils.StringPointer("natsauth"), - NATSClientCertificateProcessed: utils.StringPointer("natcertificate"), - NATSClientKeyProcessed: utils.StringPointer("natsprocess"), - NATSJetStreamMaxWaitProcessed: utils.DurationPointer(1 * time.Minute), - NATSSubjectProcessed: utils.StringPointer("process"), + JetStream: utils.BoolPointer(false), + ConsumerName: utils.StringPointer("user"), + QueueID: utils.StringPointer("id"), + JWTFile: utils.StringPointer("jwt"), + SeedFile: utils.StringPointer("seed"), + CertificateAuthority: utils.StringPointer("authority"), + ClientCertificate: utils.StringPointer("certificate"), + ClientKey: utils.StringPointer("key5"), + JetStreamMaxWait: utils.DurationPointer(1 * time.Minute), + JetStreamProcessed: utils.BoolPointer(true), + JWTFileProcessed: utils.StringPointer("file"), + SeedFileProcessed: utils.StringPointer("natseed"), + CertificateAuthorityProcessed: utils.StringPointer("natsauth"), + ClientCertificateProcessed: utils.StringPointer("natcertificate"), + ClientKeyProcessed: utils.StringPointer("natsprocess"), + JetStreamMaxWaitProcessed: utils.DurationPointer(1 * time.Minute), + SubjectProcessed: utils.StringPointer("process"), }, KafkaOpts: &KafkaROpts{ KafkaTopic: utils.StringPointer("kafka"), diff --git a/data/ansible/roles/nats/defaults/main.yaml b/data/ansible/roles/nats/defaults/main.yaml index 65d37525f..96b146f2a 100644 --- a/data/ansible/roles/nats/defaults/main.yaml +++ b/data/ansible/roles/nats/defaults/main.yaml @@ -1,5 +1,5 @@ --- -nats_version: 2.9.17 +nats_version: 2.10.1 nats_install_dir: /opt/nats nats_user: nats nats_group: nats diff --git a/ees/nats.go b/ees/nats.go index 783908aea..185997dae 100644 --- a/ees/nats.go +++ b/ees/nats.go @@ -39,7 +39,7 @@ func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Dur subject: utils.DefaultQueueID, reqs: newConcReq(cfg.ConcurrentRequests), } - err = natsPstr.parseOpt(cfg.Opts, nodeID, connTimeout) + err = natsPstr.parseOpts(cfg.Opts, nodeID, connTimeout) return } @@ -60,76 +60,87 @@ type NatsEE struct { bytePreparing } -func (pstr *NatsEE) parseOpt(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (err error) { - - if natsOpts := opts.NATS; natsOpts != nil { - if natsOpts.JetStream != nil { - pstr.jetStream = *natsOpts.JetStream - } - pstr.subject = utils.DefaultQueueID - if natsOpts.Subject != nil { - pstr.subject = *natsOpts.Subject - } - - pstr.opts, err = GetNatsOpts(natsOpts, nodeID, connTimeout) - if pstr.jetStream { - if natsOpts.JetStreamMaxWait != nil { - pstr.jsOpts = []nats.JSOpt{nats.MaxWait(*natsOpts.JetStreamMaxWait)} - } - } +func (pstr *NatsEE) parseOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) error { + if opts.NATS == nil { + return nil } - return + + if opts.NATS.JetStream != nil { + pstr.jetStream = *opts.NATS.JetStream + } + if opts.NATS.Subject != nil { + pstr.subject = *opts.NATS.Subject + } + + var err error + pstr.opts, err = GetNatsOpts(opts.NATS, nodeID, connTimeout) + if err != nil { + return err + } + + if pstr.jetStream && opts.NATS.JetStreamMaxWait != nil { + pstr.jsOpts = []nats.JSOpt{nats.MaxWait(*opts.NATS.JetStreamMaxWait)} + } + return nil } func (pstr *NatsEE) Cfg() *config.EventExporterCfg { return pstr.cfg } -func (pstr *NatsEE) Connect() (err error) { +func (pstr *NatsEE) Connect() error { pstr.Lock() defer pstr.Unlock() - if pstr.poster == nil { - if pstr.poster, err = nats.Connect(pstr.Cfg().ExportPath, pstr.opts...); err != nil { - return - } - if pstr.jetStream { - pstr.posterJS, err = pstr.poster.JetStream(pstr.jsOpts...) - } + if pstr.poster != nil { + return nil } - return + + var err error + pstr.poster, err = nats.Connect(pstr.Cfg().ExportPath, pstr.opts...) + if err != nil { + return err + } + if pstr.jetStream { + pstr.posterJS, err = pstr.poster.JetStream(pstr.jsOpts...) + } + return err } -func (pstr *NatsEE) ExportEvent(content any, _ string) (err error) { +func (pstr *NatsEE) ExportEvent(content any, _ string) error { pstr.reqs.get() + defer pstr.reqs.done() pstr.RLock() + defer pstr.RUnlock() + if pstr.poster == nil { - pstr.RUnlock() - pstr.reqs.done() return utils.ErrDisconnected } + + var err error if pstr.jetStream { _, err = pstr.posterJS.Publish(pstr.subject, content.([]byte)) } else { err = pstr.poster.Publish(pstr.subject, content.([]byte)) } - pstr.RUnlock() - pstr.reqs.done() - return + return err } -func (pstr *NatsEE) Close() (err error) { +func (pstr *NatsEE) Close() error { pstr.Lock() - if pstr.poster != nil { - err = pstr.poster.Drain() - pstr.poster = nil + defer pstr.Unlock() + + if pstr.poster == nil { + return nil } - pstr.Unlock() - return + + err := pstr.poster.Drain() + pstr.poster = nil + return err } func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } -func GetNatsOpts(opts *config.NATSOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) { - nop = make([]nats.Option, 0, 7) - nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID), +func GetNatsOpts(opts *config.NATSOpts, nodeID string, connTimeout time.Duration) ([]nats.Option, error) { + natsOpts := make([]nats.Option, 0, 7) + natsOpts = append(natsOpts, nats.Name(utils.CGRateSLwr+nodeID), nats.Timeout(connTimeout), nats.DrainTimeout(time.Second)) if opts.JWTFile != nil { @@ -137,28 +148,27 @@ func GetNatsOpts(opts *config.NATSOpts, nodeID string, connTimeout time.Duration if opts.SeedFile != nil { keys = append(keys, *opts.SeedFile) } - nop = append(nop, nats.UserCredentials(*opts.JWTFile, keys...)) + natsOpts = append(natsOpts, nats.UserCredentials(*opts.JWTFile, keys...)) } if opts.SeedFile != nil { opt, err := nats.NkeyOptionFromSeed(*opts.SeedFile) if err != nil { return nil, err } - nop = append(nop, opt) + natsOpts = append(natsOpts, opt) } - if opts.ClientCertificate != nil { - if opts.ClientKey == nil { - err = fmt.Errorf("has certificate but no key") - return - } - nop = append(nop, nats.ClientCert(*opts.ClientCertificate, *opts.ClientKey)) - } else if opts.ClientKey != nil { - err = fmt.Errorf("has key but no certificate") - return + + switch { + case opts.ClientCertificate != nil && opts.ClientKey != nil: + natsOpts = append(natsOpts, nats.ClientCert(*opts.ClientCertificate, *opts.ClientKey)) + case opts.ClientCertificate != nil: + return nil, fmt.Errorf("has certificate but no key") + case opts.ClientKey != nil: + return nil, fmt.Errorf("has key but no certificate") } if opts.CertificateAuthority != nil { - nop = append(nop, + natsOpts = append(natsOpts, func(o *nats.Options) error { pool, err := x509.SystemCertPool() if err != nil { @@ -181,5 +191,5 @@ func GetNatsOpts(opts *config.NATSOpts, nodeID string, connTimeout time.Duration return nil }) } - return + return natsOpts, nil } diff --git a/ees/nats_test.go b/ees/nats_test.go index b2fd7daec..8102bdeae 100644 --- a/ees/nats_test.go +++ b/ees/nats_test.go @@ -101,7 +101,7 @@ func TestParseOpt(t *testing.T) { t.Error(err) } - err = pstr.parseOpt(opts, nodeID, connTimeout) + err = pstr.parseOpts(opts, nodeID, connTimeout) if err != nil { t.Error(err) } @@ -138,7 +138,7 @@ func TestParseOptJetStream(t *testing.T) { t.Error(err) } - err = pstr.parseOpt(opts, nodeID, connTimeout) + err = pstr.parseOpts(opts, nodeID, connTimeout) if err != nil { t.Error(err) } @@ -179,7 +179,7 @@ func TestParseOptJetStreamMaxWait(t *testing.T) { t.Error(err) } - err = pstr.parseOpt(opts, nodeID, connTimeout) + err = pstr.parseOpts(opts, nodeID, connTimeout) if err != nil { t.Error(err) } @@ -219,7 +219,7 @@ func TestParseOptSubject(t *testing.T) { t.Error(err) } - err = pstr.parseOpt(opts, nodeID, connTimeout) + err = pstr.parseOpts(opts, nodeID, connTimeout) if err != nil { t.Error(err) } diff --git a/ers/amqp.go b/ers/amqp.go index 7ad7f41ec..d235bada8 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -252,7 +252,7 @@ func (rdr *AMQPER) close() (err error) { } func (rdr *AMQPER) createPoster() { - processedOpt := getProcessOptions(rdr.Config().Opts) + processedOpt := getProcessedOptions(rdr.Config().Opts) if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { return } diff --git a/ers/amqpv1.go b/ers/amqpv1.go index 5db2530fe..72be3b4cd 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -209,7 +209,7 @@ func (rdr *AMQPv1ER) close() (err error) { } func (rdr *AMQPv1ER) createPoster() { - processedOpt := getProcessOptions(rdr.Config().Opts) + processedOpt := getProcessedOptions(rdr.Config().Opts) if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { return } diff --git a/ers/kafka.go b/ers/kafka.go index ea118d333..badb255bc 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -203,7 +203,7 @@ func (rdr *KafkaER) setOpts(opts *config.EventReaderOpts) (err error) { } func (rdr *KafkaER) createPoster() { - processedOpt := getProcessOptions(rdr.Config().Opts) + processedOpt := getProcessedOptions(rdr.Config().Opts) if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { return } diff --git a/ers/lib_test.go b/ers/lib_test.go index b01089744..28bc59ad2 100644 --- a/ers/lib_test.go +++ b/ers/lib_test.go @@ -19,9 +19,13 @@ along with this program. If not, see package ers import ( + "crypto/rand" "errors" "flag" + "fmt" + "math/big" "os" + "path/filepath" "testing" "github.com/cgrates/birpc" @@ -80,3 +84,32 @@ func testCleanupFiles(t *testing.T) { } } } + +func initTestCfg(cfgContent string) (*config.CGRConfig, string, func(), error) { + folderNameSuffix, err := rand.Int(rand.Reader, big.NewInt(10000)) + if err != nil { + return nil, "", nil, fmt.Errorf("could not generate random number for folder name suffix, err: %s", err.Error()) + } + cfgPath := fmt.Sprintf("/tmp/config%d", folderNameSuffix) + err = os.MkdirAll(cfgPath, 0755) + if err != nil { + return nil, "", nil, err + } + filePath := filepath.Join(cfgPath, "cgrates.json") + err = os.WriteFile(filePath, []byte(cfgContent), 0644) + if err != nil { + os.RemoveAll(cfgPath) + return nil, "", nil, err + } + var cfg *config.CGRConfig + cfg, err = config.NewCGRConfigFromPath(cfgPath) + if err != nil { + os.RemoveAll(cfgPath) + return nil, "", nil, err + } + removeFunc := func() { + os.RemoveAll(cfgPath) + } + + return cfg, cfgPath, removeFunc, nil +} diff --git a/ers/libers.go b/ers/libers.go index 07cd8803d..b2510e688 100644 --- a/ers/libers.go +++ b/ers/libers.go @@ -29,174 +29,167 @@ import ( "github.com/cgrates/cgrates/utils" ) -// getProcessOptions assigns all non-nil fields ending in "Processed" from EventReaderOpts to their counterparts in EventExporterOpts -func getProcessOptions(erOpts *config.EventReaderOpts) (eeOpts *config.EventExporterOpts) { - eeOpts = new(config.EventExporterOpts) - if amqOpts := erOpts.AMQPOpts; amqOpts != nil { - if amqOpts.AMQPExchangeProcessed != nil { +// getProcessedOptions assigns all non-nil fields ending in "Processed" from EventReaderOpts to their counterparts in EventExporterOpts +func getProcessedOptions(erOpts *config.EventReaderOpts) *config.EventExporterOpts { + var eeOpts *config.EventExporterOpts + + if erOpts.AMQPOpts != nil { + initAMQPExporterOpts := func() { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } if eeOpts.AMQP == nil { eeOpts.AMQP = new(config.AMQPOpts) } - eeOpts.AMQP.Exchange = amqOpts.AMQPExchangeProcessed } - if amqOpts.AMQPExchangeTypeProcessed != nil { - if eeOpts.AMQP == nil { - eeOpts.AMQP = new(config.AMQPOpts) - } - eeOpts.AMQP.ExchangeType = amqOpts.AMQPExchangeTypeProcessed + + if erOpts.AMQPOpts.AMQPExchangeProcessed != nil { + initAMQPExporterOpts() + eeOpts.AMQP.Exchange = erOpts.AMQPOpts.AMQPExchangeProcessed } - if amqOpts.AMQPQueueIDProcessed != nil { - if eeOpts.AMQP == nil { - eeOpts.AMQP = new(config.AMQPOpts) - } - eeOpts.AMQP.QueueID = amqOpts.AMQPQueueIDProcessed + if erOpts.AMQPOpts.AMQPExchangeTypeProcessed != nil { + initAMQPExporterOpts() + eeOpts.AMQP.ExchangeType = erOpts.AMQPOpts.AMQPExchangeTypeProcessed } - if amqOpts.AMQPRoutingKeyProcessed != nil { - if eeOpts.AMQP == nil { - eeOpts.AMQP = new(config.AMQPOpts) - } - eeOpts.AMQP.RoutingKey = amqOpts.AMQPRoutingKeyProcessed + if erOpts.AMQPOpts.AMQPQueueIDProcessed != nil { + initAMQPExporterOpts() + eeOpts.AMQP.QueueID = erOpts.AMQPOpts.AMQPQueueIDProcessed } - if amqOpts.AMQPUsernameProcessed != nil { - if eeOpts.AMQP == nil { - eeOpts.AMQP = new(config.AMQPOpts) - } - eeOpts.AMQP.Username = amqOpts.AMQPUsernameProcessed + if erOpts.AMQPOpts.AMQPRoutingKeyProcessed != nil { + initAMQPExporterOpts() + eeOpts.AMQP.RoutingKey = erOpts.AMQPOpts.AMQPRoutingKeyProcessed } - if amqOpts.AMQPPasswordProcessed != nil { - if eeOpts.AMQP == nil { - eeOpts.AMQP = new(config.AMQPOpts) - } - eeOpts.AMQP.Password = amqOpts.AMQPPasswordProcessed + if erOpts.AMQPOpts.AMQPUsernameProcessed != nil { + initAMQPExporterOpts() + eeOpts.AMQP.Username = erOpts.AMQPOpts.AMQPUsernameProcessed } - } - if awsOpts := erOpts.AWSOpts; awsOpts != nil { - if awsOpts.AWSKeyProcessed != nil { - if eeOpts.AWS == nil { - eeOpts.AWS = new(config.AWSOpts) - } - eeOpts.AWS.Key = awsOpts.AWSKeyProcessed - } - if awsOpts.AWSRegionProcessed != nil { - if eeOpts.AWS == nil { - eeOpts.AWS = new(config.AWSOpts) - } - eeOpts.AWS.Region = awsOpts.AWSRegionProcessed - } - if awsOpts.AWSSecretProcessed != nil { - if eeOpts.AWS == nil { - eeOpts.AWS = new(config.AWSOpts) - } - eeOpts.AWS.Secret = awsOpts.AWSSecretProcessed - } - if awsOpts.AWSTokenProcessed != nil { - if eeOpts.AWS == nil { - eeOpts.AWS = new(config.AWSOpts) - } - eeOpts.AWS.Token = awsOpts.AWSTokenProcessed - } - if awsOpts.S3BucketIDProcessed != nil { - if eeOpts.AWS == nil { - eeOpts.AWS = new(config.AWSOpts) - } - eeOpts.AWS.S3BucketID = awsOpts.S3BucketIDProcessed - } - if awsOpts.S3FolderPathProcessed != nil { - if eeOpts.AWS == nil { - eeOpts.AWS = new(config.AWSOpts) - } - eeOpts.AWS.S3FolderPath = awsOpts.S3FolderPathProcessed - } - if awsOpts.SQSQueueIDProcessed != nil { - if eeOpts.AWS == nil { - eeOpts.AWS = new(config.AWSOpts) - } - eeOpts.AWS.SQSQueueID = awsOpts.SQSQueueIDProcessed + if erOpts.AMQPOpts.AMQPPasswordProcessed != nil { + initAMQPExporterOpts() + eeOpts.AMQP.Password = erOpts.AMQPOpts.AMQPPasswordProcessed } } - if kfkOpts := erOpts.KafkaOpts; kfkOpts != nil { - if kfkOpts.KafkaTopicProcessed != nil { + if erOpts.AWSOpts != nil { + initAWSExporterOpts := func() { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + if eeOpts.AWS == nil { + eeOpts.AWS = new(config.AWSOpts) + } + } + + if erOpts.AWSOpts.AWSKeyProcessed != nil { + initAWSExporterOpts() + eeOpts.AWS.Key = erOpts.AWSOpts.AWSKeyProcessed + } + if erOpts.AWSOpts.AWSRegionProcessed != nil { + initAWSExporterOpts() + eeOpts.AWS.Region = erOpts.AWSOpts.AWSRegionProcessed + } + if erOpts.AWSOpts.AWSSecretProcessed != nil { + initAWSExporterOpts() + eeOpts.AWS.Secret = erOpts.AWSOpts.AWSSecretProcessed + } + if erOpts.AWSOpts.AWSTokenProcessed != nil { + initAWSExporterOpts() + eeOpts.AWS.Token = erOpts.AWSOpts.AWSTokenProcessed + } + if erOpts.AWSOpts.S3BucketIDProcessed != nil { + initAWSExporterOpts() + eeOpts.AWS.S3BucketID = erOpts.AWSOpts.S3BucketIDProcessed + } + if erOpts.AWSOpts.S3FolderPathProcessed != nil { + initAWSExporterOpts() + eeOpts.AWS.S3FolderPath = erOpts.AWSOpts.S3FolderPathProcessed + } + if erOpts.AWSOpts.SQSQueueIDProcessed != nil { + initAWSExporterOpts() + eeOpts.AWS.SQSQueueID = erOpts.AWSOpts.SQSQueueIDProcessed + } + } + + if erOpts.KafkaOpts != nil { + if erOpts.KafkaOpts.KafkaTopicProcessed != nil { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } if eeOpts.Kafka == nil { eeOpts.Kafka = new(config.KafkaOpts) } - eeOpts.Kafka.KafkaTopic = kfkOpts.KafkaTopicProcessed - } - } - if natsOpts := erOpts.NATSOpts; natsOpts != nil { - if natsOpts.NATSCertificateAuthorityProcessed != nil { - if eeOpts.NATS == nil { - eeOpts.NATS = new(config.NATSOpts) - } - eeOpts.NATS.CertificateAuthority = natsOpts.NATSCertificateAuthorityProcessed - } - if natsOpts.NATSClientCertificateProcessed != nil { - if eeOpts.NATS == nil { - eeOpts.NATS = new(config.NATSOpts) - } - eeOpts.NATS.ClientCertificate = natsOpts.NATSClientCertificateProcessed - } - if natsOpts.NATSClientKeyProcessed != nil { - if eeOpts.NATS == nil { - eeOpts.NATS = new(config.NATSOpts) - } - eeOpts.NATS.ClientKey = natsOpts.NATSClientKeyProcessed - } - if natsOpts.NATSJWTFileProcessed != nil { - if eeOpts.NATS == nil { - eeOpts.NATS = new(config.NATSOpts) - } - eeOpts.NATS.JWTFile = natsOpts.NATSJWTFileProcessed - } - if natsOpts.NATSJetStreamMaxWaitProcessed != nil { - if eeOpts.NATS == nil { - eeOpts.NATS = new(config.NATSOpts) - } - eeOpts.NATS.JetStreamMaxWait = natsOpts.NATSJetStreamMaxWaitProcessed - } - if natsOpts.NATSJetStreamProcessed != nil { - if eeOpts.NATS == nil { - eeOpts.NATS = new(config.NATSOpts) - } - eeOpts.NATS.JetStream = natsOpts.NATSJetStreamProcessed - } - if natsOpts.NATSSeedFileProcessed != nil { - if eeOpts.NATS == nil { - eeOpts.NATS = new(config.NATSOpts) - } - eeOpts.NATS.SeedFile = natsOpts.NATSSeedFileProcessed - } - if natsOpts.NATSSubjectProcessed != nil { - if eeOpts.NATS == nil { - eeOpts.NATS = new(config.NATSOpts) - } - eeOpts.NATS.Subject = natsOpts.NATSSubjectProcessed + eeOpts.Kafka.KafkaTopic = erOpts.KafkaOpts.KafkaTopicProcessed } } - if sqlOpts := erOpts.SQLOpts; sqlOpts != nil { - if sqlOpts.SQLDBNameProcessed != nil { - if eeOpts.SQL == nil { - eeOpts.SQL = new(config.SQLOpts) + if erOpts.NATSOpts != nil { + initNATSExporterOpts := func() { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) } - eeOpts.SQL.DBName = sqlOpts.SQLDBNameProcessed - } - if sqlOpts.SQLTableNameProcessed != nil { - if eeOpts.SQL == nil { - eeOpts.SQL = new(config.SQLOpts) + if eeOpts.NATS == nil { + eeOpts.NATS = new(config.NATSOpts) } - eeOpts.SQL.TableName = sqlOpts.SQLTableNameProcessed - } - if sqlOpts.PgSSLModeProcessed != nil { - if eeOpts.SQL == nil { - eeOpts.SQL = new(config.SQLOpts) - } - eeOpts.SQL.PgSSLMode = sqlOpts.PgSSLModeProcessed } + if erOpts.NATSOpts.CertificateAuthorityProcessed != nil { + initNATSExporterOpts() + eeOpts.NATS.CertificateAuthority = erOpts.NATSOpts.CertificateAuthorityProcessed + } + if erOpts.NATSOpts.ClientCertificateProcessed != nil { + initNATSExporterOpts() + eeOpts.NATS.ClientCertificate = erOpts.NATSOpts.ClientCertificateProcessed + } + if erOpts.NATSOpts.ClientKeyProcessed != nil { + initNATSExporterOpts() + eeOpts.NATS.ClientKey = erOpts.NATSOpts.ClientKeyProcessed + } + if erOpts.NATSOpts.JWTFileProcessed != nil { + initNATSExporterOpts() + eeOpts.NATS.JWTFile = erOpts.NATSOpts.JWTFileProcessed + } + if erOpts.NATSOpts.JetStreamMaxWaitProcessed != nil { + initNATSExporterOpts() + eeOpts.NATS.JetStreamMaxWait = erOpts.NATSOpts.JetStreamMaxWaitProcessed + } + if erOpts.NATSOpts.JetStreamProcessed != nil { + initNATSExporterOpts() + eeOpts.NATS.JetStream = erOpts.NATSOpts.JetStreamProcessed + } + if erOpts.NATSOpts.SeedFileProcessed != nil { + initNATSExporterOpts() + eeOpts.NATS.SeedFile = erOpts.NATSOpts.SeedFileProcessed + } + if erOpts.NATSOpts.SubjectProcessed != nil { + initNATSExporterOpts() + eeOpts.NATS.Subject = erOpts.NATSOpts.SubjectProcessed + } } - return + if erOpts.SQLOpts != nil { + initSQLExporterOpts := func() { + if eeOpts == nil { + eeOpts = new(config.EventExporterOpts) + } + if eeOpts.SQL == nil { + eeOpts.SQL = new(config.SQLOpts) + } + } + + if erOpts.SQLOpts.SQLDBNameProcessed != nil { + initSQLExporterOpts() + eeOpts.SQL.DBName = erOpts.SQLOpts.SQLDBNameProcessed + } + if erOpts.SQLOpts.SQLTableNameProcessed != nil { + initSQLExporterOpts() + eeOpts.SQL.TableName = erOpts.SQLOpts.SQLTableNameProcessed + } + if erOpts.SQLOpts.PgSSLModeProcessed != nil { + initSQLExporterOpts() + eeOpts.SQL.PgSSLMode = erOpts.SQLOpts.PgSSLModeProcessed + } + } + + return eeOpts } // mergePartialEvents will unite the events using the reader configuration diff --git a/ers/libers_test.go b/ers/libers_test.go index ad6d2e69c..f57c981dd 100644 --- a/ers/libers_test.go +++ b/ers/libers_test.go @@ -32,7 +32,7 @@ func TestGetProcessOptions(t *testing.T) { AMQPQueueIDProcessed: utils.StringPointer("processed"), }, } - result := getProcessOptions(opts) + result := getProcessedOptions(opts) expected := &config.EventExporterOpts{ AMQP: &config.AMQPOpts{ QueueID: utils.StringPointer("processed"), diff --git a/ers/nats.go b/ers/nats.go index 0022e17f7..475bd7245 100644 --- a/ers/nats.go +++ b/ers/nats.go @@ -90,68 +90,85 @@ func (rdr *NatsER) Config() *config.EventReaderCfg { return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] } -// Serve will start the gorutines needed to watch the nats subject -func (rdr *NatsER) Serve() (err error) { - // Connect to a server - var nc *nats.Conn - var js nats.JetStreamContext +// Serve will subscribe to a NATS subject and process incoming messages until the rdrExit channel +// will be closed. +func (rdr *NatsER) Serve() error { - if nc, err = nats.Connect(rdr.Config().SourcePath, rdr.opts...); err != nil { - return + // Establish a connection to the nats server. + nc, err := nats.Connect(rdr.Config().SourcePath, rdr.opts...) + if err != nil { + return err } - ch := make(chan *nats.Msg) + + // Define the message handler. Its content will get executed for every received message. + msgHandler := func(msg *nats.Msg) { + + // If the rdr.cap channel buffer is empty, block until a resource is available. Otherwise + // allocate one resource and start processing the message. + if rdr.Config().ConcurrentReqs != -1 { + <-rdr.cap + } + go func(msg *nats.Msg) { + handlerErr := rdr.processMessage(msg.Data) + if handlerErr != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> processing message %s error: %s", + utils.ERs, string(msg.Data), handlerErr.Error())) + } + + // Export the received message if a poster has been defined. + if rdr.poster != nil { + handlerErr = ees.ExportWithAttempts(rdr.poster, msg.Data, utils.EmptyString) + if handlerErr != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> writing message %s error: %s", + utils.ERs, string(msg.Data), handlerErr.Error())) + } + } + + // Release the resource back to rdr.cap channel. + if rdr.Config().ConcurrentReqs != -1 { + rdr.cap <- struct{}{} + } + }(msg) + } + + // Subscribe to the appropriate NATS subject. if !rdr.jetStream { - if _, err = nc.ChanQueueSubscribe(rdr.subject, rdr.queueID, ch); err != nil { - return + _, err = nc.QueueSubscribe(rdr.subject, rdr.queueID, msgHandler) + if err != nil { + nc.Drain() + return err } } else { + var js nats.JetStreamContext js, err = nc.JetStream(rdr.jsOpts...) if err != nil { - return + nc.Drain() + return err } - if _, err = js.QueueSubscribe(rdr.subject, rdr.queueID, func(msg *nats.Msg) { - ch <- msg - }, nats.Durable(rdr.consumerName)); err != nil { - return + _, err = js.QueueSubscribe(rdr.subject, rdr.queueID, msgHandler, + nats.Durable(rdr.consumerName)) + if err != nil { + nc.Drain() + return err } } + go func() { - for { - if rdr.Config().ConcurrentReqs != -1 { - <-rdr.cap // do not try to read if the limit is reached - } - select { - case <-rdr.rdrExit: - utils.Logger.Info( - fmt.Sprintf("<%s> stop monitoring nats path <%s>", - utils.ERs, rdr.Config().SourcePath)) - nc.Drain() - if rdr.poster != nil { - rdr.poster.Close() - } - return - case msg := <-ch: - go func(msg *nats.Msg) { - if err := rdr.processMessage(msg.Data); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> processing message %s error: %s", - utils.ERs, string(msg.Data), err.Error())) - } - if rdr.poster != nil { // post it - if err := ees.ExportWithAttempts(rdr.poster, msg.Data, utils.EmptyString); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> writing message %s error: %s", - utils.ERs, string(msg.Data), err.Error())) - } - } - if rdr.Config().ConcurrentReqs != -1 { - rdr.cap <- struct{}{} - } - }(msg) - } + + // Wait for exit signal. + <-rdr.rdrExit + utils.Logger.Info( + fmt.Sprintf("<%s> stop monitoring nats path <%s>", + utils.ERs, rdr.Config().SourcePath)) + nc.Drain() + if rdr.poster != nil { + rdr.poster.Close() } }() - return + + return nil } func (rdr *NatsER) processMessage(msg []byte) (err error) { @@ -187,7 +204,7 @@ func (rdr *NatsER) processMessage(msg []byte) (err error) { } func (rdr *NatsER) createPoster() (err error) { - processedOpt := getProcessOptions(rdr.Config().Opts) + processedOpt := getProcessedOptions(rdr.Config().Opts) if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { return } @@ -199,78 +216,77 @@ func (rdr *NatsER) createPoster() (err error) { } func (rdr *NatsER) processOpts() (err error) { - if rdr.Config().Opts.NATSOpts.NATSSubject != nil { - rdr.subject = *rdr.Config().Opts.NATSOpts.NATSSubject + if rdr.Config().Opts.NATSOpts.Subject != nil { + rdr.subject = *rdr.Config().Opts.NATSOpts.Subject } var queueID string - if rdr.Config().Opts.NATSOpts.NATSQueueID != nil { - queueID = *rdr.Config().Opts.NATSOpts.NATSQueueID + if rdr.Config().Opts.NATSOpts.QueueID != nil { + queueID = *rdr.Config().Opts.NATSOpts.QueueID } rdr.queueID = utils.FirstNonEmpty(queueID, rdr.cgrCfg.GeneralCfg().NodeID) var consumerName string - if rdr.Config().Opts.NATSOpts.NATSConsumerName != nil { - consumerName = *rdr.Config().Opts.NATSOpts.NATSConsumerName + if rdr.Config().Opts.NATSOpts.ConsumerName != nil { + consumerName = *rdr.Config().Opts.NATSOpts.ConsumerName } rdr.consumerName = utils.FirstNonEmpty(consumerName, utils.CGRateSLwr) - if rdr.Config().Opts.NATSOpts.NATSJetStream != nil { - rdr.jetStream = *rdr.Config().Opts.NATSOpts.NATSJetStream + if rdr.Config().Opts.NATSOpts.JetStream != nil { + rdr.jetStream = *rdr.Config().Opts.NATSOpts.JetStream } if rdr.jetStream { - if rdr.Config().Opts.NATSOpts.NATSJetStreamMaxWait != nil { - rdr.jsOpts = []nats.JSOpt{nats.MaxWait(*rdr.Config().Opts.NATSOpts.NATSJetStreamMaxWait)} + if rdr.Config().Opts.NATSOpts.JetStreamMaxWait != nil { + rdr.jsOpts = []nats.JSOpt{nats.MaxWait(*rdr.Config().Opts.NATSOpts.JetStreamMaxWait)} } } - rdr.opts, err = GetNatsOpts(rdr.Config().Opts, + rdr.opts, err = GetNatsOpts(rdr.Config().Opts.NATSOpts, rdr.cgrCfg.GeneralCfg().NodeID, rdr.cgrCfg.GeneralCfg().ConnectTimeout) return } -func GetNatsOpts(opts *config.EventReaderOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) { - nop = make([]nats.Option, 0, 7) - nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID), +func GetNatsOpts(opts *config.NATSROpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) { + natsOpts := make([]nats.Option, 0, 7) + natsOpts = append(natsOpts, nats.Name(utils.CGRateSLwr+nodeID), nats.Timeout(connTimeout), nats.DrainTimeout(time.Second)) - if opts.NATSOpts.NATSJWTFile != nil { + if opts.JWTFile != nil { keys := make([]string, 0, 1) - if opts.NATSOpts.NATSSeedFile != nil { - keys = append(keys, *opts.NATSOpts.NATSSeedFile) + if opts.SeedFile != nil { + keys = append(keys, *opts.SeedFile) } - nop = append(nop, nats.UserCredentials(*opts.NATSOpts.NATSJWTFile, keys...)) + natsOpts = append(natsOpts, nats.UserCredentials(*opts.JWTFile, keys...)) } - if opts.NATSOpts.NATSSeedFile != nil { - opt, err := nats.NkeyOptionFromSeed(*opts.NATSOpts.NATSSeedFile) + if opts.SeedFile != nil { + opt, err := nats.NkeyOptionFromSeed(*opts.SeedFile) if err != nil { return nil, err } - nop = append(nop, opt) - } - if opts.NATSOpts.NATSClientCertificate != nil { - if opts.NATSOpts.NATSClientKey == nil { - err = fmt.Errorf("has certificate but no key") - return - } - nop = append(nop, nats.ClientCert(*opts.NATSOpts.NATSClientCertificate, *opts.NATSOpts.NATSClientKey)) - } else if opts.NATSOpts.NATSClientKey != nil { - err = fmt.Errorf("has key but no certificate") - return + natsOpts = append(natsOpts, opt) } - if opts.NATSOpts.NATSCertificateAuthority != nil { - nop = append(nop, + switch { + case opts.ClientCertificate != nil && opts.ClientKey != nil: + natsOpts = append(natsOpts, nats.ClientCert(*opts.ClientCertificate, *opts.ClientKey)) + case opts.ClientCertificate != nil: + return nil, fmt.Errorf("has certificate but no key") + case opts.ClientKey != nil: + return nil, fmt.Errorf("has key but no certificate") + } + + if opts.CertificateAuthority != nil { + natsOpts = append(natsOpts, func(o *nats.Options) error { pool, err := x509.SystemCertPool() if err != nil { return err } - rootPEM, err := os.ReadFile(*opts.NATSOpts.NATSCertificateAuthority) + rootPEM, err := os.ReadFile(*opts.CertificateAuthority) if err != nil || rootPEM == nil { return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err) } ok := pool.AppendCertsFromPEM(rootPEM) if !ok { return fmt.Errorf("nats: failed to parse root certificate from %q", - *opts.NATSOpts.NATSCertificateAuthority) + *opts.CertificateAuthority) } if o.TLSConfig == nil { o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} @@ -280,5 +296,5 @@ func GetNatsOpts(opts *config.EventReaderOpts, nodeID string, connTimeout time.D return nil }) } - return + return natsOpts, nil } diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go index 629a4a76e..9bb6fc124 100644 --- a/ers/nats_it_test.go +++ b/ers/nats_it_test.go @@ -22,6 +22,7 @@ along with this program. If not, see package ers import ( + "encoding/json" "fmt" "os" "os/exec" @@ -37,6 +38,178 @@ import ( "github.com/nats-io/nats.go" ) +func TestERsNATSIT(t *testing.T) { + t.Skip() + cfgContent := `{ + +"general": { + "log_level": 7 +}, + +"data_db": { + "db_type": "*internal" +}, + +"stor_db": { + "db_type": "*internal" +}, + +"ers": { + "enabled": true, + "sessions_conns":[], + "readers": [ + { + "id": "nats_consumer1", + "type": "*nats_json_map", + "source_path": "nats://127.0.0.1:4222", + "processed_path": "nats://127.0.0.1:4222", + "opts": { + "natsJetStream": true, + "natsConsumerName": "cgrates_consumer", + "natsSubject": "cgrates_cdrs", + "natsQueueID": "queue", + "natsJetStreamMaxWait": "5s", + + "natsJetStreamProcessed": true, + "natsSubjectProcessed": "cgrates_cdrs_processed", + "natsJetStreamMaxWaitProcessed": "5s" + }, + "flags": ["*dryrun"], + "fields":[ + {"tag": "cdr_template", "type": "*template", "value": "cdr_template"} + ] + }, + { + "id": "nats_consumer2", + "type": "*nats_json_map", + "source_path": "nats://127.0.0.1:4222", + "processed_path": "nats://127.0.0.1:4222", + "opts": { + "natsJetStream": true, + "natsConsumerName": "cgrates_consumer", + "natsSubject": "cgrates_cdrs", + "natsQueueID": "queue", + "natsJetStreamMaxWait": "5s", + + "natsJetStreamProcessed": true, + "natsSubjectProcessed": "cgrates_cdrs_processed", + "natsJetStreamMaxWaitProcessed": "5s" + }, + "flags": ["*dryrun"], + "fields":[ + {"tag": "cdr_template", "type": "*template", "value": "cdr_template"} + ] + }, + { + "id": "nats_consumer3", + "type": "*nats_json_map", + "source_path": "nats://127.0.0.1:4222", + "processed_path": "nats://127.0.0.1:4222", + "opts": { + "natsJetStream": true, + "natsConsumerName": "cgrates_consumer", + "natsSubject": "cgrates_cdrs", + "natsQueueID": "queue", + "natsJetStreamMaxWait": "5s", + + "natsJetStreamProcessed": true, + "natsSubjectProcessed": "cgrates_cdrs_processed", + "natsJetStreamMaxWaitProcessed": "5s" + }, + "flags": ["*dryrun"], + "fields":[ + {"tag": "cdr_template", "type": "*template", "value": "cdr_template"} + ] + }, + { + "id": "nats_consumer4", + "type": "*nats_json_map", + "source_path": "nats://127.0.0.1:4222", + "processed_path": "", + "opts": { + "natsJetStream": true, + "natsConsumerName": "cgrates_consumer4", + "natsSubject": "cgrates_cdrs_processed", + "natsQueueID": "", + "natsJetStreamMaxWait": "5s", + }, + "flags": ["*dryrun"], + "fields":[ + {"tag": "cdr_template", "type": "*template", "value": "cdr_template"} + ] + } + ] +}, + + +"templates": { + "cdr_template": [ + // {"tag": "Source", "path": "*cgreq.Source", "type": "*constant", "value": "ers_template_combined", "mandatory": true}, + // {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.2", "mandatory": true}, + // {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.3", "mandatory": true}, + // {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.4", "mandatory": true}, + // {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.6", "mandatory": true}, + // {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.7", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true}, + // {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.9", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true}, + // {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.11", "mandatory": true}, + // {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.12", "mandatory": true}, + // {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.13", "mandatory": true} + ] +} + +}` + cfg, cfgPath, clean, err := initTestCfg(cfgContent) + if err != nil { + t.Fatal(err) + } + defer clean() + + nc, err := nats.Connect(cfg.ERsCfg().Readers[1].SourcePath) + if err != nil { + t.Fatal(err) + } + defer nc.Close() + + js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) + if err != nil { + t.Fatal(err) + } + + js.AddStream(&nats.StreamConfig{ + Name: "CDRs", + Subjects: []string{"cgrates_cdrs", "cgrates_cdrs_processed"}, + }) + + time.Sleep(2 * time.Second) + + if _, err := engine.StopStartEngine(cfgPath, 100); err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + cdr := map[string]any{ + "Account": 1000 + i, + "Destination": 2000 + i, + } + b, _ := json.Marshal(cdr) + js.PublishAsync("cgrates_cdrs", b) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatal("Did not resolve in time") + } + + // Add verification + + err = engine.KillEngine(100) + if err != nil { + t.Error(err) + } +} + func testCheckNatsData(t *testing.T, randomCGRID, expData string, ch chan *nats.Msg) { select { case err := <-rdrErr: @@ -80,7 +253,7 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) { t.Fatal(err) } - nop, err := GetNatsOpts(rdr.Config().Opts, "testExp", time.Second) + nop, err := GetNatsOpts(rdr.Config().Opts.NATSOpts, "testExp", time.Second) if err != nil { t.Fatal(err) } @@ -167,7 +340,7 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) { t.Fatal(err) } - nop, err := GetNatsOpts(rdr.Config().Opts, "testExp", time.Second) + nop, err := GetNatsOpts(rdr.Config().Opts.NATSOpts, "testExp", time.Second) if err != nil { t.Fatal(err) } diff --git a/ers/s3.go b/ers/s3.go index e43a83975..e611a9b58 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -195,7 +195,7 @@ func (rdr *S3ER) readLoop(scv s3Client) (err error) { } func (rdr *S3ER) createPoster() { - processedOpt := getProcessOptions(rdr.Config().Opts) + processedOpt := getProcessedOptions(rdr.Config().Opts) if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { return } diff --git a/ers/sql.go b/ers/sql.go index 97c112cb7..79ed8667d 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -301,7 +301,7 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string, opts *config.EventReader } // outURL - processedOpt := getProcessOptions(opts) + processedOpt := getProcessedOptions(opts) if processedOpt == nil { if len(outURL) == 0 { return diff --git a/ers/sqs.go b/ers/sqs.go index c1c84ce1a..2e778e5e1 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -219,7 +219,7 @@ func (rdr *SQSER) readLoop(scv sqsClient) (err error) { } func (rdr *SQSER) createPoster() { - processedOpt := getProcessOptions(rdr.Config().Opts) + processedOpt := getProcessedOptions(rdr.Config().Opts) if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { return } diff --git a/go.mod b/go.mod index 82cce8d94..81d7a2643 100644 --- a/go.mod +++ b/go.mod @@ -39,13 +39,13 @@ require ( github.com/mediocregopher/radix/v3 v3.8.1 github.com/miekg/dns v1.1.54 github.com/mitchellh/mapstructure v1.4.0 - github.com/nats-io/nats.go v1.11.0 + github.com/nats-io/nats.go v1.30.0 github.com/nyaruka/phonenumbers v1.0.75 github.com/peterh/liner v1.2.1 github.com/rabbitmq/amqp091-go v1.5.0 github.com/segmentio/kafka-go v0.4.8 go.mongodb.org/mongo-driver v1.11.0 - golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d + golang.org/x/crypto v0.6.0 golang.org/x/net v0.10.0 golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5 google.golang.org/api v0.36.0 @@ -89,14 +89,14 @@ require ( github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/klauspost/compress v1.13.6 // indirect + github.com/klauspost/compress v1.17.0 // indirect github.com/kr/pretty v0.2.1 // indirect github.com/lib/pq v1.8.0 // indirect github.com/mattn/go-runewidth v0.0.10 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/mschoch/smat v0.2.0 // indirect github.com/nats-io/nats-server/v2 v2.2.6 // indirect - github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nkeys v0.4.5 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/philhofer/fwd v1.1.1 // indirect github.com/pierrec/lz4 v2.6.0+incompatible // indirect diff --git a/go.sum b/go.sum index bb9515ebd..c9e5ffa54 100644 --- a/go.sum +++ b/go.sum @@ -318,8 +318,9 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= +github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kljensen/snowball v0.6.0/go.mod h1:27N7E8fVU5H68RlUmnWwZCfxgt4POBJfENGMvNRhldw= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -371,11 +372,13 @@ github.com/nats-io/jwt/v2 v2.0.2 h1:ejVCLO8gu6/4bOKIHQpmB5UhhUJfAQw55yvLWpfmKjI= github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= github.com/nats-io/nats-server/v2 v2.2.6 h1:FPK9wWx9pagxcw14s8W9rlfzfyHm61uNLnJyybZbn48= github.com/nats-io/nats-server/v2 v2.2.6/go.mod h1:sEnFaxqe09cDmfMgACxZbziXnhQFhwk+aKkZjBBRYrI= -github.com/nats-io/nats.go v1.11.0 h1:L263PZkrmkRJRJT2YHU8GwWWvEvmr9/LUKuJTXsF32k= github.com/nats-io/nats.go v1.11.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.30.0 h1:bj/rVsRCrFXxmm9mJiDhb74UKl2HhKpDwKRBtvCjZjc= +github.com/nats-io/nats.go v1.30.0/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= -github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= +github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nyaruka/phonenumbers v1.0.75 h1:OCwKXSjTi6IzuI4gVi8zfY+0s60DQUC6ks8Ll4j0eyU= @@ -513,8 +516,9 @@ golang.org/x/crypto v0.0.0-20200320181102-891825fb96df/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=