diff --git a/config/config_defaults.go b/config/config_defaults.go
index a12c9cc3c..c79f183e7 100644
--- a/config/config_defaults.go
+++ b/config/config_defaults.go
@@ -463,7 +463,7 @@ const CGRATES_CFG_JSON = `
// "natsCertificateAuthorityProcessed": "", // the path to a custom certificate authority file( used by tls)
// "natsClientCertificateProcessed": "", // the path to a client certificate( used by tls)
// "natsClientKeyProcessed": "", // the path to a client key( used by tls)
- // "natsJetStreamMaxWaitProcessed": "5s ", // the maximum amount of time to wait for a response
+ // "natsJetStreamMaxWaitProcessed": "5s", // the maximum amount of time to wait for a response
},
"tenant": "", // tenant used by import
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
diff --git a/config/config_it_test.go b/config/config_it_test.go
index 4bf9af8b0..33ef64111 100644
--- a/config/config_it_test.go
+++ b/config/config_it_test.go
@@ -621,7 +621,7 @@ func testCGRConfigReloadERs(t *testing.T) {
PartialCacheAction: utils.StringPointer(utils.MetaNone),
XMLRootPath: utils.StringPointer(utils.EmptyString),
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
},
},
@@ -651,7 +651,7 @@ func testCGRConfigReloadERs(t *testing.T) {
PartialCacheAction: utils.StringPointer(utils.MetaNone),
XMLRootPath: utils.StringPointer(utils.EmptyString),
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
},
},
diff --git a/config/config_test.go b/config/config_test.go
index 94c6e3a6a..3a2689736 100644
--- a/config/config_test.go
+++ b/config/config_test.go
@@ -2368,7 +2368,7 @@ func TestERSConfig(t *testing.T) {
PartialCacheAction: utils.StringPointer(utils.MetaNone),
XMLRootPath: utils.StringPointer(utils.EmptyString),
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
},
},
@@ -5411,7 +5411,7 @@ func TestCgrCdfEventReader(t *testing.T) {
KafkaOpts: &KafkaROpts{},
PartialOrderField: utils.StringPointer("~*req.AnswerTime"),
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
PartialCacheAction: utils.StringPointer(utils.MetaNone),
XMLRootPath: utils.StringPointer(utils.EmptyString),
@@ -5524,7 +5524,7 @@ func TestCgrCfgEventReaderDefault(t *testing.T) {
SQLOpts: &SQLROpts{},
KafkaOpts: &KafkaROpts{},
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
},
}
diff --git a/config/erscfg.go b/config/erscfg.go
index 6aa7ac2fa..381797cf4 100644
--- a/config/erscfg.go
+++ b/config/erscfg.go
@@ -313,88 +313,88 @@ func (awsROpts *AWSROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err erro
}
type NATSROpts struct {
- NATSJetStream *bool
- NATSConsumerName *string
- NATSSubject *string
- NATSQueueID *string
- NATSJWTFile *string
- NATSSeedFile *string
- NATSCertificateAuthority *string
- NATSClientCertificate *string
- NATSClientKey *string
- NATSJetStreamMaxWait *time.Duration
- NATSJetStreamProcessed *bool
- NATSSubjectProcessed *string
- NATSJWTFileProcessed *string
- NATSSeedFileProcessed *string
- NATSCertificateAuthorityProcessed *string
- NATSClientCertificateProcessed *string
- NATSClientKeyProcessed *string
- NATSJetStreamMaxWaitProcessed *time.Duration
+ JetStream *bool
+ ConsumerName *string
+ Subject *string
+ QueueID *string
+ JWTFile *string
+ SeedFile *string
+ CertificateAuthority *string
+ ClientCertificate *string
+ ClientKey *string
+ JetStreamMaxWait *time.Duration
+ JetStreamProcessed *bool
+ SubjectProcessed *string
+ JWTFileProcessed *string
+ SeedFileProcessed *string
+ CertificateAuthorityProcessed *string
+ ClientCertificateProcessed *string
+ ClientKeyProcessed *string
+ JetStreamMaxWaitProcessed *time.Duration
}
func (natsOpts *NATSROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) {
if jsnCfg.NATSJetStream != nil {
- natsOpts.NATSJetStream = jsnCfg.NATSJetStream
+ natsOpts.JetStream = jsnCfg.NATSJetStream
}
if jsnCfg.NATSConsumerName != nil {
- natsOpts.NATSConsumerName = jsnCfg.NATSConsumerName
+ natsOpts.ConsumerName = jsnCfg.NATSConsumerName
}
if jsnCfg.NATSSubject != nil {
- natsOpts.NATSSubject = jsnCfg.NATSSubject
+ natsOpts.Subject = jsnCfg.NATSSubject
}
if jsnCfg.NATSQueueID != nil {
- natsOpts.NATSQueueID = jsnCfg.NATSQueueID
+ natsOpts.QueueID = jsnCfg.NATSQueueID
}
if jsnCfg.NATSJWTFile != nil {
- natsOpts.NATSJWTFile = jsnCfg.NATSJWTFile
+ natsOpts.JWTFile = jsnCfg.NATSJWTFile
}
if jsnCfg.NATSSeedFile != nil {
- natsOpts.NATSSeedFile = jsnCfg.NATSSeedFile
+ natsOpts.SeedFile = jsnCfg.NATSSeedFile
}
if jsnCfg.NATSCertificateAuthority != nil {
- natsOpts.NATSCertificateAuthority = jsnCfg.NATSCertificateAuthority
+ natsOpts.CertificateAuthority = jsnCfg.NATSCertificateAuthority
}
if jsnCfg.NATSClientCertificate != nil {
- natsOpts.NATSClientCertificate = jsnCfg.NATSClientCertificate
+ natsOpts.ClientCertificate = jsnCfg.NATSClientCertificate
}
if jsnCfg.NATSClientKey != nil {
- natsOpts.NATSClientKey = jsnCfg.NATSClientKey
+ natsOpts.ClientKey = jsnCfg.NATSClientKey
}
if jsnCfg.NATSJetStreamMaxWait != nil {
var jetStreamMaxWait time.Duration
if jetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWait); err != nil {
return
}
- natsOpts.NATSJetStreamMaxWait = utils.DurationPointer(jetStreamMaxWait)
+ natsOpts.JetStreamMaxWait = utils.DurationPointer(jetStreamMaxWait)
}
if jsnCfg.NATSJetStreamProcessed != nil {
- natsOpts.NATSJetStreamProcessed = jsnCfg.NATSJetStreamProcessed
+ natsOpts.JetStreamProcessed = jsnCfg.NATSJetStreamProcessed
}
if jsnCfg.NATSSubjectProcessed != nil {
- natsOpts.NATSSubjectProcessed = jsnCfg.NATSSubjectProcessed
+ natsOpts.SubjectProcessed = jsnCfg.NATSSubjectProcessed
}
if jsnCfg.NATSJWTFileProcessed != nil {
- natsOpts.NATSJWTFileProcessed = jsnCfg.NATSJWTFileProcessed
+ natsOpts.JWTFileProcessed = jsnCfg.NATSJWTFileProcessed
}
if jsnCfg.NATSSeedFileProcessed != nil {
- natsOpts.NATSSeedFileProcessed = jsnCfg.NATSSeedFileProcessed
+ natsOpts.SeedFileProcessed = jsnCfg.NATSSeedFileProcessed
}
if jsnCfg.NATSCertificateAuthorityProcessed != nil {
- natsOpts.NATSCertificateAuthorityProcessed = jsnCfg.NATSCertificateAuthorityProcessed
+ natsOpts.CertificateAuthorityProcessed = jsnCfg.NATSCertificateAuthorityProcessed
}
if jsnCfg.NATSClientCertificateProcessed != nil {
- natsOpts.NATSClientCertificateProcessed = jsnCfg.NATSClientCertificateProcessed
+ natsOpts.ClientCertificateProcessed = jsnCfg.NATSClientCertificateProcessed
}
if jsnCfg.NATSClientKeyProcessed != nil {
- natsOpts.NATSClientKeyProcessed = jsnCfg.NATSClientKeyProcessed
+ natsOpts.ClientKeyProcessed = jsnCfg.NATSClientKeyProcessed
}
if jsnCfg.NATSJetStreamMaxWaitProcessed != nil {
var jetStreamMaxWait time.Duration
if jetStreamMaxWait, err = utils.ParseDurationWithNanosecs(*jsnCfg.NATSJetStreamMaxWaitProcessed); err != nil {
return
}
- natsOpts.NATSJetStreamMaxWaitProcessed = utils.DurationPointer(jetStreamMaxWait)
+ natsOpts.JetStreamMaxWaitProcessed = utils.DurationPointer(jetStreamMaxWait)
}
return
}
@@ -761,77 +761,77 @@ func (awsOpt *AWSROpts) Clone() *AWSROpts {
}
func (natOpts *NATSROpts) Clone() *NATSROpts {
cln := &NATSROpts{}
- if natOpts.NATSJetStream != nil {
- cln.NATSJetStream = new(bool)
- *cln.NATSJetStream = *natOpts.NATSJetStream
+ if natOpts.JetStream != nil {
+ cln.JetStream = new(bool)
+ *cln.JetStream = *natOpts.JetStream
}
- if natOpts.NATSConsumerName != nil {
- cln.NATSConsumerName = new(string)
- *cln.NATSConsumerName = *natOpts.NATSConsumerName
+ if natOpts.ConsumerName != nil {
+ cln.ConsumerName = new(string)
+ *cln.ConsumerName = *natOpts.ConsumerName
}
- if natOpts.NATSSubject != nil {
- cln.NATSSubject = new(string)
- *cln.NATSSubject = *natOpts.NATSSubject
+ if natOpts.Subject != nil {
+ cln.Subject = new(string)
+ *cln.Subject = *natOpts.Subject
}
- if natOpts.NATSQueueID != nil {
- cln.NATSQueueID = new(string)
- *cln.NATSQueueID = *natOpts.NATSQueueID
+ if natOpts.QueueID != nil {
+ cln.QueueID = new(string)
+ *cln.QueueID = *natOpts.QueueID
}
- if natOpts.NATSJWTFile != nil {
- cln.NATSJWTFile = new(string)
- *cln.NATSJWTFile = *natOpts.NATSJWTFile
+ if natOpts.JWTFile != nil {
+ cln.JWTFile = new(string)
+ *cln.JWTFile = *natOpts.JWTFile
}
- if natOpts.NATSSeedFile != nil {
- cln.NATSSeedFile = new(string)
- *cln.NATSSeedFile = *natOpts.NATSSeedFile
+ if natOpts.SeedFile != nil {
+ cln.SeedFile = new(string)
+ *cln.SeedFile = *natOpts.SeedFile
}
- if natOpts.NATSCertificateAuthority != nil {
- cln.NATSCertificateAuthority = new(string)
- *cln.NATSCertificateAuthority = *natOpts.NATSCertificateAuthority
+ if natOpts.CertificateAuthority != nil {
+ cln.CertificateAuthority = new(string)
+ *cln.CertificateAuthority = *natOpts.CertificateAuthority
}
- if natOpts.NATSClientCertificate != nil {
- cln.NATSClientCertificate = new(string)
- *cln.NATSClientCertificate = *natOpts.NATSClientCertificate
+ if natOpts.ClientCertificate != nil {
+ cln.ClientCertificate = new(string)
+ *cln.ClientCertificate = *natOpts.ClientCertificate
}
- if natOpts.NATSClientKey != nil {
- cln.NATSClientKey = new(string)
- *cln.NATSClientKey = *natOpts.NATSClientKey
+ if natOpts.ClientKey != nil {
+ cln.ClientKey = new(string)
+ *cln.ClientKey = *natOpts.ClientKey
}
- if natOpts.NATSJetStreamMaxWait != nil {
- cln.NATSJetStreamMaxWait = new(time.Duration)
- *cln.NATSJetStreamMaxWait = *natOpts.NATSJetStreamMaxWait
+ if natOpts.JetStreamMaxWait != nil {
+ cln.JetStreamMaxWait = new(time.Duration)
+ *cln.JetStreamMaxWait = *natOpts.JetStreamMaxWait
}
- if natOpts.NATSJetStreamProcessed != nil {
- cln.NATSJetStreamProcessed = new(bool)
- *cln.NATSJetStreamProcessed = *natOpts.NATSJetStreamProcessed
+ if natOpts.JetStreamProcessed != nil {
+ cln.JetStreamProcessed = new(bool)
+ *cln.JetStreamProcessed = *natOpts.JetStreamProcessed
}
- if natOpts.NATSSubjectProcessed != nil {
- cln.NATSSubjectProcessed = new(string)
- *cln.NATSSubjectProcessed = *natOpts.NATSSubjectProcessed
+ if natOpts.SubjectProcessed != nil {
+ cln.SubjectProcessed = new(string)
+ *cln.SubjectProcessed = *natOpts.SubjectProcessed
}
- if natOpts.NATSJWTFileProcessed != nil {
- cln.NATSJWTFileProcessed = new(string)
- *cln.NATSJWTFileProcessed = *natOpts.NATSJWTFileProcessed
+ if natOpts.JWTFileProcessed != nil {
+ cln.JWTFileProcessed = new(string)
+ *cln.JWTFileProcessed = *natOpts.JWTFileProcessed
}
- if natOpts.NATSSeedFileProcessed != nil {
- cln.NATSSeedFileProcessed = new(string)
- *cln.NATSSeedFileProcessed = *natOpts.NATSSeedFileProcessed
+ if natOpts.SeedFileProcessed != nil {
+ cln.SeedFileProcessed = new(string)
+ *cln.SeedFileProcessed = *natOpts.SeedFileProcessed
}
- if natOpts.NATSCertificateAuthorityProcessed != nil {
- cln.NATSCertificateAuthorityProcessed = new(string)
- *cln.NATSCertificateAuthorityProcessed = *natOpts.NATSCertificateAuthorityProcessed
+ if natOpts.CertificateAuthorityProcessed != nil {
+ cln.CertificateAuthorityProcessed = new(string)
+ *cln.CertificateAuthorityProcessed = *natOpts.CertificateAuthorityProcessed
}
- if natOpts.NATSClientCertificateProcessed != nil {
- cln.NATSClientCertificateProcessed = new(string)
- *cln.NATSClientCertificateProcessed = *natOpts.NATSClientCertificateProcessed
+ if natOpts.ClientCertificateProcessed != nil {
+ cln.ClientCertificateProcessed = new(string)
+ *cln.ClientCertificateProcessed = *natOpts.ClientCertificateProcessed
}
- if natOpts.NATSClientKeyProcessed != nil {
- cln.NATSClientKeyProcessed = new(string)
- *cln.NATSClientKeyProcessed = *natOpts.NATSClientKeyProcessed
+ if natOpts.ClientKeyProcessed != nil {
+ cln.ClientKeyProcessed = new(string)
+ *cln.ClientKeyProcessed = *natOpts.ClientKeyProcessed
}
- if natOpts.NATSJetStreamMaxWaitProcessed != nil {
- cln.NATSJetStreamMaxWaitProcessed = new(time.Duration)
- *cln.NATSJetStreamMaxWaitProcessed = *natOpts.NATSJetStreamMaxWaitProcessed
+ if natOpts.JetStreamMaxWaitProcessed != nil {
+ cln.JetStreamMaxWaitProcessed = new(time.Duration)
+ *cln.JetStreamMaxWaitProcessed = *natOpts.JetStreamMaxWaitProcessed
}
return cln
}
@@ -1072,59 +1072,59 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string
}
if natsOpts := er.Opts.NATSOpts; natsOpts != nil {
- if natsOpts.NATSJetStream != nil {
- opts[utils.NatsJetStream] = *natsOpts.NATSJetStream
+ if natsOpts.JetStream != nil {
+ opts[utils.NatsJetStream] = *natsOpts.JetStream
}
- if natsOpts.NATSConsumerName != nil {
- opts[utils.NatsConsumerName] = *natsOpts.NATSConsumerName
+ if natsOpts.ConsumerName != nil {
+ opts[utils.NatsConsumerName] = *natsOpts.ConsumerName
}
- if natsOpts.NATSSubject != nil {
- opts[utils.NatsSubject] = *natsOpts.NATSSubject
+ if natsOpts.Subject != nil {
+ opts[utils.NatsSubject] = *natsOpts.Subject
}
- if natsOpts.NATSQueueID != nil {
- opts[utils.NatsQueueID] = *natsOpts.NATSQueueID
+ if natsOpts.QueueID != nil {
+ opts[utils.NatsQueueID] = *natsOpts.QueueID
}
- if natsOpts.NATSJWTFile != nil {
- opts[utils.NatsJWTFile] = *natsOpts.NATSJWTFile
+ if natsOpts.JWTFile != nil {
+ opts[utils.NatsJWTFile] = *natsOpts.JWTFile
}
- if natsOpts.NATSSeedFile != nil {
- opts[utils.NatsSeedFile] = *natsOpts.NATSSeedFile
+ if natsOpts.SeedFile != nil {
+ opts[utils.NatsSeedFile] = *natsOpts.SeedFile
}
- if natsOpts.NATSCertificateAuthority != nil {
- opts[utils.NatsCertificateAuthority] = *natsOpts.NATSCertificateAuthority
+ if natsOpts.CertificateAuthority != nil {
+ opts[utils.NatsCertificateAuthority] = *natsOpts.CertificateAuthority
}
- if natsOpts.NATSClientCertificate != nil {
- opts[utils.NatsClientCertificate] = *natsOpts.NATSClientCertificate
+ if natsOpts.ClientCertificate != nil {
+ opts[utils.NatsClientCertificate] = *natsOpts.ClientCertificate
}
- if natsOpts.NATSClientKey != nil {
- opts[utils.NatsClientKey] = *natsOpts.NATSClientKey
+ if natsOpts.ClientKey != nil {
+ opts[utils.NatsClientKey] = *natsOpts.ClientKey
}
- if natsOpts.NATSJetStreamMaxWait != nil {
- opts[utils.NatsJetStreamMaxWait] = natsOpts.NATSJetStreamMaxWait.String()
+ if natsOpts.JetStreamMaxWait != nil {
+ opts[utils.NatsJetStreamMaxWait] = natsOpts.JetStreamMaxWait.String()
}
- if natsOpts.NATSJetStreamProcessed != nil {
- opts[utils.NATSJetStreamProcessedCfg] = *natsOpts.NATSJetStreamProcessed
+ if natsOpts.JetStreamProcessed != nil {
+ opts[utils.NATSJetStreamProcessedCfg] = *natsOpts.JetStreamProcessed
}
- if natsOpts.NATSSubjectProcessed != nil {
- opts[utils.NATSSubjectProcessedCfg] = *natsOpts.NATSSubjectProcessed
+ if natsOpts.SubjectProcessed != nil {
+ opts[utils.NATSSubjectProcessedCfg] = *natsOpts.SubjectProcessed
}
- if natsOpts.NATSJWTFileProcessed != nil {
- opts[utils.NATSJWTFileProcessedCfg] = *natsOpts.NATSJWTFileProcessed
+ if natsOpts.JWTFileProcessed != nil {
+ opts[utils.NATSJWTFileProcessedCfg] = *natsOpts.JWTFileProcessed
}
- if natsOpts.NATSSeedFileProcessed != nil {
- opts[utils.NATSSeedFileProcessedCfg] = *natsOpts.NATSSeedFileProcessed
+ if natsOpts.SeedFileProcessed != nil {
+ opts[utils.NATSSeedFileProcessedCfg] = *natsOpts.SeedFileProcessed
}
- if natsOpts.NATSCertificateAuthorityProcessed != nil {
- opts[utils.NATSCertificateAuthorityProcessedCfg] = *natsOpts.NATSCertificateAuthorityProcessed
+ if natsOpts.CertificateAuthorityProcessed != nil {
+ opts[utils.NATSCertificateAuthorityProcessedCfg] = *natsOpts.CertificateAuthorityProcessed
}
- if natsOpts.NATSClientCertificateProcessed != nil {
- opts[utils.NATSClientCertificateProcessed] = *natsOpts.NATSClientCertificateProcessed
+ if natsOpts.ClientCertificateProcessed != nil {
+ opts[utils.NATSClientCertificateProcessed] = *natsOpts.ClientCertificateProcessed
}
- if natsOpts.NATSClientKeyProcessed != nil {
- opts[utils.NATSClientKeyProcessedCfg] = *natsOpts.NATSClientKeyProcessed
+ if natsOpts.ClientKeyProcessed != nil {
+ opts[utils.NATSClientKeyProcessedCfg] = *natsOpts.ClientKeyProcessed
}
- if natsOpts.NATSJetStreamMaxWaitProcessed != nil {
- opts[utils.NATSJetStreamMaxWaitProcessedCfg] = natsOpts.NATSJetStreamMaxWaitProcessed.String()
+ if natsOpts.JetStreamMaxWaitProcessed != nil {
+ opts[utils.NATSJetStreamMaxWaitProcessedCfg] = natsOpts.JetStreamMaxWaitProcessed.String()
}
}
initialMP = map[string]any{
diff --git a/config/erscfg_test.go b/config/erscfg_test.go
index 6a12589b6..b19a606c7 100644
--- a/config/erscfg_test.go
+++ b/config/erscfg_test.go
@@ -113,7 +113,7 @@ func TestERSClone(t *testing.T) {
PartialOrderField: utils.StringPointer("~*req.AnswerTime"),
PartialCacheAction: utils.StringPointer(utils.MetaNone),
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
},
},
@@ -153,7 +153,7 @@ func TestERSClone(t *testing.T) {
PartialOrderField: utils.StringPointer("~*req.AnswerTime"),
PartialCacheAction: utils.StringPointer(utils.MetaNone),
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
},
},
@@ -281,7 +281,7 @@ func TestERSLoadFromjsonCfg(t *testing.T) {
PartialOrderField: utils.StringPointer("~*req.AnswerTime"),
PartialCacheAction: utils.StringPointer(utils.MetaNone),
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
},
},
@@ -336,7 +336,7 @@ func TestERSLoadFromjsonCfg(t *testing.T) {
PartialOrderField: utils.StringPointer("~*req.AnswerTime"),
PartialCacheAction: utils.StringPointer(utils.MetaNone),
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
},
},
@@ -524,7 +524,7 @@ func TestERSloadFromJsonCase3(t *testing.T) {
PartialOrderField: utils.StringPointer("~*req.AnswerTime"),
PartialCacheAction: utils.StringPointer(utils.MetaNone),
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
},
},
@@ -563,7 +563,7 @@ func TestERSloadFromJsonCase3(t *testing.T) {
PartialOrderField: utils.StringPointer("~*req.AnswerTime"),
PartialCacheAction: utils.StringPointer(utils.MetaNone),
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
},
},
@@ -675,7 +675,7 @@ func TestERSloadFromJsonCase4(t *testing.T) {
PartialOrderField: utils.StringPointer("~*req.AnswerTime"),
PartialCacheAction: utils.StringPointer(utils.MetaNone),
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
},
},
@@ -714,7 +714,7 @@ func TestERSloadFromJsonCase4(t *testing.T) {
PartialOrderField: utils.StringPointer("~*req.AnswerTime"),
PartialCacheAction: utils.StringPointer(utils.MetaNone),
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
},
},
@@ -821,7 +821,7 @@ func TestEventReaderSameID(t *testing.T) {
PartialOrderField: utils.StringPointer("~*req.AnswerTime"),
PartialCacheAction: utils.StringPointer(utils.MetaNone),
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
},
},
@@ -856,7 +856,7 @@ func TestEventReaderSameID(t *testing.T) {
PartialOrderField: utils.StringPointer("~*req.AnswerTime"),
PartialCacheAction: utils.StringPointer(utils.MetaNone),
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
},
},
@@ -1315,7 +1315,7 @@ func TestERsloadFromJsonCfg(t *testing.T) {
PartialOrderField: utils.StringPointer("~*req.AnswerTime"),
PartialCacheAction: utils.StringPointer(utils.MetaNone),
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
},
},
@@ -1356,7 +1356,7 @@ func TestERsloadFromJsonCfg(t *testing.T) {
PartialOrderField: utils.StringPointer("~*req.AnswerTime"),
PartialCacheAction: utils.StringPointer(utils.MetaNone),
NATSOpts: &NATSROpts{
- NATSSubject: utils.StringPointer("cgrates_cdrs"),
+ Subject: utils.StringPointer("cgrates_cdrs"),
},
},
},
@@ -1539,23 +1539,23 @@ func TestEventReaderCfgClone(t *testing.T) {
S3BucketIDProcessed: utils.StringPointer("S3BucketProc"),
},
NATSOpts: &NATSROpts{
- NATSJetStream: utils.BoolPointer(false),
- NATSConsumerName: utils.StringPointer("user"),
- NATSQueueID: utils.StringPointer("id"),
- NATSJWTFile: utils.StringPointer("jwt"),
- NATSSeedFile: utils.StringPointer("seed"),
- NATSCertificateAuthority: utils.StringPointer("authority"),
- NATSClientCertificate: utils.StringPointer("certificate"),
- NATSClientKey: utils.StringPointer("key5"),
- NATSJetStreamMaxWait: utils.DurationPointer(1 * time.Minute),
- NATSJetStreamProcessed: utils.BoolPointer(true),
- NATSJWTFileProcessed: utils.StringPointer("file"),
- NATSSeedFileProcessed: utils.StringPointer("natseed"),
- NATSCertificateAuthorityProcessed: utils.StringPointer("natsauth"),
- NATSClientCertificateProcessed: utils.StringPointer("natcertificate"),
- NATSClientKeyProcessed: utils.StringPointer("natsprocess"),
- NATSJetStreamMaxWaitProcessed: utils.DurationPointer(1 * time.Minute),
- NATSSubjectProcessed: utils.StringPointer("process"),
+ JetStream: utils.BoolPointer(false),
+ ConsumerName: utils.StringPointer("user"),
+ QueueID: utils.StringPointer("id"),
+ JWTFile: utils.StringPointer("jwt"),
+ SeedFile: utils.StringPointer("seed"),
+ CertificateAuthority: utils.StringPointer("authority"),
+ ClientCertificate: utils.StringPointer("certificate"),
+ ClientKey: utils.StringPointer("key5"),
+ JetStreamMaxWait: utils.DurationPointer(1 * time.Minute),
+ JetStreamProcessed: utils.BoolPointer(true),
+ JWTFileProcessed: utils.StringPointer("file"),
+ SeedFileProcessed: utils.StringPointer("natseed"),
+ CertificateAuthorityProcessed: utils.StringPointer("natsauth"),
+ ClientCertificateProcessed: utils.StringPointer("natcertificate"),
+ ClientKeyProcessed: utils.StringPointer("natsprocess"),
+ JetStreamMaxWaitProcessed: utils.DurationPointer(1 * time.Minute),
+ SubjectProcessed: utils.StringPointer("process"),
},
KafkaOpts: &KafkaROpts{
KafkaTopic: utils.StringPointer("kafka"),
diff --git a/data/ansible/roles/nats/defaults/main.yaml b/data/ansible/roles/nats/defaults/main.yaml
index 65d37525f..96b146f2a 100644
--- a/data/ansible/roles/nats/defaults/main.yaml
+++ b/data/ansible/roles/nats/defaults/main.yaml
@@ -1,5 +1,5 @@
---
-nats_version: 2.9.17
+nats_version: 2.10.1
nats_install_dir: /opt/nats
nats_user: nats
nats_group: nats
diff --git a/ees/nats.go b/ees/nats.go
index 783908aea..185997dae 100644
--- a/ees/nats.go
+++ b/ees/nats.go
@@ -39,7 +39,7 @@ func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Dur
subject: utils.DefaultQueueID,
reqs: newConcReq(cfg.ConcurrentRequests),
}
- err = natsPstr.parseOpt(cfg.Opts, nodeID, connTimeout)
+ err = natsPstr.parseOpts(cfg.Opts, nodeID, connTimeout)
return
}
@@ -60,76 +60,87 @@ type NatsEE struct {
bytePreparing
}
-func (pstr *NatsEE) parseOpt(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (err error) {
-
- if natsOpts := opts.NATS; natsOpts != nil {
- if natsOpts.JetStream != nil {
- pstr.jetStream = *natsOpts.JetStream
- }
- pstr.subject = utils.DefaultQueueID
- if natsOpts.Subject != nil {
- pstr.subject = *natsOpts.Subject
- }
-
- pstr.opts, err = GetNatsOpts(natsOpts, nodeID, connTimeout)
- if pstr.jetStream {
- if natsOpts.JetStreamMaxWait != nil {
- pstr.jsOpts = []nats.JSOpt{nats.MaxWait(*natsOpts.JetStreamMaxWait)}
- }
- }
+func (pstr *NatsEE) parseOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) error {
+ if opts.NATS == nil {
+ return nil
}
- return
+
+ if opts.NATS.JetStream != nil {
+ pstr.jetStream = *opts.NATS.JetStream
+ }
+ if opts.NATS.Subject != nil {
+ pstr.subject = *opts.NATS.Subject
+ }
+
+ var err error
+ pstr.opts, err = GetNatsOpts(opts.NATS, nodeID, connTimeout)
+ if err != nil {
+ return err
+ }
+
+ if pstr.jetStream && opts.NATS.JetStreamMaxWait != nil {
+ pstr.jsOpts = []nats.JSOpt{nats.MaxWait(*opts.NATS.JetStreamMaxWait)}
+ }
+ return nil
}
func (pstr *NatsEE) Cfg() *config.EventExporterCfg { return pstr.cfg }
-func (pstr *NatsEE) Connect() (err error) {
+func (pstr *NatsEE) Connect() error {
pstr.Lock()
defer pstr.Unlock()
- if pstr.poster == nil {
- if pstr.poster, err = nats.Connect(pstr.Cfg().ExportPath, pstr.opts...); err != nil {
- return
- }
- if pstr.jetStream {
- pstr.posterJS, err = pstr.poster.JetStream(pstr.jsOpts...)
- }
+ if pstr.poster != nil {
+ return nil
}
- return
+
+ var err error
+ pstr.poster, err = nats.Connect(pstr.Cfg().ExportPath, pstr.opts...)
+ if err != nil {
+ return err
+ }
+ if pstr.jetStream {
+ pstr.posterJS, err = pstr.poster.JetStream(pstr.jsOpts...)
+ }
+ return err
}
-func (pstr *NatsEE) ExportEvent(content any, _ string) (err error) {
+func (pstr *NatsEE) ExportEvent(content any, _ string) error {
pstr.reqs.get()
+ defer pstr.reqs.done()
pstr.RLock()
+ defer pstr.RUnlock()
+
if pstr.poster == nil {
- pstr.RUnlock()
- pstr.reqs.done()
return utils.ErrDisconnected
}
+
+ var err error
if pstr.jetStream {
_, err = pstr.posterJS.Publish(pstr.subject, content.([]byte))
} else {
err = pstr.poster.Publish(pstr.subject, content.([]byte))
}
- pstr.RUnlock()
- pstr.reqs.done()
- return
+ return err
}
-func (pstr *NatsEE) Close() (err error) {
+func (pstr *NatsEE) Close() error {
pstr.Lock()
- if pstr.poster != nil {
- err = pstr.poster.Drain()
- pstr.poster = nil
+ defer pstr.Unlock()
+
+ if pstr.poster == nil {
+ return nil
}
- pstr.Unlock()
- return
+
+ err := pstr.poster.Drain()
+ pstr.poster = nil
+ return err
}
func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
-func GetNatsOpts(opts *config.NATSOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) {
- nop = make([]nats.Option, 0, 7)
- nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID),
+func GetNatsOpts(opts *config.NATSOpts, nodeID string, connTimeout time.Duration) ([]nats.Option, error) {
+ natsOpts := make([]nats.Option, 0, 7)
+ natsOpts = append(natsOpts, nats.Name(utils.CGRateSLwr+nodeID),
nats.Timeout(connTimeout),
nats.DrainTimeout(time.Second))
if opts.JWTFile != nil {
@@ -137,28 +148,27 @@ func GetNatsOpts(opts *config.NATSOpts, nodeID string, connTimeout time.Duration
if opts.SeedFile != nil {
keys = append(keys, *opts.SeedFile)
}
- nop = append(nop, nats.UserCredentials(*opts.JWTFile, keys...))
+ natsOpts = append(natsOpts, nats.UserCredentials(*opts.JWTFile, keys...))
}
if opts.SeedFile != nil {
opt, err := nats.NkeyOptionFromSeed(*opts.SeedFile)
if err != nil {
return nil, err
}
- nop = append(nop, opt)
+ natsOpts = append(natsOpts, opt)
}
- if opts.ClientCertificate != nil {
- if opts.ClientKey == nil {
- err = fmt.Errorf("has certificate but no key")
- return
- }
- nop = append(nop, nats.ClientCert(*opts.ClientCertificate, *opts.ClientKey))
- } else if opts.ClientKey != nil {
- err = fmt.Errorf("has key but no certificate")
- return
+
+ switch {
+ case opts.ClientCertificate != nil && opts.ClientKey != nil:
+ natsOpts = append(natsOpts, nats.ClientCert(*opts.ClientCertificate, *opts.ClientKey))
+ case opts.ClientCertificate != nil:
+ return nil, fmt.Errorf("has certificate but no key")
+ case opts.ClientKey != nil:
+ return nil, fmt.Errorf("has key but no certificate")
}
if opts.CertificateAuthority != nil {
- nop = append(nop,
+ natsOpts = append(natsOpts,
func(o *nats.Options) error {
pool, err := x509.SystemCertPool()
if err != nil {
@@ -181,5 +191,5 @@ func GetNatsOpts(opts *config.NATSOpts, nodeID string, connTimeout time.Duration
return nil
})
}
- return
+ return natsOpts, nil
}
diff --git a/ees/nats_test.go b/ees/nats_test.go
index b2fd7daec..8102bdeae 100644
--- a/ees/nats_test.go
+++ b/ees/nats_test.go
@@ -101,7 +101,7 @@ func TestParseOpt(t *testing.T) {
t.Error(err)
}
- err = pstr.parseOpt(opts, nodeID, connTimeout)
+ err = pstr.parseOpts(opts, nodeID, connTimeout)
if err != nil {
t.Error(err)
}
@@ -138,7 +138,7 @@ func TestParseOptJetStream(t *testing.T) {
t.Error(err)
}
- err = pstr.parseOpt(opts, nodeID, connTimeout)
+ err = pstr.parseOpts(opts, nodeID, connTimeout)
if err != nil {
t.Error(err)
}
@@ -179,7 +179,7 @@ func TestParseOptJetStreamMaxWait(t *testing.T) {
t.Error(err)
}
- err = pstr.parseOpt(opts, nodeID, connTimeout)
+ err = pstr.parseOpts(opts, nodeID, connTimeout)
if err != nil {
t.Error(err)
}
@@ -219,7 +219,7 @@ func TestParseOptSubject(t *testing.T) {
t.Error(err)
}
- err = pstr.parseOpt(opts, nodeID, connTimeout)
+ err = pstr.parseOpts(opts, nodeID, connTimeout)
if err != nil {
t.Error(err)
}
diff --git a/ers/amqp.go b/ers/amqp.go
index 7ad7f41ec..d235bada8 100644
--- a/ers/amqp.go
+++ b/ers/amqp.go
@@ -252,7 +252,7 @@ func (rdr *AMQPER) close() (err error) {
}
func (rdr *AMQPER) createPoster() {
- processedOpt := getProcessOptions(rdr.Config().Opts)
+ processedOpt := getProcessedOptions(rdr.Config().Opts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
diff --git a/ers/amqpv1.go b/ers/amqpv1.go
index 5db2530fe..72be3b4cd 100644
--- a/ers/amqpv1.go
+++ b/ers/amqpv1.go
@@ -209,7 +209,7 @@ func (rdr *AMQPv1ER) close() (err error) {
}
func (rdr *AMQPv1ER) createPoster() {
- processedOpt := getProcessOptions(rdr.Config().Opts)
+ processedOpt := getProcessedOptions(rdr.Config().Opts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
diff --git a/ers/kafka.go b/ers/kafka.go
index ea118d333..badb255bc 100644
--- a/ers/kafka.go
+++ b/ers/kafka.go
@@ -203,7 +203,7 @@ func (rdr *KafkaER) setOpts(opts *config.EventReaderOpts) (err error) {
}
func (rdr *KafkaER) createPoster() {
- processedOpt := getProcessOptions(rdr.Config().Opts)
+ processedOpt := getProcessedOptions(rdr.Config().Opts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
diff --git a/ers/lib_test.go b/ers/lib_test.go
index b01089744..28bc59ad2 100644
--- a/ers/lib_test.go
+++ b/ers/lib_test.go
@@ -19,9 +19,13 @@ along with this program. If not, see
package ers
import (
+ "crypto/rand"
"errors"
"flag"
+ "fmt"
+ "math/big"
"os"
+ "path/filepath"
"testing"
"github.com/cgrates/birpc"
@@ -80,3 +84,32 @@ func testCleanupFiles(t *testing.T) {
}
}
}
+
+func initTestCfg(cfgContent string) (*config.CGRConfig, string, func(), error) {
+ folderNameSuffix, err := rand.Int(rand.Reader, big.NewInt(10000))
+ if err != nil {
+ return nil, "", nil, fmt.Errorf("could not generate random number for folder name suffix, err: %s", err.Error())
+ }
+ cfgPath := fmt.Sprintf("/tmp/config%d", folderNameSuffix)
+ err = os.MkdirAll(cfgPath, 0755)
+ if err != nil {
+ return nil, "", nil, err
+ }
+ filePath := filepath.Join(cfgPath, "cgrates.json")
+ err = os.WriteFile(filePath, []byte(cfgContent), 0644)
+ if err != nil {
+ os.RemoveAll(cfgPath)
+ return nil, "", nil, err
+ }
+ var cfg *config.CGRConfig
+ cfg, err = config.NewCGRConfigFromPath(cfgPath)
+ if err != nil {
+ os.RemoveAll(cfgPath)
+ return nil, "", nil, err
+ }
+ removeFunc := func() {
+ os.RemoveAll(cfgPath)
+ }
+
+ return cfg, cfgPath, removeFunc, nil
+}
diff --git a/ers/libers.go b/ers/libers.go
index 07cd8803d..b2510e688 100644
--- a/ers/libers.go
+++ b/ers/libers.go
@@ -29,174 +29,167 @@ import (
"github.com/cgrates/cgrates/utils"
)
-// getProcessOptions assigns all non-nil fields ending in "Processed" from EventReaderOpts to their counterparts in EventExporterOpts
-func getProcessOptions(erOpts *config.EventReaderOpts) (eeOpts *config.EventExporterOpts) {
- eeOpts = new(config.EventExporterOpts)
- if amqOpts := erOpts.AMQPOpts; amqOpts != nil {
- if amqOpts.AMQPExchangeProcessed != nil {
+// getProcessedOptions assigns all non-nil fields ending in "Processed" from EventReaderOpts to their counterparts in EventExporterOpts
+func getProcessedOptions(erOpts *config.EventReaderOpts) *config.EventExporterOpts {
+ var eeOpts *config.EventExporterOpts
+
+ if erOpts.AMQPOpts != nil {
+ initAMQPExporterOpts := func() {
+ if eeOpts == nil {
+ eeOpts = new(config.EventExporterOpts)
+ }
if eeOpts.AMQP == nil {
eeOpts.AMQP = new(config.AMQPOpts)
}
- eeOpts.AMQP.Exchange = amqOpts.AMQPExchangeProcessed
}
- if amqOpts.AMQPExchangeTypeProcessed != nil {
- if eeOpts.AMQP == nil {
- eeOpts.AMQP = new(config.AMQPOpts)
- }
- eeOpts.AMQP.ExchangeType = amqOpts.AMQPExchangeTypeProcessed
+
+ if erOpts.AMQPOpts.AMQPExchangeProcessed != nil {
+ initAMQPExporterOpts()
+ eeOpts.AMQP.Exchange = erOpts.AMQPOpts.AMQPExchangeProcessed
}
- if amqOpts.AMQPQueueIDProcessed != nil {
- if eeOpts.AMQP == nil {
- eeOpts.AMQP = new(config.AMQPOpts)
- }
- eeOpts.AMQP.QueueID = amqOpts.AMQPQueueIDProcessed
+ if erOpts.AMQPOpts.AMQPExchangeTypeProcessed != nil {
+ initAMQPExporterOpts()
+ eeOpts.AMQP.ExchangeType = erOpts.AMQPOpts.AMQPExchangeTypeProcessed
}
- if amqOpts.AMQPRoutingKeyProcessed != nil {
- if eeOpts.AMQP == nil {
- eeOpts.AMQP = new(config.AMQPOpts)
- }
- eeOpts.AMQP.RoutingKey = amqOpts.AMQPRoutingKeyProcessed
+ if erOpts.AMQPOpts.AMQPQueueIDProcessed != nil {
+ initAMQPExporterOpts()
+ eeOpts.AMQP.QueueID = erOpts.AMQPOpts.AMQPQueueIDProcessed
}
- if amqOpts.AMQPUsernameProcessed != nil {
- if eeOpts.AMQP == nil {
- eeOpts.AMQP = new(config.AMQPOpts)
- }
- eeOpts.AMQP.Username = amqOpts.AMQPUsernameProcessed
+ if erOpts.AMQPOpts.AMQPRoutingKeyProcessed != nil {
+ initAMQPExporterOpts()
+ eeOpts.AMQP.RoutingKey = erOpts.AMQPOpts.AMQPRoutingKeyProcessed
}
- if amqOpts.AMQPPasswordProcessed != nil {
- if eeOpts.AMQP == nil {
- eeOpts.AMQP = new(config.AMQPOpts)
- }
- eeOpts.AMQP.Password = amqOpts.AMQPPasswordProcessed
+ if erOpts.AMQPOpts.AMQPUsernameProcessed != nil {
+ initAMQPExporterOpts()
+ eeOpts.AMQP.Username = erOpts.AMQPOpts.AMQPUsernameProcessed
}
- }
- if awsOpts := erOpts.AWSOpts; awsOpts != nil {
- if awsOpts.AWSKeyProcessed != nil {
- if eeOpts.AWS == nil {
- eeOpts.AWS = new(config.AWSOpts)
- }
- eeOpts.AWS.Key = awsOpts.AWSKeyProcessed
- }
- if awsOpts.AWSRegionProcessed != nil {
- if eeOpts.AWS == nil {
- eeOpts.AWS = new(config.AWSOpts)
- }
- eeOpts.AWS.Region = awsOpts.AWSRegionProcessed
- }
- if awsOpts.AWSSecretProcessed != nil {
- if eeOpts.AWS == nil {
- eeOpts.AWS = new(config.AWSOpts)
- }
- eeOpts.AWS.Secret = awsOpts.AWSSecretProcessed
- }
- if awsOpts.AWSTokenProcessed != nil {
- if eeOpts.AWS == nil {
- eeOpts.AWS = new(config.AWSOpts)
- }
- eeOpts.AWS.Token = awsOpts.AWSTokenProcessed
- }
- if awsOpts.S3BucketIDProcessed != nil {
- if eeOpts.AWS == nil {
- eeOpts.AWS = new(config.AWSOpts)
- }
- eeOpts.AWS.S3BucketID = awsOpts.S3BucketIDProcessed
- }
- if awsOpts.S3FolderPathProcessed != nil {
- if eeOpts.AWS == nil {
- eeOpts.AWS = new(config.AWSOpts)
- }
- eeOpts.AWS.S3FolderPath = awsOpts.S3FolderPathProcessed
- }
- if awsOpts.SQSQueueIDProcessed != nil {
- if eeOpts.AWS == nil {
- eeOpts.AWS = new(config.AWSOpts)
- }
- eeOpts.AWS.SQSQueueID = awsOpts.SQSQueueIDProcessed
+ if erOpts.AMQPOpts.AMQPPasswordProcessed != nil {
+ initAMQPExporterOpts()
+ eeOpts.AMQP.Password = erOpts.AMQPOpts.AMQPPasswordProcessed
}
}
- if kfkOpts := erOpts.KafkaOpts; kfkOpts != nil {
- if kfkOpts.KafkaTopicProcessed != nil {
+ if erOpts.AWSOpts != nil {
+ initAWSExporterOpts := func() {
+ if eeOpts == nil {
+ eeOpts = new(config.EventExporterOpts)
+ }
+ if eeOpts.AWS == nil {
+ eeOpts.AWS = new(config.AWSOpts)
+ }
+ }
+
+ if erOpts.AWSOpts.AWSKeyProcessed != nil {
+ initAWSExporterOpts()
+ eeOpts.AWS.Key = erOpts.AWSOpts.AWSKeyProcessed
+ }
+ if erOpts.AWSOpts.AWSRegionProcessed != nil {
+ initAWSExporterOpts()
+ eeOpts.AWS.Region = erOpts.AWSOpts.AWSRegionProcessed
+ }
+ if erOpts.AWSOpts.AWSSecretProcessed != nil {
+ initAWSExporterOpts()
+ eeOpts.AWS.Secret = erOpts.AWSOpts.AWSSecretProcessed
+ }
+ if erOpts.AWSOpts.AWSTokenProcessed != nil {
+ initAWSExporterOpts()
+ eeOpts.AWS.Token = erOpts.AWSOpts.AWSTokenProcessed
+ }
+ if erOpts.AWSOpts.S3BucketIDProcessed != nil {
+ initAWSExporterOpts()
+ eeOpts.AWS.S3BucketID = erOpts.AWSOpts.S3BucketIDProcessed
+ }
+ if erOpts.AWSOpts.S3FolderPathProcessed != nil {
+ initAWSExporterOpts()
+ eeOpts.AWS.S3FolderPath = erOpts.AWSOpts.S3FolderPathProcessed
+ }
+ if erOpts.AWSOpts.SQSQueueIDProcessed != nil {
+ initAWSExporterOpts()
+ eeOpts.AWS.SQSQueueID = erOpts.AWSOpts.SQSQueueIDProcessed
+ }
+ }
+
+ if erOpts.KafkaOpts != nil {
+ if erOpts.KafkaOpts.KafkaTopicProcessed != nil {
+ if eeOpts == nil {
+ eeOpts = new(config.EventExporterOpts)
+ }
if eeOpts.Kafka == nil {
eeOpts.Kafka = new(config.KafkaOpts)
}
- eeOpts.Kafka.KafkaTopic = kfkOpts.KafkaTopicProcessed
- }
- }
- if natsOpts := erOpts.NATSOpts; natsOpts != nil {
- if natsOpts.NATSCertificateAuthorityProcessed != nil {
- if eeOpts.NATS == nil {
- eeOpts.NATS = new(config.NATSOpts)
- }
- eeOpts.NATS.CertificateAuthority = natsOpts.NATSCertificateAuthorityProcessed
- }
- if natsOpts.NATSClientCertificateProcessed != nil {
- if eeOpts.NATS == nil {
- eeOpts.NATS = new(config.NATSOpts)
- }
- eeOpts.NATS.ClientCertificate = natsOpts.NATSClientCertificateProcessed
- }
- if natsOpts.NATSClientKeyProcessed != nil {
- if eeOpts.NATS == nil {
- eeOpts.NATS = new(config.NATSOpts)
- }
- eeOpts.NATS.ClientKey = natsOpts.NATSClientKeyProcessed
- }
- if natsOpts.NATSJWTFileProcessed != nil {
- if eeOpts.NATS == nil {
- eeOpts.NATS = new(config.NATSOpts)
- }
- eeOpts.NATS.JWTFile = natsOpts.NATSJWTFileProcessed
- }
- if natsOpts.NATSJetStreamMaxWaitProcessed != nil {
- if eeOpts.NATS == nil {
- eeOpts.NATS = new(config.NATSOpts)
- }
- eeOpts.NATS.JetStreamMaxWait = natsOpts.NATSJetStreamMaxWaitProcessed
- }
- if natsOpts.NATSJetStreamProcessed != nil {
- if eeOpts.NATS == nil {
- eeOpts.NATS = new(config.NATSOpts)
- }
- eeOpts.NATS.JetStream = natsOpts.NATSJetStreamProcessed
- }
- if natsOpts.NATSSeedFileProcessed != nil {
- if eeOpts.NATS == nil {
- eeOpts.NATS = new(config.NATSOpts)
- }
- eeOpts.NATS.SeedFile = natsOpts.NATSSeedFileProcessed
- }
- if natsOpts.NATSSubjectProcessed != nil {
- if eeOpts.NATS == nil {
- eeOpts.NATS = new(config.NATSOpts)
- }
- eeOpts.NATS.Subject = natsOpts.NATSSubjectProcessed
+ eeOpts.Kafka.KafkaTopic = erOpts.KafkaOpts.KafkaTopicProcessed
}
}
- if sqlOpts := erOpts.SQLOpts; sqlOpts != nil {
- if sqlOpts.SQLDBNameProcessed != nil {
- if eeOpts.SQL == nil {
- eeOpts.SQL = new(config.SQLOpts)
+ if erOpts.NATSOpts != nil {
+ initNATSExporterOpts := func() {
+ if eeOpts == nil {
+ eeOpts = new(config.EventExporterOpts)
}
- eeOpts.SQL.DBName = sqlOpts.SQLDBNameProcessed
- }
- if sqlOpts.SQLTableNameProcessed != nil {
- if eeOpts.SQL == nil {
- eeOpts.SQL = new(config.SQLOpts)
+ if eeOpts.NATS == nil {
+ eeOpts.NATS = new(config.NATSOpts)
}
- eeOpts.SQL.TableName = sqlOpts.SQLTableNameProcessed
- }
- if sqlOpts.PgSSLModeProcessed != nil {
- if eeOpts.SQL == nil {
- eeOpts.SQL = new(config.SQLOpts)
- }
- eeOpts.SQL.PgSSLMode = sqlOpts.PgSSLModeProcessed
}
+ if erOpts.NATSOpts.CertificateAuthorityProcessed != nil {
+ initNATSExporterOpts()
+ eeOpts.NATS.CertificateAuthority = erOpts.NATSOpts.CertificateAuthorityProcessed
+ }
+ if erOpts.NATSOpts.ClientCertificateProcessed != nil {
+ initNATSExporterOpts()
+ eeOpts.NATS.ClientCertificate = erOpts.NATSOpts.ClientCertificateProcessed
+ }
+ if erOpts.NATSOpts.ClientKeyProcessed != nil {
+ initNATSExporterOpts()
+ eeOpts.NATS.ClientKey = erOpts.NATSOpts.ClientKeyProcessed
+ }
+ if erOpts.NATSOpts.JWTFileProcessed != nil {
+ initNATSExporterOpts()
+ eeOpts.NATS.JWTFile = erOpts.NATSOpts.JWTFileProcessed
+ }
+ if erOpts.NATSOpts.JetStreamMaxWaitProcessed != nil {
+ initNATSExporterOpts()
+ eeOpts.NATS.JetStreamMaxWait = erOpts.NATSOpts.JetStreamMaxWaitProcessed
+ }
+ if erOpts.NATSOpts.JetStreamProcessed != nil {
+ initNATSExporterOpts()
+ eeOpts.NATS.JetStream = erOpts.NATSOpts.JetStreamProcessed
+ }
+ if erOpts.NATSOpts.SeedFileProcessed != nil {
+ initNATSExporterOpts()
+ eeOpts.NATS.SeedFile = erOpts.NATSOpts.SeedFileProcessed
+ }
+ if erOpts.NATSOpts.SubjectProcessed != nil {
+ initNATSExporterOpts()
+ eeOpts.NATS.Subject = erOpts.NATSOpts.SubjectProcessed
+ }
}
- return
+ if erOpts.SQLOpts != nil {
+ initSQLExporterOpts := func() {
+ if eeOpts == nil {
+ eeOpts = new(config.EventExporterOpts)
+ }
+ if eeOpts.SQL == nil {
+ eeOpts.SQL = new(config.SQLOpts)
+ }
+ }
+
+ if erOpts.SQLOpts.SQLDBNameProcessed != nil {
+ initSQLExporterOpts()
+ eeOpts.SQL.DBName = erOpts.SQLOpts.SQLDBNameProcessed
+ }
+ if erOpts.SQLOpts.SQLTableNameProcessed != nil {
+ initSQLExporterOpts()
+ eeOpts.SQL.TableName = erOpts.SQLOpts.SQLTableNameProcessed
+ }
+ if erOpts.SQLOpts.PgSSLModeProcessed != nil {
+ initSQLExporterOpts()
+ eeOpts.SQL.PgSSLMode = erOpts.SQLOpts.PgSSLModeProcessed
+ }
+ }
+
+ return eeOpts
}
// mergePartialEvents will unite the events using the reader configuration
diff --git a/ers/libers_test.go b/ers/libers_test.go
index ad6d2e69c..f57c981dd 100644
--- a/ers/libers_test.go
+++ b/ers/libers_test.go
@@ -32,7 +32,7 @@ func TestGetProcessOptions(t *testing.T) {
AMQPQueueIDProcessed: utils.StringPointer("processed"),
},
}
- result := getProcessOptions(opts)
+ result := getProcessedOptions(opts)
expected := &config.EventExporterOpts{
AMQP: &config.AMQPOpts{
QueueID: utils.StringPointer("processed"),
diff --git a/ers/nats.go b/ers/nats.go
index 0022e17f7..475bd7245 100644
--- a/ers/nats.go
+++ b/ers/nats.go
@@ -90,68 +90,85 @@ func (rdr *NatsER) Config() *config.EventReaderCfg {
return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx]
}
-// Serve will start the gorutines needed to watch the nats subject
-func (rdr *NatsER) Serve() (err error) {
- // Connect to a server
- var nc *nats.Conn
- var js nats.JetStreamContext
+// Serve will subscribe to a NATS subject and process incoming messages until the rdrExit channel
+// will be closed.
+func (rdr *NatsER) Serve() error {
- if nc, err = nats.Connect(rdr.Config().SourcePath, rdr.opts...); err != nil {
- return
+ // Establish a connection to the nats server.
+ nc, err := nats.Connect(rdr.Config().SourcePath, rdr.opts...)
+ if err != nil {
+ return err
}
- ch := make(chan *nats.Msg)
+
+ // Define the message handler. Its content will get executed for every received message.
+ msgHandler := func(msg *nats.Msg) {
+
+ // If the rdr.cap channel buffer is empty, block until a resource is available. Otherwise
+ // allocate one resource and start processing the message.
+ if rdr.Config().ConcurrentReqs != -1 {
+ <-rdr.cap
+ }
+ go func(msg *nats.Msg) {
+ handlerErr := rdr.processMessage(msg.Data)
+ if handlerErr != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> processing message %s error: %s",
+ utils.ERs, string(msg.Data), handlerErr.Error()))
+ }
+
+ // Export the received message if a poster has been defined.
+ if rdr.poster != nil {
+ handlerErr = ees.ExportWithAttempts(rdr.poster, msg.Data, utils.EmptyString)
+ if handlerErr != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> writing message %s error: %s",
+ utils.ERs, string(msg.Data), handlerErr.Error()))
+ }
+ }
+
+ // Release the resource back to rdr.cap channel.
+ if rdr.Config().ConcurrentReqs != -1 {
+ rdr.cap <- struct{}{}
+ }
+ }(msg)
+ }
+
+ // Subscribe to the appropriate NATS subject.
if !rdr.jetStream {
- if _, err = nc.ChanQueueSubscribe(rdr.subject, rdr.queueID, ch); err != nil {
- return
+ _, err = nc.QueueSubscribe(rdr.subject, rdr.queueID, msgHandler)
+ if err != nil {
+ nc.Drain()
+ return err
}
} else {
+ var js nats.JetStreamContext
js, err = nc.JetStream(rdr.jsOpts...)
if err != nil {
- return
+ nc.Drain()
+ return err
}
- if _, err = js.QueueSubscribe(rdr.subject, rdr.queueID, func(msg *nats.Msg) {
- ch <- msg
- }, nats.Durable(rdr.consumerName)); err != nil {
- return
+ _, err = js.QueueSubscribe(rdr.subject, rdr.queueID, msgHandler,
+ nats.Durable(rdr.consumerName))
+ if err != nil {
+ nc.Drain()
+ return err
}
}
+
go func() {
- for {
- if rdr.Config().ConcurrentReqs != -1 {
- <-rdr.cap // do not try to read if the limit is reached
- }
- select {
- case <-rdr.rdrExit:
- utils.Logger.Info(
- fmt.Sprintf("<%s> stop monitoring nats path <%s>",
- utils.ERs, rdr.Config().SourcePath))
- nc.Drain()
- if rdr.poster != nil {
- rdr.poster.Close()
- }
- return
- case msg := <-ch:
- go func(msg *nats.Msg) {
- if err := rdr.processMessage(msg.Data); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf("<%s> processing message %s error: %s",
- utils.ERs, string(msg.Data), err.Error()))
- }
- if rdr.poster != nil { // post it
- if err := ees.ExportWithAttempts(rdr.poster, msg.Data, utils.EmptyString); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf("<%s> writing message %s error: %s",
- utils.ERs, string(msg.Data), err.Error()))
- }
- }
- if rdr.Config().ConcurrentReqs != -1 {
- rdr.cap <- struct{}{}
- }
- }(msg)
- }
+
+ // Wait for exit signal.
+ <-rdr.rdrExit
+ utils.Logger.Info(
+ fmt.Sprintf("<%s> stop monitoring nats path <%s>",
+ utils.ERs, rdr.Config().SourcePath))
+ nc.Drain()
+ if rdr.poster != nil {
+ rdr.poster.Close()
}
}()
- return
+
+ return nil
}
func (rdr *NatsER) processMessage(msg []byte) (err error) {
@@ -187,7 +204,7 @@ func (rdr *NatsER) processMessage(msg []byte) (err error) {
}
func (rdr *NatsER) createPoster() (err error) {
- processedOpt := getProcessOptions(rdr.Config().Opts)
+ processedOpt := getProcessedOptions(rdr.Config().Opts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
@@ -199,78 +216,77 @@ func (rdr *NatsER) createPoster() (err error) {
}
func (rdr *NatsER) processOpts() (err error) {
- if rdr.Config().Opts.NATSOpts.NATSSubject != nil {
- rdr.subject = *rdr.Config().Opts.NATSOpts.NATSSubject
+ if rdr.Config().Opts.NATSOpts.Subject != nil {
+ rdr.subject = *rdr.Config().Opts.NATSOpts.Subject
}
var queueID string
- if rdr.Config().Opts.NATSOpts.NATSQueueID != nil {
- queueID = *rdr.Config().Opts.NATSOpts.NATSQueueID
+ if rdr.Config().Opts.NATSOpts.QueueID != nil {
+ queueID = *rdr.Config().Opts.NATSOpts.QueueID
}
rdr.queueID = utils.FirstNonEmpty(queueID, rdr.cgrCfg.GeneralCfg().NodeID)
var consumerName string
- if rdr.Config().Opts.NATSOpts.NATSConsumerName != nil {
- consumerName = *rdr.Config().Opts.NATSOpts.NATSConsumerName
+ if rdr.Config().Opts.NATSOpts.ConsumerName != nil {
+ consumerName = *rdr.Config().Opts.NATSOpts.ConsumerName
}
rdr.consumerName = utils.FirstNonEmpty(consumerName, utils.CGRateSLwr)
- if rdr.Config().Opts.NATSOpts.NATSJetStream != nil {
- rdr.jetStream = *rdr.Config().Opts.NATSOpts.NATSJetStream
+ if rdr.Config().Opts.NATSOpts.JetStream != nil {
+ rdr.jetStream = *rdr.Config().Opts.NATSOpts.JetStream
}
if rdr.jetStream {
- if rdr.Config().Opts.NATSOpts.NATSJetStreamMaxWait != nil {
- rdr.jsOpts = []nats.JSOpt{nats.MaxWait(*rdr.Config().Opts.NATSOpts.NATSJetStreamMaxWait)}
+ if rdr.Config().Opts.NATSOpts.JetStreamMaxWait != nil {
+ rdr.jsOpts = []nats.JSOpt{nats.MaxWait(*rdr.Config().Opts.NATSOpts.JetStreamMaxWait)}
}
}
- rdr.opts, err = GetNatsOpts(rdr.Config().Opts,
+ rdr.opts, err = GetNatsOpts(rdr.Config().Opts.NATSOpts,
rdr.cgrCfg.GeneralCfg().NodeID,
rdr.cgrCfg.GeneralCfg().ConnectTimeout)
return
}
-func GetNatsOpts(opts *config.EventReaderOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) {
- nop = make([]nats.Option, 0, 7)
- nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID),
+func GetNatsOpts(opts *config.NATSROpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) {
+ natsOpts := make([]nats.Option, 0, 7)
+ natsOpts = append(natsOpts, nats.Name(utils.CGRateSLwr+nodeID),
nats.Timeout(connTimeout),
nats.DrainTimeout(time.Second))
- if opts.NATSOpts.NATSJWTFile != nil {
+ if opts.JWTFile != nil {
keys := make([]string, 0, 1)
- if opts.NATSOpts.NATSSeedFile != nil {
- keys = append(keys, *opts.NATSOpts.NATSSeedFile)
+ if opts.SeedFile != nil {
+ keys = append(keys, *opts.SeedFile)
}
- nop = append(nop, nats.UserCredentials(*opts.NATSOpts.NATSJWTFile, keys...))
+ natsOpts = append(natsOpts, nats.UserCredentials(*opts.JWTFile, keys...))
}
- if opts.NATSOpts.NATSSeedFile != nil {
- opt, err := nats.NkeyOptionFromSeed(*opts.NATSOpts.NATSSeedFile)
+ if opts.SeedFile != nil {
+ opt, err := nats.NkeyOptionFromSeed(*opts.SeedFile)
if err != nil {
return nil, err
}
- nop = append(nop, opt)
- }
- if opts.NATSOpts.NATSClientCertificate != nil {
- if opts.NATSOpts.NATSClientKey == nil {
- err = fmt.Errorf("has certificate but no key")
- return
- }
- nop = append(nop, nats.ClientCert(*opts.NATSOpts.NATSClientCertificate, *opts.NATSOpts.NATSClientKey))
- } else if opts.NATSOpts.NATSClientKey != nil {
- err = fmt.Errorf("has key but no certificate")
- return
+ natsOpts = append(natsOpts, opt)
}
- if opts.NATSOpts.NATSCertificateAuthority != nil {
- nop = append(nop,
+ switch {
+ case opts.ClientCertificate != nil && opts.ClientKey != nil:
+ natsOpts = append(natsOpts, nats.ClientCert(*opts.ClientCertificate, *opts.ClientKey))
+ case opts.ClientCertificate != nil:
+ return nil, fmt.Errorf("has certificate but no key")
+ case opts.ClientKey != nil:
+ return nil, fmt.Errorf("has key but no certificate")
+ }
+
+ if opts.CertificateAuthority != nil {
+ natsOpts = append(natsOpts,
func(o *nats.Options) error {
pool, err := x509.SystemCertPool()
if err != nil {
return err
}
- rootPEM, err := os.ReadFile(*opts.NATSOpts.NATSCertificateAuthority)
+ rootPEM, err := os.ReadFile(*opts.CertificateAuthority)
if err != nil || rootPEM == nil {
return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err)
}
ok := pool.AppendCertsFromPEM(rootPEM)
if !ok {
return fmt.Errorf("nats: failed to parse root certificate from %q",
- *opts.NATSOpts.NATSCertificateAuthority)
+ *opts.CertificateAuthority)
}
if o.TLSConfig == nil {
o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
@@ -280,5 +296,5 @@ func GetNatsOpts(opts *config.EventReaderOpts, nodeID string, connTimeout time.D
return nil
})
}
- return
+ return natsOpts, nil
}
diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go
index 629a4a76e..9bb6fc124 100644
--- a/ers/nats_it_test.go
+++ b/ers/nats_it_test.go
@@ -22,6 +22,7 @@ along with this program. If not, see
package ers
import (
+ "encoding/json"
"fmt"
"os"
"os/exec"
@@ -37,6 +38,178 @@ import (
"github.com/nats-io/nats.go"
)
+func TestERsNATSIT(t *testing.T) {
+ t.Skip()
+ cfgContent := `{
+
+"general": {
+ "log_level": 7
+},
+
+"data_db": {
+ "db_type": "*internal"
+},
+
+"stor_db": {
+ "db_type": "*internal"
+},
+
+"ers": {
+ "enabled": true,
+ "sessions_conns":[],
+ "readers": [
+ {
+ "id": "nats_consumer1",
+ "type": "*nats_json_map",
+ "source_path": "nats://127.0.0.1:4222",
+ "processed_path": "nats://127.0.0.1:4222",
+ "opts": {
+ "natsJetStream": true,
+ "natsConsumerName": "cgrates_consumer",
+ "natsSubject": "cgrates_cdrs",
+ "natsQueueID": "queue",
+ "natsJetStreamMaxWait": "5s",
+
+ "natsJetStreamProcessed": true,
+ "natsSubjectProcessed": "cgrates_cdrs_processed",
+ "natsJetStreamMaxWaitProcessed": "5s"
+ },
+ "flags": ["*dryrun"],
+ "fields":[
+ {"tag": "cdr_template", "type": "*template", "value": "cdr_template"}
+ ]
+ },
+ {
+ "id": "nats_consumer2",
+ "type": "*nats_json_map",
+ "source_path": "nats://127.0.0.1:4222",
+ "processed_path": "nats://127.0.0.1:4222",
+ "opts": {
+ "natsJetStream": true,
+ "natsConsumerName": "cgrates_consumer",
+ "natsSubject": "cgrates_cdrs",
+ "natsQueueID": "queue",
+ "natsJetStreamMaxWait": "5s",
+
+ "natsJetStreamProcessed": true,
+ "natsSubjectProcessed": "cgrates_cdrs_processed",
+ "natsJetStreamMaxWaitProcessed": "5s"
+ },
+ "flags": ["*dryrun"],
+ "fields":[
+ {"tag": "cdr_template", "type": "*template", "value": "cdr_template"}
+ ]
+ },
+ {
+ "id": "nats_consumer3",
+ "type": "*nats_json_map",
+ "source_path": "nats://127.0.0.1:4222",
+ "processed_path": "nats://127.0.0.1:4222",
+ "opts": {
+ "natsJetStream": true,
+ "natsConsumerName": "cgrates_consumer",
+ "natsSubject": "cgrates_cdrs",
+ "natsQueueID": "queue",
+ "natsJetStreamMaxWait": "5s",
+
+ "natsJetStreamProcessed": true,
+ "natsSubjectProcessed": "cgrates_cdrs_processed",
+ "natsJetStreamMaxWaitProcessed": "5s"
+ },
+ "flags": ["*dryrun"],
+ "fields":[
+ {"tag": "cdr_template", "type": "*template", "value": "cdr_template"}
+ ]
+ },
+ {
+ "id": "nats_consumer4",
+ "type": "*nats_json_map",
+ "source_path": "nats://127.0.0.1:4222",
+ "processed_path": "",
+ "opts": {
+ "natsJetStream": true,
+ "natsConsumerName": "cgrates_consumer4",
+ "natsSubject": "cgrates_cdrs_processed",
+ "natsQueueID": "",
+ "natsJetStreamMaxWait": "5s",
+ },
+ "flags": ["*dryrun"],
+ "fields":[
+ {"tag": "cdr_template", "type": "*template", "value": "cdr_template"}
+ ]
+ }
+ ]
+},
+
+
+"templates": {
+ "cdr_template": [
+ // {"tag": "Source", "path": "*cgreq.Source", "type": "*constant", "value": "ers_template_combined", "mandatory": true},
+ // {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.2", "mandatory": true},
+ // {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.3", "mandatory": true},
+ // {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.4", "mandatory": true},
+ // {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.6", "mandatory": true},
+ // {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.7", "mandatory": true},
+ {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true},
+ // {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.9", "mandatory": true},
+ {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true},
+ // {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.11", "mandatory": true},
+ // {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.12", "mandatory": true},
+ // {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.13", "mandatory": true}
+ ]
+}
+
+}`
+ cfg, cfgPath, clean, err := initTestCfg(cfgContent)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer clean()
+
+ nc, err := nats.Connect(cfg.ERsCfg().Readers[1].SourcePath)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer nc.Close()
+
+ js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ js.AddStream(&nats.StreamConfig{
+ Name: "CDRs",
+ Subjects: []string{"cgrates_cdrs", "cgrates_cdrs_processed"},
+ })
+
+ time.Sleep(2 * time.Second)
+
+ if _, err := engine.StopStartEngine(cfgPath, 100); err != nil {
+ t.Fatal(err)
+ }
+
+ for i := 0; i < 10; i++ {
+ cdr := map[string]any{
+ "Account": 1000 + i,
+ "Destination": 2000 + i,
+ }
+ b, _ := json.Marshal(cdr)
+ js.PublishAsync("cgrates_cdrs", b)
+ }
+ select {
+ case <-js.PublishAsyncComplete():
+ case <-time.After(5 * time.Second):
+ t.Fatal("Did not resolve in time")
+ }
+
+ // Add verification
+
+ err = engine.KillEngine(100)
+ if err != nil {
+ t.Error(err)
+ }
+}
+
func testCheckNatsData(t *testing.T, randomCGRID, expData string, ch chan *nats.Msg) {
select {
case err := <-rdrErr:
@@ -80,7 +253,7 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) {
t.Fatal(err)
}
- nop, err := GetNatsOpts(rdr.Config().Opts, "testExp", time.Second)
+ nop, err := GetNatsOpts(rdr.Config().Opts.NATSOpts, "testExp", time.Second)
if err != nil {
t.Fatal(err)
}
@@ -167,7 +340,7 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) {
t.Fatal(err)
}
- nop, err := GetNatsOpts(rdr.Config().Opts, "testExp", time.Second)
+ nop, err := GetNatsOpts(rdr.Config().Opts.NATSOpts, "testExp", time.Second)
if err != nil {
t.Fatal(err)
}
diff --git a/ers/s3.go b/ers/s3.go
index e43a83975..e611a9b58 100644
--- a/ers/s3.go
+++ b/ers/s3.go
@@ -195,7 +195,7 @@ func (rdr *S3ER) readLoop(scv s3Client) (err error) {
}
func (rdr *S3ER) createPoster() {
- processedOpt := getProcessOptions(rdr.Config().Opts)
+ processedOpt := getProcessedOptions(rdr.Config().Opts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
diff --git a/ers/sql.go b/ers/sql.go
index 97c112cb7..79ed8667d 100644
--- a/ers/sql.go
+++ b/ers/sql.go
@@ -301,7 +301,7 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string, opts *config.EventReader
}
// outURL
- processedOpt := getProcessOptions(opts)
+ processedOpt := getProcessedOptions(opts)
if processedOpt == nil {
if len(outURL) == 0 {
return
diff --git a/ers/sqs.go b/ers/sqs.go
index c1c84ce1a..2e778e5e1 100644
--- a/ers/sqs.go
+++ b/ers/sqs.go
@@ -219,7 +219,7 @@ func (rdr *SQSER) readLoop(scv sqsClient) (err error) {
}
func (rdr *SQSER) createPoster() {
- processedOpt := getProcessOptions(rdr.Config().Opts)
+ processedOpt := getProcessedOptions(rdr.Config().Opts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
diff --git a/go.mod b/go.mod
index 82cce8d94..81d7a2643 100644
--- a/go.mod
+++ b/go.mod
@@ -39,13 +39,13 @@ require (
github.com/mediocregopher/radix/v3 v3.8.1
github.com/miekg/dns v1.1.54
github.com/mitchellh/mapstructure v1.4.0
- github.com/nats-io/nats.go v1.11.0
+ github.com/nats-io/nats.go v1.30.0
github.com/nyaruka/phonenumbers v1.0.75
github.com/peterh/liner v1.2.1
github.com/rabbitmq/amqp091-go v1.5.0
github.com/segmentio/kafka-go v0.4.8
go.mongodb.org/mongo-driver v1.11.0
- golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d
+ golang.org/x/crypto v0.6.0
golang.org/x/net v0.10.0
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5
google.golang.org/api v0.36.0
@@ -89,14 +89,14 @@ require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
- github.com/klauspost/compress v1.13.6 // indirect
+ github.com/klauspost/compress v1.17.0 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/lib/pq v1.8.0 // indirect
github.com/mattn/go-runewidth v0.0.10 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/nats-io/nats-server/v2 v2.2.6 // indirect
- github.com/nats-io/nkeys v0.3.0 // indirect
+ github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/philhofer/fwd v1.1.1 // indirect
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
diff --git a/go.sum b/go.sum
index bb9515ebd..c9e5ffa54 100644
--- a/go.sum
+++ b/go.sum
@@ -318,8 +318,9 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
-github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
+github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
+github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kljensen/snowball v0.6.0/go.mod h1:27N7E8fVU5H68RlUmnWwZCfxgt4POBJfENGMvNRhldw=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@@ -371,11 +372,13 @@ github.com/nats-io/jwt/v2 v2.0.2 h1:ejVCLO8gu6/4bOKIHQpmB5UhhUJfAQw55yvLWpfmKjI=
github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.2.6 h1:FPK9wWx9pagxcw14s8W9rlfzfyHm61uNLnJyybZbn48=
github.com/nats-io/nats-server/v2 v2.2.6/go.mod h1:sEnFaxqe09cDmfMgACxZbziXnhQFhwk+aKkZjBBRYrI=
-github.com/nats-io/nats.go v1.11.0 h1:L263PZkrmkRJRJT2YHU8GwWWvEvmr9/LUKuJTXsF32k=
github.com/nats-io/nats.go v1.11.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
+github.com/nats-io/nats.go v1.30.0 h1:bj/rVsRCrFXxmm9mJiDhb74UKl2HhKpDwKRBtvCjZjc=
+github.com/nats-io/nats.go v1.30.0/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
-github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
+github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
+github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nyaruka/phonenumbers v1.0.75 h1:OCwKXSjTi6IzuI4gVi8zfY+0s60DQUC6ks8Ll4j0eyU=
@@ -513,8 +516,9 @@ golang.org/x/crypto v0.0.0-20200320181102-891825fb96df/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
-golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
+golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
+golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=