mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Added full tests for nats ers
This commit is contained in:
committed by
Dan Christian Bogos
parent
09985ce8f9
commit
aa22adfc3c
@@ -439,6 +439,7 @@ const CGRATES_CFG_JSON = `
|
||||
|
||||
// nats
|
||||
// "natsJetStream": false, // controls if the nats reader uses the JetStream
|
||||
// "natsConsumerName": "cgrates", // in case of JetStream the name of the consumer
|
||||
"natsSubject": "cgrates_cdrs", // the subject from were the events are read
|
||||
// "natsQueueID": "", // the queue id the consumer listen to
|
||||
// "natsJWTFile": "", // the path to the JWT file( can be the chained file or the user file)
|
||||
|
||||
@@ -578,6 +578,7 @@ func testCGRConfigReloadERs(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"xmlRootPath": "",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -598,6 +599,7 @@ func testCGRConfigReloadERs(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"xmlRootPath": "",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -785,6 +787,7 @@ func testCgrCfgV1ReloadConfigSection(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"xmlRootPath": "",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
"partial_commit_fields": []interface{}{},
|
||||
},
|
||||
@@ -807,6 +810,7 @@ func testCgrCfgV1ReloadConfigSection(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"xmlRootPath": "",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
"partial_commit_fields": []interface{}{},
|
||||
},
|
||||
|
||||
@@ -105,6 +105,7 @@ func (pstr *NatsPoster) parseOpt(opts map[string]interface{}, nodeID string, con
|
||||
return
|
||||
}
|
||||
}
|
||||
pstr.subject = utils.DefaultQueueID
|
||||
if vals, has := opts[utils.NatsSubject]; has {
|
||||
pstr.subject = utils.IfaceAsString(vals)
|
||||
}
|
||||
|
||||
21
ers/nats.go
21
ers/nats.go
@@ -70,10 +70,11 @@ type NatsER struct {
|
||||
rdrErr chan error
|
||||
cap chan struct{}
|
||||
|
||||
subject string
|
||||
queueID string
|
||||
jetStream bool
|
||||
opts []nats.Option
|
||||
subject string
|
||||
queueID string
|
||||
jetStream bool
|
||||
consumerName string
|
||||
opts []nats.Option
|
||||
|
||||
poster *engine.NatsPoster
|
||||
}
|
||||
@@ -104,7 +105,7 @@ func (rdr *NatsER) Serve() (err error) {
|
||||
}
|
||||
if _, err = js.QueueSubscribe(rdr.subject, rdr.queueID, func(msg *nats.Msg) {
|
||||
ch <- msg
|
||||
}); err != nil {
|
||||
}, nats.Durable(rdr.consumerName)); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -118,13 +119,11 @@ func (rdr *NatsER) Serve() (err error) {
|
||||
fmt.Sprintf("<%s> stop monitoring nats path <%s>",
|
||||
utils.ERs, rdr.Config().SourcePath))
|
||||
nc.Drain()
|
||||
if rdr.poster != nil {
|
||||
rdr.poster.Close()
|
||||
}
|
||||
return
|
||||
case msg := <-ch:
|
||||
if err = rdr.processMessage(msg.Data); err != nil {
|
||||
nc.Drain() // ignore this error(if any) in favor of the error processMessage
|
||||
return
|
||||
}
|
||||
|
||||
go func(msg *nats.Msg) {
|
||||
if err := rdr.processMessage(msg.Data); err != nil {
|
||||
utils.Logger.Warning(
|
||||
@@ -196,6 +195,8 @@ func (rdr *NatsER) processOpts() (err error) {
|
||||
rdr.subject = utils.IfaceAsString(rdr.Config().Opts[utils.NatsSubject])
|
||||
rdr.queueID = utils.FirstNonEmpty(utils.IfaceAsString(rdr.Config().Opts[utils.NatsQueueID]),
|
||||
rdr.cgrCfg.GeneralCfg().NodeID)
|
||||
rdr.consumerName = utils.FirstNonEmpty(utils.IfaceAsString(rdr.Config().Opts[utils.NatsConsumerName]),
|
||||
utils.CGRateSLwr)
|
||||
if useJetStreamVal, has := rdr.Config().Opts[utils.NatsJetStream]; has {
|
||||
if rdr.jetStream, err = utils.IfaceAsBool(useJetStreamVal); err != nil {
|
||||
return
|
||||
|
||||
@@ -22,6 +22,7 @@ package ers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"testing"
|
||||
@@ -33,7 +34,18 @@ import (
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
func TestNatsER(t *testing.T) {
|
||||
func TestNatsERJetStream(t *testing.T) {
|
||||
// start the nats-server
|
||||
exec.Command("pkill", "nats-server")
|
||||
|
||||
cmd := exec.Command("nats-server", "-js")
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatal(err) // most probably not installed
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
defer cmd.Process.Kill()
|
||||
//
|
||||
|
||||
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
|
||||
"ers": { // EventReaderService
|
||||
"enabled": true, // starts the EventReader service: <true|false>
|
||||
@@ -45,13 +57,171 @@ func TestNatsER(t *testing.T) {
|
||||
"run_delay": "-1",
|
||||
"concurrent_requests": 1024,
|
||||
"source_path": "nats://localhost:4222",
|
||||
// "processed_path": "/var/spool/cgrates/ers/out",
|
||||
"processed_path": "",
|
||||
"tenant": "cgrates.org",
|
||||
"filters": [],
|
||||
"flags": [],
|
||||
"fields":[
|
||||
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"},
|
||||
],
|
||||
"opts": {
|
||||
"natsJetStream": true,
|
||||
"natsSubjectProcessed": "processed_cdrs",
|
||||
}
|
||||
},
|
||||
],
|
||||
},
|
||||
}`)
|
||||
utils.Logger.SetLogLevel(7)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := cfg.CheckConfigSanity(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rdrEvents = make(chan *erEvent, 1)
|
||||
rdrErr = make(chan error, 1)
|
||||
rdrExit = make(chan struct{}, 1)
|
||||
|
||||
if rdr, err = NewNatsER(cfg, 1, rdrEvents, make(chan *erEvent, 1),
|
||||
rdrErr, new(engine.FilterS), rdrExit); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
nc, err := nats.Connect(rdr.Config().SourcePath, nats.Timeout(time.Second),
|
||||
nats.DrainTimeout(time.Second))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer nc.Drain()
|
||||
|
||||
js, err := nc.JetStream()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for name := range js.StreamNames() {
|
||||
if name == "test" {
|
||||
if err = js.DeleteStream("test"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
break
|
||||
}
|
||||
if name == "test2" {
|
||||
if err = js.DeleteStream("test2"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
if _, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "test",
|
||||
Subjects: []string{utils.DefaultQueueID},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err = js.PurgeStream("test"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if _, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: "test2",
|
||||
Subjects: []string{"processed_cdrs"},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err = js.PurgeStream("test2"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ch := make(chan *nats.Msg, 3)
|
||||
_, err = js.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) {
|
||||
ch <- msg
|
||||
}, nats.Durable("test4"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
go rdr.Serve()
|
||||
runtime.Gosched()
|
||||
time.Sleep(10 * time.Nanosecond)
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
randomCGRID := utils.UUIDSha1Prefix()
|
||||
expData := fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID)
|
||||
if _, err = js.Publish(utils.DefaultQueueID, []byte(expData)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
nc.FlushTimeout(time.Second)
|
||||
nc.Flush()
|
||||
|
||||
select {
|
||||
case err = <-rdrErr:
|
||||
t.Fatal(err)
|
||||
case ev := <-rdrEvents:
|
||||
if ev.rdrCfg.ID != "nats" {
|
||||
t.Fatalf("Expected 'nats' received `%s`", ev.rdrCfg.ID)
|
||||
}
|
||||
expected := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: ev.cgrEvent.ID,
|
||||
Time: ev.cgrEvent.Time,
|
||||
Event: map[string]interface{}{
|
||||
"CGRID": randomCGRID,
|
||||
},
|
||||
APIOpts: map[string]interface{}{},
|
||||
}
|
||||
if !reflect.DeepEqual(ev.cgrEvent, expected) {
|
||||
t.Fatalf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))
|
||||
}
|
||||
select {
|
||||
case msg := <-ch:
|
||||
if expData != string(msg.Data) {
|
||||
t.Errorf("Expected %q ,received %q", expData, string(msg.Data))
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("Timeout")
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("Timeout")
|
||||
}
|
||||
}
|
||||
close(rdrExit)
|
||||
}
|
||||
|
||||
func TestNatsER(t *testing.T) {
|
||||
// start the nats-server
|
||||
exec.Command("pkill", "nats-server")
|
||||
|
||||
cmd := exec.Command("nats-server")
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatal(err) // most probably not installed
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
defer cmd.Process.Kill()
|
||||
//
|
||||
|
||||
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
|
||||
"ers": { // EventReaderService
|
||||
"enabled": true, // starts the EventReader service: <true|false>
|
||||
"sessions_conns":["*localhost"],
|
||||
"readers": [
|
||||
{
|
||||
"id": "nats",
|
||||
"type": "*nats_json_map",
|
||||
"run_delay": "-1",
|
||||
"concurrent_requests": 1024,
|
||||
"source_path": "nats://localhost:4222",
|
||||
"processed_path": "",
|
||||
"tenant": "cgrates.org",
|
||||
"filters": [],
|
||||
"flags": [],
|
||||
"fields":[
|
||||
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"},
|
||||
],
|
||||
"opts": {
|
||||
"natsSubjectProcessed": "processed_cdrs",
|
||||
}
|
||||
},
|
||||
],
|
||||
},
|
||||
@@ -75,46 +245,56 @@ func TestNatsER(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// js, err := nc.JetStream()
|
||||
// if err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
go rdr.Serve()
|
||||
runtime.Gosched()
|
||||
time.Sleep(time.Second)
|
||||
randomCGRID := utils.UUIDSha1Prefix()
|
||||
if err = nc.Publish(utils.DefaultQueueID, []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID))); err != nil {
|
||||
ch := make(chan *nats.Msg, 3)
|
||||
_, err = nc.ChanQueueSubscribe("processed_cdrs", "test3", ch)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// if _, err = js.Publish(utils.DefaultQueueID, []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID))); err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
|
||||
nc.FlushTimeout(time.Second)
|
||||
nc.Flush()
|
||||
nc.Drain()
|
||||
defer nc.Drain()
|
||||
go rdr.Serve()
|
||||
runtime.Gosched()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
for i := 0; i < 3; i++ {
|
||||
randomCGRID := utils.UUIDSha1Prefix()
|
||||
expData := fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID)
|
||||
if err = nc.Publish(utils.DefaultQueueID, []byte(expData)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
select {
|
||||
case err = <-rdrErr:
|
||||
t.Error(err)
|
||||
case ev := <-rdrEvents:
|
||||
if ev.rdrCfg.ID != "nats" {
|
||||
t.Errorf("Expected 'kakfa' received `%s`", ev.rdrCfg.ID)
|
||||
nc.FlushTimeout(time.Second)
|
||||
nc.Flush()
|
||||
|
||||
select {
|
||||
case err = <-rdrErr:
|
||||
t.Fatal(err)
|
||||
case ev := <-rdrEvents:
|
||||
if ev.rdrCfg.ID != "nats" {
|
||||
t.Fatalf("Expected 'nats' received `%s`", ev.rdrCfg.ID)
|
||||
}
|
||||
expected := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: ev.cgrEvent.ID,
|
||||
Time: ev.cgrEvent.Time,
|
||||
Event: map[string]interface{}{
|
||||
"CGRID": randomCGRID,
|
||||
},
|
||||
APIOpts: map[string]interface{}{},
|
||||
}
|
||||
if !reflect.DeepEqual(ev.cgrEvent, expected) {
|
||||
t.Fatalf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))
|
||||
}
|
||||
select {
|
||||
case msg := <-ch:
|
||||
if expData != string(msg.Data) {
|
||||
t.Errorf("Expected %q ,received %q", expData, string(msg.Data))
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("Timeout")
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("Timeout")
|
||||
}
|
||||
expected := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: ev.cgrEvent.ID,
|
||||
Time: ev.cgrEvent.Time,
|
||||
Event: map[string]interface{}{
|
||||
"CGRID": randomCGRID,
|
||||
},
|
||||
APIOpts: map[string]interface{}{},
|
||||
}
|
||||
if !reflect.DeepEqual(ev.cgrEvent, expected) {
|
||||
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("Timeout")
|
||||
}
|
||||
close(rdrExit)
|
||||
}
|
||||
|
||||
@@ -166,6 +166,8 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium
|
||||
* [SessionS] The sessions are no longer terminated on shutdown if the replication_conns are set
|
||||
* [FilterS] Added *regex filter
|
||||
* [RSRParsers] Added *len dataconverter
|
||||
* [ERs] Added *nats_json_map
|
||||
* [EEs] Added *nats_json_map
|
||||
|
||||
-- DanB <danb@cgrates.org> Wed, 19 Feb 2020 13:25:52 +0200
|
||||
|
||||
|
||||
@@ -2604,6 +2604,7 @@ const (
|
||||
// nats
|
||||
NatsSubject = "natsSubject"
|
||||
NatsQueueID = "natsQueueID"
|
||||
NatsConsumerName = "natsConsumerName"
|
||||
NatsJWTFile = "natsJWTFile"
|
||||
NatsSeedFile = "natsSeedFile"
|
||||
NatsClientCertificate = "natsClientCertificate"
|
||||
|
||||
Reference in New Issue
Block a user