From d0a435aa6d372dbd5a23f0c2a87bf13d80455252 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 27 Sep 2023 13:03:23 -0400 Subject: [PATCH] Migrate to new jetstream API Upgraded go.mod nats version due to an issue caused by version mismatch between driver and server (uncertain). Renamed function from getProcessOptions to getProcessedOptions. ## *NatsER.Serve - Replaced ChanQueueSubscribe with QueueSubscribe for Core NATS consumer to handle the message processing directly. - Since QueueSubscribe is now used regardless of jetstream status, the message handler has been assigned to a separate variable that can be reused. - The message handler is now dealing with the message processing directly, therefore the select case listening for the channel which is feeding NATS messages can be removed together with the channel itself and the select. Currently, the goroutine within Serve only has to block until the rdrExit chan is closed. - Moved the resource check inside the handler right before starting the message processing goroutine. ## *NatsEE.parseOpts - Renamed function from parseOpt to parseOpts. - Handled the error coming from GetNatsOpts function. ## *NatsEE.Connect - Updated function to return early in case of non-nil nats.Conn value to reduce nesting. ## *NatsEE.ExportEvent - Use defer to release resources and RUnlock. ## *NatsEE.Close - Use defer to Unlock. - Update function to return early in case of nil nats.Conn value to reduce nesting. ## ees.GetNatsOpts - Chose switch over if else when parsing client certificate and keys opts. - Updated function to return the errors directly instead of assigning them to a separate variable right before returning. ## ers.GetNatsOpts - Chose switch over if else when parsing client certificate and keys opts. - Updated function to return the errors directly instead of assigning them to a separate variable right before returning. Removed tab from commented natsJetStreamMaxWaitProcessed option value in config_defaults.go under ers section. Added integration test for ERs NATS. Updated ees/ers implementation to use the jetstream package which separates the jetstream context from Core NATS. Removed the jsOpts fields from the NatsEE struct. We are now using the jetStreamMaxWait option directly through a timeout context. Added streamName option for NATS reader since it is now required to be specified when creating a consumer (it is not inferred based on subject anymore). Updated nats ers integration tests. Updated tests to also use the new jetstream package. Updated tests to start the nats-server using their official driver instead of using the std go exec package. time.Sleeps are now not required anymore to wait for the server. In test configurations for nats readers, made sure that natsStreamName option is populated. It is now required for consumers to know where to subscribe. --- config/config_defaults.go | 3 +- config/erscfg.go | 20 ++ config/erscfg_test.go | 9 + data/conf/samples/ers_nats/cgrates.json | 75 +++++ ees/nats.go | 117 ++++---- ees/nats_it_test.go | 113 ++++--- ers/amqp.go | 2 +- ers/amqpv1.go | 2 +- ers/kafka.go | 2 +- ers/libers.go | 7 +- ers/libers_test.go | 2 +- ers/nats.go | 214 ++++++++------ ers/nats_it_test.go | 374 +++++++++++++++--------- ers/s3.go | 2 +- ers/sql.go | 2 +- ers/sqs.go | 2 +- go.mod | 29 +- go.sum | 73 ++--- utils/consts.go | 1 + 19 files changed, 658 insertions(+), 391 deletions(-) create mode 100644 data/conf/samples/ers_nats/cgrates.json diff --git a/config/config_defaults.go b/config/config_defaults.go index cb586bbf7..6c611b344 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -469,6 +469,7 @@ const CGRATES_CFG_JSON = ` // nats // "natsJetStream": false, // controls if the nats reader uses the JetStream // "natsConsumerName": "cgrates", // in case of JetStream the name of the consumer + // "natsStreamName": "cdrs", // the name of the NATS JetStream stream from which the consumer will read messages "natsSubject": "cgrates_cdrs", // the subject from were the events are read // "natsQueueID": "", // the queue id the consumer listen to // "natsJWTFile": "", // the path to the JWT file( can be the chained file or the user file) @@ -485,7 +486,7 @@ const CGRATES_CFG_JSON = ` // "natsCertificateAuthorityProcessed": "", // the path to a custom certificate authority file( used by tls) // "natsClientCertificateProcessed": "", // the path to a client certificate( used by tls) // "natsClientKeyProcessed": "", // the path to a client key( used by tls) - // "natsJetStreamMaxWaitProcessed": "5s ", // the maximum amount of time to wait for a response + // "natsJetStreamMaxWaitProcessed": "5s", // the maximum amount of time to wait for a response }, "tenant": "", // tenant used by import "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> diff --git a/config/erscfg.go b/config/erscfg.go index eee0e0eea..327ef9c0d 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -181,6 +181,7 @@ type EventReaderOpts struct { S3BucketIDProcessed *string NATSJetStream *bool NATSConsumerName *string + NATSStreamName *string NATSSubject *string NATSQueueID *string NATSJWTFile *string @@ -384,6 +385,9 @@ func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err if jsnCfg.NATSConsumerName != nil { erOpts.NATSConsumerName = jsnCfg.NATSConsumerName } + if jsnCfg.NATSStreamName != nil { + erOpts.NATSStreamName = jsnCfg.NATSStreamName + } if jsnCfg.NATSSubject != nil { erOpts.NATSSubject = jsnCfg.NATSSubject } @@ -731,6 +735,10 @@ func (erOpts *EventReaderOpts) Clone() *EventReaderOpts { cln.NATSConsumerName = new(string) *cln.NATSConsumerName = *erOpts.NATSConsumerName } + if erOpts.NATSStreamName != nil { + cln.NATSStreamName = new(string) + *cln.NATSStreamName = *erOpts.NATSStreamName + } if erOpts.NATSSubject != nil { cln.NATSSubject = new(string) *cln.NATSSubject = *erOpts.NATSSubject @@ -999,6 +1007,9 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string if er.Opts.NATSConsumerName != nil { opts[utils.NatsConsumerName] = *er.Opts.NATSConsumerName } + if er.Opts.NATSStreamName != nil { + opts[utils.NatsStreamName] = *er.Opts.NATSStreamName + } if er.Opts.NATSSubject != nil { opts[utils.NatsSubject] = *er.Opts.NATSSubject } @@ -1150,6 +1161,7 @@ type EventReaderOptsJson struct { S3BucketIDProcessed *string `json:"s3BucketIDProcessed"` NATSJetStream *bool `json:"natsJetStream"` NATSConsumerName *string `json:"natsConsumerName"` + NATSStreamName *string `json:"natsStreamName"` NATSSubject *string `json:"natsSubject"` NATSQueueID *string `json:"natsQueueID"` NATSJWTFile *string `json:"natsJWTFile"` @@ -1614,6 +1626,14 @@ func diffEventReaderOptsJsonCfg(d *EventReaderOptsJson, v1, v2 *EventReaderOpts) } else { d.NATSConsumerName = nil } + if v2.NATSStreamName != nil { + if v1.NATSStreamName == nil || + *v1.NATSStreamName != *v2.NATSStreamName { + d.NATSStreamName = v2.NATSStreamName + } + } else { + d.NATSStreamName = nil + } if v2.NATSSubject != nil { if v1.NATSSubject == nil || *v1.NATSSubject != *v2.NATSSubject { diff --git a/config/erscfg_test.go b/config/erscfg_test.go index eb3b378c9..514b5c2bb 100644 --- a/config/erscfg_test.go +++ b/config/erscfg_test.go @@ -1781,6 +1781,7 @@ func TestERsLoadFromJSONCfg(t *testing.T) { S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"), NATSJetStream: utils.BoolPointer(false), NATSConsumerName: utils.StringPointer("consumer_name"), + NATSStreamName: utils.StringPointer("stream_name"), NATSSubject: utils.StringPointer("subject"), NATSQueueID: utils.StringPointer("queue_id"), NATSJWTFile: utils.StringPointer("jsw_file"), @@ -1849,6 +1850,7 @@ func TestERsLoadFromJSONCfg(t *testing.T) { S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"), NATSJetStream: utils.BoolPointer(false), NATSConsumerName: utils.StringPointer("consumer_name"), + NATSStreamName: utils.StringPointer("stream_name"), NATSSubject: utils.StringPointer("subject"), NATSQueueID: utils.StringPointer("queue_id"), NATSJWTFile: utils.StringPointer("jsw_file"), @@ -1934,6 +1936,7 @@ func TestERsLoadFromJsonCfgParseError(t *testing.T) { S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"), NATSJetStream: utils.BoolPointer(false), NATSConsumerName: utils.StringPointer("consumer_name"), + NATSStreamName: utils.StringPointer("stream_name"), NATSSubject: utils.StringPointer("subject"), NATSQueueID: utils.StringPointer("queue_id"), NATSJWTFile: utils.StringPointer("jsw_file"), @@ -2022,6 +2025,7 @@ func TestERsClone(t *testing.T) { S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"), NATSJetStream: utils.BoolPointer(false), NATSConsumerName: utils.StringPointer("consumer_name"), + NATSStreamName: utils.StringPointer("stream_name"), NATSSubject: utils.StringPointer("subject"), NATSQueueID: utils.StringPointer("queue_id"), NATSJWTFile: utils.StringPointer("jsw_file"), @@ -2090,6 +2094,7 @@ func TestERsClone(t *testing.T) { S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"), NATSJetStream: utils.BoolPointer(false), NATSConsumerName: utils.StringPointer("consumer_name"), + NATSStreamName: utils.StringPointer("stream_name"), NATSSubject: utils.StringPointer("subject"), NATSQueueID: utils.StringPointer("queue_id"), NATSJWTFile: utils.StringPointer("jsw_file"), @@ -2166,6 +2171,7 @@ func TestERsAsMapInterface(t *testing.T) { S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"), NATSJetStream: utils.BoolPointer(false), NATSConsumerName: utils.StringPointer("consumer_name"), + NATSStreamName: utils.StringPointer("stream_name"), NATSSubject: utils.StringPointer("subject"), NATSQueueID: utils.StringPointer("queue_id"), NATSJWTFile: utils.StringPointer("jsw_file"), @@ -2314,6 +2320,7 @@ func TestDiffEventReaderOptsJsonCfg(t *testing.T) { S3BucketIDProcessed: utils.StringPointer("bucket_id_processed_diff"), NATSJetStream: utils.BoolPointer(true), NATSConsumerName: utils.StringPointer("consumer_name_diff"), + NATSStreamName: utils.StringPointer("stream_name_diff"), NATSSubject: utils.StringPointer("subject_diff"), NATSQueueID: utils.StringPointer("queue_id_diff"), NATSJWTFile: utils.StringPointer("jsw_file_diff"), @@ -2382,6 +2389,7 @@ func TestDiffEventReaderOptsJsonCfg(t *testing.T) { S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"), NATSJetStream: utils.BoolPointer(false), NATSConsumerName: utils.StringPointer("consumer_name"), + NATSStreamName: utils.StringPointer("stream_name"), NATSSubject: utils.StringPointer("subject"), NATSQueueID: utils.StringPointer("queue_id"), NATSJWTFile: utils.StringPointer("jsw_file"), @@ -2450,6 +2458,7 @@ func TestDiffEventReaderOptsJsonCfg(t *testing.T) { S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"), NATSJetStream: utils.BoolPointer(false), NATSConsumerName: utils.StringPointer("consumer_name"), + NATSStreamName: utils.StringPointer("stream_name"), NATSSubject: utils.StringPointer("subject"), NATSQueueID: utils.StringPointer("queue_id"), NATSJWTFile: utils.StringPointer("jsw_file"), diff --git a/data/conf/samples/ers_nats/cgrates.json b/data/conf/samples/ers_nats/cgrates.json new file mode 100644 index 000000000..4adffed95 --- /dev/null +++ b/data/conf/samples/ers_nats/cgrates.json @@ -0,0 +1,75 @@ +{ + +"general": { + "log_level": 7 +}, + +"data_db": { + "db_type": "*internal" +}, + +"stor_db": { + "db_type": "*internal" +}, + +"ers": { + "enabled": true, + "sessions_conns":[], + "readers": [ + { + "id": "nats_reader1", + "type": "*natsJSONMap", + "source_path": "nats://127.0.0.1:4222", + "processed_path": "nats://127.0.0.1:4222", + "opts": { + "natsJetStream": true, + "natsConsumerName": "cgrates", + "natsStreamName": "stream", + "natsSubject": "cgrates_cdrs", + "natsQueueID": "queue", + "natsJetStreamMaxWait": "5s", + + "natsJetStreamProcessed": true, + "natsSubjectProcessed": "cgrates_cdrs_processed", + "natsJetStreamMaxWaitProcessed": "5s" + }, + "flags": ["*dryrun"], + "fields":[ + {"tag": "cdr_template", "type": "*template", "value": "cdr_template"} + ] + }, + { + "id": "nats_reader2", + "type": "*natsJSONMap", + "source_path": "nats://127.0.0.1:4222", + "processed_path": "nats://127.0.0.1:4222", + "opts": { + "natsJetStream": true, + "natsConsumerName": "cgrates", + "natsStreamName": "stream", + "natsSubject": "cgrates_cdrs", + "natsQueueID": "queue", + "natsJetStreamMaxWait": "5s", + + "natsJetStreamProcessed": true, + "natsSubjectProcessed": "cgrates_cdrs_processed", + "natsJetStreamMaxWaitProcessed": "5s" + }, + "flags": ["*dryrun"], + "fields":[ + {"tag": "cdr_template", "type": "*template", "value": "cdr_template"} + ] + } + ] +}, + + +"templates": { + "cdr_template": [ + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.Subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true} + ] +} + +} \ No newline at end of file diff --git a/ees/nats.go b/ees/nats.go index 5c849703d..f38cc69ea 100644 --- a/ees/nats.go +++ b/ees/nats.go @@ -22,7 +22,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" - "io/ioutil" + "os" "sync" "time" @@ -30,6 +30,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) // NewNatsEE creates a kafka poster @@ -40,7 +41,7 @@ func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Dur subject: utils.DefaultQueueID, reqs: newConcReq(cfg.ConcurrentRequests), } - err = natsPstr.parseOpt(cfg.Opts, nodeID, connTimeout) + err = natsPstr.parseOpts(cfg.Opts, nodeID, connTimeout) return } @@ -49,10 +50,9 @@ type NatsEE struct { subject string // identifier of the CDR queue where we publish jetStream bool opts []nats.Option - jsOpts []nats.JSOpt poster *nats.Conn - posterJS nats.JetStreamContext + posterJS jetstream.JetStream cfg *config.EventExporterCfg dc *utils.SafeMapStorage @@ -61,74 +61,79 @@ type NatsEE struct { bytePreparing } -func (pstr *NatsEE) parseOpt(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (err error) { +func (pstr *NatsEE) parseOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) error { if opts.NATSJetStream != nil { pstr.jetStream = *opts.NATSJetStream } - pstr.subject = utils.DefaultQueueID if opts.NATSSubject != nil { pstr.subject = *opts.NATSSubject } + var err error pstr.opts, err = GetNatsOpts(opts, nodeID, connTimeout) - if pstr.jetStream { - if opts.NATSJetStreamMaxWait != nil { - pstr.jsOpts = []nats.JSOpt{nats.MaxWait(*opts.NATSJetStreamMaxWait)} - } - } - return + return err } func (pstr *NatsEE) Cfg() *config.EventExporterCfg { return pstr.cfg } -func (pstr *NatsEE) Connect() (err error) { +func (pstr *NatsEE) Connect() error { pstr.Lock() defer pstr.Unlock() - if pstr.poster == nil { - if pstr.poster, err = nats.Connect(pstr.Cfg().ExportPath, pstr.opts...); err != nil { - return - } - if pstr.jetStream { - pstr.posterJS, err = pstr.poster.JetStream(pstr.jsOpts...) - } + if pstr.poster != nil { + return nil } - return -} -func (pstr *NatsEE) ExportEvent(ctx *context.Context, content, _ any) (err error) { - pstr.reqs.get() - pstr.RLock() - if pstr.poster == nil { - pstr.RUnlock() - pstr.reqs.done() - return utils.ErrDisconnected + var err error + pstr.poster, err = nats.Connect(pstr.Cfg().ExportPath, pstr.opts...) + if err != nil { + return err } if pstr.jetStream { - _, err = pstr.posterJS.Publish(pstr.subject, content.([]byte), nats.Context(ctx)) + pstr.posterJS, err = jetstream.New(pstr.poster) + } + return err +} + +func (pstr *NatsEE) ExportEvent(ctx *context.Context, content, _ any) error { + pstr.reqs.get() + defer pstr.reqs.done() + pstr.RLock() + defer pstr.RUnlock() + if pstr.poster == nil { + return utils.ErrDisconnected + } + var err error + if pstr.jetStream { + ctx := context.TODO() + if pstr.cfg.Opts.NATSJetStreamMaxWait != nil { + ctx, _ = context.WithTimeout(ctx, *pstr.cfg.Opts.NATSJetStreamMaxWait) + } + _, err = pstr.posterJS.Publish(ctx, pstr.subject, content.([]byte)) } else { err = pstr.poster.Publish(pstr.subject, content.([]byte)) } - pstr.RUnlock() - pstr.reqs.done() - return + return err } -func (pstr *NatsEE) Close() (err error) { +func (pstr *NatsEE) Close() error { pstr.Lock() - if pstr.poster != nil { - err = pstr.poster.Drain() - pstr.poster = nil + defer pstr.Unlock() + + if pstr.poster == nil { + return nil } - pstr.Unlock() - return + + err := pstr.poster.Drain() + pstr.poster = nil + return err } func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } func (pstr *NatsEE) ExtraData(ev *utils.CGREvent) any { return nil } -func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) { - nop = make([]nats.Option, 0, 7) - nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID), +func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) ([]nats.Option, error) { + natsOpts := make([]nats.Option, 0, 7) + natsOpts = append(natsOpts, nats.Name(utils.CGRateSLwr+nodeID), nats.Timeout(connTimeout), nats.DrainTimeout(time.Second)) if opts.NATSJWTFile != nil { @@ -136,33 +141,33 @@ func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time if opts.NATSSeedFile != nil { keys = append(keys, *opts.NATSSeedFile) } - nop = append(nop, nats.UserCredentials(*opts.NATSJWTFile, keys...)) + natsOpts = append(natsOpts, nats.UserCredentials(*opts.NATSJWTFile, keys...)) } if opts.NATSSeedFile != nil { opt, err := nats.NkeyOptionFromSeed(*opts.NATSSeedFile) if err != nil { return nil, err } - nop = append(nop, opt) + natsOpts = append(natsOpts, opt) } - if opts.NATSClientCertificate != nil { - if opts.NATSClientKey == nil { - err = fmt.Errorf("has certificate but no key") - return - } - nop = append(nop, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey)) - } else if opts.NATSClientKey != nil { - err = fmt.Errorf("has key but no certificate") - return + + switch { + case opts.NATSClientCertificate != nil && opts.NATSClientKey != nil: + natsOpts = append(natsOpts, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey)) + case opts.NATSClientCertificate != nil: + return nil, fmt.Errorf("has certificate but no key") + case opts.NATSClientKey != nil: + return nil, fmt.Errorf("has key but no certificate") } + if opts.NATSCertificateAuthority != nil { - nop = append(nop, + natsOpts = append(natsOpts, func(o *nats.Options) error { pool, err := x509.SystemCertPool() if err != nil { return err } - rootPEM, err := ioutil.ReadFile(*opts.NATSCertificateAuthority) + rootPEM, err := os.ReadFile(*opts.NATSCertificateAuthority) if err != nil || rootPEM == nil { return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err) } @@ -179,5 +184,5 @@ func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time return nil }) } - return + return natsOpts, nil } diff --git a/ees/nats_it_test.go b/ees/nats_it_test.go index b168e0be9..8b685c87c 100644 --- a/ees/nats_it_test.go +++ b/ees/nats_it_test.go @@ -22,7 +22,7 @@ along with this program. If not, see package ees import ( - "os/exec" + "os" "path" "testing" "time" @@ -31,18 +31,25 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) -func TestNatsEE(t *testing.T) { - testCreateDirectory(t) - var err error - cmd := exec.Command("nats-server", "-js") // Start the nats-server. - if err := cmd.Start(); err != nil { - t.Fatal(err) // Only if nats-server is not installed. +func TestNatsEEJetStream(t *testing.T) { + + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + JetStream: true, + }) + if err != nil { + t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() + natsServer.Start() + defer natsServer.Shutdown() + + testCreateDirectory(t) cgrCfg, err := config.NewCGRConfigFromPath(context.Background(), path.Join(*dataDir, "conf", "samples", "ees")) if err != nil { t.Fatal(err) @@ -63,38 +70,40 @@ func TestNatsEE(t *testing.T) { t.Fatal(err) } - nc, err := nats.Connect("nats://localhost:4222", nop...) + nc, err := nats.Connect(nats.DefaultURL, nop...) if err != nil { t.Fatal(err) } - js, err := nc.JetStream() + defer nc.Drain() + + js, err := jetstream.New(nc) if err != nil { t.Fatal(err) } - for name := range js.StreamNames() { - if name == "test2" { - if err = js.DeleteStream("test2"); err != nil { - t.Fatal(err) - } - break - } - } - if _, err = js.AddStream(&nats.StreamConfig{ + _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ Name: "test2", Subjects: []string{"processed_cdrs"}, - }); err != nil { + }) + if err != nil { + t.Fatal(err) + } + defer js.DeleteStream(context.Background(), "test2") + + ch := make(chan string, 3) + var cons jetstream.Consumer + cons, err = js.CreateOrUpdateConsumer(context.Background(), "test2", jetstream.ConsumerConfig{ + Durable: "test4", + FilterSubject: "processed_cdrs", + AckPolicy: jetstream.AckAllPolicy, + }) + if err != nil { t.Fatal(err) } - if err = js.PurgeStream("test2"); err != nil { - t.Fatal(err) - } - - ch := make(chan *nats.Msg, 3) - _, err = js.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) { - ch <- msg - }, nats.Durable("test4")) + _, err = cons.Consume(func(msg jetstream.Msg) { + ch <- string(msg.Data()) + }) if err != nil { t.Fatal(err) } @@ -114,24 +123,26 @@ func TestNatsEE(t *testing.T) { // fmt.Println((<-ch).Data) select { case data := <-ch: - if expected != string(data.Data) { - t.Fatalf("Expected %v \n but received \n %v", expected, string(data.Data)) + if expected != data { + t.Fatalf("Expected %v \n but received \n %v", expected, data) } case <-time.After(50 * time.Millisecond): t.Fatal("Time limit exceeded") } } -func TestNatsEE2(t *testing.T) { +func TestNatsEE(t *testing.T) { testCreateDirectory(t) - exec.Command("pkill", "nats-server") - cmd := exec.Command("nats-server") - if err := cmd.Start(); err != nil { + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + }) + if err != nil { t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() + natsServer.Start() + defer natsServer.Shutdown() cgrCfg, err := config.NewCGRConfigFromPath(context.Background(), path.Join(*dataDir, "conf", "samples", "ees")) if err != nil { @@ -187,3 +198,31 @@ func TestNatsEE2(t *testing.T) { t.Fatal("Time limit exceeded") } } + +func TestGetNatsOptsSeedFile(t *testing.T) { + if _, err := os.Create("/tmp/nkey.txt"); err != nil { + t.Error(err) + } + defer os.Remove("/tmp/nkey.txt") + nkey := "SUACSSL3UAHUDXKFSNVUZRF5UHPMWZ6BFDTJ7M6USDXIEDNPPQYYYCU3VY" + os.WriteFile("/tmp/nkey.txt", []byte(nkey), 0777) + + opts := &config.EventExporterOpts{ + NATSSeedFile: utils.StringPointer("/tmp/nkey.txt"), + } + + nodeID := "node_id1" + connTimeout := 2 * time.Second + + _, err := GetNatsOpts(opts, nodeID, connTimeout) + if err != nil { + t.Error(err) + } + + //test error + os.WriteFile("/tmp/nkey.txt", []byte(""), 0777) + _, err = GetNatsOpts(opts, nodeID, connTimeout) + if err == nil || err.Error() != "nkeys: no nkey seed found" { + t.Errorf("expected \"%s\" but received \"%s\"", "nkeys: no nkey seed found", err.Error()) + } +} diff --git a/ers/amqp.go b/ers/amqp.go index fcfa13a8f..c14c47014 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -254,7 +254,7 @@ func (rdr *AMQPER) close() (err error) { } func (rdr *AMQPER) createPoster() { - processedOpt := getProcessOptions(rdr.Config().Opts) + processedOpt := getProcessedOptions(rdr.Config().Opts) if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { return } diff --git a/ers/amqpv1.go b/ers/amqpv1.go index 7ef3f0d12..abd2e131f 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -211,7 +211,7 @@ func (rdr *AMQPv1ER) close() (err error) { } func (rdr *AMQPv1ER) createPoster() { - processedOpt := getProcessOptions(rdr.Config().Opts) + processedOpt := getProcessedOptions(rdr.Config().Opts) if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { return } diff --git a/ers/kafka.go b/ers/kafka.go index e516cedde..e91698214 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -250,7 +250,7 @@ func (rdr *KafkaER) setOpts(opts *config.EventReaderOpts) (err error) { } func (rdr *KafkaER) createPoster() { - processedOpt := getProcessOptions(rdr.Config().Opts) + processedOpt := getProcessedOptions(rdr.Config().Opts) if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { return } diff --git a/ers/libers.go b/ers/libers.go index 54dc61164..7dff1177f 100644 --- a/ers/libers.go +++ b/ers/libers.go @@ -28,8 +28,9 @@ import ( "github.com/cgrates/cgrates/utils" ) -// getProcessOptions assigns all non-nil fields ending in "Processed" from EventReaderOpts to their counterparts in EventExporterOpts -func getProcessOptions(erOpts *config.EventReaderOpts) (eeOpts *config.EventExporterOpts) { +// getProcessedOptions assigns all non-nil fields ending in "Processed" from EventReaderOpts to their counterparts in EventExporterOpts +func getProcessedOptions(erOpts *config.EventReaderOpts) *config.EventExporterOpts { + var eeOpts *config.EventExporterOpts if erOpts.AMQPExchangeProcessed != nil { if eeOpts == nil { eeOpts = new(config.EventExporterOpts) @@ -198,7 +199,7 @@ func getProcessOptions(erOpts *config.EventReaderOpts) (eeOpts *config.EventExpo } eeOpts.PgSSLMode = erOpts.PgSSLModeProcessed } - return + return eeOpts } // mergePartialEvents will unite the events using the reader configuration diff --git a/ers/libers_test.go b/ers/libers_test.go index 3af897db0..0f10ba12a 100644 --- a/ers/libers_test.go +++ b/ers/libers_test.go @@ -31,7 +31,7 @@ func TestGetProcessOptions(t *testing.T) { opts := &config.EventReaderOpts{ AWSKeyProcessed: utils.StringPointer("testKey"), } - result := getProcessOptions(opts) + result := getProcessedOptions(opts) expected := &config.EventExporterOpts{ AWSKey: utils.StringPointer("testKey"), } diff --git a/ers/nats.go b/ers/nats.go index e14f9c8d5..b130af932 100644 --- a/ers/nats.go +++ b/ers/nats.go @@ -23,7 +23,7 @@ import ( "crypto/x509" "encoding/json" "fmt" - "io/ioutil" + "os" "time" "github.com/cgrates/birpc/context" @@ -33,12 +33,13 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) // NewNatsER return a new amqp event reader func NewNatsER(cfg *config.CGRConfig, cfgIdx int, rdrEvents, partialEvents chan *erEvent, rdrErr chan error, - fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (_ EventReader, err error) { + fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (EventReader, error) { rdr := &NatsER{ connMgr: connMgr, cgrCfg: cfg, @@ -55,11 +56,14 @@ func NewNatsER(cfg *config.CGRConfig, cfgIdx int, rdr.cap <- struct{}{} } } - if err = rdr.processOpts(); err != nil { - return + var err error + err = rdr.processOpts() + if err != nil { + return nil, err } - if err = rdr.createPoster(); err != nil { - return + err = rdr.createPoster() + if err != nil { + return nil, err } return rdr, nil } @@ -82,8 +86,8 @@ type NatsER struct { queueID string jetStream bool consumerName string + streamName string opts []nats.Option - jsOpts []nats.JSOpt poster *ees.NatsEE } @@ -93,69 +97,106 @@ func (rdr *NatsER) Config() *config.EventReaderCfg { return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] } -// Serve will start the gorutines needed to watch the nats subject -func (rdr *NatsER) Serve() (err error) { - // Connect to a server - var nc *nats.Conn - var js nats.JetStreamContext +// Serve will subscribe to a NATS subject and process incoming messages until the rdrExit channel +// will be closed. +func (rdr *NatsER) Serve() error { - if nc, err = nats.Connect(rdr.Config().SourcePath, rdr.opts...); err != nil { - return + // Establish a connection to the nats server. + nc, err := nats.Connect(rdr.Config().SourcePath, rdr.opts...) + if err != nil { + return err } - ch := make(chan *nats.Msg) + + // Define the message handler. Its content will get executed for every received message. + handleMessage := func(msgData []byte) { + + // If the rdr.cap channel buffer is empty, block until a resource is available. Otherwise + // allocate one resource and start processing the message. + if rdr.Config().ConcurrentReqs != -1 { + <-rdr.cap + } + go func() { + handlerErr := rdr.processMessage(msgData) + if handlerErr != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> processing message %s error: %s", + utils.ERs, string(msgData), handlerErr.Error())) + } + + // Export the received message if a poster has been defined. + if rdr.poster != nil { + handlerErr = ees.ExportWithAttempts(context.TODO(), rdr.poster, msgData, + utils.EmptyString, rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant) + if handlerErr != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> writing message %s error: %s", + utils.ERs, string(msgData), handlerErr.Error())) + } + } + + // Release the resource back to rdr.cap channel. + if rdr.Config().ConcurrentReqs != -1 { + rdr.cap <- struct{}{} + } + + }() + } + + // Subscribe to the appropriate NATS subject. if !rdr.jetStream { - if _, err = nc.ChanQueueSubscribe(rdr.subject, rdr.queueID, ch); err != nil { - return + _, err = nc.QueueSubscribe(rdr.subject, rdr.queueID, func(msg *nats.Msg) { + handleMessage(msg.Data) + }) + if err != nil { + nc.Drain() + return err } } else { - js, err = nc.JetStream(rdr.jsOpts...) + var js jetstream.JetStream + js, err = jetstream.New(nc) if err != nil { - return + nc.Drain() + return err } - if _, err = js.QueueSubscribe(rdr.subject, rdr.queueID, func(msg *nats.Msg) { - ch <- msg - }, nats.Durable(rdr.consumerName)); err != nil { - return + ctx := context.TODO() + if jsMaxWait := rdr.Config().Opts.NATSJetStreamMaxWait; jsMaxWait != nil { + ctx, _ = context.WithTimeout(ctx, *jsMaxWait) + } + + var cons jetstream.Consumer + cons, err = js.CreateOrUpdateConsumer(ctx, rdr.streamName, jetstream.ConsumerConfig{ + FilterSubject: rdr.subject, + Durable: rdr.consumerName, + AckPolicy: jetstream.AckAllPolicy, + }) + if err != nil { + nc.Drain() + return err + } + + _, err = cons.Consume(func(msg jetstream.Msg) { + handleMessage(msg.Data()) + }) + if err != nil { + nc.Drain() + return err } } + go func() { - for { - if rdr.Config().ConcurrentReqs != -1 { - <-rdr.cap // do not try to read if the limit is reached - } - select { - case <-rdr.rdrExit: - utils.Logger.Info( - fmt.Sprintf("<%s> stop monitoring nats path <%s>", - utils.ERs, rdr.Config().SourcePath)) - nc.Drain() - if rdr.poster != nil { - rdr.poster.Close() - } - return - case msg := <-ch: - go func(msg *nats.Msg) { - if err := rdr.processMessage(msg.Data); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> processing message %s error: %s", - utils.ERs, string(msg.Data), err.Error())) - } - if rdr.poster != nil { // post it - if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Data, utils.EmptyString, - rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> writing message %s error: %s", - utils.ERs, string(msg.Data), err.Error())) - } - } - if rdr.Config().ConcurrentReqs != -1 { - rdr.cap <- struct{}{} - } - }(msg) - } + + // Wait for exit signal. + <-rdr.rdrExit + utils.Logger.Info( + fmt.Sprintf("<%s> stop monitoring nats path <%s>", + utils.ERs, rdr.Config().SourcePath)) + nc.Drain() + if rdr.poster != nil { + rdr.poster.Close() } }() - return + + return nil } func (rdr *NatsER) processMessage(msg []byte) (err error) { @@ -191,7 +232,7 @@ func (rdr *NatsER) processMessage(msg []byte) (err error) { } func (rdr *NatsER) createPoster() (err error) { - processedOpt := getProcessOptions(rdr.Config().Opts) + processedOpt := getProcessedOptions(rdr.Config().Opts) if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { return } @@ -202,37 +243,34 @@ func (rdr *NatsER) createPoster() (err error) { return } -func (rdr *NatsER) processOpts() (err error) { +func (rdr *NatsER) processOpts() error { if rdr.Config().Opts.NATSSubject != nil { rdr.subject = *rdr.Config().Opts.NATSSubject } - var queueID string + rdr.queueID = rdr.cgrCfg.GeneralCfg().NodeID if rdr.Config().Opts.NATSQueueID != nil { - queueID = *rdr.Config().Opts.NATSQueueID + rdr.queueID = *rdr.Config().Opts.NATSQueueID } - rdr.queueID = utils.FirstNonEmpty(queueID, rdr.cgrCfg.GeneralCfg().NodeID) - var consumerName string + rdr.consumerName = utils.CGRateSLwr if rdr.Config().Opts.NATSConsumerName != nil { - consumerName = *rdr.Config().Opts.NATSConsumerName + rdr.consumerName = *rdr.Config().Opts.NATSConsumerName + } + if rdr.Config().Opts.NATSStreamName != nil { + rdr.streamName = *rdr.Config().Opts.NATSStreamName } - rdr.consumerName = utils.FirstNonEmpty(consumerName, utils.CGRateSLwr) if rdr.Config().Opts.NATSJetStream != nil { rdr.jetStream = *rdr.Config().Opts.NATSJetStream } - if rdr.jetStream { - if rdr.Config().Opts.NATSJetStreamMaxWait != nil { - rdr.jsOpts = []nats.JSOpt{nats.MaxWait(*rdr.Config().Opts.NATSJetStreamMaxWait)} - } - } + var err error rdr.opts, err = GetNatsOpts(rdr.Config().Opts, rdr.cgrCfg.GeneralCfg().NodeID, rdr.cgrCfg.GeneralCfg().ConnectTimeout) - return + return err } func GetNatsOpts(opts *config.EventReaderOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) { - nop = make([]nats.Option, 0, 7) - nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID), + natsOpts := make([]nats.Option, 0, 7) + natsOpts = append(natsOpts, nats.Name(utils.CGRateSLwr+nodeID), nats.Timeout(connTimeout), nats.DrainTimeout(time.Second)) if opts.NATSJWTFile != nil { @@ -240,33 +278,33 @@ func GetNatsOpts(opts *config.EventReaderOpts, nodeID string, connTimeout time.D if opts.NATSSeedFile != nil { keys = append(keys, *opts.NATSSeedFile) } - nop = append(nop, nats.UserCredentials(*opts.NATSJWTFile, keys...)) + natsOpts = append(natsOpts, nats.UserCredentials(*opts.NATSJWTFile, keys...)) } if opts.NATSSeedFile != nil { opt, err := nats.NkeyOptionFromSeed(*opts.NATSSeedFile) if err != nil { return nil, err } - nop = append(nop, opt) + natsOpts = append(natsOpts, opt) } - if opts.NATSClientCertificate != nil { - if opts.NATSClientKey == nil { - err = fmt.Errorf("has certificate but no key") - return - } - nop = append(nop, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey)) - } else if opts.NATSClientKey != nil { - err = fmt.Errorf("has key but no certificate") - return + + switch { + case opts.NATSClientCertificate != nil && opts.NATSClientKey != nil: + natsOpts = append(natsOpts, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey)) + case opts.NATSClientCertificate != nil: + return nil, fmt.Errorf("has certificate but no key") + case opts.NATSClientKey != nil: + return nil, fmt.Errorf("has key but no certificate") } + if opts.NATSCertificateAuthority != nil { - nop = append(nop, + natsOpts = append(natsOpts, func(o *nats.Options) error { pool, err := x509.SystemCertPool() if err != nil { return err } - rootPEM, err := ioutil.ReadFile(*opts.NATSCertificateAuthority) + rootPEM, err := os.ReadFile(*opts.NATSCertificateAuthority) if err != nil || rootPEM == nil { return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err) } @@ -283,5 +321,5 @@ func GetNatsOpts(opts *config.EventReaderOpts, nodeID string, connTimeout time.D return nil }) } - return + return natsOpts, nil } diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go index 5a37fa05d..d0333e0f6 100644 --- a/ers/nats_it_test.go +++ b/ers/nats_it_test.go @@ -22,22 +22,109 @@ along with this program. If not, see package ers import ( + "encoding/json" "fmt" "os" - "os/exec" "path" "reflect" "runtime" "testing" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) -func testCheckNatsData(t *testing.T, randomOriginID, expData string, ch chan *nats.Msg) { +func TestNatsERIT(t *testing.T) { + cfgPath := path.Join(*dataDir, "conf", "samples", "ers_nats") + cfg, err := config.NewCGRConfigFromPath(context.Background(), cfgPath) + if err != nil { + t.Fatal("could not init cfg", err.Error()) + } + + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + JetStream: true, + }) + if err != nil { + t.Fatal(err) + } + natsServer.Start() + defer natsServer.Shutdown() + + // Establish a connection to nats. + nc, err := nats.Connect(cfg.ERsCfg().Readers[1].SourcePath) + if err != nil { + t.Fatal(err) + } + defer nc.Close() + + // Initialize a stream manager and create a stream. + js, err := jetstream.New(nc) + if err != nil { + t.Fatal(err) + } + js.CreateStream(context.Background(), jetstream.StreamConfig{ + Name: "stream", + Subjects: []string{"cgrates_cdrs", "cgrates_cdrs_processed"}, + }) + + // Start the engine. + if _, err := engine.StopStartEngine(cfgPath, 100); err != nil { + t.Fatal(err) + } + defer engine.KillEngine(100) + + // Publish CDRs asynchronously to the nats subject. + cdr := make(map[string]any) + for i := 0; i < 10; i++ { + cdr[utils.AccountField] = 1001 + i + cdr[utils.Subject] = 1001 + i + cdr[utils.Destination] = 2001 + i + b, _ := json.Marshal(cdr) + js.PublishAsync("cgrates_cdrs", b) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatal("Did not resolve in time") + } + + // Define a consumer for the subject where all the processed cdrs were published. + var cons jetstream.Consumer + cons, err = js.CreateOrUpdateConsumer(context.Background(), "stream", jetstream.ConsumerConfig{ + FilterSubject: "cgrates_cdrs_processed", + Durable: "cgrates_processed", + AckPolicy: jetstream.AckAllPolicy, + }) + if err != nil { + t.Error(err) + } + + // Wait for the messages to be consumed and processed. + time.Sleep(100 * time.Millisecond) + + // Retrieve info about the consumer. + info, err := cons.Info(context.Background()) + if err != nil { + t.Error(err) + } + + if info.NumPending != 10 { + t.Errorf("expected %d pending messages, received %d", 10, info.NumPending) + } + + js.DeleteStream(context.Background(), "stream") + +} + +func testCheckNatsData(t *testing.T, randomOriginID, expData string, ch chan string) { select { case err := <-rdrErr: t.Fatal(err) @@ -58,8 +145,8 @@ func testCheckNatsData(t *testing.T, randomOriginID, expData string, ch chan *na } select { case msg := <-ch: - if expData != string(msg.Data) { - t.Errorf("Expected %q ,received %q", expData, string(msg.Data)) + if expData != msg { + t.Errorf("Expected %q ,received %q", expData, msg) } case <-time.After(10 * time.Second): t.Fatal("Timeout2") @@ -89,49 +176,44 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) { } defer nc.Drain() - js, err := nc.JetStream() + js, err := jetstream.New(nc) if err != nil { t.Fatal(err) } - for name := range js.StreamNames() { - if name == "test" { - if err = js.DeleteStream("test"); err != nil { - t.Fatal(err) - } - break - } - if name == "test2" { - if err = js.DeleteStream("test2"); err != nil { - t.Fatal(err) - } - break - } - } - if _, err = js.AddStream(&nats.StreamConfig{ + + _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ Name: "test", Subjects: []string{utils.DefaultQueueID}, - }); err != nil { + }) + if err != nil { t.Fatal(err) } + defer js.DeleteStream(context.Background(), "test") - if err = js.PurgeStream("test"); err != nil { - t.Fatal(err) - } - - if _, err = js.AddStream(&nats.StreamConfig{ + _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ Name: "test2", Subjects: []string{"processed_cdrs"}, - }); err != nil { + }) + if err != nil { + t.Fatal(err) + } + defer js.DeleteStream(context.Background(), "test2") + + ch := make(chan string, 3) + var cons jetstream.Consumer + cons, err = js.CreateOrUpdateConsumer(context.Background(), "test2", jetstream.ConsumerConfig{ + FilterSubject: "processed_cdrs", + Durable: "test4", + AckPolicy: jetstream.AckAllPolicy, + }) + if err != nil { + nc.Drain() t.Fatal(err) } - if err = js.PurgeStream("test2"); err != nil { - t.Fatal(err) - } - ch := make(chan *nats.Msg, 3) - _, err = js.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) { - ch <- msg - }, nats.Durable("test4")) + _, err = cons.Consume(func(msg jetstream.Msg) { + ch <- string(msg.Data()) + }) if err != nil { t.Fatal(err) } @@ -143,7 +225,7 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) { for i := 0; i < 3; i++ { randomOriginID := utils.UUIDSha1Prefix() expData := fmt.Sprintf(`{"OriginID": "%s"}`, randomOriginID) - if _, err = js.Publish(utils.DefaultQueueID, []byte(expData)); err != nil { + if _, err = js.Publish(context.Background(), utils.DefaultQueueID, []byte(expData)); err != nil { t.Fatal(err) } @@ -174,8 +256,10 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) { if err != nil { t.Fatal(err) } - ch := make(chan *nats.Msg, 3) - _, err = nc.ChanQueueSubscribe("processed_cdrs", "test3", ch) + ch := make(chan string, 3) + _, err = nc.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) { + ch <- string(msg.Data) + }) if err != nil { t.Fatal(err) } @@ -200,16 +284,16 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) { } func TestNatsERJetStream(t *testing.T) { - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "-js") - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + JetStream: true, + }) + if err != nil { + t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ "ers": { // EventReaderService @@ -231,6 +315,7 @@ func TestNatsERJetStream(t *testing.T) { ], "opts": { "natsJetStream": true, + "natsStreamName": "test", "natsJetStreamProcessed": true, "natsSubjectProcessed": "processed_cdrs", } @@ -248,16 +333,15 @@ func TestNatsERJetStream(t *testing.T) { } func TestNatsER(t *testing.T) { - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server") - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + }) + if err != nil { + t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ "ers": { // EventReaderService @@ -294,16 +378,18 @@ func TestNatsER(t *testing.T) { } func TestNatsERJetStreamUser(t *testing.T) { - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "-js", "--user", "user", "--pass", "password") - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + JetStream: true, + Username: "user", + Password: "password", + }) + if err != nil { + t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ "ers": { // EventReaderService @@ -325,6 +411,7 @@ func TestNatsERJetStreamUser(t *testing.T) { ], "opts": { "natsJetStream": true, + "natsStreamName": "test", "natsJetStreamProcessed": true, "natsSubjectProcessed": "processed_cdrs", } @@ -342,16 +429,17 @@ func TestNatsERJetStreamUser(t *testing.T) { } func TestNatsERUser(t *testing.T) { - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "--user", "user", "--pass", "password") - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + Username: "user", + Password: "password", + }) + if err != nil { + t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ "ers": { // EventReaderService @@ -388,16 +476,17 @@ func TestNatsERUser(t *testing.T) { } func TestNatsERJetStreamToken(t *testing.T) { - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "-js", "--auth", "token") - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + JetStream: true, + Authorization: "token", + }) + if err != nil { + t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ "ers": { // EventReaderService @@ -419,6 +508,7 @@ func TestNatsERJetStreamToken(t *testing.T) { ], "opts": { "natsJetStream": true, + "natsStreamName": "test", "natsJetStreamProcessed": true, "natsSubjectProcessed": "processed_cdrs", } @@ -436,16 +526,17 @@ func TestNatsERJetStreamToken(t *testing.T) { } func TestNatsERToken(t *testing.T) { - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "--auth", "token") - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + JetStream: true, + Authorization: "token", + }) + if err != nil { + t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ "ers": { // EventReaderService @@ -493,25 +584,21 @@ func TestNatsERNkey(t *testing.T) { if err := os.WriteFile(seedFilePath, []byte("SUAOUIE5CU47NCO22GHFEZXGCRCJDVTHDLMIP4L7UQNCR5SW4FZICI7O3Q"), 0664); err != nil { t.Fatal(err) } - natsCfgPath := path.Join(basePath, "nats.cfg") - if err := os.WriteFile(natsCfgPath, []byte(`authorization: { - users: [ - { nkey: UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY } - ] - } -`), 0664); err != nil { + + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + Nkeys: []*server.NkeyUser{ + { + Nkey: "UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY", + }, + }, + }) + if err != nil { t.Fatal(err) } - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "-c", natsCfgPath) - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed - } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{ "ers": { // EventReaderService @@ -561,25 +648,22 @@ func TestNatsERJetStreamNKey(t *testing.T) { if err := os.WriteFile(seedFilePath, []byte("SUAOUIE5CU47NCO22GHFEZXGCRCJDVTHDLMIP4L7UQNCR5SW4FZICI7O3Q"), 0664); err != nil { t.Fatal(err) } - natsCfgPath := path.Join(basePath, "nats.cfg") - if err := os.WriteFile(natsCfgPath, []byte(`authorization: { -users: [ - { nkey: UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY } -] -} -`), 0664); err != nil { + + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + JetStream: true, + Nkeys: []*server.NkeyUser{ + { + Nkey: "UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY", + }, + }, + }) + if err != nil { t.Fatal(err) } - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "-c", natsCfgPath, "-js") - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed - } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{ "ers": { // EventReaderService @@ -601,6 +685,7 @@ users: [ ], "opts": { "natsJetStream": true, + "natsStreamName": "test", "natsSeedFile": %q, "natsJetStreamProcessed": true, "natsSubjectProcessed": "processed_cdrs", @@ -657,16 +742,16 @@ resolver_preload: { `), 0664); err != nil { t.Fatal(err) } - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "-c", natsCfgPath) - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + ConfigFile: natsCfgPath, + }) + if err != nil { + t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{ "ers": { // EventReaderService @@ -746,16 +831,18 @@ system_account:AAFIBB6C56ROU5XRVJLJYR3BTGGYK3HJGHEHQV7L7QZMTT3ZRBLHBS7F `), 0664); err != nil { t.Fatal(err) } - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "-c", natsCfgPath, "-js") - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + ConfigFile: natsCfgPath, + JetStream: true, + }) + if err != nil { + t.Fatal(err) } - time.Sleep(100 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() + cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{ "ers": { // EventReaderService "enabled": true, // starts the EventReader service: @@ -776,6 +863,7 @@ system_account:AAFIBB6C56ROU5XRVJLJYR3BTGGYK3HJGHEHQV7L7QZMTT3ZRBLHBS7F ], "opts": { "natsJetStream": true, + "natsStreamName": "test", "natsJWTFile": %q, "natsJetStreamProcessed": true, "natsSubjectProcessed": "processed_cdrs", diff --git a/ers/s3.go b/ers/s3.go index eee3e71f5..1387513e2 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -191,7 +191,7 @@ func (rdr *S3ER) readLoop() (err error) { } func (rdr *S3ER) createPoster() { - processedOpt := getProcessOptions(rdr.Config().Opts) + processedOpt := getProcessedOptions(rdr.Config().Opts) if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { return } diff --git a/ers/sql.go b/ers/sql.go index de535054c..5d8813474 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -297,7 +297,7 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string, opts *config.EventReader } // outURL - processedOpt := getProcessOptions(opts) + processedOpt := getProcessedOptions(opts) if processedOpt == nil { if len(outURL) == 0 { return diff --git a/ers/sqs.go b/ers/sqs.go index 42e37771c..d8ae6e736 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -220,7 +220,7 @@ func (rdr *SQSER) readLoop(scv sqsClient) (err error) { } func (rdr *SQSER) createPoster() { - processedOpt := getProcessOptions(rdr.Config().Opts) + processedOpt := getProcessedOptions(rdr.Config().Opts) if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { return } diff --git a/go.mod b/go.mod index cc5f97efa..9cc8803d6 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/cgrates/ugocodec v0.0.0-20201023092048-df93d0123f60 github.com/creack/pty v1.1.18 github.com/dgrijalva/jwt-go v3.2.0+incompatible + github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6 github.com/elastic/go-elasticsearch/v8 v8.8.0 github.com/ericlagergren/decimal v0.0.0-20211103172832-aca2edc11f73 github.com/fiorix/go-diameter/v4 v4.0.4 @@ -32,15 +33,15 @@ require ( github.com/go-sql-driver/mysql v1.6.0 github.com/mediocregopher/radix/v3 v3.8.0 github.com/miekg/dns v1.1.50 - github.com/nats-io/nats.go v1.16.0 + github.com/nats-io/nats.go v1.30.2 github.com/nyaruka/phonenumbers v1.1.0 github.com/peterh/liner v1.2.2 github.com/rabbitmq/amqp091-go v1.5.0 github.com/segmentio/kafka-go v0.4.32 go.mongodb.org/mongo-driver v1.11.0 - golang.org/x/crypto v0.7.0 - golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb - golang.org/x/net v0.8.0 + golang.org/x/crypto v0.13.0 + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 + golang.org/x/net v0.15.0 golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2 google.golang.org/api v0.85.0 gorm.io/driver/mysql v1.3.4 @@ -80,11 +81,11 @@ require ( github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/klauspost/compress v1.15.6 // indirect + github.com/klauspost/compress v1.17.0 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect github.com/mschoch/smat v0.2.0 // indirect - github.com/nats-io/nats-server/v2 v2.2.6 // indirect - github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nats-server/v2 v2.10.1 + github.com/nats-io/nkeys v0.4.5 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.12.2 @@ -97,11 +98,11 @@ require ( github.com/xdg/stringprep v1.0.3 // indirect go.etcd.io/bbolt v1.3.6 // indirect go.opencensus.io v0.23.0 // indirect - golang.org/x/mod v0.8.0 // indirect - golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.6.0 // indirect - golang.org/x/text v0.8.0 // indirect - golang.org/x/tools v0.6.0 // indirect + golang.org/x/mod v0.12.0 // indirect + golang.org/x/sync v0.3.0 // indirect + golang.org/x/sys v0.12.0 // indirect + golang.org/x/text v0.13.0 // indirect + golang.org/x/tools v0.13.0 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220627200112-0a929928cb33 // indirect @@ -114,11 +115,12 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.2.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6 // indirect github.com/google/uuid v1.3.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/minio/highwayhash v1.0.2 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect + github.com/nats-io/jwt/v2 v2.5.2 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.35.0 // indirect @@ -127,4 +129,5 @@ require ( github.com/xdg-go/scram v1.1.1 // indirect github.com/xdg-go/stringprep v1.0.3 // indirect github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect + golang.org/x/time v0.3.0 // indirect ) diff --git a/go.sum b/go.sum index 0b8129476..e604dd536 100644 --- a/go.sum +++ b/go.sum @@ -185,14 +185,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6 h1:1+44gxLdKRnR/Bx/iAtr+XqNcE4e0oODa63+FABNANI= github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= -github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo= -github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= -github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA= -github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg= github.com/elastic/go-elasticsearch/v8 v8.8.0 h1:yNBPlXNo6wstMG7I3KiZPbLFgA82RMryYqkh1xBMV3A= github.com/elastic/go-elasticsearch/v8 v8.8.0/go.mod h1:NGmpvohKiRHXI0Sw4fuUGn6hYOmAXlyCphKpzVBiqDE= -github.com/elastic/go-elasticsearch/v8 v8.8.1 h1:/OiP5Yex40q5eWpzFVQIS8jRE7SaEZrFkG9JbE6TXtY= -github.com/elastic/go-elasticsearch/v8 v8.8.1/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -409,11 +403,10 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.15.6 h1:6D9PcO8QWu0JyaQ2zUMmu16T1T+zjjEpP91guRsvDfY= -github.com/klauspost/compress v1.15.6/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= +github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kljensen/snowball v0.6.0/go.mod h1:27N7E8fVU5H68RlUmnWwZCfxgt4POBJfENGMvNRhldw= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -447,8 +440,8 @@ github.com/mediocregopher/radix/v3 v3.8.0 h1:HI8EgkaM7WzsrFpYAkOXIgUKbjNonb2Ne7K github.com/mediocregopher/radix/v3 v3.8.0/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA= github.com/miekg/dns v1.1.50/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME= -github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= -github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -463,18 +456,14 @@ github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= -github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= -github.com/nats-io/jwt/v2 v2.0.2 h1:ejVCLO8gu6/4bOKIHQpmB5UhhUJfAQw55yvLWpfmKjI= -github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= -github.com/nats-io/nats-server/v2 v2.2.6 h1:FPK9wWx9pagxcw14s8W9rlfzfyHm61uNLnJyybZbn48= -github.com/nats-io/nats-server/v2 v2.2.6/go.mod h1:sEnFaxqe09cDmfMgACxZbziXnhQFhwk+aKkZjBBRYrI= -github.com/nats-io/nats.go v1.11.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= -github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g= -github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= -github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= -github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= -github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= +github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= +github.com/nats-io/nats-server/v2 v2.10.1 h1:MIJ614dhOIdo71iSzY8ln78miXwrYvlvXHUyS+XdKZQ= +github.com/nats-io/nats-server/v2 v2.10.1/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto= +github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY= +github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM= +github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= +github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nyaruka/phonenumbers v1.1.0 h1:OvNAOAl4A9a2kNpzziITbUVH4bBBeKHkHl0llPmkxaA= @@ -643,16 +632,14 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200320181102-891825fb96df/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= -golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= -golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= +golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= +golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -663,8 +650,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb h1:PaBZQdo+iSDyHT053FjUCgZQ/9uqVwPOcl7KSWhKn6w= -golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -690,8 +677,8 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= +golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -743,8 +730,8 @@ golang.org/x/net v0.0.0-20220412020605-290c469a71a5/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220617184016-355a448f1bc9/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= -golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= +golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -779,8 +766,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -860,8 +847,8 @@ golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -874,13 +861,13 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= -golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= -golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -939,8 +926,8 @@ golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ= +golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/utils/consts.go b/utils/consts.go index e9a4acdd7..2f4306eb0 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2524,6 +2524,7 @@ const ( NatsSubject = "natsSubject" NatsQueueID = "natsQueueID" NatsConsumerName = "natsConsumerName" + NatsStreamName = "natsStreamName" NatsJWTFile = "natsJWTFile" NatsSeedFile = "natsSeedFile" NatsClientCertificate = "natsClientCertificate"