From 1baa2cc38141597c75b4ea7dfbae3da8bd26de61 Mon Sep 17 00:00:00 2001 From: nickolasdaniel Date: Mon, 19 Jul 2021 12:11:25 +0300 Subject: [PATCH] Completed NATS ees integration test --- data/conf/samples/ees/cgrates.json | 9 +++ ees/nats_it_test.go | 89 +++++++++++++++++++++++++++++- 2 files changed, 95 insertions(+), 3 deletions(-) diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index 605858261..3cd215d85 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -422,6 +422,15 @@ "natsJetStream": true, "natsSubject": "processed_cdrs", } + }, + { + "id": "NatsJsonMapExporter2", + "type": "*nats_json_map", + "export_path": "nats://localhost:4222", + "attempts": 1, + "opts": { + "natsSubject": "processed_cdrs", + } } ] }, diff --git a/ees/nats_it_test.go b/ees/nats_it_test.go index 1556b3f95..ae3a5c77d 100644 --- a/ees/nats_it_test.go +++ b/ees/nats_it_test.go @@ -29,7 +29,7 @@ import ( "github.com/nats-io/nats.go" ) -func TestNatsEE(t *testing.T) { +func TestNatsEEJetStream(t *testing.T) { testCreateDirectory(t) var err error cmd := exec.Command("nats-server", "-js") // Start the nats-server. @@ -54,7 +54,7 @@ func TestNatsEE(t *testing.T) { t.Fatal(err) } - nop, err := engine.GetNatsOpts(cfg.EEsCfg().Exporters[5].Opts, "natsTest", time.Second) + nop, err := engine.GetNatsOpts(cfg.EEsCfg().Exporters[idx].Opts, "natsTest", time.Second) if err != nil { t.Fatal(err) } @@ -106,5 +106,88 @@ func TestNatsEE(t *testing.T) { t.Fatal(err) } testCleanDirectory(t) - // fmt.Println(string((<-ch).Data)) + expected := `{"Account":"1001","Destination":"1002"}` + // fmt.Println((<-ch).Data) + select { + case data := <-ch: + if expected != string(data.Data) { + t.Fatalf("Expected %v \n but received \n %v", expected, string(data.Data)) + } + case <-time.After(50 * time.Millisecond): + t.Fatal("Time limit exceeded") + } +} + +func TestNatsEE(t *testing.T) { + testCreateDirectory(t) + exec.Command("pkill", "nats-server") + + cmd := exec.Command("nats-server") + if err := cmd.Start(); err != nil { + t.Fatal(err) + } + time.Sleep(50 * time.Millisecond) + defer cmd.Process.Kill() + + cfgPath := path.Join(*dataDir, "conf", "samples", "ees") + cfg, err := config.NewCGRConfigFromPath(cfgPath) + if err != nil { + t.Fatal(err) + } + var idx int + for idx = range cfg.EEsCfg().Exporters { + if cfg.EEsCfg().Exporters[idx].ID == "NatsJsonMapExporter2" { + break + } + } + evExp, err := NewEventExporter(cfg, idx, new(engine.FilterS)) + if err != nil { + t.Fatal(err) + } + + nop, err := engine.GetNatsOpts(cfg.EEsCfg().Exporters[idx].Opts, "natsTest", time.Second) + if err != nil { + t.Fatal(err) + } + nc, err := nats.Connect("nats://localhost:4222", nop...) + if err != nil { + t.Fatal(err) + } + + ch := make(chan *nats.Msg, 3) + _, err = nc.ChanQueueSubscribe("processed_cdrs", "test3", ch) + if err != nil { + t.Fatal(err) + } + + defer nc.Drain() + + cgrEv := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + "Account": "1001", + "Destination": "1002", + }, + } + if err := evExp.ExportEvent(cgrEv); err != nil { + t.Fatal(err) + } + testCleanDirectory(t) + expected := `{"Account":"1001","Destination":"1002"}` + // fmt.Println((<-ch).Data) + select { + case data := <-ch: + if expected != string(data.Data) { + t.Fatalf("Expected %v \n but received \n %v", expected, string(data.Data)) + } + case <-time.After(50 * time.Millisecond): + t.Fatal("Time limit exceeded") + } +} + +func TestNatsEEJetStreamRepeat(t *testing.T) { + for i := 0; i < 20; i++ { + time.Sleep(1 * time.Second) + t.Run("TestNatsEEJetStream", TestNatsEEJetStream) + } }