diff --git a/ees/nats.go b/ees/nats.go index f2e646ad1..cf757c773 100644 --- a/ees/nats.go +++ b/ees/nats.go @@ -167,7 +167,7 @@ func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time ok := pool.AppendCertsFromPEM(rootPEM) if !ok { return fmt.Errorf("nats: failed to parse root certificate from %q", - opts.NATSCertificateAuthority) + *opts.NATSCertificateAuthority) } if o.TLSConfig == nil { o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} diff --git a/ers/nats.go b/ers/nats.go index ee35cf9f0..05952412c 100644 --- a/ers/nats.go +++ b/ers/nats.go @@ -19,8 +19,12 @@ along with this program. If not, see package ers import ( + "crypto/tls" + "crypto/x509" "encoding/json" "fmt" + "io/ioutil" + "time" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" @@ -223,8 +227,64 @@ func (rdr *NatsER) processOpts() (err error) { rdr.jsOpts = []nats.JSOpt{nats.MaxWait(*rdr.Config().Opts.NATSJetStreamMaxWait)} } } - rdr.opts, err = ees.GetNatsOpts(rdr.Config().Opts, + rdr.opts, err = GetNatsOpts(rdr.Config().Opts, rdr.cgrCfg.GeneralCfg().NodeID, rdr.cgrCfg.GeneralCfg().ConnectTimeout) return } + +func GetNatsOpts(opts *config.EventReaderOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) { + nop = make([]nats.Option, 0, 7) + nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID), + nats.Timeout(connTimeout), + nats.DrainTimeout(time.Second)) + if opts.NATSJWTFile != nil { + keys := make([]string, 0, 1) + if opts.NATSSeedFile != nil { + keys = append(keys, *opts.NATSSeedFile) + } + nop = append(nop, nats.UserCredentials(*opts.NATSJWTFile, keys...)) + } + if opts.NATSSeedFile != nil { + opt, err := nats.NkeyOptionFromSeed(*opts.NATSSeedFile) + if err != nil { + return nil, err + } + nop = append(nop, opt) + } + if opts.NATSClientCertificate != nil { + if opts.NATSClientKey == nil { + err = fmt.Errorf("has certificate but no key") + return + } + nop = append(nop, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey)) + } else if opts.NATSClientKey != nil { + err = fmt.Errorf("has key but no certificate") + return + } + if opts.NATSCertificateAuthority != nil { + nop = append(nop, + func(o *nats.Options) error { + pool, err := x509.SystemCertPool() + if err != nil { + return err + } + rootPEM, err := ioutil.ReadFile(*opts.NATSCertificateAuthority) + if err != nil || rootPEM == nil { + return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err) + } + ok := pool.AppendCertsFromPEM(rootPEM) + if !ok { + return fmt.Errorf("nats: failed to parse root certificate from %q", + *opts.NATSCertificateAuthority) + } + if o.TLSConfig == nil { + o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} + } + o.TLSConfig.RootCAs = pool + o.Secure = true + return nil + }) + } + return +} diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go index 26fc5d24d..0e00c0552 100644 --- a/ers/nats_it_test.go +++ b/ers/nats_it_test.go @@ -32,7 +32,6 @@ import ( "time" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/ees" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/nats-io/nats.go" @@ -80,7 +79,7 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) { t.Fatal(err) } - nop, err := ees.GetNatsOpts(rdr.Config().Opts, "testExp", time.Second) + nop, err := GetNatsOpts(rdr.Config().Opts, "testExp", time.Second) if err != nil { t.Fatal(err) } @@ -167,7 +166,7 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) { t.Fatal(err) } - nop, err := ees.GetNatsOpts(rdr.Config().Opts, "testExp", time.Second) + nop, err := GetNatsOpts(rdr.Config().Opts, "testExp", time.Second) if err != nil { t.Fatal(err) }