From aa22adfc3cbff071ed36a09df0237dc6191308d9 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 25 Jun 2021 15:30:29 +0300 Subject: [PATCH] Added full tests for nats ers --- config/config_defaults.go | 1 + config/config_it_test.go | 4 + engine/pstr_nats.go | 1 + ers/nats.go | 21 ++-- ers/nats_it_test.go | 254 ++++++++++++++++++++++++++++++++------ packages/debian/changelog | 2 + utils/consts.go | 1 + 7 files changed, 237 insertions(+), 47 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index 909bbb10c..c536597e9 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -439,6 +439,7 @@ const CGRATES_CFG_JSON = ` // nats // "natsJetStream": false, // controls if the nats reader uses the JetStream + // "natsConsumerName": "cgrates", // in case of JetStream the name of the consumer "natsSubject": "cgrates_cdrs", // the subject from were the events are read // "natsQueueID": "", // the queue id the consumer listen to // "natsJWTFile": "", // the path to the JWT file( can be the chained file or the user file) diff --git a/config/config_it_test.go b/config/config_it_test.go index f423b8810..0e516afcc 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -578,6 +578,7 @@ func testCGRConfigReloadERs(t *testing.T) { "csvRowLength": 0., "partialOrderField": "~*req.AnswerTime", "xmlRootPath": "", + "natsSubject": "cgrates_cdrs", }, }, { @@ -598,6 +599,7 @@ func testCGRConfigReloadERs(t *testing.T) { "csvRowLength": 0., "partialOrderField": "~*req.AnswerTime", "xmlRootPath": "", + "natsSubject": "cgrates_cdrs", }, }, }, @@ -785,6 +787,7 @@ func testCgrCfgV1ReloadConfigSection(t *testing.T) { "csvRowLength": 0., "partialOrderField": "~*req.AnswerTime", "xmlRootPath": "", + "natsSubject": "cgrates_cdrs", }, "partial_commit_fields": []interface{}{}, }, @@ -807,6 +810,7 @@ func testCgrCfgV1ReloadConfigSection(t *testing.T) { "csvRowLength": 0., "partialOrderField": "~*req.AnswerTime", "xmlRootPath": "", + "natsSubject": "cgrates_cdrs", }, "partial_commit_fields": []interface{}{}, }, diff --git a/engine/pstr_nats.go b/engine/pstr_nats.go index 0bd511c68..05e8db4d8 100644 --- a/engine/pstr_nats.go +++ b/engine/pstr_nats.go @@ -105,6 +105,7 @@ func (pstr *NatsPoster) parseOpt(opts map[string]interface{}, nodeID string, con return } } + pstr.subject = utils.DefaultQueueID if vals, has := opts[utils.NatsSubject]; has { pstr.subject = utils.IfaceAsString(vals) } diff --git a/ers/nats.go b/ers/nats.go index 732cf600a..90c06e774 100644 --- a/ers/nats.go +++ b/ers/nats.go @@ -70,10 +70,11 @@ type NatsER struct { rdrErr chan error cap chan struct{} - subject string - queueID string - jetStream bool - opts []nats.Option + subject string + queueID string + jetStream bool + consumerName string + opts []nats.Option poster *engine.NatsPoster } @@ -104,7 +105,7 @@ func (rdr *NatsER) Serve() (err error) { } if _, err = js.QueueSubscribe(rdr.subject, rdr.queueID, func(msg *nats.Msg) { ch <- msg - }); err != nil { + }, nats.Durable(rdr.consumerName)); err != nil { return } } @@ -118,13 +119,11 @@ func (rdr *NatsER) Serve() (err error) { fmt.Sprintf("<%s> stop monitoring nats path <%s>", utils.ERs, rdr.Config().SourcePath)) nc.Drain() + if rdr.poster != nil { + rdr.poster.Close() + } return case msg := <-ch: - if err = rdr.processMessage(msg.Data); err != nil { - nc.Drain() // ignore this error(if any) in favor of the error processMessage - return - } - go func(msg *nats.Msg) { if err := rdr.processMessage(msg.Data); err != nil { utils.Logger.Warning( @@ -196,6 +195,8 @@ func (rdr *NatsER) processOpts() (err error) { rdr.subject = utils.IfaceAsString(rdr.Config().Opts[utils.NatsSubject]) rdr.queueID = utils.FirstNonEmpty(utils.IfaceAsString(rdr.Config().Opts[utils.NatsQueueID]), rdr.cgrCfg.GeneralCfg().NodeID) + rdr.consumerName = utils.FirstNonEmpty(utils.IfaceAsString(rdr.Config().Opts[utils.NatsConsumerName]), + utils.CGRateSLwr) if useJetStreamVal, has := rdr.Config().Opts[utils.NatsJetStream]; has { if rdr.jetStream, err = utils.IfaceAsBool(useJetStreamVal); err != nil { return diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go index 2c6d3c89b..27184e4b1 100644 --- a/ers/nats_it_test.go +++ b/ers/nats_it_test.go @@ -22,6 +22,7 @@ package ers import ( "fmt" + "os/exec" "reflect" "runtime" "testing" @@ -33,7 +34,18 @@ import ( "github.com/nats-io/nats.go" ) -func TestNatsER(t *testing.T) { +func TestNatsERJetStream(t *testing.T) { + // start the nats-server + exec.Command("pkill", "nats-server") + + cmd := exec.Command("nats-server", "-js") + if err := cmd.Start(); err != nil { + t.Fatal(err) // most probably not installed + } + time.Sleep(50 * time.Millisecond) + defer cmd.Process.Kill() + // + cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ "ers": { // EventReaderService "enabled": true, // starts the EventReader service: @@ -45,13 +57,171 @@ func TestNatsER(t *testing.T) { "run_delay": "-1", "concurrent_requests": 1024, "source_path": "nats://localhost:4222", - // "processed_path": "/var/spool/cgrates/ers/out", + "processed_path": "", "tenant": "cgrates.org", "filters": [], "flags": [], "fields":[ {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, ], + "opts": { + "natsJetStream": true, + "natsSubjectProcessed": "processed_cdrs", + } + }, + ], +}, +}`) + utils.Logger.SetLogLevel(7) + if err != nil { + t.Fatal(err) + } + if err := cfg.CheckConfigSanity(); err != nil { + t.Fatal(err) + } + rdrEvents = make(chan *erEvent, 1) + rdrErr = make(chan error, 1) + rdrExit = make(chan struct{}, 1) + + if rdr, err = NewNatsER(cfg, 1, rdrEvents, make(chan *erEvent, 1), + rdrErr, new(engine.FilterS), rdrExit); err != nil { + t.Fatal(err) + } + nc, err := nats.Connect(rdr.Config().SourcePath, nats.Timeout(time.Second), + nats.DrainTimeout(time.Second)) + if err != nil { + t.Fatal(err) + } + defer nc.Drain() + + js, err := nc.JetStream() + if err != nil { + t.Fatal(err) + } + for name := range js.StreamNames() { + if name == "test" { + if err = js.DeleteStream("test"); err != nil { + t.Fatal(err) + } + break + } + if name == "test2" { + if err = js.DeleteStream("test2"); err != nil { + t.Fatal(err) + } + break + } + } + if _, err = js.AddStream(&nats.StreamConfig{ + Name: "test", + Subjects: []string{utils.DefaultQueueID}, + }); err != nil { + t.Fatal(err) + } + + if err = js.PurgeStream("test"); err != nil { + t.Fatal(err) + } + + if _, err = js.AddStream(&nats.StreamConfig{ + Name: "test2", + Subjects: []string{"processed_cdrs"}, + }); err != nil { + t.Fatal(err) + } + + if err = js.PurgeStream("test2"); err != nil { + t.Fatal(err) + } + ch := make(chan *nats.Msg, 3) + _, err = js.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) { + ch <- msg + }, nats.Durable("test4")) + if err != nil { + t.Fatal(err) + } + + go rdr.Serve() + runtime.Gosched() + time.Sleep(10 * time.Nanosecond) + + for i := 0; i < 3; i++ { + randomCGRID := utils.UUIDSha1Prefix() + expData := fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID) + if _, err = js.Publish(utils.DefaultQueueID, []byte(expData)); err != nil { + t.Fatal(err) + } + + nc.FlushTimeout(time.Second) + nc.Flush() + + select { + case err = <-rdrErr: + t.Fatal(err) + case ev := <-rdrEvents: + if ev.rdrCfg.ID != "nats" { + t.Fatalf("Expected 'nats' received `%s`", ev.rdrCfg.ID) + } + expected := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: ev.cgrEvent.ID, + Time: ev.cgrEvent.Time, + Event: map[string]interface{}{ + "CGRID": randomCGRID, + }, + APIOpts: map[string]interface{}{}, + } + if !reflect.DeepEqual(ev.cgrEvent, expected) { + t.Fatalf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) + } + select { + case msg := <-ch: + if expData != string(msg.Data) { + t.Errorf("Expected %q ,received %q", expData, string(msg.Data)) + } + case <-time.After(10 * time.Second): + t.Fatal("Timeout") + } + case <-time.After(10 * time.Second): + t.Fatal("Timeout") + } + } + close(rdrExit) +} + +func TestNatsER(t *testing.T) { + // start the nats-server + exec.Command("pkill", "nats-server") + + cmd := exec.Command("nats-server") + if err := cmd.Start(); err != nil { + t.Fatal(err) // most probably not installed + } + time.Sleep(10 * time.Millisecond) + defer cmd.Process.Kill() + // + + cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ +"ers": { // EventReaderService + "enabled": true, // starts the EventReader service: + "sessions_conns":["*localhost"], + "readers": [ + { + "id": "nats", + "type": "*nats_json_map", + "run_delay": "-1", + "concurrent_requests": 1024, + "source_path": "nats://localhost:4222", + "processed_path": "", + "tenant": "cgrates.org", + "filters": [], + "flags": [], + "fields":[ + {"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"}, + ], + "opts": { + "natsSubjectProcessed": "processed_cdrs", + } }, ], }, @@ -75,46 +245,56 @@ func TestNatsER(t *testing.T) { if err != nil { t.Fatal(err) } - // js, err := nc.JetStream() - // if err != nil { - // t.Fatal(err) - // } - go rdr.Serve() - runtime.Gosched() - time.Sleep(time.Second) - randomCGRID := utils.UUIDSha1Prefix() - if err = nc.Publish(utils.DefaultQueueID, []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID))); err != nil { + ch := make(chan *nats.Msg, 3) + _, err = nc.ChanQueueSubscribe("processed_cdrs", "test3", ch) + if err != nil { t.Fatal(err) } - // if _, err = js.Publish(utils.DefaultQueueID, []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID))); err != nil { - // t.Fatal(err) - // } - nc.FlushTimeout(time.Second) - nc.Flush() - nc.Drain() + defer nc.Drain() + go rdr.Serve() + runtime.Gosched() + time.Sleep(100 * time.Millisecond) + for i := 0; i < 3; i++ { + randomCGRID := utils.UUIDSha1Prefix() + expData := fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID) + if err = nc.Publish(utils.DefaultQueueID, []byte(expData)); err != nil { + t.Fatal(err) + } - select { - case err = <-rdrErr: - t.Error(err) - case ev := <-rdrEvents: - if ev.rdrCfg.ID != "nats" { - t.Errorf("Expected 'kakfa' received `%s`", ev.rdrCfg.ID) + nc.FlushTimeout(time.Second) + nc.Flush() + + select { + case err = <-rdrErr: + t.Fatal(err) + case ev := <-rdrEvents: + if ev.rdrCfg.ID != "nats" { + t.Fatalf("Expected 'nats' received `%s`", ev.rdrCfg.ID) + } + expected := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: ev.cgrEvent.ID, + Time: ev.cgrEvent.Time, + Event: map[string]interface{}{ + "CGRID": randomCGRID, + }, + APIOpts: map[string]interface{}{}, + } + if !reflect.DeepEqual(ev.cgrEvent, expected) { + t.Fatalf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) + } + select { + case msg := <-ch: + if expData != string(msg.Data) { + t.Errorf("Expected %q ,received %q", expData, string(msg.Data)) + } + case <-time.After(10 * time.Second): + t.Fatal("Timeout") + } + case <-time.After(10 * time.Second): + t.Fatal("Timeout") } - expected := &utils.CGREvent{ - Tenant: "cgrates.org", - ID: ev.cgrEvent.ID, - Time: ev.cgrEvent.Time, - Event: map[string]interface{}{ - "CGRID": randomCGRID, - }, - APIOpts: map[string]interface{}{}, - } - if !reflect.DeepEqual(ev.cgrEvent, expected) { - t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) - } - case <-time.After(10 * time.Second): - t.Fatal("Timeout") } close(rdrExit) } diff --git a/packages/debian/changelog b/packages/debian/changelog index 81a6c79ce..79295524d 100644 --- a/packages/debian/changelog +++ b/packages/debian/changelog @@ -166,6 +166,8 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium * [SessionS] The sessions are no longer terminated on shutdown if the replication_conns are set * [FilterS] Added *regex filter * [RSRParsers] Added *len dataconverter + * [ERs] Added *nats_json_map + * [EEs] Added *nats_json_map -- DanB Wed, 19 Feb 2020 13:25:52 +0200 diff --git a/utils/consts.go b/utils/consts.go index 90bd9e3cf..6ee75d95e 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2604,6 +2604,7 @@ const ( // nats NatsSubject = "natsSubject" NatsQueueID = "natsQueueID" + NatsConsumerName = "natsConsumerName" NatsJWTFile = "natsJWTFile" NatsSeedFile = "natsSeedFile" NatsClientCertificate = "natsClientCertificate"