diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go index 2266cc6a4..ff26f16f0 100644 --- a/ers/nats_it_test.go +++ b/ers/nats_it_test.go @@ -30,8 +30,8 @@ import ( "testing" "time" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" - "github.com/cgrates/birpc/jsonrpc" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -48,45 +48,39 @@ func TestNatsConcurrentReaders(t *testing.T) { t.Fatal("unsupported dbtype value") } - cfgPath := path.Join(*utils.DataDir, "conf", "samples", "ers_nats") - cfg, err := config.NewCGRConfigFromPath(cfgPath) - if err != nil { - t.Fatal("could not init cfg", err.Error()) - } - - exec.Command("pkill", "nats-server") cmd := exec.Command("nats-server", "-js") if err := cmd.Start(); err != nil { t.Fatal(err) // most probably not installed } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() + t.Cleanup(func() { cmd.Process.Kill() }) - // Establish a connection to nats. - nc, err := nats.Connect(cfg.ERsCfg().Readers[1].SourcePath) - if err != nil { - t.Fatal(err) - } - defer nc.Close() + var js jetstream.JetStream // to reuse jetstream instance - // Initialize a stream manager and create a stream. - js, err := jetstream.New(nc) - if err != nil { - t.Fatal(err) - } - if _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ - Name: "stream", - Subjects: []string{"cgrates_cdrs", "cgrates_cdrs_processed"}, - }); err != nil { - t.Fatal(err) - } - defer js.DeleteStream(context.Background(), "stream") + ng := engine.TestEngine{ + ConfigPath: filepath.Join(*utils.DataDir, "conf/samples/ers_nats"), + PreStartHook: func(t *testing.T, c *config.CGRConfig) { + nc := connectToNATSServer(t, "nats://127.0.0.1:4222") - // Start the engine. - if _, err := engine.StopStartEngine(cfgPath, 100); err != nil { - t.Fatal(err) + // Initialize a stream manager and create a stream. + var err error + js, err = jetstream.New(nc) + if err != nil { + t.Fatal(err) + } + if _, err := js.CreateStream(context.Background(), jetstream.StreamConfig{ + Name: "stream", + Subjects: []string{"cgrates_cdrs", "cgrates_cdrs_processed"}, + }); err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + if err := js.DeleteStream(context.Background(), "stream"); err != nil { + t.Errorf("failed to clean up stream: %v", err) + } + }) + }, } - defer engine.KillEngine(100) + ng.Run(t) // Publish CDRs asynchronously to the nats subject. cdr := make(map[string]any) @@ -104,8 +98,7 @@ func TestNatsConcurrentReaders(t *testing.T) { } // Define a consumer for the subject where all the processed cdrs were published. - var cons jetstream.Consumer - cons, err = js.CreateOrUpdateConsumer(context.Background(), "stream", jetstream.ConsumerConfig{ + cons, err := js.CreateOrUpdateConsumer(context.Background(), "stream", jetstream.ConsumerConfig{ FilterSubject: "cgrates_cdrs_processed", Durable: "cgrates_processed", AckPolicy: jetstream.AckAllPolicy, @@ -115,7 +108,7 @@ func TestNatsConcurrentReaders(t *testing.T) { } // Wait for the messages to be consumed and processed. - time.Sleep(100 * time.Millisecond) + time.Sleep(20 * time.Millisecond) // Retrieve info about the consumer. info, err := cons.Info(context.Background()) @@ -130,20 +123,12 @@ func TestNatsConcurrentReaders(t *testing.T) { } var natsCfg string = `{ - -"general": { - "node_id": "nats_test", - "log_level": 7 -}, - "data_db": { "db_type": "*internal" }, - "stor_db": { "db_type": "*internal" }, - "ees": { "enabled": true, "exporters": [ @@ -151,19 +136,18 @@ var natsCfg string = `{ "id": "nats_processed", "type": "*virt", "fields": [ - {"tag": "CGRID", "type": "*variable", "value": "~*req.CGRID", "path": "*uch.CGRID"} + {"tag": "Key", "type": "*variable", "value": "~*req.Key", "path": "*uch.Key"} ] } ] }, - "ers": { "enabled": true, "sessions_conns":[], "ees_conns": ["*internal"], "readers": [ { - "id": "nats_reader1", + "id": "nats_reader", "type": "*nats_json_map", "source_path": "%s", "ees_success_ids": ["nats_processed"], @@ -172,13 +156,12 @@ var natsCfg string = `{ %s }, "fields":[ - {"tag": "CGRID", "type": "*variable", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, + {"tag": "Key", "type": "*variable", "value": "~*req.Key", "path": "*cgreq.Key"}, {"tag": "readerId", "type": "*variable", "value": "~*vars.*readerID", "path": "*cgreq.ReaderID"}, ] } ] } - }` func TestNatsNormalTT(t *testing.T) { @@ -284,73 +267,37 @@ resolver_preload: { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - exec.Command("pkill", "nats-server") cmd := exec.Command("nats-server", tc.serverFlags...) if err := cmd.Start(); err != nil { t.Fatal(err) // most probably not installed } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() + t.Cleanup(func() { cmd.Process.Kill() }) + var nc *nats.Conn // to reuse nats conn - configJSON := fmt.Sprintf(natsCfg, tc.sourcePath, tc.readerOpts) - cfgPath := t.TempDir() - filePath := filepath.Join(cfgPath, "cgrates.json") - err := os.WriteFile(filePath, []byte(configJSON), 0644) - if err != nil { - t.Fatal(err) - } - cfg, err := config.NewCGRConfigFromPath(cfgPath) - if err != nil { - t.Fatal(err) + ng := engine.TestEngine{ + ConfigJSON: fmt.Sprintf(natsCfg, tc.sourcePath, tc.readerOpts), + PreStartHook: func(t *testing.T, c *config.CGRConfig) { + rdrOpts := c.ERsCfg().ReaderCfg("nats_reader").Opts.NATS + nop, err := GetNatsOpts(rdrOpts, c.GeneralCfg().NodeID, time.Second) + if err != nil { + t.Fatal(err) + } + nc = connectToNATSServer(t, tc.sourcePath, nop...) + }, } + client, _ := ng.Run(t) - rdrCfgOpts := cfg.ERsCfg().Readers[1].Opts.NATS - nop, err := GetNatsOpts(rdrCfgOpts, cfg.GeneralCfg().NodeID, time.Second) - if err != nil { - t.Fatal(err) - } - - // Establish a connection to nats. - nc, err := nats.Connect(tc.sourcePath, nop...) - if err != nil { - t.Fatal(err) - } - - if _, err = engine.StartEngine(cfgPath, *utils.WaitRater); err != nil { - t.Fatal(err) - } - - t.Cleanup(func() { - engine.KillEngine(*utils.WaitRater) - nc.Close() - }) - - client, err := jsonrpc.Dial(utils.TCP, cfg.ListenCfg().RPCJSONListen) - if err != nil { - t.Fatal(err) - } + // For non-jetstream connections, we need to make sure the + // engine is ready to read published messages right away. + time.Sleep(2 * time.Millisecond) for i := 0; i < 3; i++ { - randomCGRID := utils.UUIDSha1Prefix() - expData := fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID) - if err = nc.Publish("cgrates_cdrs", []byte(expData)); err != nil { + key := fmt.Sprintf("key%d", i+1) + expData := fmt.Sprintf(`{"Key": "%s"}`, key) + if err := nc.Publish("cgrates_cdrs", []byte(expData)); err != nil { t.Error(err) } - - time.Sleep(20 * time.Millisecond) // wait for exports - - var cgrID any - if err = client.Call(context.Background(), utils.CacheSv1GetItem, &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheUCH, - ItemID: "CGRID", - }, - }, &cgrID); err != nil { - t.Error(err) - } else if cgrID != randomCGRID { - t.Errorf("expected %v, received %v", randomCGRID, cgrID) - } + checkNATSExports(t, client, key) } }) } @@ -382,6 +329,8 @@ func TestNatsJetStreamTT(t *testing.T) { // JWTFile setup baseJWTPath := t.TempDir() + // baseJWTPath := "/tmp/natsCfg3" + // os.Mkdir("/tmp/natsCfg3", 0644) jwtFilePath := path.Join(baseJWTPath, "u.creds") if err := os.WriteFile(jwtFilePath, []byte(`-----BEGIN NATS USER JWT----- eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJIMkwzWEtSTVoyMklDSFBGSDRXQzM1U0hLRVY3RVZUTEJERlpESVhTN0xOWEhCNkhUV0ZBIiwiaWF0IjoxNzI2NzUyOTAzLCJpc3MiOiJBQktCWlJVVFY0M1NZN1E2VlA3TVBYTldBQzdOTVZCUzJGUEpRMzZHQ1dWNVZIRzVCVVlFNTRGSSIsInN1YiI6IlVCSUtHTlRPRFJPTU8yWEdZTk5TQ1hQNUNLSlBUSVRWUTY1TjdZVEZQWVFKNFdFWVY3T0xPTkE1IiwibmF0cyI6eyJwdWIiOnt9LCJzdWIiOnt9LCJzdWJzIjotMSwiZGF0YSI6LTEsInBheWxvYWQiOi0xLCJpc3N1ZXJfYWNjb3VudCI6IkFCVFZITzJNUkVOQlcyTk81N0tXSzZMWENaU0g0M09IUEtYRFlHWEJENlFZWlFDSkhGRURNNVFYIiwidHlwZSI6InVzZXIiLCJ2ZXJzaW9uIjoyfX0.rIFeciJthv_V4OfRc1wQXxk7-E3Wa6-87suJI_sn808Az7psEdvFagNosCqgdGd_d7AUDhY2eCipcIEZxnPeBA @@ -463,88 +412,109 @@ resolver_preload: { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - exec.Command("pkill", "nats-server") cmd := exec.Command("nats-server", tc.serverFlags...) if err := cmd.Start(); err != nil { t.Fatal(err) // most probably not installed } - time.Sleep(50 * time.Millisecond) - defer cmd.Process.Kill() + t.Cleanup(func() { cmd.Process.Kill() }) + var js jetstream.JetStream // to reuse jetstream instance - configJSON := fmt.Sprintf(natsCfg, tc.sourcePath, tc.readerOpts) - cfgPath := t.TempDir() - filePath := filepath.Join(cfgPath, "cgrates.json") - err := os.WriteFile(filePath, []byte(configJSON), 0644) - if err != nil { - t.Fatal(err) - } - cfg, err := config.NewCGRConfigFromPath(cfgPath) - if err != nil { - t.Fatal(err) - } + ng := engine.TestEngine{ + ConfigJSON: fmt.Sprintf(natsCfg, tc.sourcePath, tc.readerOpts), + PreStartHook: func(t *testing.T, c *config.CGRConfig) { + rdrOpts := c.ERsCfg().ReaderCfg("nats_reader").Opts.NATS + nop, err := GetNatsOpts(rdrOpts, c.GeneralCfg().NodeID, time.Second) + if err != nil { + t.Fatal(err) + } + nc := connectToNATSServer(t, tc.sourcePath, nop...) - rdrCfgOpts := cfg.ERsCfg().Readers[1].Opts.NATS - nop, err := GetNatsOpts(rdrCfgOpts, cfg.GeneralCfg().NodeID, 2*time.Second) - if err != nil { - t.Fatal(err) - } - - // Establish a connection to nats. - nc, err := nats.Connect(tc.sourcePath, nop...) - if err != nil { - t.Fatal(err) - } - defer nc.Close() - - // Initialize a stream manager and create a stream. - js, err := jetstream.New(nc) - if err != nil { - t.Fatal(err) - } - if _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ - Name: "stream", - Subjects: []string{"cgrates_cdrs", "cgrates_cdrs_processed"}, - }); err != nil { - t.Fatal(err) - } - defer js.DeleteStream(context.Background(), "stream") - - if _, err = engine.StartEngine(cfgPath, *utils.WaitRater); err != nil { - t.Fatal(err) - } - defer engine.KillEngine(*utils.WaitRater) - - client, err := jsonrpc.Dial(utils.TCP, cfg.ListenCfg().RPCJSONListen) - if err != nil { - t.Fatal(err) + // Initialize a stream manager and create a stream. + js, err = jetstream.New(nc) + if err != nil { + t.Fatal(err) + } + if _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ + Name: "stream", + Subjects: []string{"cgrates_cdrs", "cgrates_cdrs_processed"}, + }); err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + if err := js.DeleteStream(context.Background(), "stream"); err != nil { + t.Error(err) + } + }) + }, } + client, _ := ng.Run(t) for i := 0; i < 3; i++ { - randomCGRID := utils.UUIDSha1Prefix() - expData := fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID) + key := fmt.Sprintf("key%d", i+1) + expData := fmt.Sprintf(`{"Key": "%s"}`, key) if _, err := js.Publish(context.Background(), "cgrates_cdrs", []byte(expData)); err != nil { t.Error(err) } - - time.Sleep(20 * time.Millisecond) // wait for exports - - var cgrID any - if err = client.Call(context.Background(), utils.CacheSv1GetItem, &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheUCH, - ItemID: "CGRID", - }, - }, &cgrID); err != nil { - t.Error(err) - } else if cgrID != randomCGRID { - t.Errorf("expected %v, received %v", randomCGRID, cgrID) - } + checkNATSExports(t, client, key) } }) } } +func connectToNATSServer(t *testing.T, url string, opts ...nats.Option) *nats.Conn { + t.Helper() + deadline := time.Now().Add(500 * time.Millisecond) + time.Sleep(5 * time.Millisecond) // takes around 5ms for the server to be available + fib := utils.FibDuration(time.Millisecond, 0) + for time.Now().Before(deadline) { + nc, err := nats.Connect(url, opts...) + if err == nil { // successfully connected + t.Cleanup(func() { + nc.Close() + }) + return nc + } + time.Sleep(fib()) + } + + t.Fatalf("NATS server did not become available within %s", time.Second) + return nil +} + +func checkNATSExports(t *testing.T, client *birpc.Client, wantKey any) { + t.Helper() + deadline := time.Now().Add(500 * time.Millisecond) + time.Sleep(2 * time.Millisecond) // takes around 1-2ms for the export to happen + fib := utils.FibDuration(time.Millisecond, 0) + + itemID := "Key" + var err error + var key any + for time.Now().Before(deadline) { + err = client.Call(context.Background(), utils.CacheSv1GetItem, + &utils.ArgsGetCacheItemWithAPIOpts{ + Tenant: "cgrates.org", + ArgsGetCacheItem: utils.ArgsGetCacheItem{ + CacheID: utils.CacheUCH, + ItemID: itemID, + }, + }, &key) + + if err == nil && key == wantKey { + return + } + time.Sleep(fib()) + } + + if err != nil { + t.Errorf("CacheSv1.GetItem(%q) unexpected err: %q", itemID, err) + return + } + if key != wantKey { + t.Errorf("CacheSv1.GetItem(%q)=%q, want %q", itemID, key, wantKey) + } +} + /* In order to generate the resolver.conf and u.creds for the jetstream test, run the following: