diff --git a/config/config_defaults.go b/config/config_defaults.go
index c79f183e7..ed347a6e2 100644
--- a/config/config_defaults.go
+++ b/config/config_defaults.go
@@ -447,6 +447,7 @@ const CGRATES_CFG_JSON = `
// nats
// "natsJetStream": false, // controls if the nats reader uses the JetStream
// "natsConsumerName": "cgrates", // in case of JetStream the name of the consumer
+ // "natsStreamName": "cdrs", // the name of the NATS JetStream stream from which the consumer will read messages
"natsSubject": "cgrates_cdrs", // the subject from were the events are read
// "natsQueueID": "", // the queue id the consumer listen to
// "natsJWTFile": "", // the path to the JWT file( can be the chained file or the user file)
diff --git a/config/erscfg.go b/config/erscfg.go
index 381797cf4..1317f95f9 100644
--- a/config/erscfg.go
+++ b/config/erscfg.go
@@ -315,6 +315,7 @@ func (awsROpts *AWSROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err erro
type NATSROpts struct {
JetStream *bool
ConsumerName *string
+ StreamName *string
Subject *string
QueueID *string
JWTFile *string
@@ -340,6 +341,9 @@ func (natsOpts *NATSROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err err
if jsnCfg.NATSConsumerName != nil {
natsOpts.ConsumerName = jsnCfg.NATSConsumerName
}
+ if jsnCfg.NATSStreamName != nil {
+ natsOpts.StreamName = jsnCfg.NATSStreamName
+ }
if jsnCfg.NATSSubject != nil {
natsOpts.Subject = jsnCfg.NATSSubject
}
@@ -769,6 +773,10 @@ func (natOpts *NATSROpts) Clone() *NATSROpts {
cln.ConsumerName = new(string)
*cln.ConsumerName = *natOpts.ConsumerName
}
+ if natOpts.StreamName != nil {
+ cln.StreamName = new(string)
+ *cln.StreamName = *natOpts.StreamName
+ }
if natOpts.Subject != nil {
cln.Subject = new(string)
*cln.Subject = *natOpts.Subject
@@ -1078,6 +1086,9 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string
if natsOpts.ConsumerName != nil {
opts[utils.NatsConsumerName] = *natsOpts.ConsumerName
}
+ if natsOpts.StreamName != nil {
+ opts[utils.NatsStreamName] = *natsOpts.StreamName
+ }
if natsOpts.Subject != nil {
opts[utils.NatsSubject] = *natsOpts.Subject
}
diff --git a/config/erscfg_test.go b/config/erscfg_test.go
index b19a606c7..79ccd74fc 100644
--- a/config/erscfg_test.go
+++ b/config/erscfg_test.go
@@ -1070,6 +1070,7 @@ func TestERSCfgAsMapInterfaceCase2(t *testing.T) {
"s3BucketIDProcessed":"s3bucketid",
"natsJetStream":true,
"natsConsumerName": "NATConsumer",
+ "natsStreamName": "NATStream",
"natsQueueID":"NATid",
"natsJWTFile":"jwt",
"natsCertificateAuthority":"auth",
@@ -1204,6 +1205,7 @@ func TestERSCfgAsMapInterfaceCase2(t *testing.T) {
utils.S3BucketIDProcessedCfg: "s3bucketid",
utils.NatsJetStream: true,
utils.NatsConsumerName: "NATConsumer",
+ utils.NatsStreamName: "NATStream",
utils.NatsQueueID: "NATid",
utils.NatsJWTFile: "jwt",
utils.NatsSeedFile: "seed",
@@ -1541,6 +1543,7 @@ func TestEventReaderCfgClone(t *testing.T) {
NATSOpts: &NATSROpts{
JetStream: utils.BoolPointer(false),
ConsumerName: utils.StringPointer("user"),
+ StreamName: utils.StringPointer("stream"),
QueueID: utils.StringPointer("id"),
JWTFile: utils.StringPointer("jwt"),
SeedFile: utils.StringPointer("seed"),
diff --git a/config/libconfig_json.go b/config/libconfig_json.go
index ad06a5161..acebaa845 100644
--- a/config/libconfig_json.go
+++ b/config/libconfig_json.go
@@ -253,6 +253,7 @@ type EventReaderOptsJson struct {
S3BucketIDProcessed *string `json:"s3BucketIDProcessed"`
NATSJetStream *bool `json:"natsJetStream"`
NATSConsumerName *string `json:"natsConsumerName"`
+ NATSStreamName *string `json:"natsStreamName"`
NATSSubject *string `json:"natsSubject"`
NATSQueueID *string `json:"natsQueueID"`
NATSJWTFile *string `json:"natsJWTFile"`
diff --git a/cores/server_it_test.go b/cores/server_it_test.go
index 14ed45c2f..62bf61e14 100644
--- a/cores/server_it_test.go
+++ b/cores/server_it_test.go
@@ -63,7 +63,6 @@ var (
testServeHHTPFail,
testServeHHTPFailEnableRpc,
testServeBiJSON,
- testServeBiJSONEmptyBiRPCServer,
testServeBiJSONInvalidPort,
testServeBiGoB,
testServeBiGoBInvalidPort,
@@ -334,27 +333,6 @@ func testServeBiJSON(t *testing.T) {
runtime.Gosched()
}
-func testServeBiJSONEmptyBiRPCServer(t *testing.T) {
- cfg := config.NewDefaultCGRConfig()
- caps := engine.NewCaps(100, utils.MetaBusy)
- server = NewServer(caps)
- server.RpcRegister(new(mockRegister))
-
- data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
- dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
-
- ss := sessions.NewSessionS(cfg, dm, nil)
-
- expectedErr := "BiRPCServer should not be nil"
- go func() {
- if err := server.ServeBiRPC(":3430", "", ss.OnBiJSONConnect, ss.OnBiJSONDisconnect); err == nil || err.Error() != "BiRPCServer should not be nil" {
- t.Errorf("Expected %+v, received %+v", expectedErr, err)
- }
- }()
-
- runtime.Gosched()
-}
-
func testServeBiJSONInvalidPort(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json
index 9217eabb7..fd8073f43 100644
--- a/data/conf/cgrates/cgrates.json
+++ b/data/conf/cgrates/cgrates.json
@@ -410,7 +410,8 @@
// // nats
// // "natsJetStream": false, // controls if the nats reader uses the JetStream
-// // "natsConsumerName": "cgrates", // in case of JetStream the name of the consumer
+// // "natsConsumerName": "cgrates", // the name of the NATS JetStream stream from which the consumer will read messages
+// // "natsStreamName": "cgrates", // in case of JetStream the name of the consumer
// "natsSubject": "cgrates_cdrs", // the subject from were the events are read
// // "natsQueueID": "", // the queue id the consumer listen to
// // "natsJWTFile": "", // the path to the JWT file( can be the chained file or the user file)
diff --git a/data/conf/samples/ers_nats/cgrates.json b/data/conf/samples/ers_nats/cgrates.json
new file mode 100644
index 000000000..cdf44328a
--- /dev/null
+++ b/data/conf/samples/ers_nats/cgrates.json
@@ -0,0 +1,75 @@
+{
+
+"general": {
+ "log_level": 7
+},
+
+"data_db": {
+ "db_type": "*internal"
+},
+
+"stor_db": {
+ "db_type": "*internal"
+},
+
+"ers": {
+ "enabled": true,
+ "sessions_conns":[],
+ "readers": [
+ {
+ "id": "nats_reader1",
+ "type": "*nats_json_map",
+ "source_path": "nats://127.0.0.1:4222",
+ "processed_path": "nats://127.0.0.1:4222",
+ "opts": {
+ "natsJetStream": true,
+ "natsConsumerName": "cgrates",
+ "natsStreamName": "stream",
+ "natsSubject": "cgrates_cdrs",
+ "natsQueueID": "queue",
+ "natsJetStreamMaxWait": "5s",
+
+ "natsJetStreamProcessed": true,
+ "natsSubjectProcessed": "cgrates_cdrs_processed",
+ "natsJetStreamMaxWaitProcessed": "5s"
+ },
+ "flags": ["*dryrun"],
+ "fields":[
+ {"tag": "cdr_template", "type": "*template", "value": "cdr_template"}
+ ]
+ },
+ {
+ "id": "nats_reader2",
+ "type": "*nats_json_map",
+ "source_path": "nats://127.0.0.1:4222",
+ "processed_path": "nats://127.0.0.1:4222",
+ "opts": {
+ "natsJetStream": true,
+ "natsConsumerName": "cgrates",
+ "natsStreamName": "stream",
+ "natsSubject": "cgrates_cdrs",
+ "natsQueueID": "queue",
+ "natsJetStreamMaxWait": "5s",
+
+ "natsJetStreamProcessed": true,
+ "natsSubjectProcessed": "cgrates_cdrs_processed",
+ "natsJetStreamMaxWaitProcessed": "5s"
+ },
+ "flags": ["*dryrun"],
+ "fields":[
+ {"tag": "cdr_template", "type": "*template", "value": "cdr_template"}
+ ]
+ }
+ ]
+},
+
+
+"templates": {
+ "cdr_template": [
+ {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true},
+ {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.Subject", "mandatory": true},
+ {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true},
+ ]
+}
+
+}
\ No newline at end of file
diff --git a/ees/nats.go b/ees/nats.go
index 185997dae..84f741e38 100644
--- a/ees/nats.go
+++ b/ees/nats.go
@@ -26,9 +26,11 @@ import (
"sync"
"time"
+ "github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/nats-io/nats.go"
+ "github.com/nats-io/nats.go/jetstream"
)
// NewNatsEE creates a kafka poster
@@ -48,10 +50,9 @@ type NatsEE struct {
subject string // identifier of the CDR queue where we publish
jetStream bool
opts []nats.Option
- jsOpts []nats.JSOpt
poster *nats.Conn
- posterJS nats.JetStreamContext
+ posterJS jetstream.JetStream
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
@@ -78,9 +79,6 @@ func (pstr *NatsEE) parseOpts(opts *config.EventExporterOpts, nodeID string, con
return err
}
- if pstr.jetStream && opts.NATS.JetStreamMaxWait != nil {
- pstr.jsOpts = []nats.JSOpt{nats.MaxWait(*opts.NATS.JetStreamMaxWait)}
- }
return nil
}
@@ -99,7 +97,7 @@ func (pstr *NatsEE) Connect() error {
return err
}
if pstr.jetStream {
- pstr.posterJS, err = pstr.poster.JetStream(pstr.jsOpts...)
+ pstr.posterJS, err = jetstream.New(pstr.poster)
}
return err
}
@@ -116,7 +114,13 @@ func (pstr *NatsEE) ExportEvent(content any, _ string) error {
var err error
if pstr.jetStream {
- _, err = pstr.posterJS.Publish(pstr.subject, content.([]byte))
+ ctx := context.TODO()
+ if pstr.cfg.Opts.NATS.JetStreamMaxWait != nil {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, *pstr.cfg.Opts.NATS.JetStreamMaxWait)
+ defer cancel()
+ }
+ _, err = pstr.posterJS.Publish(ctx, pstr.subject, content.([]byte))
} else {
err = pstr.poster.Publish(pstr.subject, content.([]byte))
}
diff --git a/ees/nats_it_test.go b/ees/nats_it_test.go
index cd2cf8e7e..092beb11b 100644
--- a/ees/nats_it_test.go
+++ b/ees/nats_it_test.go
@@ -23,26 +23,33 @@ package ees
import (
"os"
- "os/exec"
"path"
"testing"
"time"
+ "github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
+ "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
+ "github.com/nats-io/nats.go/jetstream"
)
func TestNatsEEJetStream(t *testing.T) {
- testCreateDirectory(t)
- var err error
- cmd := exec.Command("nats-server", "-js") // Start the nats-server.
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // Only if nats-server is not installed.
+
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ JetStream: true,
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
+ natsServer.Start()
+ defer natsServer.Shutdown()
+
+ testCreateDirectory(t)
cgrCfg, err := config.NewCGRConfigFromPath(path.Join(*dataDir, "conf", "samples", "ees"))
if err != nil {
t.Fatal(err)
@@ -63,38 +70,40 @@ func TestNatsEEJetStream(t *testing.T) {
t.Fatal(err)
}
- nc, err := nats.Connect("nats://localhost:4222", nop...)
+ nc, err := nats.Connect(nats.DefaultURL, nop...)
if err != nil {
t.Fatal(err)
}
- js, err := nc.JetStream()
+ defer nc.Drain()
+
+ js, err := jetstream.New(nc)
if err != nil {
t.Fatal(err)
}
- for name := range js.StreamNames() {
- if name == "test2" {
- if err = js.DeleteStream("test2"); err != nil {
- t.Fatal(err)
- }
- break
- }
- }
- if _, err = js.AddStream(&nats.StreamConfig{
+ _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{
Name: "test2",
Subjects: []string{"processed_cdrs"},
- }); err != nil {
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer js.DeleteStream(context.Background(), "test2")
+
+ ch := make(chan string, 3)
+ var cons jetstream.Consumer
+ cons, err = js.CreateOrUpdateConsumer(context.Background(), "test2", jetstream.ConsumerConfig{
+ Durable: "test4",
+ FilterSubject: "processed_cdrs",
+ AckPolicy: jetstream.AckAllPolicy,
+ })
+ if err != nil {
t.Fatal(err)
}
- if err = js.PurgeStream("test2"); err != nil {
- t.Fatal(err)
- }
-
- ch := make(chan *nats.Msg, 3)
- _, err = js.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) {
- ch <- msg
- }, nats.Durable("test4"))
+ _, err = cons.Consume(func(msg jetstream.Msg) {
+ ch <- string(msg.Data())
+ })
if err != nil {
t.Fatal(err)
}
@@ -114,8 +123,8 @@ func TestNatsEEJetStream(t *testing.T) {
// fmt.Println((<-ch).Data)
select {
case data := <-ch:
- if expected != string(data.Data) {
- t.Fatalf("Expected %v \n but received \n %v", expected, string(data.Data))
+ if expected != data {
+ t.Fatalf("Expected %v \n but received \n %v", expected, data)
}
case <-time.After(50 * time.Millisecond):
t.Fatal("Time limit exceeded")
@@ -124,14 +133,16 @@ func TestNatsEEJetStream(t *testing.T) {
func TestNatsEE(t *testing.T) {
testCreateDirectory(t)
- exec.Command("pkill", "nats-server")
- cmd := exec.Command("nats-server")
- if err := cmd.Start(); err != nil {
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ })
+ if err != nil {
t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
+ natsServer.Start()
+ defer natsServer.Shutdown()
cgrCfg, err := config.NewCGRConfigFromPath(path.Join(*dataDir, "conf", "samples", "ees"))
if err != nil {
@@ -211,7 +222,7 @@ func TestGetNatsOptsSeedFile(t *testing.T) {
//test error
os.WriteFile("/tmp/nkey.txt", []byte(""), 0777)
_, err = GetNatsOpts(opts, nodeID, connTimeout)
- if err.Error() != "no nkey seed found" {
- t.Errorf("Expected %v \n but received \n %v", err.Error(), "no nkey seed found")
+ if err == nil || err.Error() != "nkeys: no nkey seed found" {
+ t.Errorf("expected \"%s\" but received \"%s\"", "nkeys: no nkey seed found", err.Error())
}
}
diff --git a/ees/nats_test.go b/ees/nats_test.go
index 8102bdeae..941bd5f68 100644
--- a/ees/nats_test.go
+++ b/ees/nats_test.go
@@ -19,13 +19,11 @@ along with this program. If not, see
package ees
import (
- "reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
- "github.com/nats-io/nats.go"
)
func TestNewNatsEE(t *testing.T) {
@@ -148,47 +146,6 @@ func TestParseOptJetStream(t *testing.T) {
}
}
-func TestParseOptJetStreamMaxWait(t *testing.T) {
- cfg := &config.EventExporterCfg{
- ID: "nats_exporter",
- Type: "nats",
- Attempts: 2,
- ConcurrentRequests: 2,
- Opts: &config.EventExporterOpts{
- AMQP: &config.AMQPOpts{},
- Els: &config.ElsOpts{},
- AWS: &config.AWSOpts{},
- NATS: &config.NATSOpts{},
- Kafka: &config.KafkaOpts{},
- RPC: &config.RPCOpts{},
- },
- }
- opts := &config.EventExporterOpts{
- NATS: &config.NATSOpts{
- JetStream: utils.BoolPointer(true),
- JetStreamMaxWait: utils.DurationPointer(2),
- }}
- nodeID := "node_id1"
- connTimeout := 2 * time.Second
- dc, err := newEEMetrics("Local")
- if err != nil {
- t.Error(err)
- }
- pstr, err := NewNatsEE(cfg, nodeID, connTimeout, dc)
- if err != nil {
- t.Error(err)
- }
-
- err = pstr.parseOpts(opts, nodeID, connTimeout)
- if err != nil {
- t.Error(err)
- }
- exp := []nats.JSOpt{nats.MaxWait(2 * time.Nanosecond)}
- if !reflect.DeepEqual(pstr.jsOpts, exp) {
- t.Errorf("Expected %v \n but received \n %v", exp, pstr.jsOpts)
- }
-}
-
func TestParseOptSubject(t *testing.T) {
cfg := &config.EventExporterCfg{
ID: "nats_exporter",
diff --git a/ers/nats.go b/ers/nats.go
index 475bd7245..04f86f4fd 100644
--- a/ers/nats.go
+++ b/ers/nats.go
@@ -26,12 +26,14 @@ import (
"os"
"time"
+ "github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/ees"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/nats-io/nats.go"
+ "github.com/nats-io/nats.go/jetstream"
)
// NewNatsER return a new amqp event reader
@@ -79,8 +81,8 @@ type NatsER struct {
queueID string
jetStream bool
consumerName string
+ streamName string
opts []nats.Option
- jsOpts []nats.JSOpt
poster *ees.NatsEE
}
@@ -101,28 +103,28 @@ func (rdr *NatsER) Serve() error {
}
// Define the message handler. Its content will get executed for every received message.
- msgHandler := func(msg *nats.Msg) {
+ handleMessage := func(msgData []byte) {
// If the rdr.cap channel buffer is empty, block until a resource is available. Otherwise
// allocate one resource and start processing the message.
if rdr.Config().ConcurrentReqs != -1 {
<-rdr.cap
}
- go func(msg *nats.Msg) {
- handlerErr := rdr.processMessage(msg.Data)
+ go func() {
+ handlerErr := rdr.processMessage(msgData)
if handlerErr != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing message %s error: %s",
- utils.ERs, string(msg.Data), handlerErr.Error()))
+ utils.ERs, string(msgData), handlerErr.Error()))
}
// Export the received message if a poster has been defined.
if rdr.poster != nil {
- handlerErr = ees.ExportWithAttempts(rdr.poster, msg.Data, utils.EmptyString)
+ handlerErr = ees.ExportWithAttempts(rdr.poster, msgData, utils.EmptyString)
if handlerErr != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
- utils.ERs, string(msg.Data), handlerErr.Error()))
+ utils.ERs, string(msgData), handlerErr.Error()))
}
}
@@ -130,29 +132,50 @@ func (rdr *NatsER) Serve() error {
if rdr.Config().ConcurrentReqs != -1 {
rdr.cap <- struct{}{}
}
- }(msg)
+
+ }()
}
// Subscribe to the appropriate NATS subject.
if !rdr.jetStream {
- _, err = nc.QueueSubscribe(rdr.subject, rdr.queueID, msgHandler)
+ _, err = nc.QueueSubscribe(rdr.subject, rdr.queueID, func(msg *nats.Msg) {
+ handleMessage(msg.Data)
+ })
if err != nil {
nc.Drain()
return err
}
} else {
- var js nats.JetStreamContext
- js, err = nc.JetStream(rdr.jsOpts...)
+ var js jetstream.JetStream
+ js, err = jetstream.New(nc)
if err != nil {
nc.Drain()
return err
}
- _, err = js.QueueSubscribe(rdr.subject, rdr.queueID, msgHandler,
- nats.Durable(rdr.consumerName))
+ ctx := context.TODO()
+ if jsMaxWait := rdr.Config().Opts.NATSOpts.JetStreamMaxWait; jsMaxWait != nil {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, *jsMaxWait)
+ defer cancel()
+ }
+
+ var cons jetstream.Consumer
+ cons, err = js.CreateOrUpdateConsumer(ctx, rdr.streamName, jetstream.ConsumerConfig{
+ FilterSubject: rdr.subject,
+ Durable: rdr.consumerName,
+ AckPolicy: jetstream.AckAllPolicy,
+ })
if err != nil {
nc.Drain()
return err
}
+
+ _, err = cons.Consume(func(msg jetstream.Msg) {
+ handleMessage(msg.Data())
+ })
+ if err != nil {
+ return err
+ }
}
go func() {
@@ -219,24 +242,24 @@ func (rdr *NatsER) processOpts() (err error) {
if rdr.Config().Opts.NATSOpts.Subject != nil {
rdr.subject = *rdr.Config().Opts.NATSOpts.Subject
}
- var queueID string
+
+ rdr.queueID = rdr.cgrCfg.GeneralCfg().NodeID
if rdr.Config().Opts.NATSOpts.QueueID != nil {
- queueID = *rdr.Config().Opts.NATSOpts.QueueID
+ rdr.queueID = *rdr.Config().Opts.NATSOpts.QueueID
}
- rdr.queueID = utils.FirstNonEmpty(queueID, rdr.cgrCfg.GeneralCfg().NodeID)
- var consumerName string
+
+ rdr.consumerName = utils.CGRateSLwr
if rdr.Config().Opts.NATSOpts.ConsumerName != nil {
- consumerName = *rdr.Config().Opts.NATSOpts.ConsumerName
+ rdr.consumerName = *rdr.Config().Opts.NATSOpts.ConsumerName
}
- rdr.consumerName = utils.FirstNonEmpty(consumerName, utils.CGRateSLwr)
+
+ if rdr.Config().Opts.NATSOpts.StreamName != nil {
+ rdr.streamName = *rdr.Config().Opts.NATSOpts.StreamName
+ }
+
if rdr.Config().Opts.NATSOpts.JetStream != nil {
rdr.jetStream = *rdr.Config().Opts.NATSOpts.JetStream
}
- if rdr.jetStream {
- if rdr.Config().Opts.NATSOpts.JetStreamMaxWait != nil {
- rdr.jsOpts = []nats.JSOpt{nats.MaxWait(*rdr.Config().Opts.NATSOpts.JetStreamMaxWait)}
- }
- }
rdr.opts, err = GetNatsOpts(rdr.Config().Opts.NATSOpts,
rdr.cgrCfg.GeneralCfg().NodeID,
rdr.cgrCfg.GeneralCfg().ConnectTimeout)
diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go
index 9bb6fc124..f1f7c889d 100644
--- a/ers/nats_it_test.go
+++ b/ers/nats_it_test.go
@@ -25,174 +25,68 @@ import (
"encoding/json"
"fmt"
"os"
- "os/exec"
"path"
"reflect"
"runtime"
"testing"
"time"
+ "github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
+ "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
+ "github.com/nats-io/nats.go/jetstream"
)
func TestERsNATSIT(t *testing.T) {
- t.Skip()
- cfgContent := `{
+ cfgPath := path.Join(*dataDir, "conf", "samples", "ers_nats")
+ cfg, err := config.NewCGRConfigFromPath(cfgPath)
+ if err != nil {
+ t.Fatal("could not init cfg", err.Error())
+ }
-"general": {
- "log_level": 7
-},
-
-"data_db": {
- "db_type": "*internal"
-},
-
-"stor_db": {
- "db_type": "*internal"
-},
-
-"ers": {
- "enabled": true,
- "sessions_conns":[],
- "readers": [
- {
- "id": "nats_consumer1",
- "type": "*nats_json_map",
- "source_path": "nats://127.0.0.1:4222",
- "processed_path": "nats://127.0.0.1:4222",
- "opts": {
- "natsJetStream": true,
- "natsConsumerName": "cgrates_consumer",
- "natsSubject": "cgrates_cdrs",
- "natsQueueID": "queue",
- "natsJetStreamMaxWait": "5s",
-
- "natsJetStreamProcessed": true,
- "natsSubjectProcessed": "cgrates_cdrs_processed",
- "natsJetStreamMaxWaitProcessed": "5s"
- },
- "flags": ["*dryrun"],
- "fields":[
- {"tag": "cdr_template", "type": "*template", "value": "cdr_template"}
- ]
- },
- {
- "id": "nats_consumer2",
- "type": "*nats_json_map",
- "source_path": "nats://127.0.0.1:4222",
- "processed_path": "nats://127.0.0.1:4222",
- "opts": {
- "natsJetStream": true,
- "natsConsumerName": "cgrates_consumer",
- "natsSubject": "cgrates_cdrs",
- "natsQueueID": "queue",
- "natsJetStreamMaxWait": "5s",
-
- "natsJetStreamProcessed": true,
- "natsSubjectProcessed": "cgrates_cdrs_processed",
- "natsJetStreamMaxWaitProcessed": "5s"
- },
- "flags": ["*dryrun"],
- "fields":[
- {"tag": "cdr_template", "type": "*template", "value": "cdr_template"}
- ]
- },
- {
- "id": "nats_consumer3",
- "type": "*nats_json_map",
- "source_path": "nats://127.0.0.1:4222",
- "processed_path": "nats://127.0.0.1:4222",
- "opts": {
- "natsJetStream": true,
- "natsConsumerName": "cgrates_consumer",
- "natsSubject": "cgrates_cdrs",
- "natsQueueID": "queue",
- "natsJetStreamMaxWait": "5s",
-
- "natsJetStreamProcessed": true,
- "natsSubjectProcessed": "cgrates_cdrs_processed",
- "natsJetStreamMaxWaitProcessed": "5s"
- },
- "flags": ["*dryrun"],
- "fields":[
- {"tag": "cdr_template", "type": "*template", "value": "cdr_template"}
- ]
- },
- {
- "id": "nats_consumer4",
- "type": "*nats_json_map",
- "source_path": "nats://127.0.0.1:4222",
- "processed_path": "",
- "opts": {
- "natsJetStream": true,
- "natsConsumerName": "cgrates_consumer4",
- "natsSubject": "cgrates_cdrs_processed",
- "natsQueueID": "",
- "natsJetStreamMaxWait": "5s",
- },
- "flags": ["*dryrun"],
- "fields":[
- {"tag": "cdr_template", "type": "*template", "value": "cdr_template"}
- ]
- }
- ]
-},
-
-
-"templates": {
- "cdr_template": [
- // {"tag": "Source", "path": "*cgreq.Source", "type": "*constant", "value": "ers_template_combined", "mandatory": true},
- // {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.2", "mandatory": true},
- // {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.3", "mandatory": true},
- // {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.4", "mandatory": true},
- // {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.6", "mandatory": true},
- // {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.7", "mandatory": true},
- {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true},
- // {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.9", "mandatory": true},
- {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true},
- // {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.11", "mandatory": true},
- // {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.12", "mandatory": true},
- // {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.13", "mandatory": true}
- ]
-}
-
-}`
- cfg, cfgPath, clean, err := initTestCfg(cfgContent)
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ JetStream: true,
+ })
if err != nil {
t.Fatal(err)
}
- defer clean()
+ natsServer.Start()
+ defer natsServer.Shutdown()
+ // Establish a connection to nats.
nc, err := nats.Connect(cfg.ERsCfg().Readers[1].SourcePath)
if err != nil {
t.Fatal(err)
}
defer nc.Close()
- js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
+ // Initialize a stream manager and create a stream.
+ js, err := jetstream.New(nc)
if err != nil {
t.Fatal(err)
}
-
- js.AddStream(&nats.StreamConfig{
- Name: "CDRs",
+ js.CreateStream(context.Background(), jetstream.StreamConfig{
+ Name: "stream",
Subjects: []string{"cgrates_cdrs", "cgrates_cdrs_processed"},
})
- time.Sleep(2 * time.Second)
-
+ // Start the engine.
if _, err := engine.StopStartEngine(cfgPath, 100); err != nil {
t.Fatal(err)
}
+ defer engine.KillEngine(100)
+ // Publish CDRs asynchronously to the nats subject.
+ cdr := make(map[string]any)
for i := 0; i < 10; i++ {
- cdr := map[string]any{
- "Account": 1000 + i,
- "Destination": 2000 + i,
- }
+ cdr[utils.AccountField] = 1001 + i
+ cdr[utils.Subject] = 1001 + i
+ cdr[utils.Destination] = 2001 + i
b, _ := json.Marshal(cdr)
js.PublishAsync("cgrates_cdrs", b)
}
@@ -202,15 +96,35 @@ func TestERsNATSIT(t *testing.T) {
t.Fatal("Did not resolve in time")
}
- // Add verification
-
- err = engine.KillEngine(100)
+ // Define a consumer for the subject where all the processed cdrs were published.
+ var cons jetstream.Consumer
+ cons, err = js.CreateOrUpdateConsumer(context.Background(), "stream", jetstream.ConsumerConfig{
+ FilterSubject: "cgrates_cdrs_processed",
+ Durable: "cgrates_processed",
+ AckPolicy: jetstream.AckAllPolicy,
+ })
if err != nil {
t.Error(err)
}
+
+ // Wait for the messages to be consumed and processed.
+ time.Sleep(100 * time.Millisecond)
+
+ // Retrieve info about the consumer.
+ info, err := cons.Info(context.Background())
+ if err != nil {
+ t.Error(err)
+ }
+
+ if info.NumPending != 10 {
+ t.Errorf("expected %d pending messages, received %d", 10, info.NumPending)
+ }
+
+ js.DeleteStream(context.Background(), "stream")
+
}
-func testCheckNatsData(t *testing.T, randomCGRID, expData string, ch chan *nats.Msg) {
+func testCheckNatsData(t *testing.T, randomCGRID, expData string, ch chan string) {
select {
case err := <-rdrErr:
t.Fatal(err)
@@ -232,8 +146,8 @@ func testCheckNatsData(t *testing.T, randomCGRID, expData string, ch chan *nats.
}
select {
case msg := <-ch:
- if expData != string(msg.Data) {
- t.Errorf("Expected %q ,received %q", expData, string(msg.Data))
+ if expData != msg {
+ t.Errorf("Expected %q ,received %q", expData, msg)
}
case <-time.After(10 * time.Second):
t.Fatal("Timeout2")
@@ -263,49 +177,44 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) {
}
defer nc.Drain()
- js, err := nc.JetStream()
+ js, err := jetstream.New(nc)
if err != nil {
t.Fatal(err)
}
- for name := range js.StreamNames() {
- if name == "test" {
- if err = js.DeleteStream("test"); err != nil {
- t.Fatal(err)
- }
- break
- }
- if name == "test2" {
- if err = js.DeleteStream("test2"); err != nil {
- t.Fatal(err)
- }
- break
- }
- }
- if _, err = js.AddStream(&nats.StreamConfig{
+
+ _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{
Name: "test",
Subjects: []string{utils.DefaultQueueID},
- }); err != nil {
+ })
+ if err != nil {
t.Fatal(err)
}
+ defer js.DeleteStream(context.Background(), "test")
- if err = js.PurgeStream("test"); err != nil {
- t.Fatal(err)
- }
-
- if _, err = js.AddStream(&nats.StreamConfig{
+ _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{
Name: "test2",
Subjects: []string{"processed_cdrs"},
- }); err != nil {
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer js.DeleteStream(context.Background(), "test2")
+
+ ch := make(chan string, 3)
+ var cons jetstream.Consumer
+ cons, err = js.CreateOrUpdateConsumer(context.Background(), "test2", jetstream.ConsumerConfig{
+ FilterSubject: "processed_cdrs",
+ Durable: "test4",
+ AckPolicy: jetstream.AckAllPolicy,
+ })
+ if err != nil {
+ nc.Drain()
t.Fatal(err)
}
- if err = js.PurgeStream("test2"); err != nil {
- t.Fatal(err)
- }
- ch := make(chan *nats.Msg, 3)
- _, err = js.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) {
- ch <- msg
- }, nats.Durable("test4"))
+ _, err = cons.Consume(func(msg jetstream.Msg) {
+ ch <- string(msg.Data())
+ })
if err != nil {
t.Fatal(err)
}
@@ -317,7 +226,7 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) {
for i := 0; i < 3; i++ {
randomCGRID := utils.UUIDSha1Prefix()
expData := fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID)
- if _, err = js.Publish(utils.DefaultQueueID, []byte(expData)); err != nil {
+ if _, err = js.Publish(context.Background(), utils.DefaultQueueID, []byte(expData)); err != nil {
t.Fatal(err)
}
@@ -348,8 +257,10 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) {
if err != nil {
t.Fatal(err)
}
- ch := make(chan *nats.Msg, 3)
- _, err = nc.ChanQueueSubscribe("processed_cdrs", "test3", ch)
+ ch := make(chan string, 3)
+ _, err = nc.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) {
+ ch <- string(msg.Data)
+ })
if err != nil {
t.Fatal(err)
}
@@ -374,16 +285,16 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) {
}
func TestNatsERJetStream(t *testing.T) {
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "-js")
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ JetStream: true,
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
"ers": { // EventReaderService
@@ -405,6 +316,7 @@ func TestNatsERJetStream(t *testing.T) {
],
"opts": {
"natsJetStream": true,
+ "natsStreamName": "test",
"natsJetStreamProcessed": true,
"natsSubjectProcessed": "processed_cdrs",
}
@@ -422,16 +334,15 @@ func TestNatsERJetStream(t *testing.T) {
}
func TestNatsER(t *testing.T) {
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server")
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
"ers": { // EventReaderService
@@ -468,16 +379,18 @@ func TestNatsER(t *testing.T) {
}
func TestNatsERJetStreamUser(t *testing.T) {
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "-js", "--user", "user", "--pass", "password")
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ JetStream: true,
+ Username: "user",
+ Password: "password",
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
"ers": { // EventReaderService
@@ -499,6 +412,7 @@ func TestNatsERJetStreamUser(t *testing.T) {
],
"opts": {
"natsJetStream": true,
+ "natsStreamName": "test",
"natsJetStreamProcessed": true,
"natsSubjectProcessed": "processed_cdrs",
}
@@ -516,16 +430,17 @@ func TestNatsERJetStreamUser(t *testing.T) {
}
func TestNatsERUser(t *testing.T) {
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "--user", "user", "--pass", "password")
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ Username: "user",
+ Password: "password",
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
"ers": { // EventReaderService
@@ -562,16 +477,17 @@ func TestNatsERUser(t *testing.T) {
}
func TestNatsERJetStreamToken(t *testing.T) {
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "-js", "--auth", "token")
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ JetStream: true,
+ Authorization: "token",
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
"ers": { // EventReaderService
@@ -593,6 +509,7 @@ func TestNatsERJetStreamToken(t *testing.T) {
],
"opts": {
"natsJetStream": true,
+ "natsStreamName": "test",
"natsJetStreamProcessed": true,
"natsSubjectProcessed": "processed_cdrs",
}
@@ -610,16 +527,17 @@ func TestNatsERJetStreamToken(t *testing.T) {
}
func TestNatsERToken(t *testing.T) {
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "--auth", "token")
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ JetStream: true,
+ Authorization: "token",
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
"ers": { // EventReaderService
@@ -667,25 +585,21 @@ func TestNatsERNkey(t *testing.T) {
if err := os.WriteFile(seedFilePath, []byte("SUAOUIE5CU47NCO22GHFEZXGCRCJDVTHDLMIP4L7UQNCR5SW4FZICI7O3Q"), 0664); err != nil {
t.Fatal(err)
}
- natsCfgPath := path.Join(basePath, "nats.cfg")
- if err := os.WriteFile(natsCfgPath, []byte(`authorization: {
- users: [
- { nkey: UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY }
- ]
- }
-`), 0664); err != nil {
+
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ Nkeys: []*server.NkeyUser{
+ {
+ Nkey: "UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY",
+ },
+ },
+ })
+ if err != nil {
t.Fatal(err)
}
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "-c", natsCfgPath)
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
- }
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{
"ers": { // EventReaderService
@@ -735,25 +649,22 @@ func TestNatsERJetStreamNKey(t *testing.T) {
if err := os.WriteFile(seedFilePath, []byte("SUAOUIE5CU47NCO22GHFEZXGCRCJDVTHDLMIP4L7UQNCR5SW4FZICI7O3Q"), 0664); err != nil {
t.Fatal(err)
}
- natsCfgPath := path.Join(basePath, "nats.cfg")
- if err := os.WriteFile(natsCfgPath, []byte(`authorization: {
-users: [
- { nkey: UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY }
-]
-}
-`), 0664); err != nil {
+
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ JetStream: true,
+ Nkeys: []*server.NkeyUser{
+ {
+ Nkey: "UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY",
+ },
+ },
+ })
+ if err != nil {
t.Fatal(err)
}
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "-c", natsCfgPath, "-js")
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
- }
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{
"ers": { // EventReaderService
@@ -775,6 +686,7 @@ users: [
],
"opts": {
"natsJetStream": true,
+ "natsStreamName": "test",
"natsSeedFile": %q,
"natsJetStreamProcessed": true,
"natsSubjectProcessed": "processed_cdrs",
@@ -831,16 +743,16 @@ resolver_preload: {
`), 0664); err != nil {
t.Fatal(err)
}
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "-c", natsCfgPath)
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ ConfigFile: natsCfgPath,
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(50 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{
"ers": { // EventReaderService
@@ -920,16 +832,18 @@ system_account:AAFIBB6C56ROU5XRVJLJYR3BTGGYK3HJGHEHQV7L7QZMTT3ZRBLHBS7F
`), 0664); err != nil {
t.Fatal(err)
}
- // start the nats-server
- exec.Command("pkill", "nats-server")
-
- cmd := exec.Command("nats-server", "-c", natsCfgPath, "-js")
- if err := cmd.Start(); err != nil {
- t.Fatal(err) // most probably not installed
+ natsServer, err := server.NewServer(&server.Options{
+ Host: "127.0.0.1",
+ Port: 4222,
+ ConfigFile: natsCfgPath,
+ JetStream: true,
+ })
+ if err != nil {
+ t.Fatal(err)
}
- time.Sleep(100 * time.Millisecond)
- defer cmd.Process.Kill()
- //
+ natsServer.Start()
+ defer natsServer.Shutdown()
+
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{
"ers": { // EventReaderService
"enabled": true, // starts the EventReader service:
@@ -950,6 +864,7 @@ system_account:AAFIBB6C56ROU5XRVJLJYR3BTGGYK3HJGHEHQV7L7QZMTT3ZRBLHBS7F
],
"opts": {
"natsJetStream": true,
+ "natsStreamName": "test",
"natsJWTFile": %q,
"natsJetStreamProcessed": true,
"natsSubjectProcessed": "processed_cdrs",
diff --git a/go.mod b/go.mod
index 81d7a2643..904a330e0 100644
--- a/go.mod
+++ b/go.mod
@@ -39,13 +39,14 @@ require (
github.com/mediocregopher/radix/v3 v3.8.1
github.com/miekg/dns v1.1.54
github.com/mitchellh/mapstructure v1.4.0
+ github.com/nats-io/nats-server/v2 v2.10.1
github.com/nats-io/nats.go v1.30.0
github.com/nyaruka/phonenumbers v1.0.75
github.com/peterh/liner v1.2.1
github.com/rabbitmq/amqp091-go v1.5.0
github.com/segmentio/kafka-go v0.4.8
go.mongodb.org/mongo-driver v1.11.0
- golang.org/x/crypto v0.6.0
+ golang.org/x/crypto v0.13.0
golang.org/x/net v0.10.0
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5
google.golang.org/api v0.36.0
@@ -93,9 +94,10 @@ require (
github.com/kr/pretty v0.2.1 // indirect
github.com/lib/pq v1.8.0 // indirect
github.com/mattn/go-runewidth v0.0.10 // indirect
+ github.com/minio/highwayhash v1.0.2 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/mschoch/smat v0.2.0 // indirect
- github.com/nats-io/nats-server/v2 v2.2.6 // indirect
+ github.com/nats-io/jwt/v2 v2.5.2 // indirect
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/philhofer/fwd v1.1.1 // indirect
@@ -115,8 +117,9 @@ require (
go.opencensus.io v0.22.5 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/sync v0.2.0 // indirect
- golang.org/x/sys v0.8.0 // indirect
- golang.org/x/text v0.9.0 // indirect
+ golang.org/x/sys v0.12.0 // indirect
+ golang.org/x/text v0.13.0 // indirect
+ golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.9.3 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
diff --git a/go.sum b/go.sum
index c9e5ffa54..15c78e061 100644
--- a/go.sum
+++ b/go.sum
@@ -317,7 +317,6 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
-github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
@@ -355,8 +354,8 @@ github.com/mediocregopher/radix/v3 v3.8.1 h1:rOkHflVuulFKlwsLY01/M2cM2tWCjDoETcM
github.com/mediocregopher/radix/v3 v3.8.1/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/miekg/dns v1.1.54 h1:5jon9mWcb0sFJGpnI99tOMhCPyJ+RPVz5b63MQG0VWI=
github.com/miekg/dns v1.1.54/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY=
-github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
-github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
+github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
+github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.4.0 h1:7ks8ZkOP5/ujthUsT07rNv+nkLXCQWKNHuwzOAesEks=
@@ -366,17 +365,12 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
-github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
-github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
-github.com/nats-io/jwt/v2 v2.0.2 h1:ejVCLO8gu6/4bOKIHQpmB5UhhUJfAQw55yvLWpfmKjI=
-github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
-github.com/nats-io/nats-server/v2 v2.2.6 h1:FPK9wWx9pagxcw14s8W9rlfzfyHm61uNLnJyybZbn48=
-github.com/nats-io/nats-server/v2 v2.2.6/go.mod h1:sEnFaxqe09cDmfMgACxZbziXnhQFhwk+aKkZjBBRYrI=
-github.com/nats-io/nats.go v1.11.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
+github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
+github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
+github.com/nats-io/nats-server/v2 v2.10.1 h1:MIJ614dhOIdo71iSzY8ln78miXwrYvlvXHUyS+XdKZQ=
+github.com/nats-io/nats-server/v2 v2.10.1/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
github.com/nats-io/nats.go v1.30.0 h1:bj/rVsRCrFXxmm9mJiDhb74UKl2HhKpDwKRBtvCjZjc=
github.com/nats-io/nats.go v1.30.0/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
-github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
-github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
@@ -515,10 +509,9 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200320181102-891825fb96df/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
-golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
-golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
-golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
+golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
+golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -529,6 +522,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
+golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
+golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -588,7 +583,6 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
-golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
@@ -659,8 +653,8 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
-golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
+golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -670,13 +664,13 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
-golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
-golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
+golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
+golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
-golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
-golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
+golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
diff --git a/utils/consts.go b/utils/consts.go
index 9682c8e40..d6c675958 100644
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -2670,6 +2670,7 @@ const (
NatsSubject = "natsSubject"
NatsQueueID = "natsQueueID"
NatsConsumerName = "natsConsumerName"
+ NatsStreamName = "natsStreamName"
NatsJWTFile = "natsJWTFile"
NatsSeedFile = "natsSeedFile"
NatsClientCertificate = "natsClientCertificate"