From 619a1efa505c26ce75dc85ce04230ee1995b6aab Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Fri, 15 Mar 2024 13:49:41 -0400 Subject: [PATCH] Revise ers integration tests They also do not depend on nats server dependency anymore. --- ees/nats_it_test.go | 35 +- ers/nats_it_test.go | 1191 +++++++++++++------------------------------ go.mod | 4 - go.sum | 9 - 4 files changed, 372 insertions(+), 867 deletions(-) diff --git a/ees/nats_it_test.go b/ees/nats_it_test.go index 092beb11b..bd5344f8d 100644 --- a/ees/nats_it_test.go +++ b/ees/nats_it_test.go @@ -23,6 +23,7 @@ package ees import ( "os" + "os/exec" "path" "testing" "time" @@ -31,23 +32,18 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) func TestNatsEEJetStream(t *testing.T) { - - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - JetStream: true, - }) - if err != nil { - t.Fatal(err) + exec.Command("pkill", "nats-server") + cmd := exec.Command("nats-server", "-js") + if err := cmd.Start(); err != nil { + t.Fatal(err) // most probably not installed } - natsServer.Start() - defer natsServer.Shutdown() + time.Sleep(50 * time.Millisecond) + defer cmd.Process.Kill() testCreateDirectory(t) cgrCfg, err := config.NewCGRConfigFromPath(path.Join(*dataDir, "conf", "samples", "ees")) @@ -132,18 +128,15 @@ func TestNatsEEJetStream(t *testing.T) { } func TestNatsEE(t *testing.T) { - testCreateDirectory(t) - - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - }) - if err != nil { - t.Fatal(err) + exec.Command("pkill", "nats-server") + cmd := exec.Command("nats-server") + if err := cmd.Start(); err != nil { + t.Fatal(err) // most probably not installed } - natsServer.Start() - defer natsServer.Shutdown() + time.Sleep(50 * time.Millisecond) + defer cmd.Process.Kill() + testCreateDirectory(t) cgrCfg, err := config.NewCGRConfigFromPath(path.Join(*dataDir, "conf", "samples", "ees")) if err != nil { t.Fatal(err) diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go index d87cc88a6..e0a887977 100644 --- a/ers/nats_it_test.go +++ b/ers/nats_it_test.go @@ -25,9 +25,9 @@ import ( "encoding/json" "fmt" "os" + "os/exec" "path" - "reflect" - "runtime" + "path/filepath" "testing" "time" @@ -35,28 +35,32 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) -func TestERsNATSIT(t *testing.T) { +func TestNatsConcurrentReaders(t *testing.T) { + switch *dbType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + cfgPath := path.Join(*dataDir, "conf", "samples", "ers_nats") cfg, err := config.NewCGRConfigFromPath(cfgPath) if err != nil { t.Fatal("could not init cfg", err.Error()) } - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - JetStream: true, - }) - if err != nil { - t.Fatal(err) + exec.Command("pkill", "nats-server") + cmd := exec.Command("nats-server", "-js") + if err := cmd.Start(); err != nil { + t.Fatal(err) // most probably not installed } - natsServer.Start() - defer natsServer.Shutdown() + time.Sleep(50 * time.Millisecond) + defer cmd.Process.Kill() // Establish a connection to nats. nc, err := nats.Connect(cfg.ERsCfg().Readers[1].SourcePath) @@ -70,10 +74,13 @@ func TestERsNATSIT(t *testing.T) { if err != nil { t.Fatal(err) } - js.CreateStream(context.Background(), jetstream.StreamConfig{ + if _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ Name: "stream", Subjects: []string{"cgrates_cdrs", "cgrates_cdrs_processed"}, - }) + }); err != nil { + t.Fatal(err) + } + defer js.DeleteStream(context.Background(), "stream") // Start the engine. if _, err := engine.StopStartEngine(cfgPath, 100); err != nil { @@ -120,715 +127,87 @@ func TestERsNATSIT(t *testing.T) { t.Errorf("expected %d pending messages, received %d", 10, info.NumPending) } - js.DeleteStream(context.Background(), "stream") - } -func testCheckNatsData(t *testing.T, randomCGRID, expData string, ch chan string) { - select { - case err := <-rdrErr: - t.Fatal(err) - case ev := <-rdrEvents: - if ev.rdrCfg.ID != "nats" { - t.Fatalf("Expected 'nats' received `%s`", ev.rdrCfg.ID) +var natsCfg string = `{ + +"general": { + "node_id": "nats_test", + "log_level": 7 +}, + +"data_db": { + "db_type": "*internal" +}, + +"stor_db": { + "db_type": "*internal" +}, + +"ees": { + "enabled": true, + "exporters": [ + { + "id": "nats_processed", + "type": "*virt", + "fields": [ + {"tag": "CGRID", "type": "*variable", "value": "~*req.CGRID", "path": "*uch.CGRID"} + ] } - expected := &utils.CGREvent{ - Tenant: "cgrates.org", - ID: ev.cgrEvent.ID, - Time: ev.cgrEvent.Time, - Event: map[string]any{ - "CGRID": randomCGRID, + ] +}, + +"ers": { + "enabled": true, + "sessions_conns":[], + "ees_conns": ["*internal"], + "readers": [ + { + "id": "nats_reader1", + "type": "*nats_json_map", + "source_path": "%s", + "ees_success_ids": ["nats_processed"], + "flags": ["*dryrun"], + "opts": { + %s }, - APIOpts: map[string]any{}, - } - if !reflect.DeepEqual(ev.cgrEvent, expected) { - t.Fatalf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) - } - select { - case msg := <-ch: - if expData != msg { - t.Errorf("Expected %q ,received %q", expData, msg) - } - case <-time.After(10 * time.Second): - t.Fatal("Timeout2") - } - case <-time.After(10 * time.Second): - t.Fatal("Timeout") - } -} - -func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) { - rdrEvents = make(chan *erEvent, 1) - rdrErr = make(chan error, 1) - rdrExit = make(chan struct{}, 1) - var err error - if rdr, err = NewNatsER(cfg, 1, rdrEvents, make(chan *erEvent, 1), - rdrErr, new(engine.FilterS), rdrExit); err != nil { - t.Fatal(err) - } - - nop, err := GetNatsOpts(rdr.Config().Opts.NATS, "testExp", time.Second) - if err != nil { - t.Fatal(err) - } - nc, err := nats.Connect(rdr.Config().SourcePath, nop...) - if err != nil { - t.Fatal(err) - } - defer nc.Drain() - - js, err := jetstream.New(nc) - if err != nil { - t.Fatal(err) - } - - _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ - Name: "test", - Subjects: []string{utils.DefaultQueueID}, - }) - if err != nil { - t.Fatal(err) - } - defer js.DeleteStream(context.Background(), "test") - - _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ - Name: "test2", - Subjects: []string{"processed_cdrs"}, - }) - if err != nil { - t.Fatal(err) - } - defer js.DeleteStream(context.Background(), "test2") - - ch := make(chan string, 3) - var cons jetstream.Consumer - cons, err = js.CreateOrUpdateConsumer(context.Background(), "test2", jetstream.ConsumerConfig{ - FilterSubject: "processed_cdrs", - Durable: "test4", - AckPolicy: jetstream.AckAllPolicy, - }) - if err != nil { - nc.Drain() - t.Fatal(err) - } - - _, err = cons.Consume(func(msg jetstream.Msg) { - ch <- string(msg.Data()) - }) - if err != nil { - t.Fatal(err) - } - - go rdr.Serve() - runtime.Gosched() - time.Sleep(10 * time.Nanosecond) - - for i := 0; i < 3; i++ { - randomCGRID := utils.UUIDSha1Prefix() - expData := fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID) - if _, err = js.Publish(context.Background(), utils.DefaultQueueID, []byte(expData)); err != nil { - t.Fatal(err) - } - - nc.FlushTimeout(time.Second) - nc.Flush() - - testCheckNatsData(t, randomCGRID, expData, ch) - } - close(rdrExit) -} - -func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) { - rdrEvents = make(chan *erEvent, 1) - rdrErr = make(chan error, 1) - rdrExit = make(chan struct{}, 1) - - var err error - if rdr, err = NewNatsER(cfg, 1, rdrEvents, make(chan *erEvent, 1), - rdrErr, new(engine.FilterS), rdrExit); err != nil { - t.Fatal(err) - } - - nop, err := GetNatsOpts(rdr.Config().Opts.NATS, "testExp", time.Second) - if err != nil { - t.Fatal(err) - } - nc, err := nats.Connect(rdr.Config().SourcePath, nop...) - if err != nil { - t.Fatal(err) - } - ch := make(chan string, 3) - _, err = nc.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) { - ch <- string(msg.Data) - }) - if err != nil { - t.Fatal(err) - } - - defer nc.Drain() - go rdr.Serve() - runtime.Gosched() - time.Sleep(100 * time.Millisecond) - for i := 0; i < 3; i++ { - randomCGRID := utils.UUIDSha1Prefix() - expData := fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID) - if err = nc.Publish(utils.DefaultQueueID, []byte(expData)); err != nil { - t.Fatal(err) - } - - nc.FlushTimeout(time.Second) - nc.Flush() - - testCheckNatsData(t, randomCGRID, expData, ch) - } - close(rdrExit) -} - -func TestNatsERJetStream(t *testing.T) { - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - JetStream: true, - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ -"ees": { - "enabled": true, - "exporters": [ - { - "id": "nats_processed", - "type": "*nats_json_map", - "export_path": "nats://localhost:4222", - "attempts": 1, - "opts": { - "natsJetStream": true, - "natsSubject": "processed_cdrs" - } + "fields":[ + {"tag": "CGRID", "type": "*variable", "value": "~*req.CGRID", "path": "*cgreq.CGRID"} + ] } ] -}, -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "ees_conns": ["*internal"], - "readers": [ - { - "id": "nats", - "type": "*nats_json_map", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://localhost:4222", - "processed_path": "", - "ees_success_ids": ["nats_processed"], - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, - ], - "opts": { - "natsJetStream": true, - "natsStreamName": "test", - } - }, - ], -}, -}`) - if err != nil { - t.Fatal(err) - } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) - } - testCheckNatsJetStream(t, cfg) } + +}` -func TestNatsER(t *testing.T) { - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ -"ees": { - "enabled": true, - "exporters": [ - { - "id": "nats_processed", - "type": "*nats_json_map", - "export_path": "nats://localhost:4222", - "attempts": 1, - "opts": { - "natsSubject": "processed_cdrs" - } - } - ] -}, -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "ees_conns": ["*internal"], - "readers": [ - { - "id": "nats", - "type": "*nats_json_map", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://localhost:4222", - "processed_path": "", - "ees_success_ids": ["nats_processed"], - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, - ], - }, - ], -}, -}`) - if err != nil { - t.Fatal(err) - } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) - } - testCheckNatsNormal(t, cfg) -} - -func TestNatsERJetStreamUser(t *testing.T) { - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - JetStream: true, - Username: "user", - Password: "password", - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ -"ees": { - "enabled": true, - "exporters": [ - { - "id": "nats_processed", - "type": "*nats_json_map", - "export_path": "nats://localhost:4222", - "attempts": 1, - "opts": { - "natsJetStream": true, - "natsSubject": "processed_cdrs" - } - } - ] -}, -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "ees_conns": ["*internal"], - "readers": [ - { - "id": "nats", - "type": "*nats_json_map", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://user:password@localhost:4222", - "processed_path": "", - "ees_success_ids": ["nats_processed"], - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, - ], - "opts": { - "natsJetStream": true, - "natsStreamName": "test", - } - }, - ], -}, -}`) - if err != nil { - t.Fatal(err) - } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) - } - testCheckNatsJetStream(t, cfg) -} - -func TestNatsERUser(t *testing.T) { - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - Username: "user", - Password: "password", - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ -"ees": { - "enabled": true, - "exporters": [ - { - "id": "nats_processed", - "type": "*nats_json_map", - "export_path": "nats://localhost:4222", - "attempts": 1, - "opts": { - "natsSubject": "processed_cdrs" - } - } - ] -}, -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "ees_conns": ["*internal"], - "readers": [ - { - "id": "nats", - "type": "*nats_json_map", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://user:password@localhost:4222", - "processed_path": "", - "ees_success_ids": ["nats_processed"], - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, - ], - }, - ], -}, -}`) - if err != nil { - t.Fatal(err) - } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) - } - testCheckNatsNormal(t, cfg) -} - -func TestNatsERJetStreamToken(t *testing.T) { - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - JetStream: true, - Authorization: "token", - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ -"ees": { - "enabled": true, - "exporters": [ - { - "id": "nats_processed", - "type": "*nats_json_map", - "export_path": "nats://localhost:4222", - "attempts": 1, - "opts": { - "natsJetStream": true, - "natsSubject": "processed_cdrs" - } - } - ] -}, -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "ees_conns": ["*internal"], - "readers": [ - { - "id": "nats", - "type": "*nats_json_map", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://token@localhost:4222", - "processed_path": "", - "ees_success_ids": ["nats_processed"], - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, - ], - "opts": { - "natsJetStream": true, - "natsStreamName": "test", - } - }, - ], -}, -}`) - if err != nil { - t.Fatal(err) - } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) - } - testCheckNatsJetStream(t, cfg) -} - -func TestNatsERToken(t *testing.T) { - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - JetStream: true, - Authorization: "token", - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ -"ees": { - "enabled": true, - "exporters": [ - { - "id": "nats_processed", - "type": "*nats_json_map", - "export_path": "nats://localhost:4222", - "attempts": 1, - "opts": { - "natsSubject": "processed_cdrs" - } - } - ] -}, -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "ees_conns": ["*internal"], - "readers": [ - { - "id": "nats", - "type": "*nats_json_map", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://token@localhost:4222", - "processed_path": "", - "ees_success_ids": ["nats_processed"], - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, - ], - }, - ], -}, -}`) - if err != nil { - t.Fatal(err) - } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) - } - testCheckNatsNormal(t, cfg) -} - -func TestNatsERNkey(t *testing.T) { - // prepare - basePath := "/tmp/natsCfg" - if err := os.MkdirAll(basePath, 0755); err != nil { - t.Fatal(err) +func TestNatsNormalTT(t *testing.T) { + switch *dbType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") } - defer os.RemoveAll(basePath) - seedFilePath := path.Join(basePath, "seed.txt") + // SeedFile setup + baseNKeyPath := t.TempDir() + seedFilePath := path.Join(baseNKeyPath, "seed.txt") if err := os.WriteFile(seedFilePath, []byte("SUAOUIE5CU47NCO22GHFEZXGCRCJDVTHDLMIP4L7UQNCR5SW4FZICI7O3Q"), 0664); err != nil { t.Fatal(err) } - - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - Nkeys: []*server.NkeyUser{ - { - Nkey: "UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY", - }, - }, - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{ -"ees": { - "enabled": true, - "exporters": [ - { - "id": "nats_processed", - "type": "*nats_json_map", - "export_path": "nats://localhost:4222", - "attempts": 1, - "opts": { - "natsSubject": "processed_cdrs", - "natsSeedFile": %q - } - } - ] -}, -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "ees_conns": ["*internal"], - "readers": [ - { - "id": "nats", - "type": "*nats_json_map", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://localhost:4222", - "processed_path": "", - "ees_success_ids": ["nats_processed"], - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, - ], - "opts": { - "natsSeedFile": %q - } - }, - ], -}, -}`, seedFilePath, seedFilePath)) - if err != nil { - t.Fatal(err) - } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) - } - testCheckNatsNormal(t, cfg) -} - -func TestNatsERJetStreamNKey(t *testing.T) { - // prepare - basePath := "/tmp/natsCfg" - if err := os.MkdirAll(basePath, 0755); err != nil { + nkeyCfgPath := path.Join(baseNKeyPath, "nats.cfg") + if err := os.WriteFile(nkeyCfgPath, []byte(`authorization: { + users: [ + { nkey: UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY } + ] + }`), 0664); err != nil { t.Fatal(err) } - defer os.RemoveAll(basePath) - seedFilePath := path.Join(basePath, "seed.txt") - if err := os.WriteFile(seedFilePath, []byte("SUAOUIE5CU47NCO22GHFEZXGCRCJDVTHDLMIP4L7UQNCR5SW4FZICI7O3Q"), 0664); err != nil { - t.Fatal(err) - } - - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - JetStream: true, - Nkeys: []*server.NkeyUser{ - { - Nkey: "UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY", - }, - }, - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{ -"ees": { - "enabled": true, - "exporters": [ - { - "id": "nats_processed", - "type": "*nats_json_map", - "export_path": "nats://localhost:4222", - "attempts": 1, - "opts": { - "natsJetStream": true, - "natsSubject": "processed_cdrs", - "natsSeedFile": %q - } - } - ] -}, -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "ees_conns": ["*internal"], - "readers": [ - { - "id": "nats", - "type": "*nats_json_map", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://localhost:4222", - "processed_path": "", - "ees_success_ids": ["nats_processed"], - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, - ], - "opts": { - "natsJetStream": true, - "natsStreamName": "test", - "natsSeedFile": %q - } - }, - ], -}, -}`, seedFilePath, seedFilePath)) - if err != nil { - t.Fatal(err) - } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) - } - testCheckNatsJetStream(t, cfg) -} - -func TestNatsERJWT(t *testing.T) { - // prepare - basePath := "/tmp/natsCfg" - if err := os.MkdirAll(basePath, 0755); err != nil { - t.Fatal(err) - } - - defer os.RemoveAll(basePath) - seedFilePath := path.Join(basePath, "jwt.txt") - if err := os.WriteFile(seedFilePath, []byte(`-----BEGIN NATS USER JWT----- + // JWTFile setup + baseJWTPath := t.TempDir() + jwtFilePath := path.Join(baseJWTPath, "jwt.txt") + if err := os.WriteFile(jwtFilePath, []byte(`-----BEGIN NATS USER JWT----- eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJETFRGNFpLVVdNNFRPRkVNQko0UEUzTVFFVlhIUkJJN0xZNDdZNEMzNTNMWlJKSU5CUkJRIiwiaWF0IjoxNjI0ODg1NzMwLCJpc3MiOiJBQVlKRFZMWkdXTjdZM0ZCUENWWENSVlFaREZNWUdIVTRZWExHU1hYN1UyNTRLSDVTQzNSNVFLTSIsIm5hbWUiOiJ1c2VyIiwic3ViIjoiVUQzQkdXSUJQVTNDV0w0SE9VVTRWSkVRV1RVQVNBRUc2T0ozWEhNM0VFSk5BMlBBM1VWTllUWk4iLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e30sInN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsInR5cGUiOiJ1c2VyIiwidmVyc2lvbiI6Mn19.YmFL5nRMkEOXe77sQJPPRv_vwi89tzhVVl0AVjE4sXWyoWIHiCepNw28DbpJ0p_MlT8Qf0SY2cjAhIm-Qi7lDw ------END NATS USER JWT------ @@ -843,8 +222,8 @@ SUADIH32XQYWC2MI2YGM4AUQ3NMKZSZ5V2BZXQ237XXMLO7FFHDF5CTUDE *************************************************************`), 0664); err != nil { t.Fatal(err) } - natsCfgPath := path.Join(basePath, "nats.cfg") - if err := os.WriteFile(natsCfgPath, []byte(`// Operator "memory" + jwtCfgPath := path.Join(baseJWTPath, "nats.cfg") + if err := os.WriteFile(jwtCfgPath, []byte(`// Operator "memory" operator: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJFRk5ERUdSNU1aUEw1VElQTFVKMlNMTFdZV0VDU0NJSEhVU1lISE5IR1BZVUpaWE5XUlNRIiwiaWF0IjoxNjI0ODc1NzYwLCJpc3MiOiJPQ0VSUlQ2WFNEQ1dBWTNFWVNTTjQ2UUxGQko3RFJHNTIzU1hIMkg0UjQ3WFZVWFYyUlJCSVNMSyIsIm5hbWUiOiJtZW1vcnkiLCJzdWIiOiJPQ0VSUlQ2WFNEQ1dBWTNFWVNTTjQ2UUxGQko3RFJHNTIzU1hIMkg0UjQ3WFZVWFYyUlJCSVNMSyIsIm5hdHMiOnsidHlwZSI6Im9wZXJhdG9yIiwidmVyc2lvbiI6Mn19.MZfwcw5j6zY8SfFQppGIa3VjYYZK2_n1kV16Nk5jVCgwS8dKWzRQK_XjFYWwQ15Cq9YY73jcTA6LO0DmQGsdBA resolver: MEMORY @@ -857,78 +236,155 @@ resolver_preload: { `), 0664); err != nil { t.Fatal(err) } - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - ConfigFile: natsCfgPath, - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{ -"ees": { - "enabled": true, - "exporters": [ + testcases := []struct { + name string + serverFlags []string + sourcePath string + readerOpts string + }{ { - "id": "nats_processed", - "type": "*nats_json_map", - "export_path": "nats://localhost:4222", - "attempts": 1, - "opts": { - "natsSubject": "processed_cdrs", - "natsJWTFile": %q - } - } - ] -}, -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "ees_conns": ["*internal"], - "readers": [ - { - "id": "nats", - "type": "*nats_json_map", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://localhost:4222", - "processed_path": "", - "ees_success_ids": ["nats_processed"], - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, - ], - "opts": { - "natsJWTFile": %q, - } + name: "NoAuth", + serverFlags: []string{}, + sourcePath: "nats://127.0.0.1:4222", + readerOpts: "", + }, + { + name: "UsernameAndPassword", + serverFlags: []string{"--user", "user", "--pass", "password"}, + sourcePath: "nats://user:password@127.0.0.1:4222", + readerOpts: "", + }, + { + name: "TokenAuth", + serverFlags: []string{"--auth", "token"}, + sourcePath: "nats://token@127.0.0.1:4222", + readerOpts: "", + }, + { + name: "NkeyAuth", + serverFlags: []string{"-c", nkeyCfgPath}, + sourcePath: "nats://127.0.0.1:4222", + readerOpts: fmt.Sprintf( + `"natsSeedFile": "%s"`, + seedFilePath, + ), + }, + { + name: "JWTAuth", + serverFlags: []string{"-c", jwtCfgPath}, + sourcePath: "nats://127.0.0.1:4222", + readerOpts: fmt.Sprintf( + `"natsJWTFile": "%s"`, + jwtFilePath, + ), }, - ], -}, -}`, seedFilePath, seedFilePath)) - if err != nil { - t.Fatal(err) } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + exec.Command("pkill", "nats-server") + cmd := exec.Command("nats-server", tc.serverFlags...) + if err := cmd.Start(); err != nil { + t.Fatal(err) // most probably not installed + } + time.Sleep(50 * time.Millisecond) + defer cmd.Process.Kill() + + configJSON := fmt.Sprintf(natsCfg, tc.sourcePath, tc.readerOpts) + cfgPath := t.TempDir() + filePath := filepath.Join(cfgPath, "cgrates.json") + err := os.WriteFile(filePath, []byte(configJSON), 0644) + if err != nil { + t.Fatal(err) + } + cfg, err := config.NewCGRConfigFromPath(cfgPath) + if err != nil { + t.Fatal(err) + } + + rdrCfgOpts := cfg.ERsCfg().Readers[1].Opts.NATS + nop, err := GetNatsOpts(rdrCfgOpts, cfg.GeneralCfg().NodeID, time.Second) + if err != nil { + t.Fatal(err) + } + + // Establish a connection to nats. + nc, err := nats.Connect(tc.sourcePath, nop...) + if err != nil { + t.Fatal(err) + } + + if _, err = engine.StartEngine(cfgPath, *waitRater); err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + engine.KillEngine(*waitRater) + nc.Close() + }) + + client, err := newRPCClient(cfg.ListenCfg()) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 3; i++ { + randomCGRID := utils.UUIDSha1Prefix() + expData := fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID) + if err = nc.Publish("cgrates_cdrs", []byte(expData)); err != nil { + t.Error(err) + } + + time.Sleep(20 * time.Millisecond) // wait for exports + + var cgrID any + if err = client.Call(context.Background(), utils.CacheSv1GetItem, &utils.ArgsGetCacheItemWithAPIOpts{ + Tenant: "cgrates.org", + ArgsGetCacheItem: utils.ArgsGetCacheItem{ + CacheID: utils.CacheUCH, + ItemID: "CGRID", + }, + }, &cgrID); err != nil { + t.Error(err) + } else if cgrID != randomCGRID { + t.Errorf("expected %v, received %v", randomCGRID, cgrID) + } + } + }) } - testCheckNatsNormal(t, cfg) } -func TestNatsERJetStreamJWT(t *testing.T) { - // prepare - basePath := "/tmp/natsCfg" - if err := os.MkdirAll(basePath, 0755); err != nil { +func TestNatsJetStreamTT(t *testing.T) { + switch *dbType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + + // SeedFile setup + baseNKeyPath := t.TempDir() + seedFilePath := path.Join(baseNKeyPath, "seed.txt") + if err := os.WriteFile(seedFilePath, []byte("SUAOUIE5CU47NCO22GHFEZXGCRCJDVTHDLMIP4L7UQNCR5SW4FZICI7O3Q"), 0664); err != nil { + t.Fatal(err) + } + nkeyCfgPath := path.Join(baseNKeyPath, "nats.cfg") + if err := os.WriteFile(nkeyCfgPath, []byte(`authorization: { + users: [ + { nkey: UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY } + ] + }`), 0664); err != nil { t.Fatal(err) } - defer os.RemoveAll(basePath) - seedFilePath := path.Join(basePath, "jwt.txt") - if err := os.WriteFile(seedFilePath, []byte(`-----BEGIN NATS USER JWT----- + // JWTFile setup + baseJWTPath := t.TempDir() + // baseJWTPath := "/tmp/natsCfg3" + // os.Mkdir("/tmp/natsCfg3", 0644) + jwtFilePath := path.Join(baseJWTPath, "jwt.txt") + if err := os.WriteFile(jwtFilePath, []byte(`-----BEGIN NATS USER JWT----- eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJXTUUyUkhMWEU1R0FZS0hITk5aNkhLSDQ2Q0VSUFNEUExPN1BMT0ZEWTZaNUdZM09aVkFRIiwiaWF0IjoxNjI0OTUzNTE5LCJpc3MiOiJBQVlKRFZMWkdXTjdZM0ZCUENWWENSVlFaREZNWUdIVTRZWExHU1hYN1UyNTRLSDVTQzNSNVFLTSIsIm5hbWUiOiJ1c2VyMiIsInN1YiI6IlVERVJVRElMMlZORVNKUzVGNUpDQVJHWUNSUVM1M0NWSUVBSllHU0hFR0dZSEI2R01BQ1AzM1VDIiwibmF0cyI6eyJwdWIiOnt9LCJzdWIiOnt9LCJzdWJzIjotMSwiZGF0YSI6LTEsInBheWxvYWQiOi0xLCJ0eXBlIjoidXNlciIsInZlcnNpb24iOjJ9fQ.YitrKhIlU45Q1m6A_HFsaxDgUUiIyjLJuHNKnG1cjzj5H6n697Iv3pDsIZVTh6pBYROg1aRV42bD3PpEZna2AA ------END NATS USER JWT------ @@ -943,8 +399,8 @@ SUAGM22ETPOJNZGYSGJL3HRLZ6R35FSVOYNINYN5Z5UPLP5K4SQJ753WU4 *************************************************************`), 0664); err != nil { t.Fatal(err) } - natsCfgPath := path.Join(basePath, "nats.cfg") - if err := os.WriteFile(natsCfgPath, []byte(`// Operator "memory" + jwtCfgPath := path.Join(baseJWTPath, "nats.cfg") + if err := os.WriteFile(jwtCfgPath, []byte(`// Operator "memory" operator: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJFRk5ERUdSNU1aUEw1VElQTFVKMlNMTFdZV0VDU0NJSEhVU1lISE5IR1BZVUpaWE5XUlNRIiwiaWF0IjoxNjI0ODc1NzYwLCJpc3MiOiJPQ0VSUlQ2WFNEQ1dBWTNFWVNTTjQ2UUxGQko3RFJHNTIzU1hIMkg0UjQ3WFZVWFYyUlJCSVNMSyIsIm5hbWUiOiJtZW1vcnkiLCJzdWIiOiJPQ0VSUlQ2WFNEQ1dBWTNFWVNTTjQ2UUxGQko3RFJHNTIzU1hIMkg0UjQ3WFZVWFYyUlJCSVNMSyIsIm5hdHMiOnsidHlwZSI6Im9wZXJhdG9yIiwidmVyc2lvbiI6Mn19.MZfwcw5j6zY8SfFQppGIa3VjYYZK2_n1kV16Nk5jVCgwS8dKWzRQK_XjFYWwQ15Cq9YY73jcTA6LO0DmQGsdBA resolver: MEMORY @@ -961,68 +417,137 @@ system_account:AAFIBB6C56ROU5XRVJLJYR3BTGGYK3HJGHEHQV7L7QZMTT3ZRBLHBS7F `), 0664); err != nil { t.Fatal(err) } - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - ConfigFile: natsCfgPath, - JetStream: true, - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{ -"ees": { - "enabled": true, - "exporters": [ + testcases := []struct { + name string + serverFlags []string + sourcePath string + readerOpts string + }{ { - "id": "nats_processed", - "type": "*nats_json_map", - "export_path": "nats://localhost:4222", - "attempts": 1, - "opts": { - "natsJetStream": true, - "natsSubject": "processed_cdrs", - "natsJWTFile": %q - } - } - ] -}, -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "ees_conns": ["*internal"], - "readers": [ - { - "id": "nats", - "type": "*nats_json_map", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://localhost:4222", - "processed_path": "", - "ees_success_ids": ["nats_processed"], - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, - ], - "opts": { - "natsJetStream": true, - "natsStreamName": "test", - "natsJWTFile": %q, - } + name: "NoAuth", + serverFlags: []string{"-js"}, + sourcePath: "nats://127.0.0.1:4222", + readerOpts: ` + "natsJetStream": true, + "natsStreamName": "stream"`, + }, + { + name: "UsernameAndPassword", + serverFlags: []string{"-js", "--user", "user", "--pass", "password"}, + sourcePath: "nats://user:password@127.0.0.1:4222", + readerOpts: ` + "natsJetStream": true, + "natsStreamName": "stream"`, + }, + { + name: "TokenAuth", + serverFlags: []string{"-js", "--auth", "token"}, + sourcePath: "nats://token@127.0.0.1:4222", + readerOpts: ` + "natsJetStream": true, + "natsStreamName": "stream"`, + }, + { + name: "NkeyAuth", + serverFlags: []string{"-js", "-c", nkeyCfgPath}, + sourcePath: "nats://127.0.0.1:4222", + readerOpts: fmt.Sprintf(` + "natsJetStream": true, + "natsStreamName": "stream", + "natsSeedFile": "%s"`, seedFilePath), + }, + { + name: "JWTAuth", + serverFlags: []string{"-js", "-c", jwtCfgPath}, + sourcePath: "nats://127.0.0.1:4222", + readerOpts: fmt.Sprintf(` + "natsJetStream": true, + "natsStreamName": "stream", + "natsJWTFile": "%s"`, jwtFilePath), }, - ], -}, -}`, seedFilePath, seedFilePath)) - if err != nil { - t.Fatal(err) } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + exec.Command("pkill", "nats-server") + cmd := exec.Command("nats-server", tc.serverFlags...) + if err := cmd.Start(); err != nil { + t.Fatal(err) // most probably not installed + } + time.Sleep(50 * time.Millisecond) + defer cmd.Process.Kill() + + configJSON := fmt.Sprintf(natsCfg, tc.sourcePath, tc.readerOpts) + cfgPath := t.TempDir() + filePath := filepath.Join(cfgPath, "cgrates.json") + err := os.WriteFile(filePath, []byte(configJSON), 0644) + if err != nil { + t.Fatal(err) + } + cfg, err := config.NewCGRConfigFromPath(cfgPath) + if err != nil { + t.Fatal(err) + } + + rdrCfgOpts := cfg.ERsCfg().Readers[1].Opts.NATS + nop, err := GetNatsOpts(rdrCfgOpts, cfg.GeneralCfg().NodeID, 2*time.Second) + if err != nil { + t.Fatal(err) + } + + // Establish a connection to nats. + nc, err := nats.Connect(tc.sourcePath, nop...) + if err != nil { + t.Fatal(err) + } + defer nc.Close() + + // Initialize a stream manager and create a stream. + js, err := jetstream.New(nc) + if err != nil { + t.Fatal(err) + } + if _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ + Name: "stream", + Subjects: []string{"cgrates_cdrs", "cgrates_cdrs_processed"}, + }); err != nil { + t.Fatal(err) + } + defer js.DeleteStream(context.Background(), "stream") + + if _, err = engine.StartEngine(cfgPath, *waitRater); err != nil { + t.Fatal(err) + } + defer engine.KillEngine(*waitRater) + + client, err := newRPCClient(cfg.ListenCfg()) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 3; i++ { + randomCGRID := utils.UUIDSha1Prefix() + expData := fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID) + if _, err := js.Publish(context.Background(), "cgrates_cdrs", []byte(expData)); err != nil { + t.Error(err) + } + + time.Sleep(20 * time.Millisecond) // wait for exports + + var cgrID any + if err = client.Call(context.Background(), utils.CacheSv1GetItem, &utils.ArgsGetCacheItemWithAPIOpts{ + Tenant: "cgrates.org", + ArgsGetCacheItem: utils.ArgsGetCacheItem{ + CacheID: utils.CacheUCH, + ItemID: "CGRID", + }, + }, &cgrID); err != nil { + t.Error(err) + } else if cgrID != randomCGRID { + t.Errorf("expected %v, received %v", randomCGRID, cgrID) + } + } + }) } - testCheckNatsJetStream(t, cfg) } diff --git a/go.mod b/go.mod index 9f0bad36b..513015f08 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,6 @@ require ( github.com/mediocregopher/radix/v3 v3.8.1 github.com/miekg/dns v1.1.56 github.com/mitchellh/mapstructure v1.5.0 - github.com/nats-io/nats-server/v2 v2.10.4 github.com/nats-io/nats.go v1.31.0 github.com/nyaruka/phonenumbers v1.1.8 github.com/peterh/liner v1.2.2 @@ -91,10 +90,8 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.17.2 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect - github.com/minio/highwayhash v1.0.2 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/mschoch/smat v0.2.0 // indirect - github.com/nats-io/jwt/v2 v2.5.3 // indirect github.com/nats-io/nkeys v0.4.6 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/philhofer/fwd v1.1.1 // indirect @@ -114,7 +111,6 @@ require ( golang.org/x/sync v0.5.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/time v0.4.0 // indirect golang.org/x/tools v0.14.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/go.sum b/go.sum index 13760a4dc..14e0e01c2 100644 --- a/go.sum +++ b/go.sum @@ -209,8 +209,6 @@ github.com/mediocregopher/radix/v3 v3.8.1 h1:rOkHflVuulFKlwsLY01/M2cM2tWCjDoETcM github.com/mediocregopher/radix/v3 v3.8.1/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/miekg/dns v1.1.56 h1:5imZaSeoRNvpM9SzWNhEcP9QliKiz20/dA2QabIGVnE= github.com/miekg/dns v1.1.56/go.mod h1:cRm6Oo2C8TY9ZS/TqsSrseAcncm74lfK5G+ikN2SWWY= -github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= -github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= @@ -220,10 +218,6 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= -github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo= -github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4= -github.com/nats-io/nats-server/v2 v2.10.4 h1:uB9xcwon3tPXWAdmTJqqqC6cie3yuPWHJjjTBgaPNus= -github.com/nats-io/nats-server/v2 v2.10.4/go.mod h1:eWm2JmHP9Lqm2oemB6/XGi0/GwsZwtWf8HIPUsh+9ns= github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= @@ -370,7 +364,6 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -403,8 +396,6 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/time v0.4.0 h1:Z81tqI5ddIoXDPvVQ7/7CC9TnLM7ubaFG2qXYd5BbYY= -golang.org/x/time v0.4.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=