mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-16 05:39:54 +05:00
Added nats ees
This commit is contained in:
committed by
Dan Christian Bogos
parent
82f08b3fec
commit
81f2d722f6
@@ -322,11 +322,11 @@ var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes,
|
||||
var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV,
|
||||
utils.MetaKafkajsonMap, utils.MetaFileXML, utils.MetaSQL, utils.MetaFileFWV,
|
||||
utils.MetaFileJSON, utils.MetaNone, utils.MetaAMQPjsonMap, utils.MetaS3jsonMap,
|
||||
utils.MetaSQSjsonMap, utils.MetaAMQPV1jsonMap})
|
||||
utils.MetaSQSjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaNatsjsonMap})
|
||||
|
||||
var possibleExporterTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.MetaNone, utils.MetaFileFWV,
|
||||
utils.MetaHTTPPost, utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap,
|
||||
utils.MetaKafkajsonMap, utils.MetaS3jsonMap, utils.MetaElastic, utils.MetaVirt, utils.MetaSQL})
|
||||
utils.MetaKafkajsonMap, utils.MetaS3jsonMap, utils.MetaElastic, utils.MetaVirt, utils.MetaSQL, utils.MetaNatsjsonMap})
|
||||
|
||||
// LazySanityCheck used after check config sanity to display warnings related to the config
|
||||
func (cfg *CGRConfig) LazySanityCheck() {
|
||||
|
||||
@@ -377,6 +377,24 @@ const CGRATES_CFG_JSON = `
|
||||
// "s3FolderPathProcessed": "", // only for S3 event posting
|
||||
|
||||
// "s3BucketIDProcessed": "cgrates_cdrs", // the bucket id for S3 readers were the events are sent after they are processed
|
||||
|
||||
// nats
|
||||
// "natsJetStream": false, // controls if the nats reader uses the JetStream
|
||||
"natsSubject": "cgrates_cdrs", // the subject from were the events are read
|
||||
// "natsQueueID": "", // the queue id the consumer listen to
|
||||
// "natsJWTFile": "", // the path to the JWT file( can be the chained file or the user file)
|
||||
// "natsSeedFile": "", // the path to the seed files( if the JWT file is mention this is used as seedFile for the JWT user mentioned above)
|
||||
// "natsCertificateAuthority": "", // the path to a custom certificate authority file( used by tls)
|
||||
// "natsClientCertificate": "", // the path to a client certificate( used by tls)
|
||||
// "natsClientKey": "", // the path to a client key( used by tls)
|
||||
|
||||
// "natsJetStreamProcessed": false, // controls if the nats poster uses the JetStream
|
||||
// "natsSubjectProcessed": "cgrates_cdrs", // the subject were the events are posted
|
||||
// "natsJWTFileProcessed": "", // the path to the JWT file( can be the chained file or the user file)
|
||||
// "natsSeedFileProcessed": "", // the path to the seed files( if the JWT file is mention this is used as seedFile for the JWT user mentioned above)
|
||||
// "natsCertificateAuthorityProcessed": "", // the path to a custom certificate authority file( used by tls)
|
||||
// "natsClientCertificateProcessed": "", // the path to a client certificate( used by tls)
|
||||
// "natsClientKeyProcessed": "", // the path to a client key( used by tls)
|
||||
},
|
||||
"tenant": "", // tenant used by import
|
||||
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
@@ -467,6 +485,14 @@ const CGRATES_CFG_JSON = `
|
||||
// "s3BucketID": "cgrates_cdrs", // the bucket id for S3 readers from where the events that are exported
|
||||
// "s3FolderPath": "", // S3FolderPath
|
||||
|
||||
// Nats
|
||||
// "natsJetStream": false, // controls if the nats poster uses the JetStream
|
||||
// "natsSubject": "cgrates_cdrs", // the subject were the events are exported
|
||||
// "natsJWTFile": "", // the path to the JWT file( can be the chained file or the user file)
|
||||
// "natsSeedFile": "", // the path to the seed files( if the JWT file is mention this is used as seedFile for the JWT user mentioned above)
|
||||
// "natsCertificateAuthority": "", // the path to a custom certificate authority file( used by tls)
|
||||
// "natsClientCertificate": "", // the path to a client certificate( used by tls)
|
||||
// "natsClientKey": "", // the path to a client key( used by tls)
|
||||
}, // extra options for exporter
|
||||
"tenant": "", // tenant used in filterS.Pass
|
||||
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
|
||||
@@ -1812,6 +1812,7 @@ func TestDfEventReaderCfg(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -114,6 +114,7 @@ func TestERSClone(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -154,6 +155,7 @@ func TestERSClone(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -279,6 +281,7 @@ func TestERSLoadFromjsonCfg(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -324,6 +327,7 @@ func TestERSLoadFromjsonCfg(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -579,6 +583,7 @@ func TestERSloadFromJsonCase3(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -608,6 +613,7 @@ func TestERSloadFromJsonCase3(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -711,6 +717,7 @@ func TestERSloadFromJsonCase4(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -740,6 +747,7 @@ func TestERSloadFromJsonCase4(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -838,6 +846,7 @@ func TestEventReaderSameID(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -863,6 +872,7 @@ func TestEventReaderSameID(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -969,6 +979,7 @@ func TestERsCfgAsMapInterfaceCase1(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -1003,6 +1014,7 @@ func TestERsCfgAsMapInterfaceCase1(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -1086,6 +1098,7 @@ func TestERSCfgAsMapInterfaceCase2(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -1131,6 +1144,7 @@ func TestERSCfgAsMapInterfaceCase2(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -1221,6 +1235,7 @@ func TestERsloadFromJsonCfg(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -1252,6 +1267,7 @@ func TestERsloadFromJsonCfg(t *testing.T) {
|
||||
"csvRowLength": 0.,
|
||||
"xmlRootPath": "",
|
||||
"partialOrderField": "~*req.AnswerTime",
|
||||
"natsSubject": "cgrates_cdrs",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -50,7 +50,9 @@ func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Filt
|
||||
return NewHTTPPostEe(cgrCfg, cfgIdx, filterS, dc)
|
||||
case utils.MetaHTTPjsonMap:
|
||||
return NewHTTPjsonMapEE(cgrCfg, cfgIdx, filterS, dc)
|
||||
case utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, utils.MetaKafkajsonMap, utils.MetaS3jsonMap:
|
||||
case utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap,
|
||||
utils.MetaSQSjsonMap, utils.MetaKafkajsonMap,
|
||||
utils.MetaS3jsonMap, utils.MetaNatsjsonMap:
|
||||
return NewPosterJSONMapEE(cgrCfg, cfgIdx, filterS, dc)
|
||||
case utils.MetaVirt:
|
||||
return NewVirtualExporter(cgrCfg, cfgIdx, filterS, dc)
|
||||
|
||||
@@ -53,6 +53,10 @@ func NewPosterJSONMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Fi
|
||||
case utils.MetaS3jsonMap:
|
||||
pstrJSON.poster = engine.NewS3Poster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts)
|
||||
case utils.MetaNatsjsonMap:
|
||||
pstrJSON.poster, err = engine.NewNatsPoster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
|
||||
cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts,
|
||||
cgrCfg.GeneralCfg().NodeID, cgrCfg.GeneralCfg().ConnectTimeout)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -187,6 +187,12 @@ func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *Export
|
||||
case utils.MetaS3jsonMap:
|
||||
pstr = NewS3Poster(expEv.Path, attempts, expEv.Opts)
|
||||
keyFunc = utils.UUIDSha1Prefix
|
||||
case utils.MetaNatsjsonMap:
|
||||
if pstr, err = NewNatsPoster(expEv.Path, attempts, expEv.Opts,
|
||||
config.CgrConfig().GeneralCfg().NodeID,
|
||||
config.CgrConfig().GeneralCfg().ConnectTimeout); err != nil {
|
||||
return expEv, err
|
||||
}
|
||||
}
|
||||
for _, ev := range expEv.Events {
|
||||
if err = pstr.Post(ev.([]byte), keyFunc()); err != nil {
|
||||
|
||||
184
engine/pstr_nats.go
Normal file
184
engine/pstr_nats.go
Normal file
@@ -0,0 +1,184 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package engine
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
// NewNatsPoster creates a kafka poster
|
||||
func NewNatsPoster(dialURL string, attempts int, opts map[string]interface{}, nodeID string, connTimeout time.Duration) (natsPstr *NatsPoster, err error) {
|
||||
natsPstr = &NatsPoster{
|
||||
dialURL: dialURL,
|
||||
subject: utils.DefaultQueueID,
|
||||
attempts: attempts,
|
||||
}
|
||||
err = natsPstr.parseOpt(opts, nodeID, connTimeout)
|
||||
return
|
||||
}
|
||||
|
||||
// NatsPoster is a kafka poster
|
||||
type NatsPoster struct {
|
||||
dialURL string
|
||||
subject string // identifier of the CDR queue where we publish
|
||||
attempts int
|
||||
jetStream bool
|
||||
opts []nats.Option
|
||||
sync.Mutex // protect writer
|
||||
|
||||
poster *nats.Conn
|
||||
posterJS nats.JetStreamContext
|
||||
}
|
||||
|
||||
// Post is the method being called when we need to post anything in the queue
|
||||
// the optional chn will permits channel caching
|
||||
func (pstr *NatsPoster) Post(content []byte, _ string) (err error) {
|
||||
fib := utils.Fib()
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
if err = pstr.newPostWriter(); err == nil {
|
||||
break
|
||||
}
|
||||
if i+1 < pstr.attempts {
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<NatsPoster> connecting to nats server, err: %s", err.Error()))
|
||||
return
|
||||
}
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
pstr.Lock()
|
||||
|
||||
if pstr.jetStream {
|
||||
_, err = pstr.posterJS.Publish(pstr.subject, content)
|
||||
} else {
|
||||
err = pstr.poster.Publish(pstr.subject, content)
|
||||
}
|
||||
pstr.Unlock()
|
||||
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if i+1 < pstr.attempts {
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Close closes the kafka writer
|
||||
func (pstr *NatsPoster) Close() {
|
||||
pstr.Lock()
|
||||
if pstr.poster != nil {
|
||||
pstr.poster.Drain()
|
||||
}
|
||||
pstr.poster = nil
|
||||
pstr.Unlock()
|
||||
}
|
||||
|
||||
func (pstr *NatsPoster) parseOpt(opts map[string]interface{}, nodeID string, connTimeout time.Duration) (err error) {
|
||||
if useJetStreamVal, has := opts[utils.NatsJetStream]; has {
|
||||
if pstr.jetStream, err = utils.IfaceAsBool(useJetStreamVal); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if vals, has := opts[utils.NatsSubject]; has {
|
||||
pstr.subject = utils.IfaceAsString(vals)
|
||||
}
|
||||
pstr.opts, err = GetNatsOpts(opts, nodeID, connTimeout)
|
||||
return
|
||||
}
|
||||
|
||||
func (pstr *NatsPoster) newPostWriter() (err error) {
|
||||
pstr.Lock()
|
||||
defer pstr.Unlock()
|
||||
if pstr.poster == nil {
|
||||
if pstr.poster, err = nats.Connect(pstr.dialURL, pstr.opts...); err != nil {
|
||||
return
|
||||
}
|
||||
if pstr.jetStream {
|
||||
pstr.posterJS, err = pstr.poster.JetStream()
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func GetNatsOpts(opts map[string]interface{}, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) {
|
||||
nop = make([]nats.Option, 0, 7)
|
||||
nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID),
|
||||
nats.Timeout(connTimeout),
|
||||
nats.DrainTimeout(time.Second))
|
||||
if userFile, has := opts[utils.NatsJWTFile]; has {
|
||||
keys := make([]string, 0, 1)
|
||||
if keyFile, has := opts[utils.NatsSeedFile]; has {
|
||||
keys = append(keys, utils.IfaceAsString(keyFile))
|
||||
}
|
||||
nop = append(nop, nats.UserCredentials(utils.IfaceAsString(userFile), keys...))
|
||||
}
|
||||
if nkeyFile, has := opts[utils.NatsSeedFile]; has {
|
||||
opt, err := nats.NkeyOptionFromSeed(utils.IfaceAsString(nkeyFile))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nop = append(nop, opt)
|
||||
}
|
||||
if certFile, has := opts[utils.NatsClientCertificate]; has {
|
||||
clientFile, has := opts[utils.NatsClientKey]
|
||||
if !has {
|
||||
err = fmt.Errorf("has certificate but no key")
|
||||
return
|
||||
}
|
||||
nop = append(nop, nats.ClientCert(utils.IfaceAsString(certFile), utils.IfaceAsString(clientFile)))
|
||||
} else if _, has := opts[utils.NatsClientKey]; has {
|
||||
err = fmt.Errorf("has key but no certificate")
|
||||
return
|
||||
}
|
||||
|
||||
if caFile, has := opts[utils.NatsCertificateAuthority]; has {
|
||||
nop = append(nop,
|
||||
func(o *nats.Options) error {
|
||||
pool, err := x509.SystemCertPool()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rootPEM, err := ioutil.ReadFile(utils.IfaceAsString(caFile))
|
||||
if err != nil || rootPEM == nil {
|
||||
return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err)
|
||||
}
|
||||
ok := pool.AppendCertsFromPEM(rootPEM)
|
||||
if !ok {
|
||||
return fmt.Errorf("nats: failed to parse root certificate from %q", caFile)
|
||||
}
|
||||
if o.TLSConfig == nil {
|
||||
o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
|
||||
}
|
||||
o.TLSConfig.RootCAs = pool
|
||||
o.Secure = true
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
75
ers/nats.go
75
ers/nats.go
@@ -22,6 +22,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/agents"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -32,7 +33,7 @@ import (
|
||||
// NewNatsER return a new amqp event reader
|
||||
func NewNatsER(cfg *config.CGRConfig, cfgIdx int,
|
||||
rdrEvents, partialEvents chan *erEvent, rdrErr chan error,
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}) (_ EventReader, err error) {
|
||||
rdr := &NatsER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: cfgIdx,
|
||||
@@ -48,7 +49,12 @@ func NewNatsER(cfg *config.CGRConfig, cfgIdx int,
|
||||
rdr.cap <- struct{}{}
|
||||
}
|
||||
}
|
||||
rdr.dialURL = rdr.Config().SourcePath
|
||||
if err = rdr.processOpts(); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rdr.createPoster(); err != nil {
|
||||
return
|
||||
}
|
||||
return rdr, nil
|
||||
}
|
||||
|
||||
@@ -59,22 +65,18 @@ type NatsER struct {
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
fltrS *engine.FilterS
|
||||
|
||||
dialURL string
|
||||
queueID string
|
||||
subject string
|
||||
|
||||
jetStream bool
|
||||
|
||||
rdrEvents chan *erEvent // channel to dispatch the events created to
|
||||
partialEvents chan *erEvent // channel to dispatch the partial events created to
|
||||
rdrExit chan struct{}
|
||||
rdrErr chan error
|
||||
cap chan struct{}
|
||||
|
||||
poster *nats.Conn
|
||||
posterJS nats.JetStream
|
||||
subject string
|
||||
queueID string
|
||||
jetStream bool
|
||||
opts []nats.Option
|
||||
|
||||
posterSubject string
|
||||
poster *engine.NatsPoster
|
||||
}
|
||||
|
||||
// Config returns the curent configuration
|
||||
@@ -86,19 +88,19 @@ func (rdr *NatsER) Config() *config.EventReaderCfg {
|
||||
func (rdr *NatsER) Serve() (err error) {
|
||||
// Connect to a server
|
||||
var nc *nats.Conn
|
||||
if nc, err = nats.Connect(rdr.dialURL); err != nil {
|
||||
var js nats.JetStreamContext
|
||||
|
||||
if nc, err = nats.Connect(rdr.Config().SourcePath, rdr.opts...); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
ch := make(chan *nats.Msg)
|
||||
if !rdr.jetStream {
|
||||
if _, err = nc.ChanQueueSubscribe(rdr.subject, rdr.queueID, ch); err != nil {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// Create JetStream Context
|
||||
var js nats.JetStreamContext
|
||||
if js, err = nc.JetStream(); err != nil {
|
||||
js, err = nc.JetStream()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if _, err = js.QueueSubscribe(rdr.subject, rdr.queueID, func(msg *nats.Msg) {
|
||||
@@ -115,7 +117,7 @@ func (rdr *NatsER) Serve() (err error) {
|
||||
case <-rdr.rdrExit:
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> stop monitoring nats path <%s>",
|
||||
utils.ERs, rdr.dialURL))
|
||||
utils.ERs, rdr.Config().SourcePath))
|
||||
nc.Drain()
|
||||
return
|
||||
case msg := <-ch:
|
||||
@@ -131,7 +133,7 @@ func (rdr *NatsER) Serve() (err error) {
|
||||
utils.ERs, string(msg.Data), err.Error()))
|
||||
}
|
||||
if rdr.poster != nil { // post it
|
||||
if err := rdr.postMessage(msg.Data); err != nil {
|
||||
if err := rdr.poster.Post(msg.Data, utils.EmptyString); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> writing message %s error: %s",
|
||||
utils.ERs, string(msg.Data), err.Error()))
|
||||
@@ -158,7 +160,7 @@ func (rdr *NatsER) processMessage(msg []byte) (err error) {
|
||||
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
rdr.fltrS, nil) // create an AgentRequest
|
||||
var pass bool
|
||||
if pass, err = rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
|
||||
if pass, err = rdr.fltrS.Pass(context.TODO(), agReq.Tenant, rdr.Config().Filters,
|
||||
agReq); err != nil || !pass {
|
||||
return
|
||||
}
|
||||
@@ -177,28 +179,31 @@ func (rdr *NatsER) processMessage(msg []byte) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (rdr *NatsER) postMessage(msg []byte) (err error) {
|
||||
if rdr.posterJS != nil {
|
||||
_, err = rdr.posterJS.Publish(rdr.posterSubject, msg)
|
||||
func (rdr *NatsER) createPoster() (err error) {
|
||||
processedOpt := getProcessOptions(rdr.Config().Opts)
|
||||
if len(processedOpt) == 0 &&
|
||||
len(rdr.Config().ProcessedPath) == 0 {
|
||||
return
|
||||
}
|
||||
return rdr.poster.Publish(rdr.posterSubject, msg)
|
||||
rdr.poster, err = engine.NewNatsPoster(utils.FirstNonEmpty(
|
||||
rdr.Config().ProcessedPath, rdr.Config().SourcePath),
|
||||
rdr.cgrCfg.GeneralCfg().PosterAttempts,
|
||||
processedOpt, rdr.cgrCfg.GeneralCfg().NodeID,
|
||||
rdr.cgrCfg.GeneralCfg().ConnectTimeout)
|
||||
return
|
||||
}
|
||||
|
||||
func (rdr *NatsER) createPoster(opts map[string]interface{}) (err error) {
|
||||
if rdr.poster, err = nats.Connect(utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath)); err != nil {
|
||||
return
|
||||
}
|
||||
jsOpt := rdr.jetStream
|
||||
if jsOptVal, has := opts["natsJetStreamProcessed"]; has {
|
||||
if jsOpt, err = utils.IfaceAsBool(jsOptVal); err != nil {
|
||||
func (rdr *NatsER) processOpts() (err error) {
|
||||
rdr.subject = utils.IfaceAsString(rdr.Config().Opts[utils.NatsSubject])
|
||||
rdr.queueID = utils.FirstNonEmpty(utils.IfaceAsString(rdr.Config().Opts[utils.NatsQueueID]),
|
||||
rdr.cgrCfg.GeneralCfg().NodeID)
|
||||
if useJetStreamVal, has := rdr.Config().Opts[utils.NatsJetStream]; has {
|
||||
if rdr.jetStream, err = utils.IfaceAsBool(useJetStreamVal); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if jsOpt {
|
||||
// Create JetStream Context
|
||||
rdr.posterJS, err = rdr.poster.JetStream()
|
||||
}
|
||||
rdr.opts, err = engine.GetNatsOpts(rdr.Config().Opts,
|
||||
rdr.cgrCfg.GeneralCfg().NodeID,
|
||||
rdr.cgrCfg.GeneralCfg().ConnectTimeout)
|
||||
return
|
||||
}
|
||||
|
||||
119
ers/nats_it_test.go
Normal file
119
ers/nats_it_test.go
Normal file
@@ -0,0 +1,119 @@
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package ers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
func TestNatsER(t *testing.T) {
|
||||
cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{
|
||||
"ers": { // EventReaderService
|
||||
"enabled": true, // starts the EventReader service: <true|false>
|
||||
"sessions_conns":["*localhost"],
|
||||
"readers": [
|
||||
{
|
||||
"id": "nats",
|
||||
"type": "*nats_json_map",
|
||||
"run_delay": "-1",
|
||||
"concurrent_requests": 1024,
|
||||
"source_path": "nats://localhost:4222",
|
||||
// "processed_path": "/var/spool/cgrates/ers/out",
|
||||
"tenant": "cgrates.org",
|
||||
"filters": [],
|
||||
"flags": [],
|
||||
"fields":[
|
||||
{"tag": "CGRID", "type": "*composed", "value": "~*req.CGRID", "path": "*cgreq.CGRID"},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
}`)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := cfg.CheckConfigSanity(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rdrEvents = make(chan *erEvent, 1)
|
||||
rdrErr = make(chan error, 1)
|
||||
rdrExit = make(chan struct{}, 1)
|
||||
|
||||
if rdr, err = NewNatsER(cfg, 1, rdrEvents, make(chan *erEvent, 1),
|
||||
rdrErr, new(engine.FilterS), rdrExit); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
nc, err := nats.Connect(rdr.Config().SourcePath, nats.Timeout(time.Second),
|
||||
nats.DrainTimeout(time.Second))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// js, err := nc.JetStream()
|
||||
// if err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
go rdr.Serve()
|
||||
runtime.Gosched()
|
||||
time.Sleep(time.Second)
|
||||
randomCGRID := utils.UUIDSha1Prefix()
|
||||
if err = nc.Publish(utils.DefaultQueueID, []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID))); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// if _, err = js.Publish(utils.DefaultQueueID, []byte(fmt.Sprintf(`{"CGRID": "%s"}`, randomCGRID))); err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
|
||||
nc.FlushTimeout(time.Second)
|
||||
nc.Flush()
|
||||
nc.Drain()
|
||||
|
||||
select {
|
||||
case err = <-rdrErr:
|
||||
t.Error(err)
|
||||
case ev := <-rdrEvents:
|
||||
if ev.rdrCfg.ID != "nats" {
|
||||
t.Errorf("Expected 'kakfa' received `%s`", ev.rdrCfg.ID)
|
||||
}
|
||||
expected := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: ev.cgrEvent.ID,
|
||||
Event: map[string]interface{}{
|
||||
"CGRID": randomCGRID,
|
||||
},
|
||||
APIOpts: map[string]interface{}{},
|
||||
}
|
||||
if !reflect.DeepEqual(ev.cgrEvent, expected) {
|
||||
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("Timeout")
|
||||
}
|
||||
close(rdrExit)
|
||||
}
|
||||
@@ -59,6 +59,8 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int,
|
||||
return NewSQSER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
|
||||
case utils.MetaAMQPV1jsonMap:
|
||||
return NewAMQPv1ER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
|
||||
case utils.MetaNatsjsonMap:
|
||||
return NewNatsER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
1
go.sum
1
go.sum
@@ -85,7 +85,6 @@ github.com/cgrates/aringo v0.0.0-20201113143849-3b299e4e636d h1:1PLz/t3XZy5KF8EY
|
||||
github.com/cgrates/aringo v0.0.0-20201113143849-3b299e4e636d/go.mod h1:mMAzSIjK11XfRMrOIa7DXYl64REdPldRCbAgzKB47XQ=
|
||||
github.com/cgrates/baningo v0.0.0-20210413080722-004ffd5e429f h1:dCp5BflGB8I8wlhWn4R5g0o4ok2pZRmcYHyzIks9Pbc=
|
||||
github.com/cgrates/baningo v0.0.0-20210413080722-004ffd5e429f/go.mod h1:3SwVROaS1Iml5lqEhj0gRhDRtmbBgypZpKcEkVTSleU=
|
||||
github.com/cgrates/birpc v1.3.1-0.20210413080448-f81834a37fd3 h1:AJrOcYMIQ/8X1i/kfqwOkQghxfJcPUloSRfK0n38JhI=
|
||||
github.com/cgrates/birpc v1.3.1-0.20210413080448-f81834a37fd3/go.mod h1:z/PmNnDPqSQALedKJv5T8+eXIq6XHa9J0St1YsvAVns=
|
||||
github.com/cgrates/birpc v1.3.1-0.20210517105830-c9cc855bcec5 h1:Pn9VGy13xCMm3zW5QaUCoIa3dsPbTIajgnCt4rcXJ2w=
|
||||
github.com/cgrates/birpc v1.3.1-0.20210517105830-c9cc855bcec5/go.mod h1:z/PmNnDPqSQALedKJv5T8+eXIq6XHa9J0St1YsvAVns=
|
||||
|
||||
@@ -40,6 +40,7 @@ var (
|
||||
MetaSQSjsonMap: ContentJSON,
|
||||
MetaKafkajsonMap: ContentJSON,
|
||||
MetaS3jsonMap: ContentJSON,
|
||||
MetaNatsjsonMap: ContentJSON,
|
||||
}
|
||||
|
||||
extraDBPartition = NewStringSet([]string{CacheDispatchers,
|
||||
@@ -365,6 +366,7 @@ const (
|
||||
MetaAMQPV1jsonMap = "*amqpv1_json_map"
|
||||
MetaSQSjsonMap = "*sqs_json_map"
|
||||
MetaKafkajsonMap = "*kafka_json_map"
|
||||
MetaNatsjsonMap = "*nats_json_map"
|
||||
MetaSQL = "*sql"
|
||||
MetaMySQL = "*mysql"
|
||||
MetaS3jsonMap = "*s3_json_map"
|
||||
@@ -2331,6 +2333,15 @@ const (
|
||||
ElsVersionLow = "elsVersion"
|
||||
ElsVersionType = "elsVersionType"
|
||||
ElsWaitForActiveShards = "elsWaitForActiveShards"
|
||||
// nats
|
||||
NatsSubject = "natsSubject"
|
||||
NatsQueueID = "natsQueueID"
|
||||
NatsJWTFile = "natsJWTFile"
|
||||
NatsSeedFile = "natsSeedFile"
|
||||
NatsClientCertificate = "natsClientCertificate"
|
||||
NatsClientKey = "natsClientKey"
|
||||
NatsCertificateAuthority = "natsCertificateAuthority"
|
||||
NatsJetStream = "natsJetStream"
|
||||
)
|
||||
|
||||
// Analyzers constants
|
||||
|
||||
Reference in New Issue
Block a user