From edfe2b5ef77857a7f39b1b29b4ca97225aa4f968 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Thu, 19 Sep 2024 17:18:56 +0300 Subject: [PATCH] Revise nats reader integration tests They were failing after the ERs structure changes. Added a commented helper program at the end of the test to help generate credentials for testing nats JWT functionality --- data/conf/samples/ers_nats/cgrates.json | 93 +- ers/nats_it_test.go | 1283 ++++++++++------------- 2 files changed, 612 insertions(+), 764 deletions(-) diff --git a/data/conf/samples/ers_nats/cgrates.json b/data/conf/samples/ers_nats/cgrates.json index 931d00786..515e16b35 100644 --- a/data/conf/samples/ers_nats/cgrates.json +++ b/data/conf/samples/ers_nats/cgrates.json @@ -1,9 +1,4 @@ { - -"logger": { - "level": 7 -}, - "data_db": { "db_type": "*internal" }, @@ -12,58 +7,68 @@ "db_type": "*internal" }, -"ers": { - "enabled": true, - "sessions_conns":[], +"ees": { + "enabled": true, + "exporters": [ + { + "id": "nats_processed", + "type": "*natsJSONMap", + "export_path": "nats://localhost:4222", + "attempts": 1, + "opts": { + "natsJetStream": true, + "natsSubject": "cgrates_cdrs_processed" + } + } + ] +}, + +"ers": { + "enabled": true, + "sessions_conns":[], + "ees_conns": ["*localhost"], "readers": [ { - "id": "nats_reader1", - "type": "*natsJSONMap", - "source_path": "nats://127.0.0.1:4222", - "processed_path": "nats://127.0.0.1:4222", + "id": "nats_reader1", + "type": "*natsJSONMap", + "source_path": "nats://127.0.0.1:4222", + "processed_path": "nats://127.0.0.1:4222", + "ees_success_ids": ["nats_processed"], "opts": { - "natsJetStream": true, - "natsConsumerName": "cgrates", - "natsStreamName": "stream", - "natsSubject": "cgrates_cdrs", - "natsQueueID": "queue", - "natsJetStreamMaxWait": "5s", - - "natsJetStreamProcessed": true, - "natsSubjectProcessed": "cgrates_cdrs_processed", - "natsJetStreamMaxWaitProcessed": "5s" + "natsJetStream": true, + "natsConsumerName": "cgrates", + "natsStreamName": "stream", + "natsSubject": "cgrates_cdrs", + "natsQueueID": "queue", + "natsJetStreamMaxWait": "5s" }, - "flags": ["*dryRun"], - "fields":[ + "flags": ["*dryRun"], + "fields":[ {"tag": "cdr_template", "type": "*template", "value": "cdr_template"} ] }, { - "id": "nats_reader2", - "type": "*natsJSONMap", - "source_path": "nats://127.0.0.1:4222", - "processed_path": "nats://127.0.0.1:4222", + "id": "nats_reader2", + "type": "*natsJSONMap", + "source_path": "nats://127.0.0.1:4222", + "processed_path": "nats://127.0.0.1:4222", + "ees_success_ids": ["nats_processed"], "opts": { - "natsJetStream": true, - "natsConsumerName": "cgrates", - "natsStreamName": "stream", - "natsSubject": "cgrates_cdrs", - "natsQueueID": "queue", - "natsJetStreamMaxWait": "5s", - - "natsJetStreamProcessed": true, - "natsSubjectProcessed": "cgrates_cdrs_processed", - "natsJetStreamMaxWaitProcessed": "5s" + "natsJetStream": true, + "natsConsumerName": "cgrates", + "natsStreamName": "stream", + "natsSubject": "cgrates_cdrs", + "natsQueueID": "queue", + "natsJetStreamMaxWait": "5s" }, - "flags": ["*dryRun"], - "fields":[ + "flags": ["*dryRun"], + "fields":[ {"tag": "cdr_template", "type": "*template", "value": "cdr_template"} ] } ] }, - - + "templates": { "cdr_template": [ {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true}, @@ -71,5 +76,5 @@ {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true} ] } - -} \ No newline at end of file + +} diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go index b0001437a..e8dbd3635 100644 --- a/ers/nats_it_test.go +++ b/ers/nats_it_test.go @@ -1,5 +1,4 @@ //go:build integration -// +build integration /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments @@ -25,39 +24,44 @@ import ( "encoding/json" "fmt" "os" + "os/exec" "path" - "reflect" - "runtime" + "path/filepath" "testing" "time" "github.com/cgrates/birpc/context" + "github.com/cgrates/birpc/jsonrpc" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) -func TestNatsERIT(t *testing.T) { +func TestNatsConcurrentReaders(t *testing.T) { + switch *utils.DBType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + + exec.Command("pkill", "nats-server") + cmd := exec.Command("nats-server", "-js") + if err := cmd.Start(); err != nil { + t.Fatal(err) // most probably not installed + } + time.Sleep(50 * time.Millisecond) + defer cmd.Process.Kill() + cfgPath := path.Join(*utils.DataDir, "conf", "samples", "ers_nats") cfg, err := config.NewCGRConfigFromPath(context.Background(), cfgPath) if err != nil { t.Fatal("could not init cfg", err.Error()) } - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - JetStream: true, - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - // Establish a connection to nats. nc, err := nats.Connect(cfg.ERsCfg().Readers[1].SourcePath) if err != nil { @@ -70,16 +74,19 @@ func TestNatsERIT(t *testing.T) { if err != nil { t.Fatal(err) } - js.CreateStream(context.Background(), jetstream.StreamConfig{ + if _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ Name: "stream", Subjects: []string{"cgrates_cdrs", "cgrates_cdrs_processed"}, - }) - - // Start the engine. - if _, err := engine.StopStartEngine(cfgPath, 100); err != nil { + }); err != nil { t.Fatal(err) } - defer engine.KillEngine(100) + defer js.DeleteStream(context.Background(), "stream") + + // Start the engine. + if _, err := engine.StopStartEngine(cfgPath, 20); err != nil { + t.Fatal(err) + } + defer engine.KillEngine(20) // Publish CDRs asynchronously to the nats subject. cdr := make(map[string]any) @@ -120,609 +127,78 @@ func TestNatsERIT(t *testing.T) { t.Errorf("expected %d pending messages, received %d", 10, info.NumPending) } - js.DeleteStream(context.Background(), "stream") - } -func testCheckNatsData(t *testing.T, randomOriginID, expData string, ch chan string) { - select { - case err := <-rdrErr: - t.Fatal(err) - case ev := <-rdrEvents: - if ev.rdrCfg.ID != "nats" { - t.Fatalf("Expected 'nats' received `%s`", ev.rdrCfg.ID) +var natsCfg string = `{ +"data_db": { + "db_type": "*internal" +}, +"stor_db": { + "db_type": "*internal" +}, +"ees": { + "enabled": true, + "exporters": [ + { + "id": "nats_processed", + "type": "*virt", + "fields": [ + {"tag": "CGRID", "type": "*variable", "value": "~*req.CGRID", "path": "*uch.CGRID"} + ] } - expected := &utils.CGREvent{ - Tenant: "cgrates.org", - ID: ev.cgrEvent.ID, - Event: map[string]any{ - "OriginID": randomOriginID, - "ReaderID": "nats", + ] +}, +"ers": { + "enabled": true, + "sessions_conns":[], + "ees_conns": ["*internal"], + "readers": [ + { + "id": "nats_reader1", + "type": "*natsJSONMap", + "source_path": "%s", + "ees_success_ids": ["nats_processed"], + "flags": ["*dryRun"], + "opts": { + %s }, - APIOpts: map[string]any{}, + "fields":[ + {"tag": "CGRID", "type": "*variable", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, + {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, + ] } - if !reflect.DeepEqual(ev.cgrEvent, expected) { - t.Fatalf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) - } - select { - case msg := <-ch: - if expData != msg { - t.Errorf("Expected %q ,received %q", expData, msg) - } - case <-time.After(10 * time.Second): - t.Fatal("Timeout2") - } - case <-time.After(10 * time.Second): - t.Fatal("Timeout") - } + ] } +}` -func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) { - rdrEvents = make(chan *erEvent, 1) - rdrErr = make(chan error, 1) - rdrExit = make(chan struct{}, 1) - var err error - if rdr, err = NewNatsER(cfg, 1, rdrEvents, make(chan *erEvent, 1), - rdrErr, new(engine.FilterS), rdrExit); err != nil { - t.Fatal(err) +func TestNatsNormalTT(t *testing.T) { + switch *utils.DBType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") } - nop, err := GetNatsOpts(rdr.Config().Opts, "testExp", time.Second) - if err != nil { - t.Fatal(err) - } - nc, err := nats.Connect(rdr.Config().SourcePath, nop...) - if err != nil { - t.Fatal(err) - } - defer nc.Drain() - - js, err := jetstream.New(nc) - if err != nil { - t.Fatal(err) - } - - _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ - Name: "test", - Subjects: []string{utils.DefaultQueueID}, - }) - if err != nil { - t.Fatal(err) - } - defer js.DeleteStream(context.Background(), "test") - - _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ - Name: "test2", - Subjects: []string{"processed_cdrs"}, - }) - if err != nil { - t.Fatal(err) - } - defer js.DeleteStream(context.Background(), "test2") - - ch := make(chan string, 3) - var cons jetstream.Consumer - cons, err = js.CreateOrUpdateConsumer(context.Background(), "test2", jetstream.ConsumerConfig{ - FilterSubject: "processed_cdrs", - Durable: "test4", - AckPolicy: jetstream.AckAllPolicy, - }) - if err != nil { - nc.Drain() - t.Fatal(err) - } - - _, err = cons.Consume(func(msg jetstream.Msg) { - ch <- string(msg.Data()) - }) - if err != nil { - t.Fatal(err) - } - - go rdr.Serve() - runtime.Gosched() - time.Sleep(10 * time.Nanosecond) - - for i := 0; i < 3; i++ { - randomOriginID := utils.UUIDSha1Prefix() - expData := fmt.Sprintf(`{"OriginID": "%s"}`, randomOriginID) - if _, err = js.Publish(context.Background(), utils.DefaultQueueID, []byte(expData)); err != nil { - t.Fatal(err) - } - - nc.FlushTimeout(time.Second) - nc.Flush() - - testCheckNatsData(t, randomOriginID, expData, ch) - } - close(rdrExit) -} - -func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) { - rdrEvents = make(chan *erEvent, 1) - rdrErr = make(chan error, 1) - rdrExit = make(chan struct{}, 1) - - var err error - if rdr, err = NewNatsER(cfg, 1, rdrEvents, make(chan *erEvent, 1), - rdrErr, new(engine.FilterS), rdrExit); err != nil { - t.Fatal(err) - } - - nop, err := GetNatsOpts(rdr.Config().Opts, "testExp", time.Second) - if err != nil { - t.Fatal(err) - } - nc, err := nats.Connect(rdr.Config().SourcePath, nop...) - if err != nil { - t.Fatal(err) - } - ch := make(chan string, 3) - _, err = nc.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) { - ch <- string(msg.Data) - }) - if err != nil { - t.Fatal(err) - } - - defer nc.Drain() - go rdr.Serve() - runtime.Gosched() - time.Sleep(100 * time.Millisecond) - for i := 0; i < 3; i++ { - randomOriginID := utils.UUIDSha1Prefix() - expData := fmt.Sprintf(`{"OriginID": "%s"}`, randomOriginID) - if err = nc.Publish(utils.DefaultQueueID, []byte(expData)); err != nil { - t.Fatal(err) - } - - nc.FlushTimeout(time.Second) - nc.Flush() - - testCheckNatsData(t, randomOriginID, expData, ch) - } - close(rdrExit) -} - -func TestNatsERJetStream(t *testing.T) { - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - JetStream: true, - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "readers": [ - { - "id": "nats", - "type": "*natsJSONMap", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://localhost:4222", - "processed_path": "", - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, - {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, - ], - "opts": { - "natsJetStream": true, - "natsStreamName": "test", - "natsJetStreamProcessed": true, - "natsSubjectProcessed": "processed_cdrs", - } - }, - ], -}, -}`) - if err != nil { - t.Fatal(err) - } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) - } - testCheckNatsJetStream(t, cfg) -} - -func TestNatsER(t *testing.T) { - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "readers": [ - { - "id": "nats", - "type": "*natsJSONMap", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://localhost:4222", - "processed_path": "", - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, - {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, - ], - "opts": { - "natsSubjectProcessed": "processed_cdrs", - } - }, - ], -}, -}`) - if err != nil { - t.Fatal(err) - } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) - } - testCheckNatsNormal(t, cfg) -} - -func TestNatsERJetStreamUser(t *testing.T) { - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - JetStream: true, - Username: "user", - Password: "password", - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "readers": [ - { - "id": "nats", - "type": "*natsJSONMap", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://user:password@localhost:4222", - "processed_path": "", - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, - {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, - ], - "opts": { - "natsJetStream": true, - "natsStreamName": "test", - "natsJetStreamProcessed": true, - "natsSubjectProcessed": "processed_cdrs", - } - }, - ], -}, -}`) - if err != nil { - t.Fatal(err) - } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) - } - testCheckNatsJetStream(t, cfg) -} - -func TestNatsERUser(t *testing.T) { - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - Username: "user", - Password: "password", - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "readers": [ - { - "id": "nats", - "type": "*natsJSONMap", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://user:password@localhost:4222", - "processed_path": "", - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, - {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, - ], - "opts": { - "natsSubjectProcessed": "processed_cdrs", - } - }, - ], -}, -}`) - if err != nil { - t.Fatal(err) - } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) - } - testCheckNatsNormal(t, cfg) -} - -func TestNatsERJetStreamToken(t *testing.T) { - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - JetStream: true, - Authorization: "token", - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "readers": [ - { - "id": "nats", - "type": "*natsJSONMap", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://token@localhost:4222", - "processed_path": "", - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, - {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, - ], - "opts": { - "natsJetStream": true, - "natsStreamName": "test", - "natsJetStreamProcessed": true, - "natsSubjectProcessed": "processed_cdrs", - } - }, - ], -}, -}`) - if err != nil { - t.Fatal(err) - } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) - } - testCheckNatsJetStream(t, cfg) -} - -func TestNatsERToken(t *testing.T) { - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - JetStream: true, - Authorization: "token", - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "readers": [ - { - "id": "nats", - "type": "*natsJSONMap", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://token@localhost:4222", - "processed_path": "", - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, - {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, - ], - "opts": { - "natsSubjectProcessed": "processed_cdrs", - } - }, - ], -}, -}`) - if err != nil { - t.Fatal(err) - } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) - } - testCheckNatsNormal(t, cfg) -} - -func TestNatsERNkey(t *testing.T) { - // prepare - basePath := "/tmp/natsCfg" - if err := os.MkdirAll(basePath, 0755); err != nil { - t.Fatal(err) - } - - defer os.RemoveAll(basePath) - seedFilePath := path.Join(basePath, "seed.txt") + // SeedFile setup + baseNKeyPath := t.TempDir() + seedFilePath := path.Join(baseNKeyPath, "seed.txt") if err := os.WriteFile(seedFilePath, []byte("SUAOUIE5CU47NCO22GHFEZXGCRCJDVTHDLMIP4L7UQNCR5SW4FZICI7O3Q"), 0664); err != nil { t.Fatal(err) } - - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - Nkeys: []*server.NkeyUser{ - { - Nkey: "UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY", - }, - }, - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{ -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "readers": [ - { - "id": "nats", - "type": "*natsJSONMap", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://localhost:4222", - "processed_path": "", - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, - {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, - ], - "opts": { - "natsSubjectProcessed": "processed_cdrs", - "natsSeedFile": %q, - "natsSeedFileProcessed": %q, - } - }, - ], -}, -}`, seedFilePath, seedFilePath)) - if err != nil { - t.Fatal(err) - } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) - } - testCheckNatsNormal(t, cfg) -} - -func TestNatsERJetStreamNKey(t *testing.T) { - // prepare - basePath := "/tmp/natsCfg" - if err := os.MkdirAll(basePath, 0755); err != nil { + nkeyCfgPath := path.Join(baseNKeyPath, "nats.cfg") + if err := os.WriteFile(nkeyCfgPath, []byte(`authorization: { + users: [ + { nkey: UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY } + ] + }`), 0664); err != nil { t.Fatal(err) } - defer os.RemoveAll(basePath) - seedFilePath := path.Join(basePath, "seed.txt") - if err := os.WriteFile(seedFilePath, []byte("SUAOUIE5CU47NCO22GHFEZXGCRCJDVTHDLMIP4L7UQNCR5SW4FZICI7O3Q"), 0664); err != nil { - t.Fatal(err) - } - - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - JetStream: true, - Nkeys: []*server.NkeyUser{ - { - Nkey: "UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY", - }, - }, - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{ -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "readers": [ - { - "id": "nats", - "type": "*natsJSONMap", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://localhost:4222", - "processed_path": "", - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, - {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, - ], - "opts": { - "natsJetStream": true, - "natsStreamName": "test", - "natsSeedFile": %q, - "natsJetStreamProcessed": true, - "natsSubjectProcessed": "processed_cdrs", - "natsSeedFileProcessed": %q, - } - }, - ], -}, -}`, seedFilePath, seedFilePath)) - if err != nil { - t.Fatal(err) - } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) - } - testCheckNatsJetStream(t, cfg) -} - -func TestNatsERJWT(t *testing.T) { - // prepare - basePath := "/tmp/natsCfg" - if err := os.MkdirAll(basePath, 0755); err != nil { - t.Fatal(err) - } - - defer os.RemoveAll(basePath) - seedFilePath := path.Join(basePath, "jwt.txt") - if err := os.WriteFile(seedFilePath, []byte(`-----BEGIN NATS USER JWT----- + // JWTFile setup + baseJWTPath := t.TempDir() + jwtFilePath := path.Join(baseJWTPath, "u.creds") + if err := os.WriteFile(jwtFilePath, []byte(`-----BEGIN NATS USER JWT----- eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJETFRGNFpLVVdNNFRPRkVNQko0UEUzTVFFVlhIUkJJN0xZNDdZNEMzNTNMWlJKSU5CUkJRIiwiaWF0IjoxNjI0ODg1NzMwLCJpc3MiOiJBQVlKRFZMWkdXTjdZM0ZCUENWWENSVlFaREZNWUdIVTRZWExHU1hYN1UyNTRLSDVTQzNSNVFLTSIsIm5hbWUiOiJ1c2VyIiwic3ViIjoiVUQzQkdXSUJQVTNDV0w0SE9VVTRWSkVRV1RVQVNBRUc2T0ozWEhNM0VFSk5BMlBBM1VWTllUWk4iLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e30sInN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsInR5cGUiOiJ1c2VyIiwidmVyc2lvbiI6Mn19.YmFL5nRMkEOXe77sQJPPRv_vwi89tzhVVl0AVjE4sXWyoWIHiCepNw28DbpJ0p_MlT8Qf0SY2cjAhIm-Qi7lDw ------END NATS USER JWT------ @@ -737,8 +213,8 @@ SUADIH32XQYWC2MI2YGM4AUQ3NMKZSZ5V2BZXQ237XXMLO7FFHDF5CTUDE *************************************************************`), 0664); err != nil { t.Fatal(err) } - natsCfgPath := path.Join(basePath, "nats.cfg") - if err := os.WriteFile(natsCfgPath, []byte(`// Operator "memory" + jwtCfgPath := path.Join(baseJWTPath, "resolver.conf") + if err := os.WriteFile(jwtCfgPath, []byte(`// Operator "memory" operator: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJFRk5ERUdSNU1aUEw1VElQTFVKMlNMTFdZV0VDU0NJSEhVU1lISE5IR1BZVUpaWE5XUlNRIiwiaWF0IjoxNjI0ODc1NzYwLCJpc3MiOiJPQ0VSUlQ2WFNEQ1dBWTNFWVNTTjQ2UUxGQko3RFJHNTIzU1hIMkg0UjQ3WFZVWFYyUlJCSVNMSyIsIm5hbWUiOiJtZW1vcnkiLCJzdWIiOiJPQ0VSUlQ2WFNEQ1dBWTNFWVNTTjQ2UUxGQko3RFJHNTIzU1hIMkg0UjQ3WFZVWFYyUlJCSVNMSyIsIm5hdHMiOnsidHlwZSI6Im9wZXJhdG9yIiwidmVyc2lvbiI6Mn19.MZfwcw5j6zY8SfFQppGIa3VjYYZK2_n1kV16Nk5jVCgwS8dKWzRQK_XjFYWwQ15Cq9YY73jcTA6LO0DmQGsdBA resolver: MEMORY @@ -751,65 +227,156 @@ resolver_preload: { `), 0664); err != nil { t.Fatal(err) } - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - ConfigFile: natsCfgPath, - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{ -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "readers": [ + testcases := []struct { + name string + serverFlags []string + sourcePath string + readerOpts string + }{ { - "id": "nats", - "type": "*natsJSONMap", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://localhost:4222", - "processed_path": "", - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, - {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, - ], - "opts": { - "natsSubjectProcessed": "processed_cdrs", - "natsJWTFile": %q, - "natsJWTFileProcessed": %q, - } + name: "NoAuth", + serverFlags: []string{}, + sourcePath: "nats://127.0.0.1:4222", + readerOpts: "", + }, + { + name: "UsernameAndPassword", + serverFlags: []string{"--user", "user", "--pass", "password"}, + sourcePath: "nats://user:password@127.0.0.1:4222", + readerOpts: "", + }, + { + name: "TokenAuth", + serverFlags: []string{"--auth", "token"}, + sourcePath: "nats://token@127.0.0.1:4222", + readerOpts: "", + }, + { + name: "NkeyAuth", + serverFlags: []string{"-c", nkeyCfgPath}, + sourcePath: "nats://127.0.0.1:4222", + readerOpts: fmt.Sprintf( + `"natsSeedFile": "%s"`, + seedFilePath, + ), + }, + { + name: "JWTAuth", + serverFlags: []string{"-c", jwtCfgPath}, + sourcePath: "nats://127.0.0.1:4222", + readerOpts: fmt.Sprintf( + `"natsJWTFile": "%s"`, + jwtFilePath, + ), }, - ], -}, -}`, seedFilePath, seedFilePath)) - if err != nil { - t.Fatal(err) } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + exec.Command("pkill", "nats-server") + cmd := exec.Command("nats-server", tc.serverFlags...) + if err := cmd.Start(); err != nil { + t.Fatal(err) // most probably not installed + } + time.Sleep(50 * time.Millisecond) + defer cmd.Process.Kill() + + configJSON := fmt.Sprintf(natsCfg, tc.sourcePath, tc.readerOpts) + cfgPath := t.TempDir() + filePath := filepath.Join(cfgPath, "cgrates.json") + err := os.WriteFile(filePath, []byte(configJSON), 0644) + if err != nil { + t.Fatal(err) + } + cfg, err := config.NewCGRConfigFromPath(context.Background(), cfgPath) + if err != nil { + t.Fatal(err) + } + + rdrCfgOpts := cfg.ERsCfg().Readers[1].Opts + nop, err := GetNatsOpts(rdrCfgOpts, cfg.GeneralCfg().NodeID, time.Second) + if err != nil { + t.Fatal(err) + } + + // Establish a connection to nats. + nc, err := nats.Connect(tc.sourcePath, nop...) + if err != nil { + t.Fatal(err) + } + + if _, err = engine.StartEngine(cfgPath, 0); err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + engine.KillEngine(0) + nc.Close() + }) + + client, err := jsonrpc.Dial(utils.TCP, cfg.ListenCfg().RPCJSONListen) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 3; i++ { + randomCGRID := utils.UUIDSha1Prefix() + expData := fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID) + if err = nc.Publish("cgrates_cdrs", []byte(expData)); err != nil { + t.Error(err) + } + + time.Sleep(20 * time.Millisecond) // wait for exports + + var cgrID any + if err = client.Call(context.Background(), utils.CacheSv1GetItem, &utils.ArgsGetCacheItemWithAPIOpts{ + Tenant: "cgrates.org", + ArgsGetCacheItem: utils.ArgsGetCacheItem{ + CacheID: utils.CacheUCH, + ItemID: "CGRID", + }, + }, &cgrID); err != nil { + t.Error(err) + } else if cgrID != randomCGRID { + t.Errorf("expected %v, received %v", randomCGRID, cgrID) + } + } + }) } - testCheckNatsNormal(t, cfg) } -func TestNatsERJetStreamJWT(t *testing.T) { - // prepare - basePath := "/tmp/natsCfg" - if err := os.MkdirAll(basePath, 0755); err != nil { +func TestNatsJetStreamTT(t *testing.T) { + switch *utils.DBType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + + // SeedFile setup + baseNKeyPath := t.TempDir() + seedFilePath := path.Join(baseNKeyPath, "seed.txt") + if err := os.WriteFile(seedFilePath, []byte("SUAOUIE5CU47NCO22GHFEZXGCRCJDVTHDLMIP4L7UQNCR5SW4FZICI7O3Q"), 0664); err != nil { + t.Fatal(err) + } + nkeyCfgPath := path.Join(baseNKeyPath, "nats.cfg") + if err := os.WriteFile(nkeyCfgPath, []byte(`authorization: { + users: [ + { nkey: UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY } + ] + }`), 0664); err != nil { t.Fatal(err) } - defer os.RemoveAll(basePath) - seedFilePath := path.Join(basePath, "jwt.txt") - if err := os.WriteFile(seedFilePath, []byte(`-----BEGIN NATS USER JWT----- -eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJXTUUyUkhMWEU1R0FZS0hITk5aNkhLSDQ2Q0VSUFNEUExPN1BMT0ZEWTZaNUdZM09aVkFRIiwiaWF0IjoxNjI0OTUzNTE5LCJpc3MiOiJBQVlKRFZMWkdXTjdZM0ZCUENWWENSVlFaREZNWUdIVTRZWExHU1hYN1UyNTRLSDVTQzNSNVFLTSIsIm5hbWUiOiJ1c2VyMiIsInN1YiI6IlVERVJVRElMMlZORVNKUzVGNUpDQVJHWUNSUVM1M0NWSUVBSllHU0hFR0dZSEI2R01BQ1AzM1VDIiwibmF0cyI6eyJwdWIiOnt9LCJzdWIiOnt9LCJzdWJzIjotMSwiZGF0YSI6LTEsInBheWxvYWQiOi0xLCJ0eXBlIjoidXNlciIsInZlcnNpb24iOjJ9fQ.YitrKhIlU45Q1m6A_HFsaxDgUUiIyjLJuHNKnG1cjzj5H6n697Iv3pDsIZVTh6pBYROg1aRV42bD3PpEZna2AA + // JWTFile setup + baseJWTPath := t.TempDir() + // baseJWTPath := "/tmp/natsCfg3" + // os.Mkdir("/tmp/natsCfg3", 0644) + jwtFilePath := path.Join(baseJWTPath, "u.creds") + if err := os.WriteFile(jwtFilePath, []byte(`-----BEGIN NATS USER JWT----- +eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJIMkwzWEtSTVoyMklDSFBGSDRXQzM1U0hLRVY3RVZUTEJERlpESVhTN0xOWEhCNkhUV0ZBIiwiaWF0IjoxNzI2NzUyOTAzLCJpc3MiOiJBQktCWlJVVFY0M1NZN1E2VlA3TVBYTldBQzdOTVZCUzJGUEpRMzZHQ1dWNVZIRzVCVVlFNTRGSSIsInN1YiI6IlVCSUtHTlRPRFJPTU8yWEdZTk5TQ1hQNUNLSlBUSVRWUTY1TjdZVEZQWVFKNFdFWVY3T0xPTkE1IiwibmF0cyI6eyJwdWIiOnt9LCJzdWIiOnt9LCJzdWJzIjotMSwiZGF0YSI6LTEsInBheWxvYWQiOi0xLCJpc3N1ZXJfYWNjb3VudCI6IkFCVFZITzJNUkVOQlcyTk81N0tXSzZMWENaU0g0M09IUEtYRFlHWEJENlFZWlFDSkhGRURNNVFYIiwidHlwZSI6InVzZXIiLCJ2ZXJzaW9uIjoyfX0.rIFeciJthv_V4OfRc1wQXxk7-E3Wa6-87suJI_sn808Az7psEdvFagNosCqgdGd_d7AUDhY2eCipcIEZxnPeBA ------END NATS USER JWT------ ************************* IMPORTANT ************************* @@ -817,78 +384,354 @@ NKEY Seed printed below can be used to sign and prove identity. NKEYs are sensitive and should be treated as secrets. -----BEGIN USER NKEY SEED----- -SUAGM22ETPOJNZGYSGJL3HRLZ6R35FSVOYNINYN5Z5UPLP5K4SQJ753WU4 +SUAD3BD2VMG6RJZVODS5BKYHOY6HGM7U5VDFYEEDHXIQXBX5R7RMKVQS4Y ------END USER NKEY SEED------ *************************************************************`), 0664); err != nil { t.Fatal(err) } - natsCfgPath := path.Join(basePath, "nats.cfg") - if err := os.WriteFile(natsCfgPath, []byte(`// Operator "memory" -operator: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJFRk5ERUdSNU1aUEw1VElQTFVKMlNMTFdZV0VDU0NJSEhVU1lISE5IR1BZVUpaWE5XUlNRIiwiaWF0IjoxNjI0ODc1NzYwLCJpc3MiOiJPQ0VSUlQ2WFNEQ1dBWTNFWVNTTjQ2UUxGQko3RFJHNTIzU1hIMkg0UjQ3WFZVWFYyUlJCSVNMSyIsIm5hbWUiOiJtZW1vcnkiLCJzdWIiOiJPQ0VSUlQ2WFNEQ1dBWTNFWVNTTjQ2UUxGQko3RFJHNTIzU1hIMkg0UjQ3WFZVWFYyUlJCSVNMSyIsIm5hdHMiOnsidHlwZSI6Im9wZXJhdG9yIiwidmVyc2lvbiI6Mn19.MZfwcw5j6zY8SfFQppGIa3VjYYZK2_n1kV16Nk5jVCgwS8dKWzRQK_XjFYWwQ15Cq9YY73jcTA6LO0DmQGsdBA + jwtCfgPath := path.Join(baseJWTPath, "resolver.conf") + if err := os.WriteFile(jwtCfgPath, []byte(`operator: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJQRVBMTFM3WFQyUzQ2NUZIS0JDSjdTNDdWS1VPRjJEVFhWRE9QVzZTWjY3R0JTU0I3SllBIiwiaWF0IjoxNzI2NzUyOTAzLCJpc3MiOiJPQzZJRFFQVTZJNFFOTVZYUEpSR1RSVFpHRFNTSEVSMzVMSkQ3V0k3RlJEUUpPT1ZUUjVKRjczWiIsIm5hbWUiOiJPIiwic3ViIjoiT0M2SURRUFU2STRRTk1WWFBKUkdUUlRaR0RTU0hFUjM1TEpEN1dJN0ZSRFFKT09WVFI1SkY3M1oiLCJuYXRzIjp7InNpZ25pbmdfa2V5cyI6WyJPQUdPWkNFRjNWR1FFV1RRTkFZTllJRFpFREVZSU03WDI3S0xKQzdaU1c3WDVQSjZJSllEWEgyUyJdLCJ0eXBlIjoib3BlcmF0b3IiLCJ2ZXJzaW9uIjoyfX0.KS5aRNmSKIcIDrEIZvpLPgmrwKGMExSTOnsp579ihwqRLFt5ZrDXEl8I81F-lgvRGZ0_yX4fUIQ9-J4trZDkDw + +system_account: ADXMVBEVF2FHZTELG3CMW4VT4WRN6JK3SSXSS34G77BUX7YD6KWU2GMI resolver: MEMORY - resolver_preload: { - // Account "js" - AAFIBB6C56ROU5XRVJLJYR3BTGGYK3HJGHEHQV7L7QZMTT3ZRBLHBS7F: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJQNEZOWllRQkNKWERZT09ITzRNVU5BQTRHR0w2UTVIRkxKQUJXVEc3WVFIRFVNUVlHUldRIiwiaWF0IjoxNjI0OTU0MjQ0LCJpc3MiOiJPQ0VSUlQ2WFNEQ1dBWTNFWVNTTjQ2UUxGQko3RFJHNTIzU1hIMkg0UjQ3WFZVWFYyUlJCSVNMSyIsIm5hbWUiOiJqcyIsInN1YiI6IkFBRklCQjZDNTZST1U1WFJWSkxKWVIzQlRHR1lLM0hKR0hFSFFWN0w3UVpNVFQzWlJCTEhCUzdGIiwibmF0cyI6eyJsaW1pdHMiOnsic3VicyI6LTEsImRhdGEiOi0xLCJwYXlsb2FkIjotMSwiaW1wb3J0cyI6LTEsImV4cG9ydHMiOi0xLCJ3aWxkY2FyZHMiOnRydWUsImNvbm4iOi0xLCJsZWFmIjotMX0sImRlZmF1bHRfcGVybWlzc2lvbnMiOnsicHViIjp7fSwic3ViIjp7fX0sInR5cGUiOiJhY2NvdW50IiwidmVyc2lvbiI6Mn19.tGaVbpNXuSFxk3RDxicbi62nupiTv_-vTgps0t-LmvxKoNuzjvrnhyARwdh3qknMP54pDqzlUfldqubmEYLFBg - - // Account "account" - AAYJDVLZGWN7Y3FBPCVXCRVQZDFMYGHU4YXLGSXX7U254KH5SC3R5QKM: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJTUkVHMkdLUVg1RlJKQ0lTUlFITVlNUU9CU09DSkRYMjVaUUpGTllDMkxMQkZBRlNOQU9BIiwiaWF0IjoxNjI0OTUzNzIzLCJpc3MiOiJPQ0VSUlQ2WFNEQ1dBWTNFWVNTTjQ2UUxGQko3RFJHNTIzU1hIMkg0UjQ3WFZVWFYyUlJCSVNMSyIsIm5hbWUiOiJhY2NvdW50Iiwic3ViIjoiQUFZSkRWTFpHV043WTNGQlBDVlhDUlZRWkRGTVlHSFU0WVhMR1NYWDdVMjU0S0g1U0MzUjVRS00iLCJuYXRzIjp7ImxpbWl0cyI6eyJzdWJzIjotMSwiZGF0YSI6LTEsInBheWxvYWQiOi0xLCJpbXBvcnRzIjotMSwiZXhwb3J0cyI6LTEsIndpbGRjYXJkcyI6dHJ1ZSwiY29ubiI6LTEsImxlYWYiOi0xLCJtZW1fc3RvcmFnZSI6LTEsInN0cmVhbXMiOi0xLCJjb25zdW1lciI6LTF9LCJkZWZhdWx0X3Blcm1pc3Npb25zIjp7InB1YiI6e30sInN1YiI6e319LCJ0eXBlIjoiYWNjb3VudCIsInZlcnNpb24iOjJ9fQ.rcOqLmWL77kgoDS4GPK5qs-rpG1mQCkQ5FoCzT3VGqsIXNdpn72d38jbCeV40_6l8dI49IRtRHySv8k7VwaaAA - -} -system_account:AAFIBB6C56ROU5XRVJLJYR3BTGGYK3HJGHEHQV7L7QZMTT3ZRBLHBS7F -`), 0664); err != nil { + ADXMVBEVF2FHZTELG3CMW4VT4WRN6JK3SSXSS34G77BUX7YD6KWU2GMI: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiI0UzJRVU1JWDdGTjZJR0lUM0xTWERMN1ZaQTVJQlUyNjJFTkY3WTZGR1JMNDJWNE5YUkdBIiwiaWF0IjoxNzI2NzUyOTAzLCJpc3MiOiJPQUdPWkNFRjNWR1FFV1RRTkFZTllJRFpFREVZSU03WDI3S0xKQzdaU1c3WDVQSjZJSllEWEgyUyIsIm5hbWUiOiJTWVMiLCJzdWIiOiJBRFhNVkJFVkYyRkhaVEVMRzNDTVc0VlQ0V1JONkpLM1NTWFNTMzRHNzdCVVg3WUQ2S1dVMkdNSSIsIm5hdHMiOnsibGltaXRzIjp7InN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsImltcG9ydHMiOi0xLCJleHBvcnRzIjotMSwid2lsZGNhcmRzIjp0cnVlLCJjb25uIjotMSwibGVhZiI6LTF9LCJkZWZhdWx0X3Blcm1pc3Npb25zIjp7InB1YiI6eyJhbGxvdyI6WyIkU1lTLlx1MDAzZSJdfSwic3ViIjp7ImFsbG93IjpbIiRTWVMuXHUwMDNlIl19fSwiYXV0aG9yaXphdGlvbiI6e30sInR5cGUiOiJhY2NvdW50IiwidmVyc2lvbiI6Mn19.nZFNLl_sfCsaX2jTPFkCsHRbDNt0WGlpR0tx3K8J9KP8Ds8VmiQl7OvEmYjZflVKDVJgXvIwICIT-aY56klsAg + ABTVHO2MRENBW2NO57KWK6LXCZSH43OHPKXDYGXBD6QYZQCJHFEDM5QX: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJKWFZETkU2NVhITFg0Qk03M0EzUDNIWkFZT0QyTFlZUUc3SElIMjQ1QUlHN1Y0NVBVSTRBIiwiaWF0IjoxNzI2NzUyOTAzLCJpc3MiOiJPQUdPWkNFRjNWR1FFV1RRTkFZTllJRFpFREVZSU03WDI3S0xKQzdaU1c3WDVQSjZJSllEWEgyUyIsIm5hbWUiOiJBIiwic3ViIjoiQUJUVkhPMk1SRU5CVzJOTzU3S1dLNkxYQ1pTSDQzT0hQS1hEWUdYQkQ2UVlaUUNKSEZFRE01UVgiLCJuYXRzIjp7ImxpbWl0cyI6eyJzdWJzIjotMSwiZGF0YSI6LTEsInBheWxvYWQiOi0xLCJpbXBvcnRzIjotMSwiZXhwb3J0cyI6LTEsIndpbGRjYXJkcyI6dHJ1ZSwiY29ubiI6LTEsImxlYWYiOi0xLCJtZW1fc3RvcmFnZSI6LTEsImRpc2tfc3RvcmFnZSI6LTEsInN0cmVhbXMiOi0xLCJjb25zdW1lciI6LTF9LCJzaWduaW5nX2tleXMiOlsiQUJLQlpSVVRWNDNTWTdRNlZQN01QWE5XQUM3Tk1WQlMyRlBKUTM2R0NXVjVWSEc1QlVZRTU0RkkiXSwiZGVmYXVsdF9wZXJtaXNzaW9ucyI6eyJwdWIiOnt9LCJzdWIiOnt9fSwiYXV0aG9yaXphdGlvbiI6e30sInR5cGUiOiJhY2NvdW50IiwidmVyc2lvbiI6Mn19.GWvAWjlECPNT5afsc96NVIEOJLFQi5fL9gBEzY4-z7mGyJN41qjJpx7l_LvLb8icToW2nca81J9NFwT5yf-NAA +}`), 0664); err != nil { t.Fatal(err) } - natsServer, err := server.NewServer(&server.Options{ - Host: "127.0.0.1", - Port: 4222, - ConfigFile: natsCfgPath, - JetStream: true, - }) - if err != nil { - t.Fatal(err) - } - natsServer.Start() - defer natsServer.Shutdown() - cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{ -"ers": { // EventReaderService - "enabled": true, // starts the EventReader service: - "sessions_conns":["*localhost"], - "readers": [ + testcases := []struct { + name string + serverFlags []string + sourcePath string + readerOpts string + }{ { - "id": "nats", - "type": "*natsJSONMap", - "run_delay": "-1", - "concurrent_requests": 1024, - "source_path": "nats://localhost:4222", - "processed_path": "", - "tenant": "cgrates.org", - "filters": [], - "flags": [], - "fields":[ - {"tag": "OriginID", "type": "*composed", "value": "~*req.OriginID", "path": "*cgreq.OriginID"}, - {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, - ], - "opts": { - "natsJetStream": true, - "natsStreamName": "test", - "natsJWTFile": %q, - "natsJetStreamProcessed": true, - "natsSubjectProcessed": "processed_cdrs", - "natsJWTFileProcessed": %q, - } + name: "NoAuth", + serverFlags: []string{"-js"}, + sourcePath: "nats://127.0.0.1:4222", + readerOpts: ` + "natsJetStream": true, + "natsStreamName": "stream"`, + }, + { + name: "UsernameAndPassword", + serverFlags: []string{"-js", "--user", "user", "--pass", "password"}, + sourcePath: "nats://user:password@127.0.0.1:4222", + readerOpts: ` + "natsJetStream": true, + "natsStreamName": "stream"`, + }, + { + name: "TokenAuth", + serverFlags: []string{"-js", "--auth", "token"}, + sourcePath: "nats://token@127.0.0.1:4222", + readerOpts: ` + "natsJetStream": true, + "natsStreamName": "stream"`, + }, + { + name: "NkeyAuth", + serverFlags: []string{"-js", "-c", nkeyCfgPath}, + sourcePath: "nats://127.0.0.1:4222", + readerOpts: fmt.Sprintf(` + "natsJetStream": true, + "natsStreamName": "stream", + "natsSeedFile": "%s"`, seedFilePath), + }, + { + name: "JWTAuth", + serverFlags: []string{"-js", "-c", jwtCfgPath}, + sourcePath: "nats://127.0.0.1:4222", + readerOpts: fmt.Sprintf(` + "natsJetStream": true, + "natsStreamName": "stream", + "natsJWTFile": "%s"`, jwtFilePath), }, - ], -}, -}`, seedFilePath, seedFilePath)) - if err != nil { - t.Fatal(err) } - if err := cfg.CheckConfigSanity(); err != nil { - t.Fatal(err) + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + exec.Command("pkill", "nats-server") + cmd := exec.Command("nats-server", tc.serverFlags...) + if err := cmd.Start(); err != nil { + t.Fatal(err) // most probably not installed + } + time.Sleep(50 * time.Millisecond) + defer cmd.Process.Kill() + + configJSON := fmt.Sprintf(natsCfg, tc.sourcePath, tc.readerOpts) + cfgPath := t.TempDir() + filePath := filepath.Join(cfgPath, "cgrates.json") + err := os.WriteFile(filePath, []byte(configJSON), 0644) + if err != nil { + t.Fatal(err) + } + cfg, err := config.NewCGRConfigFromPath(context.Background(), cfgPath) + if err != nil { + t.Fatal(err) + } + + rdrCfgOpts := cfg.ERsCfg().Readers[1].Opts + nop, err := GetNatsOpts(rdrCfgOpts, cfg.GeneralCfg().NodeID, 2*time.Second) + if err != nil { + t.Fatal(err) + } + + // Establish a connection to nats. + nc, err := nats.Connect(tc.sourcePath, nop...) + if err != nil { + t.Fatal(err) + } + defer nc.Close() + + // Initialize a stream manager and create a stream. + js, err := jetstream.New(nc) + if err != nil { + t.Fatal(err) + } + if _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ + Name: "stream", + Subjects: []string{"cgrates_cdrs", "cgrates_cdrs_processed"}, + }); err != nil { + t.Fatal(err) + } + defer js.DeleteStream(context.Background(), "stream") + + if _, err = engine.StartEngine(cfgPath, 0); err != nil { + t.Fatal(err) + } + defer engine.KillEngine(0) + + client, err := jsonrpc.Dial(utils.TCP, cfg.ListenCfg().RPCJSONListen) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 3; i++ { + randomCGRID := utils.UUIDSha1Prefix() + expData := fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID) + if _, err := js.Publish(context.Background(), "cgrates_cdrs", []byte(expData)); err != nil { + t.Error(err) + } + + time.Sleep(20 * time.Millisecond) // wait for exports + + var cgrID any + if err = client.Call(context.Background(), utils.CacheSv1GetItem, &utils.ArgsGetCacheItemWithAPIOpts{ + Tenant: "cgrates.org", + ArgsGetCacheItem: utils.ArgsGetCacheItem{ + CacheID: utils.CacheUCH, + ItemID: "CGRID", + }, + }, &cgrID); err != nil { + t.Error(err) + } else if cgrID != randomCGRID { + t.Errorf("expected %v, received %v", randomCGRID, cgrID) + } + } + }) } - testCheckNatsJetStream(t, cfg) } + +/* + +In order to generate the resolver.conf and u.creds for the jetstream test, run the following: + +package main + +import ( + "fmt" + "log" + "os" + "path" + + "github.com/nats-io/jwt/v2" + "github.com/nats-io/nkeys" +) + +func main() { + // create an operator key pair (private key) + okp, err := nkeys.CreateOperator() + if err != nil { + log.Fatal(err) + } + // extract the public key + opk, err := okp.PublicKey() + if err != nil { + log.Fatal(err) + } + + // create an operator claim using the public key for the identifier + oc := jwt.NewOperatorClaims(opk) + oc.Name = "O" + // add an operator signing key to sign accounts + oskp, err := nkeys.CreateOperator() + if err != nil { + log.Fatal(err) + } + // get the public key for the signing key + ospk, err := oskp.PublicKey() + if err != nil { + log.Fatal(err) + } + // add the signing key to the operator - this makes any account + // issued by the signing key to be valid for the operator + oc.SigningKeys.Add(ospk) + + // self-sign the operator JWT - the operator trusts itself + operatorJWT, err := oc.Encode(okp) + if err != nil { + log.Fatal(err) + } + + // Create system account + sysAkp, err := nkeys.CreateAccount() + if err != nil { + log.Fatal(err) + } + sysApk, err := sysAkp.PublicKey() + if err != nil { + log.Fatal(err) + } + sysAc := jwt.NewAccountClaims(sysApk) + sysAc.Name = "SYS" + // Add necessary system permissions + sysAc.DefaultPermissions.Pub.Allow.Add("$SYS.>") + sysAc.DefaultPermissions.Sub.Allow.Add("$SYS.>") + sysAccountJWT, err := sysAc.Encode(oskp) + if err != nil { + log.Fatal(err) + } + + // create an account keypair + akp, err := nkeys.CreateAccount() + if err != nil { + log.Fatal(err) + } + // extract the public key for the account + apk, err := akp.PublicKey() + if err != nil { + log.Fatal(err) + } + // create the claim for the account using the public key of the account + ac := jwt.NewAccountClaims(apk) + ac.Name = "A" + + // enable jetstream for account + ac.Limits.JetStreamLimits = jwt.JetStreamLimits{ + MemoryStorage: -1, + DiskStorage: -1, + Streams: -1, + Consumer: -1, + } + + // create a signing key that we can use for issuing users + askp, err := nkeys.CreateAccount() + if err != nil { + log.Fatal(err) + } + // extract the public key + aspk, err := askp.PublicKey() + if err != nil { + log.Fatal(err) + } + // add the signing key (public) to the account + ac.SigningKeys.Add(aspk) + + // now we could encode an issue the account using the operator + // key that we generated above, but this will illustrate that + // the account could be self-signed, and given to the operator + // who can then re-sign it + accountJWT, err := ac.Encode(akp) + if err != nil { + log.Fatal(err) + } + + // the operator would decode the provided token, if the token + // is not self-signed or signed by an operator or tampered with + // the decoding would fail + ac, err = jwt.DecodeAccountClaims(accountJWT) + if err != nil { + log.Fatal(err) + } + // here the operator is going to use its private signing key to + // re-issue the account + accountJWT, err = ac.Encode(oskp) + if err != nil { + log.Fatal(err) + } + + // now back to the account, the account can issue users + // need not be known to the operator - the users are trusted + // because they will be signed by the account. The server will + // look up the account get a list of keys the account has and + // verify that the user was issued by one of those keys + ukp, err := nkeys.CreateUser() + if err != nil { + log.Fatal(err) + } + upk, err := ukp.PublicKey() + if err != nil { + log.Fatal(err) + } + uc := jwt.NewUserClaims(upk) + // since the jwt will be issued by a signing key, the issuer account + // must be set to the public ID of the account + uc.IssuerAccount = apk + userJwt, err := uc.Encode(askp) + if err != nil { + log.Fatal(err) + } + // the seed is a version of the keypair that is stored as text + useed, err := ukp.Seed() + if err != nil { + log.Fatal(err) + } + // generate a creds formatted file that can be used by a NATS client + creds, err := jwt.FormatUserConfig(userJwt, useed) + if err != nil { + log.Fatal(err) + } + + // now we are going to put it together into something that can be run + // we create a directory to store the server configuration, the creds + // file and a small go program that uses the creds file + dir, err := os.MkdirTemp(os.TempDir(), "jwt_example") + if err != nil { + log.Fatal(err) + } + // print where we generated the file + fmt.Printf("cfg path: %s\n", dir) + + // we are generating a memory resolver server configuration + // it lists the operator and all account jwts the server should + // know about + resolver := fmt.Sprintf(`operator: %s + +system_account: %s + +resolver: MEMORY +resolver_preload: { + %s: %s + %s: %s +} + +jetstream: enabled +`, operatorJWT, sysApk, sysApk, sysAccountJWT, apk, accountJWT) + if err := os.WriteFile(path.Join(dir, "resolver.conf"), + []byte(resolver), 0644); err != nil { + log.Fatal(err) + } + + // store the creds + credsPath := path.Join(dir, "u.creds") + if err := os.WriteFile(credsPath, creds, 0644); err != nil { + log.Fatal(err) + } +} + +*/