diff --git a/config/config_defaults.go b/config/config_defaults.go
index cb586bbf7..6c611b344 100644
--- a/config/config_defaults.go
+++ b/config/config_defaults.go
@@ -469,6 +469,7 @@ const CGRATES_CFG_JSON = `
// nats
// "natsJetStream": false, // controls if the nats reader uses the JetStream
// "natsConsumerName": "cgrates", // in case of JetStream the name of the consumer
+ // "natsStreamName": "cdrs", // the name of the NATS JetStream stream from which the consumer will read messages
"natsSubject": "cgrates_cdrs", // the subject from were the events are read
// "natsQueueID": "", // the queue id the consumer listen to
// "natsJWTFile": "", // the path to the JWT file( can be the chained file or the user file)
@@ -485,7 +486,7 @@ const CGRATES_CFG_JSON = `
// "natsCertificateAuthorityProcessed": "", // the path to a custom certificate authority file( used by tls)
// "natsClientCertificateProcessed": "", // the path to a client certificate( used by tls)
// "natsClientKeyProcessed": "", // the path to a client key( used by tls)
- // "natsJetStreamMaxWaitProcessed": "5s ", // the maximum amount of time to wait for a response
+ // "natsJetStreamMaxWaitProcessed": "5s", // the maximum amount of time to wait for a response
},
"tenant": "", // tenant used by import
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
diff --git a/config/erscfg.go b/config/erscfg.go
index eee0e0eea..327ef9c0d 100644
--- a/config/erscfg.go
+++ b/config/erscfg.go
@@ -181,6 +181,7 @@ type EventReaderOpts struct {
S3BucketIDProcessed *string
NATSJetStream *bool
NATSConsumerName *string
+ NATSStreamName *string
NATSSubject *string
NATSQueueID *string
NATSJWTFile *string
@@ -384,6 +385,9 @@ func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err
if jsnCfg.NATSConsumerName != nil {
erOpts.NATSConsumerName = jsnCfg.NATSConsumerName
}
+ if jsnCfg.NATSStreamName != nil {
+ erOpts.NATSStreamName = jsnCfg.NATSStreamName
+ }
if jsnCfg.NATSSubject != nil {
erOpts.NATSSubject = jsnCfg.NATSSubject
}
@@ -731,6 +735,10 @@ func (erOpts *EventReaderOpts) Clone() *EventReaderOpts {
cln.NATSConsumerName = new(string)
*cln.NATSConsumerName = *erOpts.NATSConsumerName
}
+ if erOpts.NATSStreamName != nil {
+ cln.NATSStreamName = new(string)
+ *cln.NATSStreamName = *erOpts.NATSStreamName
+ }
if erOpts.NATSSubject != nil {
cln.NATSSubject = new(string)
*cln.NATSSubject = *erOpts.NATSSubject
@@ -999,6 +1007,9 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string
if er.Opts.NATSConsumerName != nil {
opts[utils.NatsConsumerName] = *er.Opts.NATSConsumerName
}
+ if er.Opts.NATSStreamName != nil {
+ opts[utils.NatsStreamName] = *er.Opts.NATSStreamName
+ }
if er.Opts.NATSSubject != nil {
opts[utils.NatsSubject] = *er.Opts.NATSSubject
}
@@ -1150,6 +1161,7 @@ type EventReaderOptsJson struct {
S3BucketIDProcessed *string `json:"s3BucketIDProcessed"`
NATSJetStream *bool `json:"natsJetStream"`
NATSConsumerName *string `json:"natsConsumerName"`
+ NATSStreamName *string `json:"natsStreamName"`
NATSSubject *string `json:"natsSubject"`
NATSQueueID *string `json:"natsQueueID"`
NATSJWTFile *string `json:"natsJWTFile"`
@@ -1614,6 +1626,14 @@ func diffEventReaderOptsJsonCfg(d *EventReaderOptsJson, v1, v2 *EventReaderOpts)
} else {
d.NATSConsumerName = nil
}
+ if v2.NATSStreamName != nil {
+ if v1.NATSStreamName == nil ||
+ *v1.NATSStreamName != *v2.NATSStreamName {
+ d.NATSStreamName = v2.NATSStreamName
+ }
+ } else {
+ d.NATSStreamName = nil
+ }
if v2.NATSSubject != nil {
if v1.NATSSubject == nil ||
*v1.NATSSubject != *v2.NATSSubject {
diff --git a/config/erscfg_test.go b/config/erscfg_test.go
index eb3b378c9..514b5c2bb 100644
--- a/config/erscfg_test.go
+++ b/config/erscfg_test.go
@@ -1781,6 +1781,7 @@ func TestERsLoadFromJSONCfg(t *testing.T) {
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
NATSJetStream: utils.BoolPointer(false),
NATSConsumerName: utils.StringPointer("consumer_name"),
+ NATSStreamName: utils.StringPointer("stream_name"),
NATSSubject: utils.StringPointer("subject"),
NATSQueueID: utils.StringPointer("queue_id"),
NATSJWTFile: utils.StringPointer("jsw_file"),
@@ -1849,6 +1850,7 @@ func TestERsLoadFromJSONCfg(t *testing.T) {
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
NATSJetStream: utils.BoolPointer(false),
NATSConsumerName: utils.StringPointer("consumer_name"),
+ NATSStreamName: utils.StringPointer("stream_name"),
NATSSubject: utils.StringPointer("subject"),
NATSQueueID: utils.StringPointer("queue_id"),
NATSJWTFile: utils.StringPointer("jsw_file"),
@@ -1934,6 +1936,7 @@ func TestERsLoadFromJsonCfgParseError(t *testing.T) {
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
NATSJetStream: utils.BoolPointer(false),
NATSConsumerName: utils.StringPointer("consumer_name"),
+ NATSStreamName: utils.StringPointer("stream_name"),
NATSSubject: utils.StringPointer("subject"),
NATSQueueID: utils.StringPointer("queue_id"),
NATSJWTFile: utils.StringPointer("jsw_file"),
@@ -2022,6 +2025,7 @@ func TestERsClone(t *testing.T) {
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
NATSJetStream: utils.BoolPointer(false),
NATSConsumerName: utils.StringPointer("consumer_name"),
+ NATSStreamName: utils.StringPointer("stream_name"),
NATSSubject: utils.StringPointer("subject"),
NATSQueueID: utils.StringPointer("queue_id"),
NATSJWTFile: utils.StringPointer("jsw_file"),
@@ -2090,6 +2094,7 @@ func TestERsClone(t *testing.T) {
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
NATSJetStream: utils.BoolPointer(false),
NATSConsumerName: utils.StringPointer("consumer_name"),
+ NATSStreamName: utils.StringPointer("stream_name"),
NATSSubject: utils.StringPointer("subject"),
NATSQueueID: utils.StringPointer("queue_id"),
NATSJWTFile: utils.StringPointer("jsw_file"),
@@ -2166,6 +2171,7 @@ func TestERsAsMapInterface(t *testing.T) {
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
NATSJetStream: utils.BoolPointer(false),
NATSConsumerName: utils.StringPointer("consumer_name"),
+ NATSStreamName: utils.StringPointer("stream_name"),
NATSSubject: utils.StringPointer("subject"),
NATSQueueID: utils.StringPointer("queue_id"),
NATSJWTFile: utils.StringPointer("jsw_file"),
@@ -2314,6 +2320,7 @@ func TestDiffEventReaderOptsJsonCfg(t *testing.T) {
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed_diff"),
NATSJetStream: utils.BoolPointer(true),
NATSConsumerName: utils.StringPointer("consumer_name_diff"),
+ NATSStreamName: utils.StringPointer("stream_name_diff"),
NATSSubject: utils.StringPointer("subject_diff"),
NATSQueueID: utils.StringPointer("queue_id_diff"),
NATSJWTFile: utils.StringPointer("jsw_file_diff"),
@@ -2382,6 +2389,7 @@ func TestDiffEventReaderOptsJsonCfg(t *testing.T) {
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
NATSJetStream: utils.BoolPointer(false),
NATSConsumerName: utils.StringPointer("consumer_name"),
+ NATSStreamName: utils.StringPointer("stream_name"),
NATSSubject: utils.StringPointer("subject"),
NATSQueueID: utils.StringPointer("queue_id"),
NATSJWTFile: utils.StringPointer("jsw_file"),
@@ -2450,6 +2458,7 @@ func TestDiffEventReaderOptsJsonCfg(t *testing.T) {
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
NATSJetStream: utils.BoolPointer(false),
NATSConsumerName: utils.StringPointer("consumer_name"),
+ NATSStreamName: utils.StringPointer("stream_name"),
NATSSubject: utils.StringPointer("subject"),
NATSQueueID: utils.StringPointer("queue_id"),
NATSJWTFile: utils.StringPointer("jsw_file"),
diff --git a/data/conf/samples/ers_nats/cgrates.json b/data/conf/samples/ers_nats/cgrates.json
new file mode 100644
index 000000000..4adffed95
--- /dev/null
+++ b/data/conf/samples/ers_nats/cgrates.json
@@ -0,0 +1,75 @@
+{
+
+"general": {
+ "log_level": 7
+},
+
+"data_db": {
+ "db_type": "*internal"
+},
+
+"stor_db": {
+ "db_type": "*internal"
+},
+
+"ers": {
+ "enabled": true,
+ "sessions_conns":[],
+ "readers": [
+ {
+ "id": "nats_reader1",
+ "type": "*natsJSONMap",
+ "source_path": "nats://127.0.0.1:4222",
+ "processed_path": "nats://127.0.0.1:4222",
+ "opts": {
+ "natsJetStream": true,
+ "natsConsumerName": "cgrates",
+ "natsStreamName": "stream",
+ "natsSubject": "cgrates_cdrs",
+ "natsQueueID": "queue",
+ "natsJetStreamMaxWait": "5s",
+
+ "natsJetStreamProcessed": true,
+ "natsSubjectProcessed": "cgrates_cdrs_processed",
+ "natsJetStreamMaxWaitProcessed": "5s"
+ },
+ "flags": ["*dryrun"],
+ "fields":[
+ {"tag": "cdr_template", "type": "*template", "value": "cdr_template"}
+ ]
+ },
+ {
+ "id": "nats_reader2",
+ "type": "*natsJSONMap",
+ "source_path": "nats://127.0.0.1:4222",
+ "processed_path": "nats://127.0.0.1:4222",
+ "opts": {
+ "natsJetStream": true,
+ "natsConsumerName": "cgrates",
+ "natsStreamName": "stream",
+ "natsSubject": "cgrates_cdrs",
+ "natsQueueID": "queue",
+ "natsJetStreamMaxWait": "5s",
+
+ "natsJetStreamProcessed": true,
+ "natsSubjectProcessed": "cgrates_cdrs_processed",
+ "natsJetStreamMaxWaitProcessed": "5s"
+ },
+ "flags": ["*dryrun"],
+ "fields":[
+ {"tag": "cdr_template", "type": "*template", "value": "cdr_template"}
+ ]
+ }
+ ]
+},
+
+
+"templates": {
+ "cdr_template": [
+ {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true},
+ {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.Subject", "mandatory": true},
+ {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true}
+ ]
+}
+
+}
\ No newline at end of file
diff --git a/ees/nats.go b/ees/nats.go
index 5c849703d..f38cc69ea 100644
--- a/ees/nats.go
+++ b/ees/nats.go
@@ -22,7 +22,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
- "io/ioutil"
+ "os"
"sync"
"time"
@@ -30,6 +30,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/nats-io/nats.go"
+ "github.com/nats-io/nats.go/jetstream"
)
// NewNatsEE creates a kafka poster
@@ -40,7 +41,7 @@ func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Dur
subject: utils.DefaultQueueID,
reqs: newConcReq(cfg.ConcurrentRequests),
}
- err = natsPstr.parseOpt(cfg.Opts, nodeID, connTimeout)
+ err = natsPstr.parseOpts(cfg.Opts, nodeID, connTimeout)
return
}
@@ -49,10 +50,9 @@ type NatsEE struct {
subject string // identifier of the CDR queue where we publish
jetStream bool
opts []nats.Option
- jsOpts []nats.JSOpt
poster *nats.Conn
- posterJS nats.JetStreamContext
+ posterJS jetstream.JetStream
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
@@ -61,74 +61,79 @@ type NatsEE struct {
bytePreparing
}
-func (pstr *NatsEE) parseOpt(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (err error) {
+func (pstr *NatsEE) parseOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) error {
if opts.NATSJetStream != nil {
pstr.jetStream = *opts.NATSJetStream
}
- pstr.subject = utils.DefaultQueueID
if opts.NATSSubject != nil {
pstr.subject = *opts.NATSSubject
}
+ var err error
pstr.opts, err = GetNatsOpts(opts, nodeID, connTimeout)
- if pstr.jetStream {
- if opts.NATSJetStreamMaxWait != nil {
- pstr.jsOpts = []nats.JSOpt{nats.MaxWait(*opts.NATSJetStreamMaxWait)}
- }
- }
- return
+ return err
}
func (pstr *NatsEE) Cfg() *config.EventExporterCfg { return pstr.cfg }
-func (pstr *NatsEE) Connect() (err error) {
+func (pstr *NatsEE) Connect() error {
pstr.Lock()
defer pstr.Unlock()
- if pstr.poster == nil {
- if pstr.poster, err = nats.Connect(pstr.Cfg().ExportPath, pstr.opts...); err != nil {
- return
- }
- if pstr.jetStream {
- pstr.posterJS, err = pstr.poster.JetStream(pstr.jsOpts...)
- }
+ if pstr.poster != nil {
+ return nil
}
- return
-}
-func (pstr *NatsEE) ExportEvent(ctx *context.Context, content, _ any) (err error) {
- pstr.reqs.get()
- pstr.RLock()
- if pstr.poster == nil {
- pstr.RUnlock()
- pstr.reqs.done()
- return utils.ErrDisconnected
+ var err error
+ pstr.poster, err = nats.Connect(pstr.Cfg().ExportPath, pstr.opts...)
+ if err != nil {
+ return err
}
if pstr.jetStream {
- _, err = pstr.posterJS.Publish(pstr.subject, content.([]byte), nats.Context(ctx))
+ pstr.posterJS, err = jetstream.New(pstr.poster)
+ }
+ return err
+}
+
+func (pstr *NatsEE) ExportEvent(ctx *context.Context, content, _ any) error {
+ pstr.reqs.get()
+ defer pstr.reqs.done()
+ pstr.RLock()
+ defer pstr.RUnlock()
+ if pstr.poster == nil {
+ return utils.ErrDisconnected
+ }
+ var err error
+ if pstr.jetStream {
+ ctx := context.TODO()
+ if pstr.cfg.Opts.NATSJetStreamMaxWait != nil {
+ ctx, _ = context.WithTimeout(ctx, *pstr.cfg.Opts.NATSJetStreamMaxWait)
+ }
+ _, err = pstr.posterJS.Publish(ctx, pstr.subject, content.([]byte))
} else {
err = pstr.poster.Publish(pstr.subject, content.([]byte))
}
- pstr.RUnlock()
- pstr.reqs.done()
- return
+ return err
}
-func (pstr *NatsEE) Close() (err error) {
+func (pstr *NatsEE) Close() error {
pstr.Lock()
- if pstr.poster != nil {
- err = pstr.poster.Drain()
- pstr.poster = nil
+ defer pstr.Unlock()
+
+ if pstr.poster == nil {
+ return nil
}
- pstr.Unlock()
- return
+
+ err := pstr.poster.Drain()
+ pstr.poster = nil
+ return err
}
func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
func (pstr *NatsEE) ExtraData(ev *utils.CGREvent) any { return nil }
-func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) {
- nop = make([]nats.Option, 0, 7)
- nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID),
+func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) ([]nats.Option, error) {
+ natsOpts := make([]nats.Option, 0, 7)
+ natsOpts = append(natsOpts, nats.Name(utils.CGRateSLwr+nodeID),
nats.Timeout(connTimeout),
nats.DrainTimeout(time.Second))
if opts.NATSJWTFile != nil {
@@ -136,33 +141,33 @@ func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time
if opts.NATSSeedFile != nil {
keys = append(keys, *opts.NATSSeedFile)
}
- nop = append(nop, nats.UserCredentials(*opts.NATSJWTFile, keys...))
+ natsOpts = append(natsOpts, nats.UserCredentials(*opts.NATSJWTFile, keys...))
}
if opts.NATSSeedFile != nil {
opt, err := nats.NkeyOptionFromSeed(*opts.NATSSeedFile)
if err != nil {
return nil, err
}
- nop = append(nop, opt)
+ natsOpts = append(natsOpts, opt)
}
- if opts.NATSClientCertificate != nil {
- if opts.NATSClientKey == nil {
- err = fmt.Errorf("has certificate but no key")
- return
- }
- nop = append(nop, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey))
- } else if opts.NATSClientKey != nil {
- err = fmt.Errorf("has key but no certificate")
- return
+
+ switch {
+ case opts.NATSClientCertificate != nil && opts.NATSClientKey != nil:
+ natsOpts = append(natsOpts, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey))
+ case opts.NATSClientCertificate != nil:
+ return nil, fmt.Errorf("has certificate but no key")
+ case opts.NATSClientKey != nil:
+ return nil, fmt.Errorf("has key but no certificate")
}
+
if opts.NATSCertificateAuthority != nil {
- nop = append(nop,
+ natsOpts = append(natsOpts,
func(o *nats.Options) error {
pool, err := x509.SystemCertPool()
if err != nil {
return err
}
- rootPEM, err := ioutil.ReadFile(*opts.NATSCertificateAuthority)
+ rootPEM, err := os.ReadFile(*opts.NATSCertificateAuthority)
if err != nil || rootPEM == nil {
return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err)
}
@@ -179,5 +184,5 @@ func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time
return nil
})
}
- return
+ return natsOpts, nil
}
diff --git a/ees/nats_it_test.go b/ees/nats_it_test.go
index b168e0be9..8b685c87c 100644
--- a/ees/nats_it_test.go
+++ b/ees/nats_it_test.go
@@ -22,7 +22,7 @@ along with this program. If not, see
package ees
import (
- "os/exec"
+ "os"
"path"
"testing"
"time"
@@ -31,18 +31,25 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
+ "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
+ "github.com/nats-io/nats.go/jetstream"
)
-func TestNatsEE(t *testing.T) {
- testCreateDirectory(t)
- var err error
- cmd := exec.Command("nats-server", "-js") // Start the nats-server.
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // Only if nats-server is not installed.
+func TestNatsEEJetStream(t *testing.T) {
+
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ JetStream: true,
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
+ natsServer.Start()
+ defer natsServer.Shutdown()
+
+ testCreateDirectory(t)
cgrCfg, err := config.NewCGRConfigFromPath(context.Background(), path.Join(*dataDir, "conf", "samples", "ees"))
if err != nil {
t.Fatal(err)
@@ -63,38 +70,40 @@ func TestNatsEE(t *testing.T) {
t.Fatal(err)
}
- nc, err := nats.Connect("nats://localhost:4222", nop...)
+ nc, err := nats.Connect(nats.DefaultURL, nop...)
if err != nil {
t.Fatal(err)
}
- js, err := nc.JetStream()
+ defer nc.Drain()
+
+ js, err := jetstream.New(nc)
if err != nil {
t.Fatal(err)
}
- for name := range js.StreamNames() {
- if name == "test2" {
- if err = js.DeleteStream("test2"); err != nil {
- t.Fatal(err)
- }
- break
- }
- }
- if _, err = js.AddStream(&nats.StreamConfig{
+ _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{
Name: "test2",
Subjects: []string{"processed_cdrs"},
- }); err != nil {
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer js.DeleteStream(context.Background(), "test2")
+
+ ch := make(chan string, 3)
+ var cons jetstream.Consumer
+ cons, err = js.CreateOrUpdateConsumer(context.Background(), "test2", jetstream.ConsumerConfig{
+ Durable: "test4",
+ FilterSubject: "processed_cdrs",
+ AckPolicy: jetstream.AckAllPolicy,
+ })
+ if err != nil {
t.Fatal(err)
}
- if err = js.PurgeStream("test2"); err != nil {
- t.Fatal(err)
- }
-
- ch := make(chan *nats.Msg, 3)
- _, err = js.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) {
- ch <- msg
- }, nats.Durable("test4"))
+ _, err = cons.Consume(func(msg jetstream.Msg) {
+ ch <- string(msg.Data())
+ })
if err != nil {
t.Fatal(err)
}
@@ -114,24 +123,26 @@ func TestNatsEE(t *testing.T) {
// fmt.Println((<-ch).Data)
select {
case data := <-ch:
- if expected != string(data.Data) {
- t.Fatalf("Expected %v \n but received \n %v", expected, string(data.Data))
+ if expected != data {
+ t.Fatalf("Expected %v \n but received \n %v", expected, data)
}
case <-time.After(50 * time.Millisecond):
t.Fatal("Time limit exceeded")
}
}
-func TestNatsEE2(t *testing.T) {
+func TestNatsEE(t *testing.T) {
testCreateDirectory(t)
- exec.Command("pkill", "nats-server")
- cmd := exec.Command("nats-server")
- if err := cmd.Start(); err != nil {
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ })
+ if err != nil {
t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
+ natsServer.Start()
+ defer natsServer.Shutdown()
cgrCfg, err := config.NewCGRConfigFromPath(context.Background(), path.Join(*dataDir, "conf", "samples", "ees"))
if err != nil {
@@ -187,3 +198,31 @@ func TestNatsEE2(t *testing.T) {
t.Fatal("Time limit exceeded")
}
}
+
+func TestGetNatsOptsSeedFile(t *testing.T) {
+ if _, err := os.Create("/tmp/nkey.txt"); err != nil {
+ t.Error(err)
+ }
+ defer os.Remove("/tmp/nkey.txt")
+ nkey := "SUACSSL3UAHUDXKFSNVUZRF5UHPMWZ6BFDTJ7M6USDXIEDNPPQYYYCU3VY"
+ os.WriteFile("/tmp/nkey.txt", []byte(nkey), 0777)
+
+ opts := &config.EventExporterOpts{
+ NATSSeedFile: utils.StringPointer("/tmp/nkey.txt"),
+ }
+
+ nodeID := "node_id1"
+ connTimeout := 2 * time.Second
+
+ _, err := GetNatsOpts(opts, nodeID, connTimeout)
+ if err != nil {
+ t.Error(err)
+ }
+
+ //test error
+ os.WriteFile("/tmp/nkey.txt", []byte(""), 0777)
+ _, err = GetNatsOpts(opts, nodeID, connTimeout)
+ if err == nil || err.Error() != "nkeys: no nkey seed found" {
+ t.Errorf("expected \"%s\" but received \"%s\"", "nkeys: no nkey seed found", err.Error())
+ }
+}
diff --git a/ers/amqp.go b/ers/amqp.go
index fcfa13a8f..c14c47014 100644
--- a/ers/amqp.go
+++ b/ers/amqp.go
@@ -254,7 +254,7 @@ func (rdr *AMQPER) close() (err error) {
}
func (rdr *AMQPER) createPoster() {
- processedOpt := getProcessOptions(rdr.Config().Opts)
+ processedOpt := getProcessedOptions(rdr.Config().Opts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
diff --git a/ers/amqpv1.go b/ers/amqpv1.go
index 7ef3f0d12..abd2e131f 100644
--- a/ers/amqpv1.go
+++ b/ers/amqpv1.go
@@ -211,7 +211,7 @@ func (rdr *AMQPv1ER) close() (err error) {
}
func (rdr *AMQPv1ER) createPoster() {
- processedOpt := getProcessOptions(rdr.Config().Opts)
+ processedOpt := getProcessedOptions(rdr.Config().Opts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
diff --git a/ers/kafka.go b/ers/kafka.go
index e516cedde..e91698214 100644
--- a/ers/kafka.go
+++ b/ers/kafka.go
@@ -250,7 +250,7 @@ func (rdr *KafkaER) setOpts(opts *config.EventReaderOpts) (err error) {
}
func (rdr *KafkaER) createPoster() {
- processedOpt := getProcessOptions(rdr.Config().Opts)
+ processedOpt := getProcessedOptions(rdr.Config().Opts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
diff --git a/ers/libers.go b/ers/libers.go
index 54dc61164..7dff1177f 100644
--- a/ers/libers.go
+++ b/ers/libers.go
@@ -28,8 +28,9 @@ import (
"github.com/cgrates/cgrates/utils"
)
-// getProcessOptions assigns all non-nil fields ending in "Processed" from EventReaderOpts to their counterparts in EventExporterOpts
-func getProcessOptions(erOpts *config.EventReaderOpts) (eeOpts *config.EventExporterOpts) {
+// getProcessedOptions assigns all non-nil fields ending in "Processed" from EventReaderOpts to their counterparts in EventExporterOpts
+func getProcessedOptions(erOpts *config.EventReaderOpts) *config.EventExporterOpts {
+ var eeOpts *config.EventExporterOpts
if erOpts.AMQPExchangeProcessed != nil {
if eeOpts == nil {
eeOpts = new(config.EventExporterOpts)
@@ -198,7 +199,7 @@ func getProcessOptions(erOpts *config.EventReaderOpts) (eeOpts *config.EventExpo
}
eeOpts.PgSSLMode = erOpts.PgSSLModeProcessed
}
- return
+ return eeOpts
}
// mergePartialEvents will unite the events using the reader configuration
diff --git a/ers/libers_test.go b/ers/libers_test.go
index 3af897db0..0f10ba12a 100644
--- a/ers/libers_test.go
+++ b/ers/libers_test.go
@@ -31,7 +31,7 @@ func TestGetProcessOptions(t *testing.T) {
opts := &config.EventReaderOpts{
AWSKeyProcessed: utils.StringPointer("testKey"),
}
- result := getProcessOptions(opts)
+ result := getProcessedOptions(opts)
expected := &config.EventExporterOpts{
AWSKey: utils.StringPointer("testKey"),
}
diff --git a/ers/nats.go b/ers/nats.go
index e14f9c8d5..b130af932 100644
--- a/ers/nats.go
+++ b/ers/nats.go
@@ -23,7 +23,7 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
- "io/ioutil"
+ "os"
"time"
"github.com/cgrates/birpc/context"
@@ -33,12 +33,13 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/nats-io/nats.go"
+ "github.com/nats-io/nats.go/jetstream"
)
// NewNatsER return a new amqp event reader
func NewNatsER(cfg *config.CGRConfig, cfgIdx int,
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
- fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (_ EventReader, err error) {
+ fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (EventReader, error) {
rdr := &NatsER{
connMgr: connMgr,
cgrCfg: cfg,
@@ -55,11 +56,14 @@ func NewNatsER(cfg *config.CGRConfig, cfgIdx int,
rdr.cap <- struct{}{}
}
}
- if err = rdr.processOpts(); err != nil {
- return
+ var err error
+ err = rdr.processOpts()
+ if err != nil {
+ return nil, err
}
- if err = rdr.createPoster(); err != nil {
- return
+ err = rdr.createPoster()
+ if err != nil {
+ return nil, err
}
return rdr, nil
}
@@ -82,8 +86,8 @@ type NatsER struct {
queueID string
jetStream bool
consumerName string
+ streamName string
opts []nats.Option
- jsOpts []nats.JSOpt
poster *ees.NatsEE
}
@@ -93,69 +97,106 @@ func (rdr *NatsER) Config() *config.EventReaderCfg {
return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx]
}
-// Serve will start the gorutines needed to watch the nats subject
-func (rdr *NatsER) Serve() (err error) {
- // Connect to a server
- var nc *nats.Conn
- var js nats.JetStreamContext
+// Serve will subscribe to a NATS subject and process incoming messages until the rdrExit channel
+// will be closed.
+func (rdr *NatsER) Serve() error {
- if nc, err = nats.Connect(rdr.Config().SourcePath, rdr.opts...); err != nil {
- return
+ // Establish a connection to the nats server.
+ nc, err := nats.Connect(rdr.Config().SourcePath, rdr.opts...)
+ if err != nil {
+ return err
}
- ch := make(chan *nats.Msg)
+
+ // Define the message handler. Its content will get executed for every received message.
+ handleMessage := func(msgData []byte) {
+
+ // If the rdr.cap channel buffer is empty, block until a resource is available. Otherwise
+ // allocate one resource and start processing the message.
+ if rdr.Config().ConcurrentReqs != -1 {
+ <-rdr.cap
+ }
+ go func() {
+ handlerErr := rdr.processMessage(msgData)
+ if handlerErr != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> processing message %s error: %s",
+ utils.ERs, string(msgData), handlerErr.Error()))
+ }
+
+ // Export the received message if a poster has been defined.
+ if rdr.poster != nil {
+ handlerErr = ees.ExportWithAttempts(context.TODO(), rdr.poster, msgData,
+ utils.EmptyString, rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant)
+ if handlerErr != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> writing message %s error: %s",
+ utils.ERs, string(msgData), handlerErr.Error()))
+ }
+ }
+
+ // Release the resource back to rdr.cap channel.
+ if rdr.Config().ConcurrentReqs != -1 {
+ rdr.cap <- struct{}{}
+ }
+
+ }()
+ }
+
+ // Subscribe to the appropriate NATS subject.
if !rdr.jetStream {
- if _, err = nc.ChanQueueSubscribe(rdr.subject, rdr.queueID, ch); err != nil {
- return
+ _, err = nc.QueueSubscribe(rdr.subject, rdr.queueID, func(msg *nats.Msg) {
+ handleMessage(msg.Data)
+ })
+ if err != nil {
+ nc.Drain()
+ return err
}
} else {
- js, err = nc.JetStream(rdr.jsOpts...)
+ var js jetstream.JetStream
+ js, err = jetstream.New(nc)
if err != nil {
- return
+ nc.Drain()
+ return err
}
- if _, err = js.QueueSubscribe(rdr.subject, rdr.queueID, func(msg *nats.Msg) {
- ch <- msg
- }, nats.Durable(rdr.consumerName)); err != nil {
- return
+ ctx := context.TODO()
+ if jsMaxWait := rdr.Config().Opts.NATSJetStreamMaxWait; jsMaxWait != nil {
+ ctx, _ = context.WithTimeout(ctx, *jsMaxWait)
+ }
+
+ var cons jetstream.Consumer
+ cons, err = js.CreateOrUpdateConsumer(ctx, rdr.streamName, jetstream.ConsumerConfig{
+ FilterSubject: rdr.subject,
+ Durable: rdr.consumerName,
+ AckPolicy: jetstream.AckAllPolicy,
+ })
+ if err != nil {
+ nc.Drain()
+ return err
+ }
+
+ _, err = cons.Consume(func(msg jetstream.Msg) {
+ handleMessage(msg.Data())
+ })
+ if err != nil {
+ nc.Drain()
+ return err
}
}
+
go func() {
- for {
- if rdr.Config().ConcurrentReqs != -1 {
- <-rdr.cap // do not try to read if the limit is reached
- }
- select {
- case <-rdr.rdrExit:
- utils.Logger.Info(
- fmt.Sprintf("<%s> stop monitoring nats path <%s>",
- utils.ERs, rdr.Config().SourcePath))
- nc.Drain()
- if rdr.poster != nil {
- rdr.poster.Close()
- }
- return
- case msg := <-ch:
- go func(msg *nats.Msg) {
- if err := rdr.processMessage(msg.Data); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf("<%s> processing message %s error: %s",
- utils.ERs, string(msg.Data), err.Error()))
- }
- if rdr.poster != nil { // post it
- if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Data, utils.EmptyString,
- rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf("<%s> writing message %s error: %s",
- utils.ERs, string(msg.Data), err.Error()))
- }
- }
- if rdr.Config().ConcurrentReqs != -1 {
- rdr.cap <- struct{}{}
- }
- }(msg)
- }
+
+ // Wait for exit signal.
+ <-rdr.rdrExit
+ utils.Logger.Info(
+ fmt.Sprintf("<%s> stop monitoring nats path <%s>",
+ utils.ERs, rdr.Config().SourcePath))
+ nc.Drain()
+ if rdr.poster != nil {
+ rdr.poster.Close()
}
}()
- return
+
+ return nil
}
func (rdr *NatsER) processMessage(msg []byte) (err error) {
@@ -191,7 +232,7 @@ func (rdr *NatsER) processMessage(msg []byte) (err error) {
}
func (rdr *NatsER) createPoster() (err error) {
- processedOpt := getProcessOptions(rdr.Config().Opts)
+ processedOpt := getProcessedOptions(rdr.Config().Opts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
@@ -202,37 +243,34 @@ func (rdr *NatsER) createPoster() (err error) {
return
}
-func (rdr *NatsER) processOpts() (err error) {
+func (rdr *NatsER) processOpts() error {
if rdr.Config().Opts.NATSSubject != nil {
rdr.subject = *rdr.Config().Opts.NATSSubject
}
- var queueID string
+ rdr.queueID = rdr.cgrCfg.GeneralCfg().NodeID
if rdr.Config().Opts.NATSQueueID != nil {
- queueID = *rdr.Config().Opts.NATSQueueID
+ rdr.queueID = *rdr.Config().Opts.NATSQueueID
}
- rdr.queueID = utils.FirstNonEmpty(queueID, rdr.cgrCfg.GeneralCfg().NodeID)
- var consumerName string
+ rdr.consumerName = utils.CGRateSLwr
if rdr.Config().Opts.NATSConsumerName != nil {
- consumerName = *rdr.Config().Opts.NATSConsumerName
+ rdr.consumerName = *rdr.Config().Opts.NATSConsumerName
+ }
+ if rdr.Config().Opts.NATSStreamName != nil {
+ rdr.streamName = *rdr.Config().Opts.NATSStreamName
}
- rdr.consumerName = utils.FirstNonEmpty(consumerName, utils.CGRateSLwr)
if rdr.Config().Opts.NATSJetStream != nil {
rdr.jetStream = *rdr.Config().Opts.NATSJetStream
}
- if rdr.jetStream {
- if rdr.Config().Opts.NATSJetStreamMaxWait != nil {
- rdr.jsOpts = []nats.JSOpt{nats.MaxWait(*rdr.Config().Opts.NATSJetStreamMaxWait)}
- }
- }
+ var err error
rdr.opts, err = GetNatsOpts(rdr.Config().Opts,
rdr.cgrCfg.GeneralCfg().NodeID,
rdr.cgrCfg.GeneralCfg().ConnectTimeout)
- return
+ return err
}
func GetNatsOpts(opts *config.EventReaderOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) {
- nop = make([]nats.Option, 0, 7)
- nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID),
+ natsOpts := make([]nats.Option, 0, 7)
+ natsOpts = append(natsOpts, nats.Name(utils.CGRateSLwr+nodeID),
nats.Timeout(connTimeout),
nats.DrainTimeout(time.Second))
if opts.NATSJWTFile != nil {
@@ -240,33 +278,33 @@ func GetNatsOpts(opts *config.EventReaderOpts, nodeID string, connTimeout time.D
if opts.NATSSeedFile != nil {
keys = append(keys, *opts.NATSSeedFile)
}
- nop = append(nop, nats.UserCredentials(*opts.NATSJWTFile, keys...))
+ natsOpts = append(natsOpts, nats.UserCredentials(*opts.NATSJWTFile, keys...))
}
if opts.NATSSeedFile != nil {
opt, err := nats.NkeyOptionFromSeed(*opts.NATSSeedFile)
if err != nil {
return nil, err
}
- nop = append(nop, opt)
+ natsOpts = append(natsOpts, opt)
}
- if opts.NATSClientCertificate != nil {
- if opts.NATSClientKey == nil {
- err = fmt.Errorf("has certificate but no key")
- return
- }
- nop = append(nop, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey))
- } else if opts.NATSClientKey != nil {
- err = fmt.Errorf("has key but no certificate")
- return
+
+ switch {
+ case opts.NATSClientCertificate != nil && opts.NATSClientKey != nil:
+ natsOpts = append(natsOpts, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey))
+ case opts.NATSClientCertificate != nil:
+ return nil, fmt.Errorf("has certificate but no key")
+ case opts.NATSClientKey != nil:
+ return nil, fmt.Errorf("has key but no certificate")
}
+
if opts.NATSCertificateAuthority != nil {
- nop = append(nop,
+ natsOpts = append(natsOpts,
func(o *nats.Options) error {
pool, err := x509.SystemCertPool()
if err != nil {
return err
}
- rootPEM, err := ioutil.ReadFile(*opts.NATSCertificateAuthority)
+ rootPEM, err := os.ReadFile(*opts.NATSCertificateAuthority)
if err != nil || rootPEM == nil {
return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err)
}
@@ -283,5 +321,5 @@ func GetNatsOpts(opts *config.EventReaderOpts, nodeID string, connTimeout time.D
return nil
})
}
- return
+ return natsOpts, nil
}
diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go
index 5a37fa05d..d0333e0f6 100644
--- a/ers/nats_it_test.go
+++ b/ers/nats_it_test.go
@@ -22,22 +22,109 @@ along with this program. If not, see
package ers
import (
+ "encoding/json"
"fmt"
"os"
- "os/exec"
"path"
"reflect"
"runtime"
"testing"
"time"
+ "github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
+ "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
+ "github.com/nats-io/nats.go/jetstream"
)
-func testCheckNatsData(t *testing.T, randomOriginID, expData string, ch chan *nats.Msg) {
+func TestNatsERIT(t *testing.T) {
+ cfgPath := path.Join(*dataDir, "conf", "samples", "ers_nats")
+ cfg, err := config.NewCGRConfigFromPath(context.Background(), cfgPath)
+ if err != nil {
+ t.Fatal("could not init cfg", err.Error())
+ }
+
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ JetStream: true,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ natsServer.Start()
+ defer natsServer.Shutdown()
+
+ // Establish a connection to nats.
+ nc, err := nats.Connect(cfg.ERsCfg().Readers[1].SourcePath)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer nc.Close()
+
+ // Initialize a stream manager and create a stream.
+ js, err := jetstream.New(nc)
+ if err != nil {
+ t.Fatal(err)
+ }
+ js.CreateStream(context.Background(), jetstream.StreamConfig{
+ Name: "stream",
+ Subjects: []string{"cgrates_cdrs", "cgrates_cdrs_processed"},
+ })
+
+ // Start the engine.
+ if _, err := engine.StopStartEngine(cfgPath, 100); err != nil {
+ t.Fatal(err)
+ }
+ defer engine.KillEngine(100)
+
+ // Publish CDRs asynchronously to the nats subject.
+ cdr := make(map[string]any)
+ for i := 0; i < 10; i++ {
+ cdr[utils.AccountField] = 1001 + i
+ cdr[utils.Subject] = 1001 + i
+ cdr[utils.Destination] = 2001 + i
+ b, _ := json.Marshal(cdr)
+ js.PublishAsync("cgrates_cdrs", b)
+ }
+ select {
+ case <-js.PublishAsyncComplete():
+ case <-time.After(5 * time.Second):
+ t.Fatal("Did not resolve in time")
+ }
+
+ // Define a consumer for the subject where all the processed cdrs were published.
+ var cons jetstream.Consumer
+ cons, err = js.CreateOrUpdateConsumer(context.Background(), "stream", jetstream.ConsumerConfig{
+ FilterSubject: "cgrates_cdrs_processed",
+ Durable: "cgrates_processed",
+ AckPolicy: jetstream.AckAllPolicy,
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // Wait for the messages to be consumed and processed.
+ time.Sleep(100 * time.Millisecond)
+
+ // Retrieve info about the consumer.
+ info, err := cons.Info(context.Background())
+ if err != nil {
+ t.Error(err)
+ }
+
+ if info.NumPending != 10 {
+ t.Errorf("expected %d pending messages, received %d", 10, info.NumPending)
+ }
+
+ js.DeleteStream(context.Background(), "stream")
+
+}
+
+func testCheckNatsData(t *testing.T, randomOriginID, expData string, ch chan string) {
select {
case err := <-rdrErr:
t.Fatal(err)
@@ -58,8 +145,8 @@ func testCheckNatsData(t *testing.T, randomOriginID, expData string, ch chan *na
}
select {
case msg := <-ch:
- if expData != string(msg.Data) {
- t.Errorf("Expected %q ,received %q", expData, string(msg.Data))
+ if expData != msg {
+ t.Errorf("Expected %q ,received %q", expData, msg)
}
case <-time.After(10 * time.Second):
t.Fatal("Timeout2")
@@ -89,49 +176,44 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) {
}
defer nc.Drain()
- js, err := nc.JetStream()
+ js, err := jetstream.New(nc)
if err != nil {
t.Fatal(err)
}
- for name := range js.StreamNames() {
- if name == "test" {
- if err = js.DeleteStream("test"); err != nil {
- t.Fatal(err)
- }
- break
- }
- if name == "test2" {
- if err = js.DeleteStream("test2"); err != nil {
- t.Fatal(err)
- }
- break
- }
- }
- if _, err = js.AddStream(&nats.StreamConfig{
+
+ _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{
Name: "test",
Subjects: []string{utils.DefaultQueueID},
- }); err != nil {
+ })
+ if err != nil {
t.Fatal(err)
}
+ defer js.DeleteStream(context.Background(), "test")
- if err = js.PurgeStream("test"); err != nil {
- t.Fatal(err)
- }
-
- if _, err = js.AddStream(&nats.StreamConfig{
+ _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{
Name: "test2",
Subjects: []string{"processed_cdrs"},
- }); err != nil {
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer js.DeleteStream(context.Background(), "test2")
+
+ ch := make(chan string, 3)
+ var cons jetstream.Consumer
+ cons, err = js.CreateOrUpdateConsumer(context.Background(), "test2", jetstream.ConsumerConfig{
+ FilterSubject: "processed_cdrs",
+ Durable: "test4",
+ AckPolicy: jetstream.AckAllPolicy,
+ })
+ if err != nil {
+ nc.Drain()
t.Fatal(err)
}
- if err = js.PurgeStream("test2"); err != nil {
- t.Fatal(err)
- }
- ch := make(chan *nats.Msg, 3)
- _, err = js.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) {
- ch <- msg
- }, nats.Durable("test4"))
+ _, err = cons.Consume(func(msg jetstream.Msg) {
+ ch <- string(msg.Data())
+ })
if err != nil {
t.Fatal(err)
}
@@ -143,7 +225,7 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) {
for i := 0; i < 3; i++ {
randomOriginID := utils.UUIDSha1Prefix()
expData := fmt.Sprintf(`{"OriginID": "%s"}`, randomOriginID)
- if _, err = js.Publish(utils.DefaultQueueID, []byte(expData)); err != nil {
+ if _, err = js.Publish(context.Background(), utils.DefaultQueueID, []byte(expData)); err != nil {
t.Fatal(err)
}
@@ -174,8 +256,10 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) {
if err != nil {
t.Fatal(err)
}
- ch := make(chan *nats.Msg, 3)
- _, err = nc.ChanQueueSubscribe("processed_cdrs", "test3", ch)
+ ch := make(chan string, 3)
+ _, err = nc.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) {
+ ch <- string(msg.Data)
+ })
if err != nil {
t.Fatal(err)
}
@@ -200,16 +284,16 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) {
}
func TestNatsERJetStream(t *testing.T) {
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "-js")
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ JetStream: true,
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
"ers": { // EventReaderService
@@ -231,6 +315,7 @@ func TestNatsERJetStream(t *testing.T) {
],
"opts": {
"natsJetStream": true,
+ "natsStreamName": "test",
"natsJetStreamProcessed": true,
"natsSubjectProcessed": "processed_cdrs",
}
@@ -248,16 +333,15 @@ func TestNatsERJetStream(t *testing.T) {
}
func TestNatsER(t *testing.T) {
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server")
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
"ers": { // EventReaderService
@@ -294,16 +378,18 @@ func TestNatsER(t *testing.T) {
}
func TestNatsERJetStreamUser(t *testing.T) {
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "-js", "--user", "user", "--pass", "password")
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ JetStream: true,
+ Username: "user",
+ Password: "password",
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
"ers": { // EventReaderService
@@ -325,6 +411,7 @@ func TestNatsERJetStreamUser(t *testing.T) {
],
"opts": {
"natsJetStream": true,
+ "natsStreamName": "test",
"natsJetStreamProcessed": true,
"natsSubjectProcessed": "processed_cdrs",
}
@@ -342,16 +429,17 @@ func TestNatsERJetStreamUser(t *testing.T) {
}
func TestNatsERUser(t *testing.T) {
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "--user", "user", "--pass", "password")
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ Username: "user",
+ Password: "password",
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
"ers": { // EventReaderService
@@ -388,16 +476,17 @@ func TestNatsERUser(t *testing.T) {
}
func TestNatsERJetStreamToken(t *testing.T) {
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "-js", "--auth", "token")
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ JetStream: true,
+ Authorization: "token",
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
"ers": { // EventReaderService
@@ -419,6 +508,7 @@ func TestNatsERJetStreamToken(t *testing.T) {
],
"opts": {
"natsJetStream": true,
+ "natsStreamName": "test",
"natsJetStreamProcessed": true,
"natsSubjectProcessed": "processed_cdrs",
}
@@ -436,16 +526,17 @@ func TestNatsERJetStreamToken(t *testing.T) {
}
func TestNatsERToken(t *testing.T) {
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "--auth", "token")
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ JetStream: true,
+ Authorization: "token",
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
"ers": { // EventReaderService
@@ -493,25 +584,21 @@ func TestNatsERNkey(t *testing.T) {
if err := os.WriteFile(seedFilePath, []byte("SUAOUIE5CU47NCO22GHFEZXGCRCJDVTHDLMIP4L7UQNCR5SW4FZICI7O3Q"), 0664); err != nil {
t.Fatal(err)
}
- natsCfgPath := path.Join(basePath, "nats.cfg")
- if err := os.WriteFile(natsCfgPath, []byte(`authorization: {
- users: [
- { nkey: UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY }
- ]
- }
-`), 0664); err != nil {
+
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ Nkeys: []*server.NkeyUser{
+ {
+ Nkey: "UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY",
+ },
+ },
+ })
+ if err != nil {
t.Fatal(err)
}
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "-c", natsCfgPath)
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
- }
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{
"ers": { // EventReaderService
@@ -561,25 +648,22 @@ func TestNatsERJetStreamNKey(t *testing.T) {
if err := os.WriteFile(seedFilePath, []byte("SUAOUIE5CU47NCO22GHFEZXGCRCJDVTHDLMIP4L7UQNCR5SW4FZICI7O3Q"), 0664); err != nil {
t.Fatal(err)
}
- natsCfgPath := path.Join(basePath, "nats.cfg")
- if err := os.WriteFile(natsCfgPath, []byte(`authorization: {
-users: [
- { nkey: UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY }
-]
-}
-`), 0664); err != nil {
+
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ JetStream: true,
+ Nkeys: []*server.NkeyUser{
+ {
+ Nkey: "UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY",
+ },
+ },
+ })
+ if err != nil {
t.Fatal(err)
}
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "-c", natsCfgPath, "-js")
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
- }
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{
"ers": { // EventReaderService
@@ -601,6 +685,7 @@ users: [
],
"opts": {
"natsJetStream": true,
+ "natsStreamName": "test",
"natsSeedFile": %q,
"natsJetStreamProcessed": true,
"natsSubjectProcessed": "processed_cdrs",
@@ -657,16 +742,16 @@ resolver_preload: {
`), 0664); err != nil {
t.Fatal(err)
}
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "-c", natsCfgPath)
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ ConfigFile: natsCfgPath,
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{
"ers": { // EventReaderService
@@ -746,16 +831,18 @@ system_account:AAFIBB6C56ROU5XRVJLJYR3BTGGYK3HJGHEHQV7L7QZMTT3ZRBLHBS7F
`), 0664); err != nil {
t.Fatal(err)
}
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "-c", natsCfgPath, "-js")
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ ConfigFile: natsCfgPath,
+ JetStream: true,
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(100 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
+
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{
"ers": { // EventReaderService
"enabled": true, // starts the EventReader service:
@@ -776,6 +863,7 @@ system_account:AAFIBB6C56ROU5XRVJLJYR3BTGGYK3HJGHEHQV7L7QZMTT3ZRBLHBS7F
],
"opts": {
"natsJetStream": true,
+ "natsStreamName": "test",
"natsJWTFile": %q,
"natsJetStreamProcessed": true,
"natsSubjectProcessed": "processed_cdrs",
diff --git a/ers/s3.go b/ers/s3.go
index eee3e71f5..1387513e2 100644
--- a/ers/s3.go
+++ b/ers/s3.go
@@ -191,7 +191,7 @@ func (rdr *S3ER) readLoop() (err error) {
}
func (rdr *S3ER) createPoster() {
- processedOpt := getProcessOptions(rdr.Config().Opts)
+ processedOpt := getProcessedOptions(rdr.Config().Opts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
diff --git a/ers/sql.go b/ers/sql.go
index de535054c..5d8813474 100644
--- a/ers/sql.go
+++ b/ers/sql.go
@@ -297,7 +297,7 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string, opts *config.EventReader
}
// outURL
- processedOpt := getProcessOptions(opts)
+ processedOpt := getProcessedOptions(opts)
if processedOpt == nil {
if len(outURL) == 0 {
return
diff --git a/ers/sqs.go b/ers/sqs.go
index 42e37771c..d8ae6e736 100644
--- a/ers/sqs.go
+++ b/ers/sqs.go
@@ -220,7 +220,7 @@ func (rdr *SQSER) readLoop(scv sqsClient) (err error) {
}
func (rdr *SQSER) createPoster() {
- processedOpt := getProcessOptions(rdr.Config().Opts)
+ processedOpt := getProcessedOptions(rdr.Config().Opts)
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
return
}
diff --git a/go.mod b/go.mod
index cc5f97efa..9cc8803d6 100644
--- a/go.mod
+++ b/go.mod
@@ -25,6 +25,7 @@ require (
github.com/cgrates/ugocodec v0.0.0-20201023092048-df93d0123f60
github.com/creack/pty v1.1.18
github.com/dgrijalva/jwt-go v3.2.0+incompatible
+ github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6
github.com/elastic/go-elasticsearch/v8 v8.8.0
github.com/ericlagergren/decimal v0.0.0-20211103172832-aca2edc11f73
github.com/fiorix/go-diameter/v4 v4.0.4
@@ -32,15 +33,15 @@ require (
github.com/go-sql-driver/mysql v1.6.0
github.com/mediocregopher/radix/v3 v3.8.0
github.com/miekg/dns v1.1.50
- github.com/nats-io/nats.go v1.16.0
+ github.com/nats-io/nats.go v1.30.2
github.com/nyaruka/phonenumbers v1.1.0
github.com/peterh/liner v1.2.2
github.com/rabbitmq/amqp091-go v1.5.0
github.com/segmentio/kafka-go v0.4.32
go.mongodb.org/mongo-driver v1.11.0
- golang.org/x/crypto v0.7.0
- golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
- golang.org/x/net v0.8.0
+ golang.org/x/crypto v0.13.0
+ golang.org/x/exp v0.0.0-20230905200255-921286631fa9
+ golang.org/x/net v0.15.0
golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2
google.golang.org/api v0.85.0
gorm.io/driver/mysql v1.3.4
@@ -80,11 +81,11 @@ require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
- github.com/klauspost/compress v1.15.6 // indirect
+ github.com/klauspost/compress v1.17.0 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mschoch/smat v0.2.0 // indirect
- github.com/nats-io/nats-server/v2 v2.2.6 // indirect
- github.com/nats-io/nkeys v0.3.0 // indirect
+ github.com/nats-io/nats-server/v2 v2.10.1
+ github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.12.2
@@ -97,11 +98,11 @@ require (
github.com/xdg/stringprep v1.0.3 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.opencensus.io v0.23.0 // indirect
- golang.org/x/mod v0.8.0 // indirect
- golang.org/x/sync v0.1.0 // indirect
- golang.org/x/sys v0.6.0 // indirect
- golang.org/x/text v0.8.0 // indirect
- golang.org/x/tools v0.6.0 // indirect
+ golang.org/x/mod v0.12.0 // indirect
+ golang.org/x/sync v0.3.0 // indirect
+ golang.org/x/sys v0.12.0 // indirect
+ golang.org/x/text v0.13.0 // indirect
+ golang.org/x/tools v0.13.0 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220627200112-0a929928cb33 // indirect
@@ -114,11 +115,12 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.2.2 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
- github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
+ github.com/minio/highwayhash v1.0.2 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
+ github.com/nats-io/jwt/v2 v2.5.2 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.35.0 // indirect
@@ -127,4 +129,5 @@ require (
github.com/xdg-go/scram v1.1.1 // indirect
github.com/xdg-go/stringprep v1.0.3 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
+ golang.org/x/time v0.3.0 // indirect
)
diff --git a/go.sum b/go.sum
index 0b8129476..e604dd536 100644
--- a/go.sum
+++ b/go.sum
@@ -185,14 +185,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6 h1:1+44gxLdKRnR/Bx/iAtr+XqNcE4e0oODa63+FABNANI=
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
-github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo=
-github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
-github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA=
-github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg=
github.com/elastic/go-elasticsearch/v8 v8.8.0 h1:yNBPlXNo6wstMG7I3KiZPbLFgA82RMryYqkh1xBMV3A=
github.com/elastic/go-elasticsearch/v8 v8.8.0/go.mod h1:NGmpvohKiRHXI0Sw4fuUGn6hYOmAXlyCphKpzVBiqDE=
-github.com/elastic/go-elasticsearch/v8 v8.8.1 h1:/OiP5Yex40q5eWpzFVQIS8jRE7SaEZrFkG9JbE6TXtY=
-github.com/elastic/go-elasticsearch/v8 v8.8.1/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@@ -409,11 +403,10 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
-github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
-github.com/klauspost/compress v1.15.6 h1:6D9PcO8QWu0JyaQ2zUMmu16T1T+zjjEpP91guRsvDfY=
-github.com/klauspost/compress v1.15.6/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
+github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
+github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kljensen/snowball v0.6.0/go.mod h1:27N7E8fVU5H68RlUmnWwZCfxgt4POBJfENGMvNRhldw=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@@ -447,8 +440,8 @@ github.com/mediocregopher/radix/v3 v3.8.0 h1:HI8EgkaM7WzsrFpYAkOXIgUKbjNonb2Ne7K
github.com/mediocregopher/radix/v3 v3.8.0/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA=
github.com/miekg/dns v1.1.50/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME=
-github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
-github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
+github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
+github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@@ -463,18 +456,14 @@ github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
-github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
-github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
-github.com/nats-io/jwt/v2 v2.0.2 h1:ejVCLO8gu6/4bOKIHQpmB5UhhUJfAQw55yvLWpfmKjI=
-github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
-github.com/nats-io/nats-server/v2 v2.2.6 h1:FPK9wWx9pagxcw14s8W9rlfzfyHm61uNLnJyybZbn48=
-github.com/nats-io/nats-server/v2 v2.2.6/go.mod h1:sEnFaxqe09cDmfMgACxZbziXnhQFhwk+aKkZjBBRYrI=
-github.com/nats-io/nats.go v1.11.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
-github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g=
-github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
-github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
-github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
-github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
+github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
+github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
+github.com/nats-io/nats-server/v2 v2.10.1 h1:MIJ614dhOIdo71iSzY8ln78miXwrYvlvXHUyS+XdKZQ=
+github.com/nats-io/nats-server/v2 v2.10.1/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
+github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY=
+github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
+github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
+github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nyaruka/phonenumbers v1.1.0 h1:OvNAOAl4A9a2kNpzziITbUVH4bBBeKHkHl0llPmkxaA=
@@ -643,16 +632,14 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200320181102-891825fb96df/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
-golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
-golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
-golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
-golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
+golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
+golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -663,8 +650,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
-golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb h1:PaBZQdo+iSDyHT053FjUCgZQ/9uqVwPOcl7KSWhKn6w=
-golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
+golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
+golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -690,8 +677,8 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
-golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
-golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
+golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
+golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -743,8 +730,8 @@ golang.org/x/net v0.0.0-20220412020605-290c469a71a5/go.mod h1:CfG3xpIq0wQ8r1q4Su
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220617184016-355a448f1bc9/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
-golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
-golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
+golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
+golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -779,8 +766,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
-golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
+golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -860,8 +847,8 @@ golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
-golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
+golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@@ -874,13 +861,13 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
-golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
-golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
+golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
+golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
-golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
-golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
+golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
@@ -939,8 +926,8 @@ golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
-golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM=
-golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
+golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ=
+golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
diff --git a/utils/consts.go b/utils/consts.go
index e9a4acdd7..2f4306eb0 100644
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -2524,6 +2524,7 @@ const (
NatsSubject = "natsSubject"
NatsQueueID = "natsQueueID"
NatsConsumerName = "natsConsumerName"
+ NatsStreamName = "natsStreamName"
NatsJWTFile = "natsJWTFile"
NatsSeedFile = "natsSeedFile"
NatsClientCertificate = "natsClientCertificate"