mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Migrate to new jetstream API
Upgraded go.mod nats version due to an issue caused by version mismatch between driver and server (uncertain). Renamed function from getProcessOptions to getProcessedOptions. ## *NatsER.Serve - Replaced ChanQueueSubscribe with QueueSubscribe for Core NATS consumer to handle the message processing directly. - Since QueueSubscribe is now used regardless of jetstream status, the message handler has been assigned to a separate variable that can be reused. - The message handler is now dealing with the message processing directly, therefore the select case listening for the channel which is feeding NATS messages can be removed together with the channel itself and the select. Currently, the goroutine within Serve only has to block until the rdrExit chan is closed. - Moved the resource check inside the handler right before starting the message processing goroutine. ## *NatsEE.parseOpts - Renamed function from parseOpt to parseOpts. - Handled the error coming from GetNatsOpts function. ## *NatsEE.Connect - Updated function to return early in case of non-nil nats.Conn value to reduce nesting. ## *NatsEE.ExportEvent - Use defer to release resources and RUnlock. ## *NatsEE.Close - Use defer to Unlock. - Update function to return early in case of nil nats.Conn value to reduce nesting. ## ees.GetNatsOpts - Chose switch over if else when parsing client certificate and keys opts. - Updated function to return the errors directly instead of assigning them to a separate variable right before returning. ## ers.GetNatsOpts - Chose switch over if else when parsing client certificate and keys opts. - Updated function to return the errors directly instead of assigning them to a separate variable right before returning. Removed tab from commented natsJetStreamMaxWaitProcessed option value in config_defaults.go under ers section. Added integration test for ERs NATS. Updated ees/ers implementation to use the jetstream package which separates the jetstream context from Core NATS. Removed the jsOpts fields from the NatsEE struct. We are now using the jetStreamMaxWait option directly through a timeout context. Added streamName option for NATS reader since it is now required to be specified when creating a consumer (it is not inferred based on subject anymore). Updated nats ers integration tests. Updated tests to also use the new jetstream package. Updated tests to start the nats-server using their official driver instead of using the std go exec package. time.Sleeps are now not required anymore to wait for the server. In test configurations for nats readers, made sure that natsStreamName option is populated. It is now required for consumers to know where to subscribe.
This commit is contained in:
committed by
Dan Christian Bogos
parent
fdadd1ab81
commit
d0a435aa6d
@@ -469,6 +469,7 @@ const CGRATES_CFG_JSON = `
|
||||
// nats
|
||||
// "natsJetStream": false, // controls if the nats reader uses the JetStream
|
||||
// "natsConsumerName": "cgrates", // in case of JetStream the name of the consumer
|
||||
// "natsStreamName": "cdrs", // the name of the NATS JetStream stream from which the consumer will read messages
|
||||
"natsSubject": "cgrates_cdrs", // the subject from were the events are read
|
||||
// "natsQueueID": "", // the queue id the consumer listen to
|
||||
// "natsJWTFile": "", // the path to the JWT file( can be the chained file or the user file)
|
||||
@@ -485,7 +486,7 @@ const CGRATES_CFG_JSON = `
|
||||
// "natsCertificateAuthorityProcessed": "", // the path to a custom certificate authority file( used by tls)
|
||||
// "natsClientCertificateProcessed": "", // the path to a client certificate( used by tls)
|
||||
// "natsClientKeyProcessed": "", // the path to a client key( used by tls)
|
||||
// "natsJetStreamMaxWaitProcessed": "5s ", // the maximum amount of time to wait for a response
|
||||
// "natsJetStreamMaxWaitProcessed": "5s", // the maximum amount of time to wait for a response
|
||||
},
|
||||
"tenant": "", // tenant used by import
|
||||
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
|
||||
@@ -181,6 +181,7 @@ type EventReaderOpts struct {
|
||||
S3BucketIDProcessed *string
|
||||
NATSJetStream *bool
|
||||
NATSConsumerName *string
|
||||
NATSStreamName *string
|
||||
NATSSubject *string
|
||||
NATSQueueID *string
|
||||
NATSJWTFile *string
|
||||
@@ -384,6 +385,9 @@ func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err
|
||||
if jsnCfg.NATSConsumerName != nil {
|
||||
erOpts.NATSConsumerName = jsnCfg.NATSConsumerName
|
||||
}
|
||||
if jsnCfg.NATSStreamName != nil {
|
||||
erOpts.NATSStreamName = jsnCfg.NATSStreamName
|
||||
}
|
||||
if jsnCfg.NATSSubject != nil {
|
||||
erOpts.NATSSubject = jsnCfg.NATSSubject
|
||||
}
|
||||
@@ -731,6 +735,10 @@ func (erOpts *EventReaderOpts) Clone() *EventReaderOpts {
|
||||
cln.NATSConsumerName = new(string)
|
||||
*cln.NATSConsumerName = *erOpts.NATSConsumerName
|
||||
}
|
||||
if erOpts.NATSStreamName != nil {
|
||||
cln.NATSStreamName = new(string)
|
||||
*cln.NATSStreamName = *erOpts.NATSStreamName
|
||||
}
|
||||
if erOpts.NATSSubject != nil {
|
||||
cln.NATSSubject = new(string)
|
||||
*cln.NATSSubject = *erOpts.NATSSubject
|
||||
@@ -999,6 +1007,9 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string
|
||||
if er.Opts.NATSConsumerName != nil {
|
||||
opts[utils.NatsConsumerName] = *er.Opts.NATSConsumerName
|
||||
}
|
||||
if er.Opts.NATSStreamName != nil {
|
||||
opts[utils.NatsStreamName] = *er.Opts.NATSStreamName
|
||||
}
|
||||
if er.Opts.NATSSubject != nil {
|
||||
opts[utils.NatsSubject] = *er.Opts.NATSSubject
|
||||
}
|
||||
@@ -1150,6 +1161,7 @@ type EventReaderOptsJson struct {
|
||||
S3BucketIDProcessed *string `json:"s3BucketIDProcessed"`
|
||||
NATSJetStream *bool `json:"natsJetStream"`
|
||||
NATSConsumerName *string `json:"natsConsumerName"`
|
||||
NATSStreamName *string `json:"natsStreamName"`
|
||||
NATSSubject *string `json:"natsSubject"`
|
||||
NATSQueueID *string `json:"natsQueueID"`
|
||||
NATSJWTFile *string `json:"natsJWTFile"`
|
||||
@@ -1614,6 +1626,14 @@ func diffEventReaderOptsJsonCfg(d *EventReaderOptsJson, v1, v2 *EventReaderOpts)
|
||||
} else {
|
||||
d.NATSConsumerName = nil
|
||||
}
|
||||
if v2.NATSStreamName != nil {
|
||||
if v1.NATSStreamName == nil ||
|
||||
*v1.NATSStreamName != *v2.NATSStreamName {
|
||||
d.NATSStreamName = v2.NATSStreamName
|
||||
}
|
||||
} else {
|
||||
d.NATSStreamName = nil
|
||||
}
|
||||
if v2.NATSSubject != nil {
|
||||
if v1.NATSSubject == nil ||
|
||||
*v1.NATSSubject != *v2.NATSSubject {
|
||||
|
||||
@@ -1781,6 +1781,7 @@ func TestERsLoadFromJSONCfg(t *testing.T) {
|
||||
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
|
||||
NATSJetStream: utils.BoolPointer(false),
|
||||
NATSConsumerName: utils.StringPointer("consumer_name"),
|
||||
NATSStreamName: utils.StringPointer("stream_name"),
|
||||
NATSSubject: utils.StringPointer("subject"),
|
||||
NATSQueueID: utils.StringPointer("queue_id"),
|
||||
NATSJWTFile: utils.StringPointer("jsw_file"),
|
||||
@@ -1849,6 +1850,7 @@ func TestERsLoadFromJSONCfg(t *testing.T) {
|
||||
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
|
||||
NATSJetStream: utils.BoolPointer(false),
|
||||
NATSConsumerName: utils.StringPointer("consumer_name"),
|
||||
NATSStreamName: utils.StringPointer("stream_name"),
|
||||
NATSSubject: utils.StringPointer("subject"),
|
||||
NATSQueueID: utils.StringPointer("queue_id"),
|
||||
NATSJWTFile: utils.StringPointer("jsw_file"),
|
||||
@@ -1934,6 +1936,7 @@ func TestERsLoadFromJsonCfgParseError(t *testing.T) {
|
||||
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
|
||||
NATSJetStream: utils.BoolPointer(false),
|
||||
NATSConsumerName: utils.StringPointer("consumer_name"),
|
||||
NATSStreamName: utils.StringPointer("stream_name"),
|
||||
NATSSubject: utils.StringPointer("subject"),
|
||||
NATSQueueID: utils.StringPointer("queue_id"),
|
||||
NATSJWTFile: utils.StringPointer("jsw_file"),
|
||||
@@ -2022,6 +2025,7 @@ func TestERsClone(t *testing.T) {
|
||||
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
|
||||
NATSJetStream: utils.BoolPointer(false),
|
||||
NATSConsumerName: utils.StringPointer("consumer_name"),
|
||||
NATSStreamName: utils.StringPointer("stream_name"),
|
||||
NATSSubject: utils.StringPointer("subject"),
|
||||
NATSQueueID: utils.StringPointer("queue_id"),
|
||||
NATSJWTFile: utils.StringPointer("jsw_file"),
|
||||
@@ -2090,6 +2094,7 @@ func TestERsClone(t *testing.T) {
|
||||
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
|
||||
NATSJetStream: utils.BoolPointer(false),
|
||||
NATSConsumerName: utils.StringPointer("consumer_name"),
|
||||
NATSStreamName: utils.StringPointer("stream_name"),
|
||||
NATSSubject: utils.StringPointer("subject"),
|
||||
NATSQueueID: utils.StringPointer("queue_id"),
|
||||
NATSJWTFile: utils.StringPointer("jsw_file"),
|
||||
@@ -2166,6 +2171,7 @@ func TestERsAsMapInterface(t *testing.T) {
|
||||
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
|
||||
NATSJetStream: utils.BoolPointer(false),
|
||||
NATSConsumerName: utils.StringPointer("consumer_name"),
|
||||
NATSStreamName: utils.StringPointer("stream_name"),
|
||||
NATSSubject: utils.StringPointer("subject"),
|
||||
NATSQueueID: utils.StringPointer("queue_id"),
|
||||
NATSJWTFile: utils.StringPointer("jsw_file"),
|
||||
@@ -2314,6 +2320,7 @@ func TestDiffEventReaderOptsJsonCfg(t *testing.T) {
|
||||
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed_diff"),
|
||||
NATSJetStream: utils.BoolPointer(true),
|
||||
NATSConsumerName: utils.StringPointer("consumer_name_diff"),
|
||||
NATSStreamName: utils.StringPointer("stream_name_diff"),
|
||||
NATSSubject: utils.StringPointer("subject_diff"),
|
||||
NATSQueueID: utils.StringPointer("queue_id_diff"),
|
||||
NATSJWTFile: utils.StringPointer("jsw_file_diff"),
|
||||
@@ -2382,6 +2389,7 @@ func TestDiffEventReaderOptsJsonCfg(t *testing.T) {
|
||||
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
|
||||
NATSJetStream: utils.BoolPointer(false),
|
||||
NATSConsumerName: utils.StringPointer("consumer_name"),
|
||||
NATSStreamName: utils.StringPointer("stream_name"),
|
||||
NATSSubject: utils.StringPointer("subject"),
|
||||
NATSQueueID: utils.StringPointer("queue_id"),
|
||||
NATSJWTFile: utils.StringPointer("jsw_file"),
|
||||
@@ -2450,6 +2458,7 @@ func TestDiffEventReaderOptsJsonCfg(t *testing.T) {
|
||||
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
|
||||
NATSJetStream: utils.BoolPointer(false),
|
||||
NATSConsumerName: utils.StringPointer("consumer_name"),
|
||||
NATSStreamName: utils.StringPointer("stream_name"),
|
||||
NATSSubject: utils.StringPointer("subject"),
|
||||
NATSQueueID: utils.StringPointer("queue_id"),
|
||||
NATSJWTFile: utils.StringPointer("jsw_file"),
|
||||
|
||||
75
data/conf/samples/ers_nats/cgrates.json
Normal file
75
data/conf/samples/ers_nats/cgrates.json
Normal file
@@ -0,0 +1,75 @@
|
||||
{
|
||||
|
||||
"general": {
|
||||
"log_level": 7
|
||||
},
|
||||
|
||||
"data_db": {
|
||||
"db_type": "*internal"
|
||||
},
|
||||
|
||||
"stor_db": {
|
||||
"db_type": "*internal"
|
||||
},
|
||||
|
||||
"ers": {
|
||||
"enabled": true,
|
||||
"sessions_conns":[],
|
||||
"readers": [
|
||||
{
|
||||
"id": "nats_reader1",
|
||||
"type": "*natsJSONMap",
|
||||
"source_path": "nats://127.0.0.1:4222",
|
||||
"processed_path": "nats://127.0.0.1:4222",
|
||||
"opts": {
|
||||
"natsJetStream": true,
|
||||
"natsConsumerName": "cgrates",
|
||||
"natsStreamName": "stream",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
"natsQueueID": "queue",
|
||||
"natsJetStreamMaxWait": "5s",
|
||||
|
||||
"natsJetStreamProcessed": true,
|
||||
"natsSubjectProcessed": "cgrates_cdrs_processed",
|
||||
"natsJetStreamMaxWaitProcessed": "5s"
|
||||
},
|
||||
"flags": ["*dryrun"],
|
||||
"fields":[
|
||||
{"tag": "cdr_template", "type": "*template", "value": "cdr_template"}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "nats_reader2",
|
||||
"type": "*natsJSONMap",
|
||||
"source_path": "nats://127.0.0.1:4222",
|
||||
"processed_path": "nats://127.0.0.1:4222",
|
||||
"opts": {
|
||||
"natsJetStream": true,
|
||||
"natsConsumerName": "cgrates",
|
||||
"natsStreamName": "stream",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
"natsQueueID": "queue",
|
||||
"natsJetStreamMaxWait": "5s",
|
||||
|
||||
"natsJetStreamProcessed": true,
|
||||
"natsSubjectProcessed": "cgrates_cdrs_processed",
|
||||
"natsJetStreamMaxWaitProcessed": "5s"
|
||||
},
|
||||
"flags": ["*dryrun"],
|
||||
"fields":[
|
||||
{"tag": "cdr_template", "type": "*template", "value": "cdr_template"}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
|
||||
"templates": {
|
||||
"cdr_template": [
|
||||
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true},
|
||||
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.Subject", "mandatory": true},
|
||||
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true}
|
||||
]
|
||||
}
|
||||
|
||||
}
|
||||
117
ees/nats.go
117
ees/nats.go
@@ -22,7 +22,7 @@ import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -30,6 +30,7 @@ import (
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
// NewNatsEE creates a kafka poster
|
||||
@@ -40,7 +41,7 @@ func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Dur
|
||||
subject: utils.DefaultQueueID,
|
||||
reqs: newConcReq(cfg.ConcurrentRequests),
|
||||
}
|
||||
err = natsPstr.parseOpt(cfg.Opts, nodeID, connTimeout)
|
||||
err = natsPstr.parseOpts(cfg.Opts, nodeID, connTimeout)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -49,10 +50,9 @@ type NatsEE struct {
|
||||
subject string // identifier of the CDR queue where we publish
|
||||
jetStream bool
|
||||
opts []nats.Option
|
||||
jsOpts []nats.JSOpt
|
||||
|
||||
poster *nats.Conn
|
||||
posterJS nats.JetStreamContext
|
||||
posterJS jetstream.JetStream
|
||||
|
||||
cfg *config.EventExporterCfg
|
||||
dc *utils.SafeMapStorage
|
||||
@@ -61,74 +61,79 @@ type NatsEE struct {
|
||||
bytePreparing
|
||||
}
|
||||
|
||||
func (pstr *NatsEE) parseOpt(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (err error) {
|
||||
func (pstr *NatsEE) parseOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) error {
|
||||
if opts.NATSJetStream != nil {
|
||||
pstr.jetStream = *opts.NATSJetStream
|
||||
}
|
||||
pstr.subject = utils.DefaultQueueID
|
||||
if opts.NATSSubject != nil {
|
||||
pstr.subject = *opts.NATSSubject
|
||||
}
|
||||
var err error
|
||||
pstr.opts, err = GetNatsOpts(opts, nodeID, connTimeout)
|
||||
if pstr.jetStream {
|
||||
if opts.NATSJetStreamMaxWait != nil {
|
||||
pstr.jsOpts = []nats.JSOpt{nats.MaxWait(*opts.NATSJetStreamMaxWait)}
|
||||
}
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func (pstr *NatsEE) Cfg() *config.EventExporterCfg { return pstr.cfg }
|
||||
|
||||
func (pstr *NatsEE) Connect() (err error) {
|
||||
func (pstr *NatsEE) Connect() error {
|
||||
pstr.Lock()
|
||||
defer pstr.Unlock()
|
||||
if pstr.poster == nil {
|
||||
if pstr.poster, err = nats.Connect(pstr.Cfg().ExportPath, pstr.opts...); err != nil {
|
||||
return
|
||||
}
|
||||
if pstr.jetStream {
|
||||
pstr.posterJS, err = pstr.poster.JetStream(pstr.jsOpts...)
|
||||
}
|
||||
if pstr.poster != nil {
|
||||
return nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (pstr *NatsEE) ExportEvent(ctx *context.Context, content, _ any) (err error) {
|
||||
pstr.reqs.get()
|
||||
pstr.RLock()
|
||||
if pstr.poster == nil {
|
||||
pstr.RUnlock()
|
||||
pstr.reqs.done()
|
||||
return utils.ErrDisconnected
|
||||
var err error
|
||||
pstr.poster, err = nats.Connect(pstr.Cfg().ExportPath, pstr.opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if pstr.jetStream {
|
||||
_, err = pstr.posterJS.Publish(pstr.subject, content.([]byte), nats.Context(ctx))
|
||||
pstr.posterJS, err = jetstream.New(pstr.poster)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (pstr *NatsEE) ExportEvent(ctx *context.Context, content, _ any) error {
|
||||
pstr.reqs.get()
|
||||
defer pstr.reqs.done()
|
||||
pstr.RLock()
|
||||
defer pstr.RUnlock()
|
||||
if pstr.poster == nil {
|
||||
return utils.ErrDisconnected
|
||||
}
|
||||
var err error
|
||||
if pstr.jetStream {
|
||||
ctx := context.TODO()
|
||||
if pstr.cfg.Opts.NATSJetStreamMaxWait != nil {
|
||||
ctx, _ = context.WithTimeout(ctx, *pstr.cfg.Opts.NATSJetStreamMaxWait)
|
||||
}
|
||||
_, err = pstr.posterJS.Publish(ctx, pstr.subject, content.([]byte))
|
||||
} else {
|
||||
err = pstr.poster.Publish(pstr.subject, content.([]byte))
|
||||
}
|
||||
pstr.RUnlock()
|
||||
pstr.reqs.done()
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func (pstr *NatsEE) Close() (err error) {
|
||||
func (pstr *NatsEE) Close() error {
|
||||
pstr.Lock()
|
||||
if pstr.poster != nil {
|
||||
err = pstr.poster.Drain()
|
||||
pstr.poster = nil
|
||||
defer pstr.Unlock()
|
||||
|
||||
if pstr.poster == nil {
|
||||
return nil
|
||||
}
|
||||
pstr.Unlock()
|
||||
return
|
||||
|
||||
err := pstr.poster.Drain()
|
||||
pstr.poster = nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
|
||||
|
||||
func (pstr *NatsEE) ExtraData(ev *utils.CGREvent) any { return nil }
|
||||
|
||||
func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) {
|
||||
nop = make([]nats.Option, 0, 7)
|
||||
nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID),
|
||||
func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) ([]nats.Option, error) {
|
||||
natsOpts := make([]nats.Option, 0, 7)
|
||||
natsOpts = append(natsOpts, nats.Name(utils.CGRateSLwr+nodeID),
|
||||
nats.Timeout(connTimeout),
|
||||
nats.DrainTimeout(time.Second))
|
||||
if opts.NATSJWTFile != nil {
|
||||
@@ -136,33 +141,33 @@ func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time
|
||||
if opts.NATSSeedFile != nil {
|
||||
keys = append(keys, *opts.NATSSeedFile)
|
||||
}
|
||||
nop = append(nop, nats.UserCredentials(*opts.NATSJWTFile, keys...))
|
||||
natsOpts = append(natsOpts, nats.UserCredentials(*opts.NATSJWTFile, keys...))
|
||||
}
|
||||
if opts.NATSSeedFile != nil {
|
||||
opt, err := nats.NkeyOptionFromSeed(*opts.NATSSeedFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nop = append(nop, opt)
|
||||
natsOpts = append(natsOpts, opt)
|
||||
}
|
||||
if opts.NATSClientCertificate != nil {
|
||||
if opts.NATSClientKey == nil {
|
||||
err = fmt.Errorf("has certificate but no key")
|
||||
return
|
||||
}
|
||||
nop = append(nop, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey))
|
||||
} else if opts.NATSClientKey != nil {
|
||||
err = fmt.Errorf("has key but no certificate")
|
||||
return
|
||||
|
||||
switch {
|
||||
case opts.NATSClientCertificate != nil && opts.NATSClientKey != nil:
|
||||
natsOpts = append(natsOpts, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey))
|
||||
case opts.NATSClientCertificate != nil:
|
||||
return nil, fmt.Errorf("has certificate but no key")
|
||||
case opts.NATSClientKey != nil:
|
||||
return nil, fmt.Errorf("has key but no certificate")
|
||||
}
|
||||
|
||||
if opts.NATSCertificateAuthority != nil {
|
||||
nop = append(nop,
|
||||
natsOpts = append(natsOpts,
|
||||
func(o *nats.Options) error {
|
||||
pool, err := x509.SystemCertPool()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rootPEM, err := ioutil.ReadFile(*opts.NATSCertificateAuthority)
|
||||
rootPEM, err := os.ReadFile(*opts.NATSCertificateAuthority)
|
||||
if err != nil || rootPEM == nil {
|
||||
return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err)
|
||||
}
|
||||
@@ -179,5 +184,5 @@ func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return
|
||||
return natsOpts, nil
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package ees
|
||||
|
||||
import (
|
||||
"os/exec"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -31,18 +31,25 @@ import (
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/nats-io/nats-server/v2/server"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
func TestNatsEE(t *testing.T) {
|
||||
testCreateDirectory(t)
|
||||
var err error
|
||||
cmd := exec.Command("nats-server", "-js") // Start the nats-server.
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatal(err) // Only if nats-server is not installed.
|
||||
func TestNatsEEJetStream(t *testing.T) {
|
||||
|
||||
natsServer, err := server.NewServer(&server.Options{
|
||||
Host: "127.0.0.1",
|
||||
Port: 4222,
|
||||
JetStream: true,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
defer cmd.Process.Kill()
|
||||
natsServer.Start()
|
||||
defer natsServer.Shutdown()
|
||||
|
||||
testCreateDirectory(t)
|
||||
cgrCfg, err := config.NewCGRConfigFromPath(context.Background(), path.Join(*dataDir, "conf", "samples", "ees"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -63,38 +70,40 @@ func TestNatsEE(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
nc, err := nats.Connect("nats://localhost:4222", nop...)
|
||||
nc, err := nats.Connect(nats.DefaultURL, nop...)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
js, err := nc.JetStream()
|
||||
defer nc.Drain()
|
||||
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for name := range js.StreamNames() {
|
||||
if name == "test2" {
|
||||
if err = js.DeleteStream("test2"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if _, err = js.AddStream(&nats.StreamConfig{
|
||||
_, err = js.CreateStream(context.Background(), jetstream.StreamConfig{
|
||||
Name: "test2",
|
||||
Subjects: []string{"processed_cdrs"},
|
||||
}); err != nil {
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer js.DeleteStream(context.Background(), "test2")
|
||||
|
||||
ch := make(chan string, 3)
|
||||
var cons jetstream.Consumer
|
||||
cons, err = js.CreateOrUpdateConsumer(context.Background(), "test2", jetstream.ConsumerConfig{
|
||||
Durable: "test4",
|
||||
FilterSubject: "processed_cdrs",
|
||||
AckPolicy: jetstream.AckAllPolicy,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err = js.PurgeStream("test2"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ch := make(chan *nats.Msg, 3)
|
||||
_, err = js.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) {
|
||||
ch <- msg
|
||||
}, nats.Durable("test4"))
|
||||
_, err = cons.Consume(func(msg jetstream.Msg) {
|
||||
ch <- string(msg.Data())
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -114,24 +123,26 @@ func TestNatsEE(t *testing.T) {
|
||||
// fmt.Println((<-ch).Data)
|
||||
select {
|
||||
case data := <-ch:
|
||||
if expected != string(data.Data) {
|
||||
t.Fatalf("Expected %v \n but received \n %v", expected, string(data.Data))
|
||||
if expected != data {
|
||||
t.Fatalf("Expected %v \n but received \n %v", expected, data)
|
||||
}
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Fatal("Time limit exceeded")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNatsEE2(t *testing.T) {
|
||||
func TestNatsEE(t *testing.T) {
|
||||
testCreateDirectory(t)
|
||||
exec.Command("pkill", "nats-server")
|
||||
|
||||
cmd := exec.Command("nats-server")
|
||||
if err := cmd.Start(); err != nil {
|
||||
natsServer, err := server.NewServer(&server.Options{
|
||||
Host: "127.0.0.1",
|
||||
Port: 4222,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
defer cmd.Process.Kill()
|
||||
natsServer.Start()
|
||||
defer natsServer.Shutdown()
|
||||
|
||||
cgrCfg, err := config.NewCGRConfigFromPath(context.Background(), path.Join(*dataDir, "conf", "samples", "ees"))
|
||||
if err != nil {
|
||||
@@ -187,3 +198,31 @@ func TestNatsEE2(t *testing.T) {
|
||||
t.Fatal("Time limit exceeded")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetNatsOptsSeedFile(t *testing.T) {
|
||||
if _, err := os.Create("/tmp/nkey.txt"); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer os.Remove("/tmp/nkey.txt")
|
||||
nkey := "SUACSSL3UAHUDXKFSNVUZRF5UHPMWZ6BFDTJ7M6USDXIEDNPPQYYYCU3VY"
|
||||
os.WriteFile("/tmp/nkey.txt", []byte(nkey), 0777)
|
||||
|
||||
opts := &config.EventExporterOpts{
|
||||
NATSSeedFile: utils.StringPointer("/tmp/nkey.txt"),
|
||||
}
|
||||
|
||||
nodeID := "node_id1"
|
||||
connTimeout := 2 * time.Second
|
||||
|
||||
_, err := GetNatsOpts(opts, nodeID, connTimeout)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
//test error
|
||||
os.WriteFile("/tmp/nkey.txt", []byte(""), 0777)
|
||||
_, err = GetNatsOpts(opts, nodeID, connTimeout)
|
||||
if err == nil || err.Error() != "nkeys: no nkey seed found" {
|
||||
t.Errorf("expected \"%s\" but received \"%s\"", "nkeys: no nkey seed found", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -254,7 +254,7 @@ func (rdr *AMQPER) close() (err error) {
|
||||
}
|
||||
|
||||
func (rdr *AMQPER) createPoster() {
|
||||
processedOpt := getProcessOptions(rdr.Config().Opts)
|
||||
processedOpt := getProcessedOptions(rdr.Config().Opts)
|
||||
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -211,7 +211,7 @@ func (rdr *AMQPv1ER) close() (err error) {
|
||||
}
|
||||
|
||||
func (rdr *AMQPv1ER) createPoster() {
|
||||
processedOpt := getProcessOptions(rdr.Config().Opts)
|
||||
processedOpt := getProcessedOptions(rdr.Config().Opts)
|
||||
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -250,7 +250,7 @@ func (rdr *KafkaER) setOpts(opts *config.EventReaderOpts) (err error) {
|
||||
}
|
||||
|
||||
func (rdr *KafkaER) createPoster() {
|
||||
processedOpt := getProcessOptions(rdr.Config().Opts)
|
||||
processedOpt := getProcessedOptions(rdr.Config().Opts)
|
||||
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -28,8 +28,9 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// getProcessOptions assigns all non-nil fields ending in "Processed" from EventReaderOpts to their counterparts in EventExporterOpts
|
||||
func getProcessOptions(erOpts *config.EventReaderOpts) (eeOpts *config.EventExporterOpts) {
|
||||
// getProcessedOptions assigns all non-nil fields ending in "Processed" from EventReaderOpts to their counterparts in EventExporterOpts
|
||||
func getProcessedOptions(erOpts *config.EventReaderOpts) *config.EventExporterOpts {
|
||||
var eeOpts *config.EventExporterOpts
|
||||
if erOpts.AMQPExchangeProcessed != nil {
|
||||
if eeOpts == nil {
|
||||
eeOpts = new(config.EventExporterOpts)
|
||||
@@ -198,7 +199,7 @@ func getProcessOptions(erOpts *config.EventReaderOpts) (eeOpts *config.EventExpo
|
||||
}
|
||||
eeOpts.PgSSLMode = erOpts.PgSSLModeProcessed
|
||||
}
|
||||
return
|
||||
return eeOpts
|
||||
}
|
||||
|
||||
// mergePartialEvents will unite the events using the reader configuration
|
||||
|
||||
@@ -31,7 +31,7 @@ func TestGetProcessOptions(t *testing.T) {
|
||||
opts := &config.EventReaderOpts{
|
||||
AWSKeyProcessed: utils.StringPointer("testKey"),
|
||||
}
|
||||
result := getProcessOptions(opts)
|
||||
result := getProcessedOptions(opts)
|
||||
expected := &config.EventExporterOpts{
|
||||
AWSKey: utils.StringPointer("testKey"),
|
||||
}
|
||||
|
||||
214
ers/nats.go
214
ers/nats.go
@@ -23,7 +23,7 @@ import (
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
@@ -33,12 +33,13 @@ import (
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
// NewNatsER return a new amqp event reader
|
||||
func NewNatsER(cfg *config.CGRConfig, cfgIdx int,
|
||||
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (_ EventReader, err error) {
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}, connMgr *engine.ConnManager) (EventReader, error) {
|
||||
rdr := &NatsER{
|
||||
connMgr: connMgr,
|
||||
cgrCfg: cfg,
|
||||
@@ -55,11 +56,14 @@ func NewNatsER(cfg *config.CGRConfig, cfgIdx int,
|
||||
rdr.cap <- struct{}{}
|
||||
}
|
||||
}
|
||||
if err = rdr.processOpts(); err != nil {
|
||||
return
|
||||
var err error
|
||||
err = rdr.processOpts()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = rdr.createPoster(); err != nil {
|
||||
return
|
||||
err = rdr.createPoster()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rdr, nil
|
||||
}
|
||||
@@ -82,8 +86,8 @@ type NatsER struct {
|
||||
queueID string
|
||||
jetStream bool
|
||||
consumerName string
|
||||
streamName string
|
||||
opts []nats.Option
|
||||
jsOpts []nats.JSOpt
|
||||
|
||||
poster *ees.NatsEE
|
||||
}
|
||||
@@ -93,69 +97,106 @@ func (rdr *NatsER) Config() *config.EventReaderCfg {
|
||||
return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx]
|
||||
}
|
||||
|
||||
// Serve will start the gorutines needed to watch the nats subject
|
||||
func (rdr *NatsER) Serve() (err error) {
|
||||
// Connect to a server
|
||||
var nc *nats.Conn
|
||||
var js nats.JetStreamContext
|
||||
// Serve will subscribe to a NATS subject and process incoming messages until the rdrExit channel
|
||||
// will be closed.
|
||||
func (rdr *NatsER) Serve() error {
|
||||
|
||||
if nc, err = nats.Connect(rdr.Config().SourcePath, rdr.opts...); err != nil {
|
||||
return
|
||||
// Establish a connection to the nats server.
|
||||
nc, err := nats.Connect(rdr.Config().SourcePath, rdr.opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ch := make(chan *nats.Msg)
|
||||
|
||||
// Define the message handler. Its content will get executed for every received message.
|
||||
handleMessage := func(msgData []byte) {
|
||||
|
||||
// If the rdr.cap channel buffer is empty, block until a resource is available. Otherwise
|
||||
// allocate one resource and start processing the message.
|
||||
if rdr.Config().ConcurrentReqs != -1 {
|
||||
<-rdr.cap
|
||||
}
|
||||
go func() {
|
||||
handlerErr := rdr.processMessage(msgData)
|
||||
if handlerErr != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> processing message %s error: %s",
|
||||
utils.ERs, string(msgData), handlerErr.Error()))
|
||||
}
|
||||
|
||||
// Export the received message if a poster has been defined.
|
||||
if rdr.poster != nil {
|
||||
handlerErr = ees.ExportWithAttempts(context.TODO(), rdr.poster, msgData,
|
||||
utils.EmptyString, rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant)
|
||||
if handlerErr != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> writing message %s error: %s",
|
||||
utils.ERs, string(msgData), handlerErr.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
// Release the resource back to rdr.cap channel.
|
||||
if rdr.Config().ConcurrentReqs != -1 {
|
||||
rdr.cap <- struct{}{}
|
||||
}
|
||||
|
||||
}()
|
||||
}
|
||||
|
||||
// Subscribe to the appropriate NATS subject.
|
||||
if !rdr.jetStream {
|
||||
if _, err = nc.ChanQueueSubscribe(rdr.subject, rdr.queueID, ch); err != nil {
|
||||
return
|
||||
_, err = nc.QueueSubscribe(rdr.subject, rdr.queueID, func(msg *nats.Msg) {
|
||||
handleMessage(msg.Data)
|
||||
})
|
||||
if err != nil {
|
||||
nc.Drain()
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
js, err = nc.JetStream(rdr.jsOpts...)
|
||||
var js jetstream.JetStream
|
||||
js, err = jetstream.New(nc)
|
||||
if err != nil {
|
||||
return
|
||||
nc.Drain()
|
||||
return err
|
||||
}
|
||||
if _, err = js.QueueSubscribe(rdr.subject, rdr.queueID, func(msg *nats.Msg) {
|
||||
ch <- msg
|
||||
}, nats.Durable(rdr.consumerName)); err != nil {
|
||||
return
|
||||
ctx := context.TODO()
|
||||
if jsMaxWait := rdr.Config().Opts.NATSJetStreamMaxWait; jsMaxWait != nil {
|
||||
ctx, _ = context.WithTimeout(ctx, *jsMaxWait)
|
||||
}
|
||||
|
||||
var cons jetstream.Consumer
|
||||
cons, err = js.CreateOrUpdateConsumer(ctx, rdr.streamName, jetstream.ConsumerConfig{
|
||||
FilterSubject: rdr.subject,
|
||||
Durable: rdr.consumerName,
|
||||
AckPolicy: jetstream.AckAllPolicy,
|
||||
})
|
||||
if err != nil {
|
||||
nc.Drain()
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = cons.Consume(func(msg jetstream.Msg) {
|
||||
handleMessage(msg.Data())
|
||||
})
|
||||
if err != nil {
|
||||
nc.Drain()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
if rdr.Config().ConcurrentReqs != -1 {
|
||||
<-rdr.cap // do not try to read if the limit is reached
|
||||
}
|
||||
select {
|
||||
case <-rdr.rdrExit:
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> stop monitoring nats path <%s>",
|
||||
utils.ERs, rdr.Config().SourcePath))
|
||||
nc.Drain()
|
||||
if rdr.poster != nil {
|
||||
rdr.poster.Close()
|
||||
}
|
||||
return
|
||||
case msg := <-ch:
|
||||
go func(msg *nats.Msg) {
|
||||
if err := rdr.processMessage(msg.Data); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> processing message %s error: %s",
|
||||
utils.ERs, string(msg.Data), err.Error()))
|
||||
}
|
||||
if rdr.poster != nil { // post it
|
||||
if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Data, utils.EmptyString,
|
||||
rdr.connMgr, rdr.cgrCfg.GeneralCfg().DefaultTenant); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> writing message %s error: %s",
|
||||
utils.ERs, string(msg.Data), err.Error()))
|
||||
}
|
||||
}
|
||||
if rdr.Config().ConcurrentReqs != -1 {
|
||||
rdr.cap <- struct{}{}
|
||||
}
|
||||
}(msg)
|
||||
}
|
||||
|
||||
// Wait for exit signal.
|
||||
<-rdr.rdrExit
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> stop monitoring nats path <%s>",
|
||||
utils.ERs, rdr.Config().SourcePath))
|
||||
nc.Drain()
|
||||
if rdr.poster != nil {
|
||||
rdr.poster.Close()
|
||||
}
|
||||
}()
|
||||
return
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rdr *NatsER) processMessage(msg []byte) (err error) {
|
||||
@@ -191,7 +232,7 @@ func (rdr *NatsER) processMessage(msg []byte) (err error) {
|
||||
}
|
||||
|
||||
func (rdr *NatsER) createPoster() (err error) {
|
||||
processedOpt := getProcessOptions(rdr.Config().Opts)
|
||||
processedOpt := getProcessedOptions(rdr.Config().Opts)
|
||||
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
|
||||
return
|
||||
}
|
||||
@@ -202,37 +243,34 @@ func (rdr *NatsER) createPoster() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (rdr *NatsER) processOpts() (err error) {
|
||||
func (rdr *NatsER) processOpts() error {
|
||||
if rdr.Config().Opts.NATSSubject != nil {
|
||||
rdr.subject = *rdr.Config().Opts.NATSSubject
|
||||
}
|
||||
var queueID string
|
||||
rdr.queueID = rdr.cgrCfg.GeneralCfg().NodeID
|
||||
if rdr.Config().Opts.NATSQueueID != nil {
|
||||
queueID = *rdr.Config().Opts.NATSQueueID
|
||||
rdr.queueID = *rdr.Config().Opts.NATSQueueID
|
||||
}
|
||||
rdr.queueID = utils.FirstNonEmpty(queueID, rdr.cgrCfg.GeneralCfg().NodeID)
|
||||
var consumerName string
|
||||
rdr.consumerName = utils.CGRateSLwr
|
||||
if rdr.Config().Opts.NATSConsumerName != nil {
|
||||
consumerName = *rdr.Config().Opts.NATSConsumerName
|
||||
rdr.consumerName = *rdr.Config().Opts.NATSConsumerName
|
||||
}
|
||||
if rdr.Config().Opts.NATSStreamName != nil {
|
||||
rdr.streamName = *rdr.Config().Opts.NATSStreamName
|
||||
}
|
||||
rdr.consumerName = utils.FirstNonEmpty(consumerName, utils.CGRateSLwr)
|
||||
if rdr.Config().Opts.NATSJetStream != nil {
|
||||
rdr.jetStream = *rdr.Config().Opts.NATSJetStream
|
||||
}
|
||||
if rdr.jetStream {
|
||||
if rdr.Config().Opts.NATSJetStreamMaxWait != nil {
|
||||
rdr.jsOpts = []nats.JSOpt{nats.MaxWait(*rdr.Config().Opts.NATSJetStreamMaxWait)}
|
||||
}
|
||||
}
|
||||
var err error
|
||||
rdr.opts, err = GetNatsOpts(rdr.Config().Opts,
|
||||
rdr.cgrCfg.GeneralCfg().NodeID,
|
||||
rdr.cgrCfg.GeneralCfg().ConnectTimeout)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func GetNatsOpts(opts *config.EventReaderOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) {
|
||||
nop = make([]nats.Option, 0, 7)
|
||||
nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID),
|
||||
natsOpts := make([]nats.Option, 0, 7)
|
||||
natsOpts = append(natsOpts, nats.Name(utils.CGRateSLwr+nodeID),
|
||||
nats.Timeout(connTimeout),
|
||||
nats.DrainTimeout(time.Second))
|
||||
if opts.NATSJWTFile != nil {
|
||||
@@ -240,33 +278,33 @@ func GetNatsOpts(opts *config.EventReaderOpts, nodeID string, connTimeout time.D
|
||||
if opts.NATSSeedFile != nil {
|
||||
keys = append(keys, *opts.NATSSeedFile)
|
||||
}
|
||||
nop = append(nop, nats.UserCredentials(*opts.NATSJWTFile, keys...))
|
||||
natsOpts = append(natsOpts, nats.UserCredentials(*opts.NATSJWTFile, keys...))
|
||||
}
|
||||
if opts.NATSSeedFile != nil {
|
||||
opt, err := nats.NkeyOptionFromSeed(*opts.NATSSeedFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nop = append(nop, opt)
|
||||
natsOpts = append(natsOpts, opt)
|
||||
}
|
||||
if opts.NATSClientCertificate != nil {
|
||||
if opts.NATSClientKey == nil {
|
||||
err = fmt.Errorf("has certificate but no key")
|
||||
return
|
||||
}
|
||||
nop = append(nop, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey))
|
||||
} else if opts.NATSClientKey != nil {
|
||||
err = fmt.Errorf("has key but no certificate")
|
||||
return
|
||||
|
||||
switch {
|
||||
case opts.NATSClientCertificate != nil && opts.NATSClientKey != nil:
|
||||
natsOpts = append(natsOpts, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey))
|
||||
case opts.NATSClientCertificate != nil:
|
||||
return nil, fmt.Errorf("has certificate but no key")
|
||||
case opts.NATSClientKey != nil:
|
||||
return nil, fmt.Errorf("has key but no certificate")
|
||||
}
|
||||
|
||||
if opts.NATSCertificateAuthority != nil {
|
||||
nop = append(nop,
|
||||
natsOpts = append(natsOpts,
|
||||
func(o *nats.Options) error {
|
||||
pool, err := x509.SystemCertPool()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rootPEM, err := ioutil.ReadFile(*opts.NATSCertificateAuthority)
|
||||
rootPEM, err := os.ReadFile(*opts.NATSCertificateAuthority)
|
||||
if err != nil || rootPEM == nil {
|
||||
return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err)
|
||||
}
|
||||
@@ -283,5 +321,5 @@ func GetNatsOpts(opts *config.EventReaderOpts, nodeID string, connTimeout time.D
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return
|
||||
return natsOpts, nil
|
||||
}
|
||||
|
||||
@@ -22,22 +22,109 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package ers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/nats-io/nats-server/v2/server"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
func testCheckNatsData(t *testing.T, randomOriginID, expData string, ch chan *nats.Msg) {
|
||||
func TestNatsERIT(t *testing.T) {
|
||||
cfgPath := path.Join(*dataDir, "conf", "samples", "ers_nats")
|
||||
cfg, err := config.NewCGRConfigFromPath(context.Background(), cfgPath)
|
||||
if err != nil {
|
||||
t.Fatal("could not init cfg", err.Error())
|
||||
}
|
||||
|
||||
natsServer, err := server.NewServer(&server.Options{
|
||||
Host: "127.0.0.1",
|
||||
Port: 4222,
|
||||
JetStream: true,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
natsServer.Start()
|
||||
defer natsServer.Shutdown()
|
||||
|
||||
// Establish a connection to nats.
|
||||
nc, err := nats.Connect(cfg.ERsCfg().Readers[1].SourcePath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
// Initialize a stream manager and create a stream.
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
js.CreateStream(context.Background(), jetstream.StreamConfig{
|
||||
Name: "stream",
|
||||
Subjects: []string{"cgrates_cdrs", "cgrates_cdrs_processed"},
|
||||
})
|
||||
|
||||
// Start the engine.
|
||||
if _, err := engine.StopStartEngine(cfgPath, 100); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer engine.KillEngine(100)
|
||||
|
||||
// Publish CDRs asynchronously to the nats subject.
|
||||
cdr := make(map[string]any)
|
||||
for i := 0; i < 10; i++ {
|
||||
cdr[utils.AccountField] = 1001 + i
|
||||
cdr[utils.Subject] = 1001 + i
|
||||
cdr[utils.Destination] = 2001 + i
|
||||
b, _ := json.Marshal(cdr)
|
||||
js.PublishAsync("cgrates_cdrs", b)
|
||||
}
|
||||
select {
|
||||
case <-js.PublishAsyncComplete():
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("Did not resolve in time")
|
||||
}
|
||||
|
||||
// Define a consumer for the subject where all the processed cdrs were published.
|
||||
var cons jetstream.Consumer
|
||||
cons, err = js.CreateOrUpdateConsumer(context.Background(), "stream", jetstream.ConsumerConfig{
|
||||
FilterSubject: "cgrates_cdrs_processed",
|
||||
Durable: "cgrates_processed",
|
||||
AckPolicy: jetstream.AckAllPolicy,
|
||||
})
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// Wait for the messages to be consumed and processed.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Retrieve info about the consumer.
|
||||
info, err := cons.Info(context.Background())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if info.NumPending != 10 {
|
||||
t.Errorf("expected %d pending messages, received %d", 10, info.NumPending)
|
||||
}
|
||||
|
||||
js.DeleteStream(context.Background(), "stream")
|
||||
|
||||
}
|
||||
|
||||
func testCheckNatsData(t *testing.T, randomOriginID, expData string, ch chan string) {
|
||||
select {
|
||||
case err := <-rdrErr:
|
||||
t.Fatal(err)
|
||||
@@ -58,8 +145,8 @@ func testCheckNatsData(t *testing.T, randomOriginID, expData string, ch chan *na
|
||||
}
|
||||
select {
|
||||
case msg := <-ch:
|
||||
if expData != string(msg.Data) {
|
||||
t.Errorf("Expected %q ,received %q", expData, string(msg.Data))
|
||||
if expData != msg {
|
||||
t.Errorf("Expected %q ,received %q", expData, msg)
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("Timeout2")
|
||||
@@ -89,49 +176,44 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) {
|
||||
}
|
||||
defer nc.Drain()
|
||||
|
||||
js, err := nc.JetStream()
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for name := range js.StreamNames() {
|
||||
if name == "test" {
|
||||
if err = js.DeleteStream("test"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
break
|
||||
}
|
||||
if name == "test2" {
|
||||
if err = js.DeleteStream("test2"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
if _, err = js.AddStream(&nats.StreamConfig{
|
||||
|
||||
_, err = js.CreateStream(context.Background(), jetstream.StreamConfig{
|
||||
Name: "test",
|
||||
Subjects: []string{utils.DefaultQueueID},
|
||||
}); err != nil {
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer js.DeleteStream(context.Background(), "test")
|
||||
|
||||
if err = js.PurgeStream("test"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if _, err = js.AddStream(&nats.StreamConfig{
|
||||
_, err = js.CreateStream(context.Background(), jetstream.StreamConfig{
|
||||
Name: "test2",
|
||||
Subjects: []string{"processed_cdrs"},
|
||||
}); err != nil {
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer js.DeleteStream(context.Background(), "test2")
|
||||
|
||||
ch := make(chan string, 3)
|
||||
var cons jetstream.Consumer
|
||||
cons, err = js.CreateOrUpdateConsumer(context.Background(), "test2", jetstream.ConsumerConfig{
|
||||
FilterSubject: "processed_cdrs",
|
||||
Durable: "test4",
|
||||
AckPolicy: jetstream.AckAllPolicy,
|
||||
})
|
||||
if err != nil {
|
||||
nc.Drain()
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err = js.PurgeStream("test2"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ch := make(chan *nats.Msg, 3)
|
||||
_, err = js.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) {
|
||||
ch <- msg
|
||||
}, nats.Durable("test4"))
|
||||
_, err = cons.Consume(func(msg jetstream.Msg) {
|
||||
ch <- string(msg.Data())
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -143,7 +225,7 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) {
|
||||
for i := 0; i < 3; i++ {
|
||||
randomOriginID := utils.UUIDSha1Prefix()
|
||||
expData := fmt.Sprintf(`{"OriginID": "%s"}`, randomOriginID)
|
||||
if _, err = js.Publish(utils.DefaultQueueID, []byte(expData)); err != nil {
|
||||
if _, err = js.Publish(context.Background(), utils.DefaultQueueID, []byte(expData)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -174,8 +256,10 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ch := make(chan *nats.Msg, 3)
|
||||
_, err = nc.ChanQueueSubscribe("processed_cdrs", "test3", ch)
|
||||
ch := make(chan string, 3)
|
||||
_, err = nc.QueueSubscribe("processed_cdrs", "test3", func(msg *nats.Msg) {
|
||||
ch <- string(msg.Data)
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -200,16 +284,16 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) {
|
||||
}
|
||||
|
||||
func TestNatsERJetStream(t *testing.T) {
|
||||
// start the nats-server
|
||||
exec.Command("pkill", "nats-server")
|
||||
|
||||
cmd := exec.Command("nats-server", "-js")
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatal(err) // most probably not installed
|
||||
natsServer, err := server.NewServer(&server.Options{
|
||||
Host: "127.0.0.1",
|
||||
Port: 4222,
|
||||
JetStream: true,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
defer cmd.Process.Kill()
|
||||
//
|
||||
natsServer.Start()
|
||||
defer natsServer.Shutdown()
|
||||
|
||||
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
|
||||
"ers": { // EventReaderService
|
||||
@@ -231,6 +315,7 @@ func TestNatsERJetStream(t *testing.T) {
|
||||
],
|
||||
"opts": {
|
||||
"natsJetStream": true,
|
||||
"natsStreamName": "test",
|
||||
"natsJetStreamProcessed": true,
|
||||
"natsSubjectProcessed": "processed_cdrs",
|
||||
}
|
||||
@@ -248,16 +333,15 @@ func TestNatsERJetStream(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNatsER(t *testing.T) {
|
||||
// start the nats-server
|
||||
exec.Command("pkill", "nats-server")
|
||||
|
||||
cmd := exec.Command("nats-server")
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatal(err) // most probably not installed
|
||||
natsServer, err := server.NewServer(&server.Options{
|
||||
Host: "127.0.0.1",
|
||||
Port: 4222,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
defer cmd.Process.Kill()
|
||||
//
|
||||
natsServer.Start()
|
||||
defer natsServer.Shutdown()
|
||||
|
||||
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
|
||||
"ers": { // EventReaderService
|
||||
@@ -294,16 +378,18 @@ func TestNatsER(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNatsERJetStreamUser(t *testing.T) {
|
||||
// start the nats-server
|
||||
exec.Command("pkill", "nats-server")
|
||||
|
||||
cmd := exec.Command("nats-server", "-js", "--user", "user", "--pass", "password")
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatal(err) // most probably not installed
|
||||
natsServer, err := server.NewServer(&server.Options{
|
||||
Host: "127.0.0.1",
|
||||
Port: 4222,
|
||||
JetStream: true,
|
||||
Username: "user",
|
||||
Password: "password",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
defer cmd.Process.Kill()
|
||||
//
|
||||
natsServer.Start()
|
||||
defer natsServer.Shutdown()
|
||||
|
||||
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
|
||||
"ers": { // EventReaderService
|
||||
@@ -325,6 +411,7 @@ func TestNatsERJetStreamUser(t *testing.T) {
|
||||
],
|
||||
"opts": {
|
||||
"natsJetStream": true,
|
||||
"natsStreamName": "test",
|
||||
"natsJetStreamProcessed": true,
|
||||
"natsSubjectProcessed": "processed_cdrs",
|
||||
}
|
||||
@@ -342,16 +429,17 @@ func TestNatsERJetStreamUser(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNatsERUser(t *testing.T) {
|
||||
// start the nats-server
|
||||
exec.Command("pkill", "nats-server")
|
||||
|
||||
cmd := exec.Command("nats-server", "--user", "user", "--pass", "password")
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatal(err) // most probably not installed
|
||||
natsServer, err := server.NewServer(&server.Options{
|
||||
Host: "127.0.0.1",
|
||||
Port: 4222,
|
||||
Username: "user",
|
||||
Password: "password",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
defer cmd.Process.Kill()
|
||||
//
|
||||
natsServer.Start()
|
||||
defer natsServer.Shutdown()
|
||||
|
||||
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
|
||||
"ers": { // EventReaderService
|
||||
@@ -388,16 +476,17 @@ func TestNatsERUser(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNatsERJetStreamToken(t *testing.T) {
|
||||
// start the nats-server
|
||||
exec.Command("pkill", "nats-server")
|
||||
|
||||
cmd := exec.Command("nats-server", "-js", "--auth", "token")
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatal(err) // most probably not installed
|
||||
natsServer, err := server.NewServer(&server.Options{
|
||||
Host: "127.0.0.1",
|
||||
Port: 4222,
|
||||
JetStream: true,
|
||||
Authorization: "token",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
defer cmd.Process.Kill()
|
||||
//
|
||||
natsServer.Start()
|
||||
defer natsServer.Shutdown()
|
||||
|
||||
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
|
||||
"ers": { // EventReaderService
|
||||
@@ -419,6 +508,7 @@ func TestNatsERJetStreamToken(t *testing.T) {
|
||||
],
|
||||
"opts": {
|
||||
"natsJetStream": true,
|
||||
"natsStreamName": "test",
|
||||
"natsJetStreamProcessed": true,
|
||||
"natsSubjectProcessed": "processed_cdrs",
|
||||
}
|
||||
@@ -436,16 +526,17 @@ func TestNatsERJetStreamToken(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNatsERToken(t *testing.T) {
|
||||
// start the nats-server
|
||||
exec.Command("pkill", "nats-server")
|
||||
|
||||
cmd := exec.Command("nats-server", "--auth", "token")
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatal(err) // most probably not installed
|
||||
natsServer, err := server.NewServer(&server.Options{
|
||||
Host: "127.0.0.1",
|
||||
Port: 4222,
|
||||
JetStream: true,
|
||||
Authorization: "token",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
defer cmd.Process.Kill()
|
||||
//
|
||||
natsServer.Start()
|
||||
defer natsServer.Shutdown()
|
||||
|
||||
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
|
||||
"ers": { // EventReaderService
|
||||
@@ -493,25 +584,21 @@ func TestNatsERNkey(t *testing.T) {
|
||||
if err := os.WriteFile(seedFilePath, []byte("SUAOUIE5CU47NCO22GHFEZXGCRCJDVTHDLMIP4L7UQNCR5SW4FZICI7O3Q"), 0664); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
natsCfgPath := path.Join(basePath, "nats.cfg")
|
||||
if err := os.WriteFile(natsCfgPath, []byte(`authorization: {
|
||||
users: [
|
||||
{ nkey: UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY }
|
||||
]
|
||||
}
|
||||
`), 0664); err != nil {
|
||||
|
||||
natsServer, err := server.NewServer(&server.Options{
|
||||
Host: "127.0.0.1",
|
||||
Port: 4222,
|
||||
Nkeys: []*server.NkeyUser{
|
||||
{
|
||||
Nkey: "UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY",
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// start the nats-server
|
||||
exec.Command("pkill", "nats-server")
|
||||
|
||||
cmd := exec.Command("nats-server", "-c", natsCfgPath)
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatal(err) // most probably not installed
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
defer cmd.Process.Kill()
|
||||
//
|
||||
natsServer.Start()
|
||||
defer natsServer.Shutdown()
|
||||
|
||||
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{
|
||||
"ers": { // EventReaderService
|
||||
@@ -561,25 +648,22 @@ func TestNatsERJetStreamNKey(t *testing.T) {
|
||||
if err := os.WriteFile(seedFilePath, []byte("SUAOUIE5CU47NCO22GHFEZXGCRCJDVTHDLMIP4L7UQNCR5SW4FZICI7O3Q"), 0664); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
natsCfgPath := path.Join(basePath, "nats.cfg")
|
||||
if err := os.WriteFile(natsCfgPath, []byte(`authorization: {
|
||||
users: [
|
||||
{ nkey: UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY }
|
||||
]
|
||||
}
|
||||
`), 0664); err != nil {
|
||||
|
||||
natsServer, err := server.NewServer(&server.Options{
|
||||
Host: "127.0.0.1",
|
||||
Port: 4222,
|
||||
JetStream: true,
|
||||
Nkeys: []*server.NkeyUser{
|
||||
{
|
||||
Nkey: "UBSNABLSM4Y2KY4ZFWPDOB4NVNYCGVD5YB7ROC4EGSDR7Z7V57PXAIQY",
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// start the nats-server
|
||||
exec.Command("pkill", "nats-server")
|
||||
|
||||
cmd := exec.Command("nats-server", "-c", natsCfgPath, "-js")
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatal(err) // most probably not installed
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
defer cmd.Process.Kill()
|
||||
//
|
||||
natsServer.Start()
|
||||
defer natsServer.Shutdown()
|
||||
|
||||
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{
|
||||
"ers": { // EventReaderService
|
||||
@@ -601,6 +685,7 @@ users: [
|
||||
],
|
||||
"opts": {
|
||||
"natsJetStream": true,
|
||||
"natsStreamName": "test",
|
||||
"natsSeedFile": %q,
|
||||
"natsJetStreamProcessed": true,
|
||||
"natsSubjectProcessed": "processed_cdrs",
|
||||
@@ -657,16 +742,16 @@ resolver_preload: {
|
||||
`), 0664); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// start the nats-server
|
||||
exec.Command("pkill", "nats-server")
|
||||
|
||||
cmd := exec.Command("nats-server", "-c", natsCfgPath)
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatal(err) // most probably not installed
|
||||
natsServer, err := server.NewServer(&server.Options{
|
||||
Host: "127.0.0.1",
|
||||
Port: 4222,
|
||||
ConfigFile: natsCfgPath,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
defer cmd.Process.Kill()
|
||||
//
|
||||
natsServer.Start()
|
||||
defer natsServer.Shutdown()
|
||||
|
||||
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{
|
||||
"ers": { // EventReaderService
|
||||
@@ -746,16 +831,18 @@ system_account:AAFIBB6C56ROU5XRVJLJYR3BTGGYK3HJGHEHQV7L7QZMTT3ZRBLHBS7F
|
||||
`), 0664); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// start the nats-server
|
||||
exec.Command("pkill", "nats-server")
|
||||
|
||||
cmd := exec.Command("nats-server", "-c", natsCfgPath, "-js")
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatal(err) // most probably not installed
|
||||
natsServer, err := server.NewServer(&server.Options{
|
||||
Host: "127.0.0.1",
|
||||
Port: 4222,
|
||||
ConfigFile: natsCfgPath,
|
||||
JetStream: true,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
defer cmd.Process.Kill()
|
||||
//
|
||||
natsServer.Start()
|
||||
defer natsServer.Shutdown()
|
||||
|
||||
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(fmt.Sprintf(`{
|
||||
"ers": { // EventReaderService
|
||||
"enabled": true, // starts the EventReader service: <true|false>
|
||||
@@ -776,6 +863,7 @@ system_account:AAFIBB6C56ROU5XRVJLJYR3BTGGYK3HJGHEHQV7L7QZMTT3ZRBLHBS7F
|
||||
],
|
||||
"opts": {
|
||||
"natsJetStream": true,
|
||||
"natsStreamName": "test",
|
||||
"natsJWTFile": %q,
|
||||
"natsJetStreamProcessed": true,
|
||||
"natsSubjectProcessed": "processed_cdrs",
|
||||
|
||||
@@ -191,7 +191,7 @@ func (rdr *S3ER) readLoop() (err error) {
|
||||
}
|
||||
|
||||
func (rdr *S3ER) createPoster() {
|
||||
processedOpt := getProcessOptions(rdr.Config().Opts)
|
||||
processedOpt := getProcessedOptions(rdr.Config().Opts)
|
||||
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -297,7 +297,7 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string, opts *config.EventReader
|
||||
}
|
||||
|
||||
// outURL
|
||||
processedOpt := getProcessOptions(opts)
|
||||
processedOpt := getProcessedOptions(opts)
|
||||
if processedOpt == nil {
|
||||
if len(outURL) == 0 {
|
||||
return
|
||||
|
||||
@@ -220,7 +220,7 @@ func (rdr *SQSER) readLoop(scv sqsClient) (err error) {
|
||||
}
|
||||
|
||||
func (rdr *SQSER) createPoster() {
|
||||
processedOpt := getProcessOptions(rdr.Config().Opts)
|
||||
processedOpt := getProcessedOptions(rdr.Config().Opts)
|
||||
if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
29
go.mod
29
go.mod
@@ -25,6 +25,7 @@ require (
|
||||
github.com/cgrates/ugocodec v0.0.0-20201023092048-df93d0123f60
|
||||
github.com/creack/pty v1.1.18
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6
|
||||
github.com/elastic/go-elasticsearch/v8 v8.8.0
|
||||
github.com/ericlagergren/decimal v0.0.0-20211103172832-aca2edc11f73
|
||||
github.com/fiorix/go-diameter/v4 v4.0.4
|
||||
@@ -32,15 +33,15 @@ require (
|
||||
github.com/go-sql-driver/mysql v1.6.0
|
||||
github.com/mediocregopher/radix/v3 v3.8.0
|
||||
github.com/miekg/dns v1.1.50
|
||||
github.com/nats-io/nats.go v1.16.0
|
||||
github.com/nats-io/nats.go v1.30.2
|
||||
github.com/nyaruka/phonenumbers v1.1.0
|
||||
github.com/peterh/liner v1.2.2
|
||||
github.com/rabbitmq/amqp091-go v1.5.0
|
||||
github.com/segmentio/kafka-go v0.4.32
|
||||
go.mongodb.org/mongo-driver v1.11.0
|
||||
golang.org/x/crypto v0.7.0
|
||||
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
|
||||
golang.org/x/net v0.8.0
|
||||
golang.org/x/crypto v0.13.0
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
|
||||
golang.org/x/net v0.15.0
|
||||
golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2
|
||||
google.golang.org/api v0.85.0
|
||||
gorm.io/driver/mysql v1.3.4
|
||||
@@ -80,11 +81,11 @@ require (
|
||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||
github.com/jinzhu/now v1.1.5 // indirect
|
||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
||||
github.com/klauspost/compress v1.15.6 // indirect
|
||||
github.com/klauspost/compress v1.17.0 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.13 // indirect
|
||||
github.com/mschoch/smat v0.2.0 // indirect
|
||||
github.com/nats-io/nats-server/v2 v2.2.6 // indirect
|
||||
github.com/nats-io/nkeys v0.3.0 // indirect
|
||||
github.com/nats-io/nats-server/v2 v2.10.1
|
||||
github.com/nats-io/nkeys v0.4.5 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.12.2
|
||||
@@ -97,11 +98,11 @@ require (
|
||||
github.com/xdg/stringprep v1.0.3 // indirect
|
||||
go.etcd.io/bbolt v1.3.6 // indirect
|
||||
go.opencensus.io v0.23.0 // indirect
|
||||
golang.org/x/mod v0.8.0 // indirect
|
||||
golang.org/x/sync v0.1.0 // indirect
|
||||
golang.org/x/sys v0.6.0 // indirect
|
||||
golang.org/x/text v0.8.0 // indirect
|
||||
golang.org/x/tools v0.6.0 // indirect
|
||||
golang.org/x/mod v0.12.0 // indirect
|
||||
golang.org/x/sync v0.3.0 // indirect
|
||||
golang.org/x/sys v0.12.0 // indirect
|
||||
golang.org/x/text v0.13.0 // indirect
|
||||
golang.org/x/tools v0.13.0 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
google.golang.org/genproto v0.0.0-20220627200112-0a929928cb33 // indirect
|
||||
@@ -114,11 +115,12 @@ require (
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/bits-and-blooms/bitset v1.2.2 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6 // indirect
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
|
||||
github.com/minio/highwayhash v1.0.2 // indirect
|
||||
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
|
||||
github.com/nats-io/jwt/v2 v2.5.2 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.15 // indirect
|
||||
github.com/prometheus/client_model v0.2.0 // indirect
|
||||
github.com/prometheus/common v0.35.0 // indirect
|
||||
@@ -127,4 +129,5 @@ require (
|
||||
github.com/xdg-go/scram v1.1.1 // indirect
|
||||
github.com/xdg-go/stringprep v1.0.3 // indirect
|
||||
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
|
||||
golang.org/x/time v0.3.0 // indirect
|
||||
)
|
||||
|
||||
73
go.sum
73
go.sum
@@ -185,14 +185,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6 h1:1+44gxLdKRnR/Bx/iAtr+XqNcE4e0oODa63+FABNANI=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.0.0-20230329154755-1a3c63de0db6/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
|
||||
github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA=
|
||||
github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg=
|
||||
github.com/elastic/go-elasticsearch/v8 v8.8.0 h1:yNBPlXNo6wstMG7I3KiZPbLFgA82RMryYqkh1xBMV3A=
|
||||
github.com/elastic/go-elasticsearch/v8 v8.8.0/go.mod h1:NGmpvohKiRHXI0Sw4fuUGn6hYOmAXlyCphKpzVBiqDE=
|
||||
github.com/elastic/go-elasticsearch/v8 v8.8.1 h1:/OiP5Yex40q5eWpzFVQIS8jRE7SaEZrFkG9JbE6TXtY=
|
||||
github.com/elastic/go-elasticsearch/v8 v8.8.1/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg=
|
||||
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
||||
@@ -409,11 +403,10 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
|
||||
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
|
||||
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
|
||||
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/klauspost/compress v1.15.6 h1:6D9PcO8QWu0JyaQ2zUMmu16T1T+zjjEpP91guRsvDfY=
|
||||
github.com/klauspost/compress v1.15.6/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
|
||||
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
|
||||
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||
github.com/kljensen/snowball v0.6.0/go.mod h1:27N7E8fVU5H68RlUmnWwZCfxgt4POBJfENGMvNRhldw=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
@@ -447,8 +440,8 @@ github.com/mediocregopher/radix/v3 v3.8.0 h1:HI8EgkaM7WzsrFpYAkOXIgUKbjNonb2Ne7K
|
||||
github.com/mediocregopher/radix/v3 v3.8.0/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
|
||||
github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA=
|
||||
github.com/miekg/dns v1.1.50/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME=
|
||||
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
|
||||
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
|
||||
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
|
||||
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
|
||||
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
||||
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
@@ -463,18 +456,14 @@ github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
|
||||
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
|
||||
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
|
||||
github.com/nats-io/jwt/v2 v2.0.2 h1:ejVCLO8gu6/4bOKIHQpmB5UhhUJfAQw55yvLWpfmKjI=
|
||||
github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
|
||||
github.com/nats-io/nats-server/v2 v2.2.6 h1:FPK9wWx9pagxcw14s8W9rlfzfyHm61uNLnJyybZbn48=
|
||||
github.com/nats-io/nats-server/v2 v2.2.6/go.mod h1:sEnFaxqe09cDmfMgACxZbziXnhQFhwk+aKkZjBBRYrI=
|
||||
github.com/nats-io/nats.go v1.11.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g=
|
||||
github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
|
||||
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
|
||||
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
|
||||
github.com/nats-io/nats-server/v2 v2.10.1 h1:MIJ614dhOIdo71iSzY8ln78miXwrYvlvXHUyS+XdKZQ=
|
||||
github.com/nats-io/nats-server/v2 v2.10.1/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
|
||||
github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY=
|
||||
github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
|
||||
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
|
||||
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/nyaruka/phonenumbers v1.1.0 h1:OvNAOAl4A9a2kNpzziITbUVH4bBBeKHkHl0llPmkxaA=
|
||||
@@ -643,16 +632,14 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20200320181102-891825fb96df/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
|
||||
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
|
||||
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
|
||||
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
|
||||
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
|
||||
@@ -663,8 +650,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
|
||||
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
|
||||
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
|
||||
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
|
||||
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb h1:PaBZQdo+iSDyHT053FjUCgZQ/9uqVwPOcl7KSWhKn6w=
|
||||
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
|
||||
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
|
||||
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
@@ -690,8 +677,8 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
|
||||
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
|
||||
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
@@ -743,8 +730,8 @@ golang.org/x/net v0.0.0-20220412020605-290c469a71a5/go.mod h1:CfG3xpIq0wQ8r1q4Su
|
||||
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
|
||||
golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
|
||||
golang.org/x/net v0.0.0-20220617184016-355a448f1bc9/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
|
||||
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
|
||||
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
|
||||
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
|
||||
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
@@ -779,8 +766,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
|
||||
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
@@ -860,8 +847,8 @@ golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
|
||||
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
@@ -874,13 +861,13 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
|
||||
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
|
||||
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
|
||||
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
|
||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
|
||||
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
|
||||
@@ -939,8 +926,8 @@ golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM=
|
||||
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
|
||||
golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ=
|
||||
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
|
||||
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
||||
@@ -2524,6 +2524,7 @@ const (
|
||||
NatsSubject = "natsSubject"
|
||||
NatsQueueID = "natsQueueID"
|
||||
NatsConsumerName = "natsConsumerName"
|
||||
NatsStreamName = "natsStreamName"
|
||||
NatsJWTFile = "natsJWTFile"
|
||||
NatsSeedFile = "natsSeedFile"
|
||||
NatsClientCertificate = "natsClientCertificate"
|
||||
|
||||
Reference in New Issue
Block a user