Completed NATS ees integration test

This commit is contained in:
nickolasdaniel
2021-07-19 12:11:25 +03:00
committed by Dan Christian Bogos
parent cda6aeb32e
commit 1baa2cc381
2 changed files with 95 additions and 3 deletions

View File

@@ -29,7 +29,7 @@ import (
"github.com/nats-io/nats.go"
)
func TestNatsEE(t *testing.T) {
func TestNatsEEJetStream(t *testing.T) {
testCreateDirectory(t)
var err error
cmd := exec.Command("nats-server", "-js") // Start the nats-server.
@@ -54,7 +54,7 @@ func TestNatsEE(t *testing.T) {
t.Fatal(err)
}
nop, err := engine.GetNatsOpts(cfg.EEsCfg().Exporters[5].Opts, "natsTest", time.Second)
nop, err := engine.GetNatsOpts(cfg.EEsCfg().Exporters[idx].Opts, "natsTest", time.Second)
if err != nil {
t.Fatal(err)
}
@@ -106,5 +106,88 @@ func TestNatsEE(t *testing.T) {
t.Fatal(err)
}
testCleanDirectory(t)
// fmt.Println(string((<-ch).Data))
expected := `{"Account":"1001","Destination":"1002"}`
// fmt.Println((<-ch).Data)
select {
case data := <-ch:
if expected != string(data.Data) {
t.Fatalf("Expected %v \n but received \n %v", expected, string(data.Data))
}
case <-time.After(50 * time.Millisecond):
t.Fatal("Time limit exceeded")
}
}
func TestNatsEE(t *testing.T) {
testCreateDirectory(t)
exec.Command("pkill", "nats-server")
cmd := exec.Command("nats-server")
if err := cmd.Start(); err != nil {
t.Fatal(err)
}
time.Sleep(50 * time.Millisecond)
defer cmd.Process.Kill()
cfgPath := path.Join(*dataDir, "conf", "samples", "ees")
cfg, err := config.NewCGRConfigFromPath(cfgPath)
if err != nil {
t.Fatal(err)
}
var idx int
for idx = range cfg.EEsCfg().Exporters {
if cfg.EEsCfg().Exporters[idx].ID == "NatsJsonMapExporter2" {
break
}
}
evExp, err := NewEventExporter(cfg, idx, new(engine.FilterS))
if err != nil {
t.Fatal(err)
}
nop, err := engine.GetNatsOpts(cfg.EEsCfg().Exporters[idx].Opts, "natsTest", time.Second)
if err != nil {
t.Fatal(err)
}
nc, err := nats.Connect("nats://localhost:4222", nop...)
if err != nil {
t.Fatal(err)
}
ch := make(chan *nats.Msg, 3)
_, err = nc.ChanQueueSubscribe("processed_cdrs", "test3", ch)
if err != nil {
t.Fatal(err)
}
defer nc.Drain()
cgrEv := &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
"Account": "1001",
"Destination": "1002",
},
}
if err := evExp.ExportEvent(cgrEv); err != nil {
t.Fatal(err)
}
testCleanDirectory(t)
expected := `{"Account":"1001","Destination":"1002"}`
// fmt.Println((<-ch).Data)
select {
case data := <-ch:
if expected != string(data.Data) {
t.Fatalf("Expected %v \n but received \n %v", expected, string(data.Data))
}
case <-time.After(50 * time.Millisecond):
t.Fatal("Time limit exceeded")
}
}
func TestNatsEEJetStreamRepeat(t *testing.T) {
for i := 0; i < 20; i++ {
time.Sleep(1 * time.Second)
t.Run("TestNatsEEJetStream", TestNatsEEJetStream)
}
}