Added full tests for nats ers

This commit is contained in:
Trial97
2021-06-25 15:30:29 +03:00
committed by Dan Christian Bogos
parent 81f2d722f6
commit e7bbbbdb2d
7 changed files with 236 additions and 49 deletions

View File

@@ -380,6 +380,7 @@ const CGRATES_CFG_JSON = `
// nats
// "natsJetStream": false, // controls if the nats reader uses the JetStream
// "natsConsumerName": "cgrates", // in case of JetStream the name of the consumer
"natsSubject": "cgrates_cdrs", // the subject from were the events are read
// "natsQueueID": "", // the queue id the consumer listen to
// "natsJWTFile": "", // the path to the JWT file( can be the chained file or the user file)

View File

@@ -457,6 +457,7 @@ func testCGRConfigReloadERs(t *testing.T) {
"csvRowLength": 0.,
"partialOrderField": "~*req.AnswerTime",
"xmlRootPath": "",
"natsSubject": "cgrates_cdrs",
},
},
{
@@ -477,6 +478,7 @@ func testCGRConfigReloadERs(t *testing.T) {
"csvRowLength": 0.,
"partialOrderField": "~*req.AnswerTime",
"xmlRootPath": "",
"natsSubject": "cgrates_cdrs",
},
},
},
@@ -664,6 +666,7 @@ func testCgrCfgV1ReloadConfigSection(t *testing.T) {
"csvRowLength": 0.,
"partialOrderField": "~*req.AnswerTime",
"xmlRootPath": "",
"natsSubject": "cgrates_cdrs",
},
"partial_commit_fields": []interface{}{},
},
@@ -686,6 +689,7 @@ func testCgrCfgV1ReloadConfigSection(t *testing.T) {
"csvRowLength": 0.,
"partialOrderField": "~*req.AnswerTime",
"xmlRootPath": "",
"natsSubject": "cgrates_cdrs",
},
"partial_commit_fields": []interface{}{},
},

View File

@@ -105,6 +105,7 @@ func (pstr *NatsPoster) parseOpt(opts map[string]interface{}, nodeID string, con
return
}
}
pstr.subject = utils.DefaultQueueID
if vals, has := opts[utils.NatsSubject]; has {
pstr.subject = utils.IfaceAsString(vals)
}

View File

@@ -71,10 +71,11 @@ type NatsER struct {
rdrErr chan error
cap chan struct{}
subject string
queueID string
jetStream bool
opts []nats.Option
subject string
queueID string
jetStream bool
consumerName string
opts []nats.Option
poster *engine.NatsPoster
}
@@ -105,7 +106,7 @@ func (rdr *NatsER) Serve() (err error) {
}
if _, err = js.QueueSubscribe(rdr.subject, rdr.queueID, func(msg *nats.Msg) {
ch <- msg
}); err != nil {
}, nats.Durable(rdr.consumerName)); err != nil {
return
}
}
@@ -119,13 +120,11 @@ func (rdr *NatsER) Serve() (err error) {
fmt.Sprintf("<%s> stop monitoring nats path <%s>",
utils.ERs, rdr.Config().SourcePath))
nc.Drain()
if rdr.poster != nil {
rdr.poster.Close()
}
return
case msg := <-ch:
if err = rdr.processMessage(msg.Data); err != nil {
nc.Drain() // ignore this error(if any) in favor of the error processMessage
return
}
go func(msg *nats.Msg) {
if err := rdr.processMessage(msg.Data); err != nil {
utils.Logger.Warning(
@@ -197,6 +196,8 @@ func (rdr *NatsER) processOpts() (err error) {
rdr.subject = utils.IfaceAsString(rdr.Config().Opts[utils.NatsSubject])
rdr.queueID = utils.FirstNonEmpty(utils.IfaceAsString(rdr.Config().Opts[utils.NatsQueueID]),
rdr.cgrCfg.GeneralCfg().NodeID)
rdr.consumerName = utils.FirstNonEmpty(utils.IfaceAsString(rdr.Config().Opts[utils.NatsConsumerName]),
utils.CGRateSLwr)
if useJetStreamVal, has := rdr.Config().Opts[utils.NatsJetStream]; has {
if rdr.jetStream, err = utils.IfaceAsBool(useJetStreamVal); err != nil {
return

View File

@@ -22,6 +22,7 @@ package ers
import (
"fmt"
"os/exec"
"reflect"
"runtime"
"testing"
@@ -33,7 +34,18 @@ import (
"github.com/nats-io/nats.go"
)
func TestNatsER(t *testing.T) {
func TestNatsERJetStream(t *testing.T) {
// start the nats-server
exec.Command("pkill", "nats-server")
cmd := exec.Command("nats-server", "-js")
if err := cmd.Start(); err != nil {
t.Fatal(err) // most probably not installed
}
time.Sleep(50 * time.Millisecond)
defer cmd.Process.Kill()
//
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
"ers": { // EventReaderService
"enabled": true, // starts the EventReader service: <true|false>
@@ -45,13 +57,170 @@ func TestNatsER(t *testing.T) {
"run_delay": "-1",
"concurrent_requests": 1024,
"source_path": "nats://localhost:4222",
// "processed_path": "/var/spool/cgrates/ers/out",
"processed_path": "",
"tenant": "cgrates.org",
"filters": [],
"flags": [],
"fields":[
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"},
],
"opts": {
"natsJetStream": true,
"natsSubjectProcessed": "processed_cdrs",
}
},
],
},
}`)
utils.Logger.SetLogLevel(7)
if err != nil {
t.Fatal(err)
}
if err := cfg.CheckConfigSanity(); err != nil {
t.Fatal(err)
}
rdrEvents = make(chan *erEvent, 1)
rdrErr = make(chan error, 1)
rdrExit = make(chan struct{}, 1)
if rdr, err = NewNatsER(cfg, 1, rdrEvents, make(chan *erEvent, 1),
rdrErr, new(engine.FilterS), rdrExit); err != nil {
t.Fatal(err)
}
nc, err := nats.Connect(rdr.Config().SourcePath, nats.Timeout(time.Second),
nats.DrainTimeout(time.Second))
if err != nil {
t.Fatal(err)
}
defer nc.Drain()
js, err := nc.JetStream()
if err != nil {
t.Fatal(err)
}
for name := range js.StreamNames() {
if name == "test" {
if err = js.DeleteStream("test"); err != nil {
t.Fatal(err)
}
break
}
if name == "test2" {
if err = js.DeleteStream("test2"); err != nil {
t.Fatal(err)
}
break
}
}
if _, err = js.AddStream(&nats.StreamConfig{
Name: "test",
Subjects: []string{utils.DefaultQueueID},
}); err != nil {
t.Fatal(err)
}
if err = js.PurgeStream("test"); err != nil {
t.Fatal(err)
}
if _, err = js.AddStream(&nats.StreamConfig{
Name: "test2",
Subjects: []string{"processed_cdrs"},
}); err != nil {
t.Fatal(err)
}
if err = js.PurgeStream("test2"); err != nil {
t.Fatal(err)
}
ch := make(chan *nats.Msg, 3)
_, err = js.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) {
ch <- msg
}, nats.Durable("test4"))
if err != nil {
t.Fatal(err)
}
go rdr.Serve()
runtime.Gosched()
time.Sleep(10 * time.Nanosecond)
for i := 0; i < 3; i++ {
randomCGRID := utils.UUIDSha1Prefix()
expData := fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID)
if _, err = js.Publish(utils.DefaultQueueID, []byte(expData)); err != nil {
t.Fatal(err)
}
nc.FlushTimeout(time.Second)
nc.Flush()
select {
case err = <-rdrErr:
t.Fatal(err)
case ev := <-rdrEvents:
if ev.rdrCfg.ID != "nats" {
t.Fatalf("Expected 'nats' received `%s`", ev.rdrCfg.ID)
}
expected := &utils.CGREvent{
Tenant: "cgrates.org",
ID: ev.cgrEvent.ID,
Event: map[string]interface{}{
"CGRID": randomCGRID,
},
APIOpts: map[string]interface{}{},
}
if !reflect.DeepEqual(ev.cgrEvent, expected) {
t.Fatalf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))
}
select {
case msg := <-ch:
if expData != string(msg.Data) {
t.Errorf("Expected %q ,received %q", expData, string(msg.Data))
}
case <-time.After(10 * time.Second):
t.Fatal("Timeout")
}
case <-time.After(10 * time.Second):
t.Fatal("Timeout")
}
}
close(rdrExit)
}
func TestNatsER(t *testing.T) {
// start the nats-server
exec.Command("pkill", "nats-server")
cmd := exec.Command("nats-server")
if err := cmd.Start(); err != nil {
t.Fatal(err) // most probably not installed
}
time.Sleep(10 * time.Millisecond)
defer cmd.Process.Kill()
//
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
"ers": { // EventReaderService
"enabled": true, // starts the EventReader service: <true|false>
"sessions_conns":["*localhost"],
"readers": [
{
"id": "nats",
"type": "*nats_json_map",
"run_delay": "-1",
"concurrent_requests": 1024,
"source_path": "nats://localhost:4222",
"processed_path": "",
"tenant": "cgrates.org",
"filters": [],
"flags": [],
"fields":[
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"},
],
"opts": {
"natsSubjectProcessed": "processed_cdrs",
}
},
],
},
@@ -75,45 +244,55 @@ func TestNatsER(t *testing.T) {
if err != nil {
t.Fatal(err)
}
// js, err := nc.JetStream()
// if err != nil {
// t.Fatal(err)
// }
go rdr.Serve()
runtime.Gosched()
time.Sleep(time.Second)
randomCGRID := utils.UUIDSha1Prefix()
if err = nc.Publish(utils.DefaultQueueID, []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID))); err != nil {
ch := make(chan *nats.Msg, 3)
_, err = nc.ChanQueueSubscribe("processed_cdrs", "test3", ch)
if err != nil {
t.Fatal(err)
}
// if _, err = js.Publish(utils.DefaultQueueID, []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID))); err != nil {
// t.Fatal(err)
// }
nc.FlushTimeout(time.Second)
nc.Flush()
nc.Drain()
defer nc.Drain()
go rdr.Serve()
runtime.Gosched()
time.Sleep(100 * time.Millisecond)
for i := 0; i < 3; i++ {
randomCGRID := utils.UUIDSha1Prefix()
expData := fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID)
if err = nc.Publish(utils.DefaultQueueID, []byte(expData)); err != nil {
t.Fatal(err)
}
select {
case err = <-rdrErr:
t.Error(err)
case ev := <-rdrEvents:
if ev.rdrCfg.ID != "nats" {
t.Errorf("Expected 'kakfa' received `%s`", ev.rdrCfg.ID)
nc.FlushTimeout(time.Second)
nc.Flush()
select {
case err = <-rdrErr:
t.Fatal(err)
case ev := <-rdrEvents:
if ev.rdrCfg.ID != "nats" {
t.Fatalf("Expected 'nats' received `%s`", ev.rdrCfg.ID)
}
expected := &utils.CGREvent{
Tenant: "cgrates.org",
ID: ev.cgrEvent.ID,
Event: map[string]interface{}{
"CGRID": randomCGRID,
},
APIOpts: map[string]interface{}{},
}
if !reflect.DeepEqual(ev.cgrEvent, expected) {
t.Fatalf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))
}
select {
case msg := <-ch:
if expData != string(msg.Data) {
t.Errorf("Expected %q ,received %q", expData, string(msg.Data))
}
case <-time.After(10 * time.Second):
t.Fatal("Timeout")
}
case <-time.After(10 * time.Second):
t.Fatal("Timeout")
}
expected := &utils.CGREvent{
Tenant: "cgrates.org",
ID: ev.cgrEvent.ID,
Event: map[string]interface{}{
"CGRID": randomCGRID,
},
APIOpts: map[string]interface{}{},
}
if !reflect.DeepEqual(ev.cgrEvent, expected) {
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))
}
case <-time.After(10 * time.Second):
t.Fatal("Timeout")
}
close(rdrExit)
}

View File

@@ -10,7 +10,9 @@ cgrates (1.0) UNRELEASED; urgency=medium
* [SessionS] The sessions are no longer terminated on shutdown if the replication_conns are set
* [FilterS] Added *regex filter
* [DispatcherS] Removed Subsystems field in favor of filters
* [RSRParsers] Added *len dataconverter
* [RSRParsers] Added *len dataconverter
* [ERs] Added *nats_json_map
* [EEs] Added *nats_json_map
-- DanB <danb@cgrates.org> Thu, 4 May 2021 12:05:00 +0200
@@ -174,8 +176,6 @@ cgrates (0.11.0) UNRELEASED; urgency=medium
* [DataDB] Updated config options
* [StorDB] Updated config options
-- DanB <danb@cgrates.org> Wed, 19 Feb 2020 13:25:52 +0200
cgrates (0.10.0) UNRELEASED; urgency=medium
* Creating first stable branch.

View File

@@ -2336,6 +2336,7 @@ const (
// nats
NatsSubject = "natsSubject"
NatsQueueID = "natsQueueID"
NatsConsumerName = "natsConsumerName"
NatsJWTFile = "natsJWTFile"
NatsSeedFile = "natsSeedFile"
NatsClientCertificate = "natsClientCertificate"