From bac73aa2c63b5f9a789ce6d60476d8cd21f2371a Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 26 Sep 2023 16:18:47 -0400 Subject: [PATCH] Migrate to new jetstream API Updated ees/ers implementation to use the jetstream package which separates the jetstream context from Core NATS. Removed the jsOpts fields from the NatsEE struct. We are now using the jetStreamMaxWait option directly through a timeout context. Added streamName option for NATS reader since it is now required to be specified when creating a consumer (it is not inferred based on subject anymore). Updated nats ers integration test. Removed deprecated birpc unit test. Updated tests to also use the new jetstream package. Updated tests to start the nats-server using their official driver instead of using the std go exec package. time.Sleeps are now not required anymore to wait for the server. In test configurations for nats readers, made sure that natsStreamName option is populated. It is now required for consumers to know where to subscribe. Fixed potential panic that happened when jetstreamMaxWait option would not be set. --- config/config_defaults.go | 1 + config/erscfg.go | 11 + config/erscfg_test.go | 3 + config/libconfig_json.go | 1 + cores/server_it_test.go | 22 -- data/conf/cgrates/cgrates.json | 3 +- data/conf/samples/ers_nats/cgrates.json | 75 ++++ ees/nats.go | 18 +- ees/nats_it_test.go | 85 +++-- ees/nats_test.go | 43 --- ers/nats.go | 71 ++-- ers/nats_it_test.go | 471 ++++++++++-------------- go.mod | 11 +- go.sum | 38 +- utils/consts.go | 1 + 15 files changed, 416 insertions(+), 438 deletions(-) create mode 100644 data/conf/samples/ers_nats/cgrates.json diff --git a/config/config_defaults.go b/config/config_defaults.go index c79f183e7..ed347a6e2 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -447,6 +447,7 @@ const CGRATES_CFG_JSON = ` // nats // "natsJetStream": false, // controls if the nats reader uses the JetStream // "natsConsumerName": "cgrates", // in case of JetStream the name of the consumer + // "natsStreamName": "cdrs", // the name of the NATS JetStream stream from which the consumer will read messages "natsSubject": "cgrates_cdrs", // the subject from were the events are read // "natsQueueID": "", // the queue id the consumer listen to // "natsJWTFile": "", // the path to the JWT file( can be the chained file or the user file) diff --git a/config/erscfg.go b/config/erscfg.go index 381797cf4..1317f95f9 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -315,6 +315,7 @@ func (awsROpts *AWSROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err erro type NATSROpts struct { JetStream *bool ConsumerName *string + StreamName *string Subject *string QueueID *string JWTFile *string @@ -340,6 +341,9 @@ func (natsOpts *NATSROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err err if jsnCfg.NATSConsumerName != nil { natsOpts.ConsumerName = jsnCfg.NATSConsumerName } + if jsnCfg.NATSStreamName != nil { + natsOpts.StreamName = jsnCfg.NATSStreamName + } if jsnCfg.NATSSubject != nil { natsOpts.Subject = jsnCfg.NATSSubject } @@ -769,6 +773,10 @@ func (natOpts *NATSROpts) Clone() *NATSROpts { cln.ConsumerName = new(string) *cln.ConsumerName = *natOpts.ConsumerName } + if natOpts.StreamName != nil { + cln.StreamName = new(string) + *cln.StreamName = *natOpts.StreamName + } if natOpts.Subject != nil { cln.Subject = new(string) *cln.Subject = *natOpts.Subject @@ -1078,6 +1086,9 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string if natsOpts.ConsumerName != nil { opts[utils.NatsConsumerName] = *natsOpts.ConsumerName } + if natsOpts.StreamName != nil { + opts[utils.NatsStreamName] = *natsOpts.StreamName + } if natsOpts.Subject != nil { opts[utils.NatsSubject] = *natsOpts.Subject } diff --git a/config/erscfg_test.go b/config/erscfg_test.go index b19a606c7..79ccd74fc 100644 --- a/config/erscfg_test.go +++ b/config/erscfg_test.go @@ -1070,6 +1070,7 @@ func TestERSCfgAsMapInterfaceCase2(t *testing.T) { "s3BucketIDProcessed":"s3bucketid", "natsJetStream":true, "natsConsumerName": "NATConsumer", + "natsStreamName": "NATStream", "natsQueueID":"NATid", "natsJWTFile":"jwt", "natsCertificateAuthority":"auth", @@ -1204,6 +1205,7 @@ func TestERSCfgAsMapInterfaceCase2(t *testing.T) { utils.S3BucketIDProcessedCfg: "s3bucketid", utils.NatsJetStream: true, utils.NatsConsumerName: "NATConsumer", + utils.NatsStreamName: "NATStream", utils.NatsQueueID: "NATid", utils.NatsJWTFile: "jwt", utils.NatsSeedFile: "seed", @@ -1541,6 +1543,7 @@ func TestEventReaderCfgClone(t *testing.T) { NATSOpts: &NATSROpts{ JetStream: utils.BoolPointer(false), ConsumerName: utils.StringPointer("user"), + StreamName: utils.StringPointer("stream"), QueueID: utils.StringPointer("id"), JWTFile: utils.StringPointer("jwt"), SeedFile: utils.StringPointer("seed"), diff --git a/config/libconfig_json.go b/config/libconfig_json.go index ad06a5161..acebaa845 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -253,6 +253,7 @@ type EventReaderOptsJson struct { S3BucketIDProcessed *string `json:"s3BucketIDProcessed"` NATSJetStream *bool `json:"natsJetStream"` NATSConsumerName *string `json:"natsConsumerName"` + NATSStreamName *string `json:"natsStreamName"` NATSSubject *string `json:"natsSubject"` NATSQueueID *string `json:"natsQueueID"` NATSJWTFile *string `json:"natsJWTFile"` diff --git a/cores/server_it_test.go b/cores/server_it_test.go index 14ed45c2f..62bf61e14 100644 --- a/cores/server_it_test.go +++ b/cores/server_it_test.go @@ -63,7 +63,6 @@ var ( testServeHHTPFail, testServeHHTPFailEnableRpc, testServeBiJSON, - testServeBiJSONEmptyBiRPCServer, testServeBiJSONInvalidPort, testServeBiGoB, testServeBiGoBInvalidPort, @@ -334,27 +333,6 @@ func testServeBiJSON(t *testing.T) { runtime.Gosched() } -func testServeBiJSONEmptyBiRPCServer(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - - data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) - dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) - - ss := sessions.NewSessionS(cfg, dm, nil) - - expectedErr := "BiRPCServer should not be nil" - go func() { - if err := server.ServeBiRPC(":3430", "", ss.OnBiJSONConnect, ss.OnBiJSONDisconnect); err == nil || err.Error() != "BiRPCServer should not be nil" { - t.Errorf("Expected %+v, received %+v", expectedErr, err) - } - }() - - runtime.Gosched() -} - func testServeBiJSONInvalidPort(t *testing.T) { cfg := config.NewDefaultCGRConfig() caps := engine.NewCaps(100, utils.MetaBusy) diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 9217eabb7..fd8073f43 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -410,7 +410,8 @@ // // nats // // "natsJetStream": false, // controls if the nats reader uses the JetStream -// // "natsConsumerName": "cgrates", // in case of JetStream the name of the consumer +// // "natsConsumerName": "cgrates", // the name of the NATS JetStream stream from which the consumer will read messages +// // "natsStreamName": "cgrates", // in case of JetStream the name of the consumer // "natsSubject": "cgrates_cdrs", // the subject from were the events are read // // "natsQueueID": "", // the queue id the consumer listen to // // "natsJWTFile": "", // the path to the JWT file( can be the chained file or the user file) diff --git a/data/conf/samples/ers_nats/cgrates.json b/data/conf/samples/ers_nats/cgrates.json new file mode 100644 index 000000000..cdf44328a --- /dev/null +++ b/data/conf/samples/ers_nats/cgrates.json @@ -0,0 +1,75 @@ +{ + +"general": { + "log_level": 7 +}, + +"data_db": { + "db_type": "*internal" +}, + +"stor_db": { + "db_type": "*internal" +}, + +"ers": { + "enabled": true, + "sessions_conns":[], + "readers": [ + { + "id": "nats_reader1", + "type": "*nats_json_map", + "source_path": "nats://127.0.0.1:4222", + "processed_path": "nats://127.0.0.1:4222", + "opts": { + "natsJetStream": true, + "natsConsumerName": "cgrates", + "natsStreamName": "stream", + "natsSubject": "cgrates_cdrs", + "natsQueueID": "queue", + "natsJetStreamMaxWait": "5s", + + "natsJetStreamProcessed": true, + "natsSubjectProcessed": "cgrates_cdrs_processed", + "natsJetStreamMaxWaitProcessed": "5s" + }, + "flags": ["*dryrun"], + "fields":[ + {"tag": "cdr_template", "type": "*template", "value": "cdr_template"} + ] + }, + { + "id": "nats_reader2", + "type": "*nats_json_map", + "source_path": "nats://127.0.0.1:4222", + "processed_path": "nats://127.0.0.1:4222", + "opts": { + "natsJetStream": true, + "natsConsumerName": "cgrates", + "natsStreamName": "stream", + "natsSubject": "cgrates_cdrs", + "natsQueueID": "queue", + "natsJetStreamMaxWait": "5s", + + "natsJetStreamProcessed": true, + "natsSubjectProcessed": "cgrates_cdrs_processed", + "natsJetStreamMaxWaitProcessed": "5s" + }, + "flags": ["*dryrun"], + "fields":[ + {"tag": "cdr_template", "type": "*template", "value": "cdr_template"} + ] + } + ] +}, + + +"templates": { + "cdr_template": [ + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.Subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true}, + ] +} + +} \ No newline at end of file diff --git a/ees/nats.go b/ees/nats.go index 185997dae..84f741e38 100644 --- a/ees/nats.go +++ b/ees/nats.go @@ -26,9 +26,11 @@ import ( "sync" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) // NewNatsEE creates a kafka poster @@ -48,10 +50,9 @@ type NatsEE struct { subject string // identifier of the CDR queue where we publish jetStream bool opts []nats.Option - jsOpts []nats.JSOpt poster *nats.Conn - posterJS nats.JetStreamContext + posterJS jetstream.JetStream cfg *config.EventExporterCfg dc *utils.SafeMapStorage @@ -78,9 +79,6 @@ func (pstr *NatsEE) parseOpts(opts *config.EventExporterOpts, nodeID string, con return err } - if pstr.jetStream && opts.NATS.JetStreamMaxWait != nil { - pstr.jsOpts = []nats.JSOpt{nats.MaxWait(*opts.NATS.JetStreamMaxWait)} - } return nil } @@ -99,7 +97,7 @@ func (pstr *NatsEE) Connect() error { return err } if pstr.jetStream { - pstr.posterJS, err = pstr.poster.JetStream(pstr.jsOpts...) + pstr.posterJS, err = jetstream.New(pstr.poster) } return err } @@ -116,7 +114,13 @@ func (pstr *NatsEE) ExportEvent(content any, _ string) error { var err error if pstr.jetStream { - _, err = pstr.posterJS.Publish(pstr.subject, content.([]byte)) + ctx := context.TODO() + if pstr.cfg.Opts.NATS.JetStreamMaxWait != nil { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, *pstr.cfg.Opts.NATS.JetStreamMaxWait) + defer cancel() + } + _, err = pstr.posterJS.Publish(ctx, pstr.subject, content.([]byte)) } else { err = pstr.poster.Publish(pstr.subject, content.([]byte)) } diff --git a/ees/nats_it_test.go b/ees/nats_it_test.go index cd2cf8e7e..092beb11b 100644 --- a/ees/nats_it_test.go +++ b/ees/nats_it_test.go @@ -23,26 +23,33 @@ package ees import ( "os" - "os/exec" "path" "testing" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) func TestNatsEEJetStream(t *testing.T) { - testCreateDirectory(t) - var err error - cmd := exec.Command("nats-server", "-js") // Start the nats-server. - if err := cmd.Start(); err != nil { - t.Fatal(err) // Only if nats-server is not installed. + + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + JetStream: true, + }) + if err != nil { + t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() + natsServer.Start() + defer natsServer.Shutdown() + + testCreateDirectory(t) cgrCfg, err := config.NewCGRConfigFromPath(path.Join(*dataDir, "conf", "samples", "ees")) if err != nil { t.Fatal(err) @@ -63,38 +70,40 @@ func TestNatsEEJetStream(t *testing.T) { t.Fatal(err) } - nc, err := nats.Connect("nats://localhost:4222", nop...) + nc, err := nats.Connect(nats.DefaultURL, nop...) if err != nil { t.Fatal(err) } - js, err := nc.JetStream() + defer nc.Drain() + + js, err := jetstream.New(nc) if err != nil { t.Fatal(err) } - for name := range js.StreamNames() { - if name == "test2" { - if err = js.DeleteStream("test2"); err != nil { - t.Fatal(err) - } - break - } - } - if _, err = js.AddStream(&nats.StreamConfig{ + _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ Name: "test2", Subjects: []string{"processed_cdrs"}, - }); err != nil { + }) + if err != nil { + t.Fatal(err) + } + defer js.DeleteStream(context.Background(), "test2") + + ch := make(chan string, 3) + var cons jetstream.Consumer + cons, err = js.CreateOrUpdateConsumer(context.Background(), "test2", jetstream.ConsumerConfig{ + Durable: "test4", + FilterSubject: "processed_cdrs", + AckPolicy: jetstream.AckAllPolicy, + }) + if err != nil { t.Fatal(err) } - if err = js.PurgeStream("test2"); err != nil { - t.Fatal(err) - } - - ch := make(chan *nats.Msg, 3) - _, err = js.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) { - ch <- msg - }, nats.Durable("test4")) + _, err = cons.Consume(func(msg jetstream.Msg) { + ch <- string(msg.Data()) + }) if err != nil { t.Fatal(err) } @@ -114,8 +123,8 @@ func TestNatsEEJetStream(t *testing.T) { // fmt.Println((<-ch).Data) select { case data := <-ch: - if expected != string(data.Data) { - t.Fatalf("Expected %v \n but received \n %v", expected, string(data.Data)) + if expected != data { + t.Fatalf("Expected %v \n but received \n %v", expected, data) } case <-time.After(50 * time.Millisecond): t.Fatal("Time limit exceeded") @@ -124,14 +133,16 @@ func TestNatsEEJetStream(t *testing.T) { func TestNatsEE(t *testing.T) { testCreateDirectory(t) - exec.Command("pkill", "nats-server") - cmd := exec.Command("nats-server") - if err := cmd.Start(); err != nil { + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + }) + if err != nil { t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() + natsServer.Start() + defer natsServer.Shutdown() cgrCfg, err := config.NewCGRConfigFromPath(path.Join(*dataDir, "conf", "samples", "ees")) if err != nil { @@ -211,7 +222,7 @@ func TestGetNatsOptsSeedFile(t *testing.T) { //test error os.WriteFile("/tmp/nkey.txt", []byte(""), 0777) _, err = GetNatsOpts(opts, nodeID, connTimeout) - if err.Error() != "no nkey seed found" { - t.Errorf("Expected %v \n but received \n %v", err.Error(), "no nkey seed found") + if err == nil || err.Error() != "nkeys: no nkey seed found" { + t.Errorf("expected \"%s\" but received \"%s\"", "nkeys: no nkey seed found", err.Error()) } } diff --git a/ees/nats_test.go b/ees/nats_test.go index 8102bdeae..941bd5f68 100644 --- a/ees/nats_test.go +++ b/ees/nats_test.go @@ -19,13 +19,11 @@ along with this program. If not, see package ees import ( - "reflect" "testing" "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" - "github.com/nats-io/nats.go" ) func TestNewNatsEE(t *testing.T) { @@ -148,47 +146,6 @@ func TestParseOptJetStream(t *testing.T) { } } -func TestParseOptJetStreamMaxWait(t *testing.T) { - cfg := &config.EventExporterCfg{ - ID: "nats_exporter", - Type: "nats", - Attempts: 2, - ConcurrentRequests: 2, - Opts: &config.EventExporterOpts{ - AMQP: &config.AMQPOpts{}, - Els: &config.ElsOpts{}, - AWS: &config.AWSOpts{}, - NATS: &config.NATSOpts{}, - Kafka: &config.KafkaOpts{}, - RPC: &config.RPCOpts{}, - }, - } - opts := &config.EventExporterOpts{ - NATS: &config.NATSOpts{ - JetStream: utils.BoolPointer(true), - JetStreamMaxWait: utils.DurationPointer(2), - }} - nodeID := "node_id1" - connTimeout := 2 * time.Second - dc, err := newEEMetrics("Local") - if err != nil { - t.Error(err) - } - pstr, err := NewNatsEE(cfg, nodeID, connTimeout, dc) - if err != nil { - t.Error(err) - } - - err = pstr.parseOpts(opts, nodeID, connTimeout) - if err != nil { - t.Error(err) - } - exp := []nats.JSOpt{nats.MaxWait(2 * time.Nanosecond)} - if !reflect.DeepEqual(pstr.jsOpts, exp) { - t.Errorf("Expected %v \n but received \n %v", exp, pstr.jsOpts) - } -} - func TestParseOptSubject(t *testing.T) { cfg := &config.EventExporterCfg{ ID: "nats_exporter", diff --git a/ers/nats.go b/ers/nats.go index 475bd7245..04f86f4fd 100644 --- a/ers/nats.go +++ b/ers/nats.go @@ -26,12 +26,14 @@ import ( "os" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/ees" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) // NewNatsER return a new amqp event reader @@ -79,8 +81,8 @@ type NatsER struct { queueID string jetStream bool consumerName string + streamName string opts []nats.Option - jsOpts []nats.JSOpt poster *ees.NatsEE } @@ -101,28 +103,28 @@ func (rdr *NatsER) Serve() error { } // Define the message handler. Its content will get executed for every received message. - msgHandler := func(msg *nats.Msg) { + handleMessage := func(msgData []byte) { // If the rdr.cap channel buffer is empty, block until a resource is available. Otherwise // allocate one resource and start processing the message. if rdr.Config().ConcurrentReqs != -1 { <-rdr.cap } - go func(msg *nats.Msg) { - handlerErr := rdr.processMessage(msg.Data) + go func() { + handlerErr := rdr.processMessage(msgData) if handlerErr != nil { utils.Logger.Warning( fmt.Sprintf("<%s> processing message %s error: %s", - utils.ERs, string(msg.Data), handlerErr.Error())) + utils.ERs, string(msgData), handlerErr.Error())) } // Export the received message if a poster has been defined. if rdr.poster != nil { - handlerErr = ees.ExportWithAttempts(rdr.poster, msg.Data, utils.EmptyString) + handlerErr = ees.ExportWithAttempts(rdr.poster, msgData, utils.EmptyString) if handlerErr != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", - utils.ERs, string(msg.Data), handlerErr.Error())) + utils.ERs, string(msgData), handlerErr.Error())) } } @@ -130,29 +132,50 @@ func (rdr *NatsER) Serve() error { if rdr.Config().ConcurrentReqs != -1 { rdr.cap <- struct{}{} } - }(msg) + + }() } // Subscribe to the appropriate NATS subject. if !rdr.jetStream { - _, err = nc.QueueSubscribe(rdr.subject, rdr.queueID, msgHandler) + _, err = nc.QueueSubscribe(rdr.subject, rdr.queueID, func(msg *nats.Msg) { + handleMessage(msg.Data) + }) if err != nil { nc.Drain() return err } } else { - var js nats.JetStreamContext - js, err = nc.JetStream(rdr.jsOpts...) + var js jetstream.JetStream + js, err = jetstream.New(nc) if err != nil { nc.Drain() return err } - _, err = js.QueueSubscribe(rdr.subject, rdr.queueID, msgHandler, - nats.Durable(rdr.consumerName)) + ctx := context.TODO() + if jsMaxWait := rdr.Config().Opts.NATSOpts.JetStreamMaxWait; jsMaxWait != nil { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, *jsMaxWait) + defer cancel() + } + + var cons jetstream.Consumer + cons, err = js.CreateOrUpdateConsumer(ctx, rdr.streamName, jetstream.ConsumerConfig{ + FilterSubject: rdr.subject, + Durable: rdr.consumerName, + AckPolicy: jetstream.AckAllPolicy, + }) if err != nil { nc.Drain() return err } + + _, err = cons.Consume(func(msg jetstream.Msg) { + handleMessage(msg.Data()) + }) + if err != nil { + return err + } } go func() { @@ -219,24 +242,24 @@ func (rdr *NatsER) processOpts() (err error) { if rdr.Config().Opts.NATSOpts.Subject != nil { rdr.subject = *rdr.Config().Opts.NATSOpts.Subject } - var queueID string + + rdr.queueID = rdr.cgrCfg.GeneralCfg().NodeID if rdr.Config().Opts.NATSOpts.QueueID != nil { - queueID = *rdr.Config().Opts.NATSOpts.QueueID + rdr.queueID = *rdr.Config().Opts.NATSOpts.QueueID } - rdr.queueID = utils.FirstNonEmpty(queueID, rdr.cgrCfg.GeneralCfg().NodeID) - var consumerName string + + rdr.consumerName = utils.CGRateSLwr if rdr.Config().Opts.NATSOpts.ConsumerName != nil { - consumerName = *rdr.Config().Opts.NATSOpts.ConsumerName + rdr.consumerName = *rdr.Config().Opts.NATSOpts.ConsumerName } - rdr.consumerName = utils.FirstNonEmpty(consumerName, utils.CGRateSLwr) + + if rdr.Config().Opts.NATSOpts.StreamName != nil { + rdr.streamName = *rdr.Config().Opts.NATSOpts.StreamName + } + if rdr.Config().Opts.NATSOpts.JetStream != nil { rdr.jetStream = *rdr.Config().Opts.NATSOpts.JetStream } - if rdr.jetStream { - if rdr.Config().Opts.NATSOpts.JetStreamMaxWait != nil { - rdr.jsOpts = []nats.JSOpt{nats.MaxWait(*rdr.Config().Opts.NATSOpts.JetStreamMaxWait)} - } - } rdr.opts, err = GetNatsOpts(rdr.Config().Opts.NATSOpts, rdr.cgrCfg.GeneralCfg().NodeID, rdr.cgrCfg.GeneralCfg().ConnectTimeout) diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go index 9bb6fc124..f1f7c889d 100644 --- a/ers/nats_it_test.go +++ b/ers/nats_it_test.go @@ -25,174 +25,68 @@ import ( "encoding/json" "fmt" "os" - "os/exec" "path" "reflect" "runtime" "testing" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) func TestERsNATSIT(t *testing.T) { - t.Skip() - cfgContent := `{ + cfgPath := path.Join(*dataDir, "conf", "samples", "ers_nats") + cfg, err := config.NewCGRConfigFromPath(cfgPath) + if err != nil { + t.Fatal("could not init cfg", err.Error()) + } -"general": { - "log_level": 7 -}, - -"data_db": { - "db_type": "*internal" -}, - -"stor_db": { - "db_type": "*internal" -}, - -"ers": { - "enabled": true, - "sessions_conns":[], - "readers": [ - { - "id": "nats_consumer1", - "type": "*nats_json_map", - "source_path": "nats://127.0.0.1:4222", - "processed_path": "nats://127.0.0.1:4222", - "opts": { - "natsJetStream": true, - "natsConsumerName": "cgrates_consumer", - "natsSubject": "cgrates_cdrs", - "natsQueueID": "queue", - "natsJetStreamMaxWait": "5s", - - "natsJetStreamProcessed": true, - "natsSubjectProcessed": "cgrates_cdrs_processed", - "natsJetStreamMaxWaitProcessed": "5s" - }, - "flags": ["*dryrun"], - "fields":[ - {"tag": "cdr_template", "type": "*template", "value": "cdr_template"} - ] - }, - { - "id": "nats_consumer2", - "type": "*nats_json_map", - "source_path": "nats://127.0.0.1:4222", - "processed_path": "nats://127.0.0.1:4222", - "opts": { - "natsJetStream": true, - "natsConsumerName": "cgrates_consumer", - "natsSubject": "cgrates_cdrs", - "natsQueueID": "queue", - "natsJetStreamMaxWait": "5s", - - "natsJetStreamProcessed": true, - "natsSubjectProcessed": "cgrates_cdrs_processed", - "natsJetStreamMaxWaitProcessed": "5s" - }, - "flags": ["*dryrun"], - "fields":[ - {"tag": "cdr_template", "type": "*template", "value": "cdr_template"} - ] - }, - { - "id": "nats_consumer3", - "type": "*nats_json_map", - "source_path": "nats://127.0.0.1:4222", - "processed_path": "nats://127.0.0.1:4222", - "opts": { - "natsJetStream": true, - "natsConsumerName": "cgrates_consumer", - "natsSubject": "cgrates_cdrs", - "natsQueueID": "queue", - "natsJetStreamMaxWait": "5s", - - "natsJetStreamProcessed": true, - "natsSubjectProcessed": "cgrates_cdrs_processed", - "natsJetStreamMaxWaitProcessed": "5s" - }, - "flags": ["*dryrun"], - "fields":[ - {"tag": "cdr_template", "type": "*template", "value": "cdr_template"} - ] - }, - { - "id": "nats_consumer4", - "type": "*nats_json_map", - "source_path": "nats://127.0.0.1:4222", - "processed_path": "", - "opts": { - "natsJetStream": true, - "natsConsumerName": "cgrates_consumer4", - "natsSubject": "cgrates_cdrs_processed", - "natsQueueID": "", - "natsJetStreamMaxWait": "5s", - }, - "flags": ["*dryrun"], - "fields":[ - {"tag": "cdr_template", "type": "*template", "value": "cdr_template"} - ] - } - ] -}, - - -"templates": { - "cdr_template": [ - // {"tag": "Source", "path": "*cgreq.Source", "type": "*constant", "value": "ers_template_combined", "mandatory": true}, - // {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.2", "mandatory": true}, - // {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.3", "mandatory": true}, - // {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.4", "mandatory": true}, - // {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.6", "mandatory": true}, - // {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.7", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true}, - // {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.9", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true}, - // {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.11", "mandatory": true}, - // {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.12", "mandatory": true}, - // {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.13", "mandatory": true} - ] -} - -}` - cfg, cfgPath, clean, err := initTestCfg(cfgContent) + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + JetStream: true, + }) if err != nil { t.Fatal(err) } - defer clean() + natsServer.Start() + defer natsServer.Shutdown() + // Establish a connection to nats. nc, err := nats.Connect(cfg.ERsCfg().Readers[1].SourcePath) if err != nil { t.Fatal(err) } defer nc.Close() - js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) + // Initialize a stream manager and create a stream. + js, err := jetstream.New(nc) if err != nil { t.Fatal(err) } - - js.AddStream(&nats.StreamConfig{ - Name: "CDRs", + js.CreateStream(context.Background(), jetstream.StreamConfig{ + Name: "stream", Subjects: []string{"cgrates_cdrs", "cgrates_cdrs_processed"}, }) - time.Sleep(2 * time.Second) - + // Start the engine. if _, err := engine.StopStartEngine(cfgPath, 100); err != nil { t.Fatal(err) } + defer engine.KillEngine(100) + // Publish CDRs asynchronously to the nats subject. + cdr := make(map[string]any) for i := 0; i < 10; i++ { - cdr := map[string]any{ - "Account": 1000 + i, - "Destination": 2000 + i, - } + cdr[utils.AccountField] = 1001 + i + cdr[utils.Subject] = 1001 + i + cdr[utils.Destination] = 2001 + i b, _ := json.Marshal(cdr) js.PublishAsync("cgrates_cdrs", b) } @@ -202,15 +96,35 @@ func TestERsNATSIT(t *testing.T) { t.Fatal("Did not resolve in time") } - // Add verification - - err = engine.KillEngine(100) + // Define a consumer for the subject where all the processed cdrs were published. + var cons jetstream.Consumer + cons, err = js.CreateOrUpdateConsumer(context.Background(), "stream", jetstream.ConsumerConfig{ + FilterSubject: "cgrates_cdrs_processed", + Durable: "cgrates_processed", + AckPolicy: jetstream.AckAllPolicy, + }) if err != nil { t.Error(err) } + + // Wait for the messages to be consumed and processed. + time.Sleep(100 * time.Millisecond) + + // Retrieve info about the consumer. + info, err := cons.Info(context.Background()) + if err != nil { + t.Error(err) + } + + if info.NumPending != 10 { + t.Errorf("expected %d pending messages, received %d", 10, info.NumPending) + } + + js.DeleteStream(context.Background(), "stream") + } -func testCheckNatsData(t *testing.T, randomCGRID, expData string, ch chan *nats.Msg) { +func testCheckNatsData(t *testing.T, randomCGRID, expData string, ch chan string) { select { case err := <-rdrErr: t.Fatal(err) @@ -232,8 +146,8 @@ func testCheckNatsData(t *testing.T, randomCGRID, expData string, ch chan *nats. } select { case msg := <-ch: - if expData != string(msg.Data) { - t.Errorf("Expected %q ,received %q", expData, string(msg.Data)) + if expData != msg { + t.Errorf("Expected %q ,received %q", expData, msg) } case <-time.After(10 * time.Second): t.Fatal("Timeout2") @@ -263,49 +177,44 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) { } defer nc.Drain() - js, err := nc.JetStream() + js, err := jetstream.New(nc) if err != nil { t.Fatal(err) } - for name := range js.StreamNames() { - if name == "test" { - if err = js.DeleteStream("test"); err != nil { - t.Fatal(err) - } - break - } - if name == "test2" { - if err = js.DeleteStream("test2"); err != nil { - t.Fatal(err) - } - break - } - } - if _, err = js.AddStream(&nats.StreamConfig{ + + _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ Name: "test", Subjects: []string{utils.DefaultQueueID}, - }); err != nil { + }) + if err != nil { t.Fatal(err) } + defer js.DeleteStream(context.Background(), "test") - if err = js.PurgeStream("test"); err != nil { - t.Fatal(err) - } - - if _, err = js.AddStream(&nats.StreamConfig{ + _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ Name: "test2", Subjects: []string{"processed_cdrs"}, - }); err != nil { + }) + if err != nil { + t.Fatal(err) + } + defer js.DeleteStream(context.Background(), "test2") + + ch := make(chan string, 3) + var cons jetstream.Consumer + cons, err = js.CreateOrUpdateConsumer(context.Background(), "test2", jetstream.ConsumerConfig{ + FilterSubject: "processed_cdrs", + Durable: "test4", + AckPolicy: jetstream.AckAllPolicy, + }) + if err != nil { + nc.Drain() t.Fatal(err) } - if err = js.PurgeStream("test2"); err != nil { - t.Fatal(err) - } - ch := make(chan *nats.Msg, 3) - _, err = js.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) { - ch <- msg - }, nats.Durable("test4")) + _, err = cons.Consume(func(msg jetstream.Msg) { + ch <- string(msg.Data()) + }) if err != nil { t.Fatal(err) } @@ -317,7 +226,7 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) { for i := 0; i < 3; i++ { randomCGRID := utils.UUIDSha1Prefix() expData := fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID) - if _, err = js.Publish(utils.DefaultQueueID, []byte(expData)); err != nil { + if _, err = js.Publish(context.Background(), utils.DefaultQueueID, []byte(expData)); err != nil { t.Fatal(err) } @@ -348,8 +257,10 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) { if err != nil { t.Fatal(err) } - ch := make(chan *nats.Msg, 3) - _, err = nc.ChanQueueSubscribe("processed_cdrs", "test3", ch) + ch := make(chan string, 3) + _, err = nc.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) { + ch <- string(msg.Data) + }) if err != nil { t.Fatal(err) } @@ -374,16 +285,16 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) { } func TestNatsERJetStream(t *testing.T) { - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "-js") - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + JetStream: true, + }) + if err != nil { + t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ "ers": { // EventReaderService @@ -405,6 +316,7 @@ func TestNatsERJetStream(t *testing.T) { ], "opts": { "natsJetStream": true, + "natsStreamName": "test", "natsJetStreamProcessed": true, "natsSubjectProcessed": "processed_cdrs", } @@ -422,16 +334,15 @@ func TestNatsERJetStream(t *testing.T) { } func TestNatsER(t *testing.T) { - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server") - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + }) + if err != nil { + t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ "ers": { // EventReaderService @@ -468,16 +379,18 @@ func TestNatsER(t *testing.T) { } func TestNatsERJetStreamUser(t *testing.T) { - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "-js", "--user", "user", "--pass", "password") - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + JetStream: true, + Username: "user", + Password: "password", + }) + if err != nil { + t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ "ers": { // EventReaderService @@ -499,6 +412,7 @@ func TestNatsERJetStreamUser(t *testing.T) { ], "opts": { "natsJetStream": true, + "natsStreamName": "test", "natsJetStreamProcessed": true, "natsSubjectProcessed": "processed_cdrs", } @@ -516,16 +430,17 @@ func TestNatsERJetStreamUser(t *testing.T) { } func TestNatsERUser(t *testing.T) { - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "--user", "user", "--pass", "password") - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + Username: "user", + Password: "password", + }) + if err != nil { + t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ "ers": { // EventReaderService @@ -562,16 +477,17 @@ func TestNatsERUser(t *testing.T) { } func TestNatsERJetStreamToken(t *testing.T) { - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "-js", "--auth", "token") - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + JetStream: true, + Authorization: "token", + }) + if err != nil { + t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ "ers": { // EventReaderService @@ -593,6 +509,7 @@ func TestNatsERJetStreamToken(t *testing.T) { ], "opts": { "natsJetStream": true, + "natsStreamName": "test", "natsJetStreamProcessed": true, "natsSubjectProcessed": "processed_cdrs", } @@ -610,16 +527,17 @@ func TestNatsERJetStreamToken(t *testing.T) { } func TestNatsERToken(t *testing.T) { - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "--auth", "token") - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + JetStream: true, + Authorization: "token", + }) + if err != nil { + t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ "ers": { // EventReaderService @@ -667,25 +585,21 @@ func TestNatsERNkey(t *testing.T) { if err := os.WriteFile(seedFilePath, []byte("SUAOUIE5CU47NCO22GHFEZXGCRCJDVTHDLMIP4L7UQNCR5SW4FZICI7O3Q"), 0664); err != nil { t.Fatal(err) } - natsCfgPath := path.Join(basePath, "nats.cfg") - if err := os.WriteFile(natsCfgPath, []byte(`authorization: { - users: [ - { nkey: UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY } - ] - } -`), 0664); err != nil { + + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + Nkeys: []*server.NkeyUser{ + { + Nkey: "UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY", + }, + }, + }) + if err != nil { t.Fatal(err) } - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "-c", natsCfgPath) - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed - } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{ "ers": { // EventReaderService @@ -735,25 +649,22 @@ func TestNatsERJetStreamNKey(t *testing.T) { if err := os.WriteFile(seedFilePath, []byte("SUAOUIE5CU47NCO22GHFEZXGCRCJDVTHDLMIP4L7UQNCR5SW4FZICI7O3Q"), 0664); err != nil { t.Fatal(err) } - natsCfgPath := path.Join(basePath, "nats.cfg") - if err := os.WriteFile(natsCfgPath, []byte(`authorization: { -users: [ - { nkey: UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY } -] -} -`), 0664); err != nil { + + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + JetStream: true, + Nkeys: []*server.NkeyUser{ + { + Nkey: "UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY", + }, + }, + }) + if err != nil { t.Fatal(err) } - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "-c", natsCfgPath, "-js") - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed - } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{ "ers": { // EventReaderService @@ -775,6 +686,7 @@ users: [ ], "opts": { "natsJetStream": true, + "natsStreamName": "test", "natsSeedFile": %q, "natsJetStreamProcessed": true, "natsSubjectProcessed": "processed_cdrs", @@ -831,16 +743,16 @@ resolver_preload: { `), 0664); err != nil { t.Fatal(err) } - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "-c", natsCfgPath) - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + ConfigFile: natsCfgPath, + }) + if err != nil { + t.Fatal(err) } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{ "ers": { // EventReaderService @@ -920,16 +832,18 @@ system_account:AAFIBB6C56ROU5XRVJLJYR3BTGGYK3HJGHEHQV7L7QZMTT3ZRBLHBS7F `), 0664); err != nil { t.Fatal(err) } - // start the nats-server - exec.Command("pkill", "nats-server") - - cmd := exec.Command("nats-server", "-c", natsCfgPath, "-js") - if err := cmd.Start(); err != nil { - t.Fatal(err) // most probably not installed + natsServer, err := server.NewServer(&server.Options{ + Host: "127.0.0.1", + Port: 4222, + ConfigFile: natsCfgPath, + JetStream: true, + }) + if err != nil { + t.Fatal(err) } - time.Sleep(100 * time.Millisecond) - defer cmd.Process.Kill() - // + natsServer.Start() + defer natsServer.Shutdown() + cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{ "ers": { // EventReaderService "enabled": true, // starts the EventReader service: @@ -950,6 +864,7 @@ system_account:AAFIBB6C56ROU5XRVJLJYR3BTGGYK3HJGHEHQV7L7QZMTT3ZRBLHBS7F ], "opts": { "natsJetStream": true, + "natsStreamName": "test", "natsJWTFile": %q, "natsJetStreamProcessed": true, "natsSubjectProcessed": "processed_cdrs", diff --git a/go.mod b/go.mod index 81d7a2643..904a330e0 100644 --- a/go.mod +++ b/go.mod @@ -39,13 +39,14 @@ require ( github.com/mediocregopher/radix/v3 v3.8.1 github.com/miekg/dns v1.1.54 github.com/mitchellh/mapstructure v1.4.0 + github.com/nats-io/nats-server/v2 v2.10.1 github.com/nats-io/nats.go v1.30.0 github.com/nyaruka/phonenumbers v1.0.75 github.com/peterh/liner v1.2.1 github.com/rabbitmq/amqp091-go v1.5.0 github.com/segmentio/kafka-go v0.4.8 go.mongodb.org/mongo-driver v1.11.0 - golang.org/x/crypto v0.6.0 + golang.org/x/crypto v0.13.0 golang.org/x/net v0.10.0 golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5 google.golang.org/api v0.36.0 @@ -93,9 +94,10 @@ require ( github.com/kr/pretty v0.2.1 // indirect github.com/lib/pq v1.8.0 // indirect github.com/mattn/go-runewidth v0.0.10 // indirect + github.com/minio/highwayhash v1.0.2 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/mschoch/smat v0.2.0 // indirect - github.com/nats-io/nats-server/v2 v2.2.6 // indirect + github.com/nats-io/jwt/v2 v2.5.2 // indirect github.com/nats-io/nkeys v0.4.5 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/philhofer/fwd v1.1.1 // indirect @@ -115,8 +117,9 @@ require ( go.opencensus.io v0.22.5 // indirect golang.org/x/mod v0.10.0 // indirect golang.org/x/sync v0.2.0 // indirect - golang.org/x/sys v0.8.0 // indirect - golang.org/x/text v0.9.0 // indirect + golang.org/x/sys v0.12.0 // indirect + golang.org/x/text v0.13.0 // indirect + golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.9.3 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index c9e5ffa54..15c78e061 100644 --- a/go.sum +++ b/go.sum @@ -317,7 +317,6 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= -github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= @@ -355,8 +354,8 @@ github.com/mediocregopher/radix/v3 v3.8.1 h1:rOkHflVuulFKlwsLY01/M2cM2tWCjDoETcM github.com/mediocregopher/radix/v3 v3.8.1/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/miekg/dns v1.1.54 h1:5jon9mWcb0sFJGpnI99tOMhCPyJ+RPVz5b63MQG0VWI= github.com/miekg/dns v1.1.54/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY= -github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= -github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.4.0 h1:7ks8ZkOP5/ujthUsT07rNv+nkLXCQWKNHuwzOAesEks= @@ -366,17 +365,12 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= -github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= -github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= -github.com/nats-io/jwt/v2 v2.0.2 h1:ejVCLO8gu6/4bOKIHQpmB5UhhUJfAQw55yvLWpfmKjI= -github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= -github.com/nats-io/nats-server/v2 v2.2.6 h1:FPK9wWx9pagxcw14s8W9rlfzfyHm61uNLnJyybZbn48= -github.com/nats-io/nats-server/v2 v2.2.6/go.mod h1:sEnFaxqe09cDmfMgACxZbziXnhQFhwk+aKkZjBBRYrI= -github.com/nats-io/nats.go v1.11.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= +github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= +github.com/nats-io/nats-server/v2 v2.10.1 h1:MIJ614dhOIdo71iSzY8ln78miXwrYvlvXHUyS+XdKZQ= +github.com/nats-io/nats-server/v2 v2.10.1/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto= github.com/nats-io/nats.go v1.30.0 h1:bj/rVsRCrFXxmm9mJiDhb74UKl2HhKpDwKRBtvCjZjc= github.com/nats-io/nats.go v1.30.0/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM= -github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= -github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -515,10 +509,9 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200320181102-891825fb96df/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= -golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= +golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -529,6 +522,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -588,7 +583,6 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= @@ -659,8 +653,8 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -670,13 +664,13 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= -golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/utils/consts.go b/utils/consts.go index 9682c8e40..d6c675958 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2670,6 +2670,7 @@ const ( NatsSubject = "natsSubject" NatsQueueID = "natsQueueID" NatsConsumerName = "natsConsumerName" + NatsStreamName = "natsStreamName" NatsJWTFile = "natsJWTFile" NatsSeedFile = "natsSeedFile" NatsClientCertificate = "natsClientCertificate"