diff --git a/config/config_defaults.go b/config/config_defaults.go index 45add5355..22b0449bc 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -414,6 +414,9 @@ const CGRATES_CFG_JSON = ` // "kafkaTopic": "cgrates", // the topic from were the events are read // "kafkaGroupID": "cgrates", // the group that reads the events // "kafkaMaxWait": "1ms", // the maximum amount of time to wait for new data to come + // "kafkaTLS": false, // if true it will try to authenticate the client + // "kafkaCAPath": "", // path to certificate authority pem + // "kafkaSkipTLSVerify": false, // if true it will skip certificate verification // SQL // "sqlDBName": "cgrates", // the name of the database from were the events are read @@ -534,6 +537,9 @@ const CGRATES_CFG_JSON = ` // Kafka // "kafkaTopic": "cgrates", // the topic from where the events are exported + // "kafkaTLS": false, // if true, it will try to authenticate the server + // "kafkaCAPath": "", // path to certificate authority pem + // "kafkaSkipTLSVerify: false, // if true it will skip certificate verification // AMQP diff --git a/config/eescfg.go b/config/eescfg.go index 360e7db7f..e9e968eb1 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -242,7 +242,10 @@ type RPCOpts struct { } type KafkaOpts struct { - KafkaTopic *string + Topic *string + TLS *bool + CAPath *string + SkipTLSVerify *bool } type EventExporterOpts struct { @@ -381,7 +384,16 @@ func (elsOpts *ElsOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err erro func (kafkaOpts *KafkaOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) (err error) { if jsnCfg.KafkaTopic != nil { - kafkaOpts.KafkaTopic = jsnCfg.KafkaTopic + kafkaOpts.Topic = jsnCfg.KafkaTopic + } + if jsnCfg.KafkaTLS != nil { + kafkaOpts.TLS = jsnCfg.KafkaTLS + } + if jsnCfg.KafkaCAPath != nil { + kafkaOpts.CAPath = jsnCfg.KafkaCAPath + } + if jsnCfg.KafkaSkipTLSVerify != nil { + kafkaOpts.SkipTLSVerify = jsnCfg.KafkaSkipTLSVerify } return } @@ -711,10 +723,21 @@ func (elsOpts *ElsOpts) Clone() *ElsOpts { func (kafkaOpts *KafkaOpts) Clone() *KafkaOpts { cln := &KafkaOpts{} - - if kafkaOpts.KafkaTopic != nil { - cln.KafkaTopic = new(string) - *cln.KafkaTopic = *kafkaOpts.KafkaTopic + if kafkaOpts.Topic != nil { + cln.Topic = new(string) + *cln.Topic = *kafkaOpts.Topic + } + if kafkaOpts.TLS != nil { + cln.TLS = new(bool) + *cln.TLS = *kafkaOpts.TLS + } + if kafkaOpts.CAPath != nil { + cln.CAPath = new(string) + *cln.CAPath = *kafkaOpts.CAPath + } + if kafkaOpts.SkipTLSVerify != nil { + cln.SkipTLSVerify = new(bool) + *cln.SkipTLSVerify = *kafkaOpts.SkipTLSVerify } return cln } @@ -1032,8 +1055,17 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str } } if kafkaOpts := eeC.Opts.Kafka; kafkaOpts != nil { - if kafkaOpts.KafkaTopic != nil { - opts[utils.KafkaTopic] = *kafkaOpts.KafkaTopic + if kafkaOpts.Topic != nil { + opts[utils.KafkaTopic] = *kafkaOpts.Topic + } + if kafkaOpts.TLS != nil { + opts[utils.KafkaTLS] = *kafkaOpts.TLS + } + if kafkaOpts.CAPath != nil { + opts[utils.KafkaCAPath] = *kafkaOpts.CAPath + } + if kafkaOpts.SkipTLSVerify != nil { + opts[utils.KafkaSkipTLSVerify] = *kafkaOpts.SkipTLSVerify } } if amOpts := eeC.Opts.AMQP; amOpts != nil { diff --git a/config/eescfg_test.go b/config/eescfg_test.go index 3b8749afa..c0ee8326e 100644 --- a/config/eescfg_test.go +++ b/config/eescfg_test.go @@ -255,7 +255,7 @@ func TestEESClone(t *testing.T) { WaitForActiveShards: utils.StringPointer("test6"), }, Kafka: &KafkaOpts{ - KafkaTopic: utils.StringPointer("kafka"), + Topic: utils.StringPointer("kafka"), }, AWS: &AWSOpts{ Token: utils.StringPointer("token"), diff --git a/config/erscfg.go b/config/erscfg.go index 2b10d8230..94d04f05c 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -197,9 +197,12 @@ func (amqpr *AMQPROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) } type KafkaROpts struct { - Topic *string - GroupID *string - MaxWait *time.Duration + Topic *string + GroupID *string + MaxWait *time.Duration + TLS *bool + CAPath *string + SkipTLSVerify *bool } func (kafkaROpts *KafkaROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) { @@ -216,6 +219,15 @@ func (kafkaROpts *KafkaROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err } kafkaROpts.MaxWait = utils.DurationPointer(kafkaMaxWait) } + if jsnCfg.KafkaTLS != nil { + kafkaROpts.TLS = jsnCfg.KafkaTLS + } + if jsnCfg.KafkaCAPath != nil { + kafkaROpts.CAPath = jsnCfg.KafkaCAPath + } + if jsnCfg.KafkaSkipTLSVerify != nil { + kafkaROpts.SkipTLSVerify = jsnCfg.KafkaSkipTLSVerify + } return } @@ -592,6 +604,18 @@ func (kafkaOpts *KafkaROpts) Clone() *KafkaROpts { cln.MaxWait = new(time.Duration) *cln.MaxWait = *kafkaOpts.MaxWait } + if kafkaOpts.TLS != nil { + cln.TLS = new(bool) + *cln.TLS = *kafkaOpts.TLS + } + if kafkaOpts.CAPath != nil { + cln.CAPath = new(string) + *cln.CAPath = *kafkaOpts.CAPath + } + if kafkaOpts.SkipTLSVerify != nil { + cln.SkipTLSVerify = new(bool) + *cln.SkipTLSVerify = *kafkaOpts.SkipTLSVerify + } return cln } @@ -837,6 +861,15 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string if kafkaOpts.MaxWait != nil { opts[utils.KafkaMaxWait] = kafkaOpts.MaxWait.String() } + if kafkaOpts.TLS != nil { + opts[utils.KafkaTLS] = *kafkaOpts.TLS + } + if kafkaOpts.CAPath != nil { + opts[utils.KafkaCAPath] = *kafkaOpts.CAPath + } + if kafkaOpts.SkipTLSVerify != nil { + opts[utils.KafkaSkipTLSVerify] = *kafkaOpts.SkipTLSVerify + } } if sqlOpts := er.Opts.SQL; sqlOpts != nil { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index b333c98f4..71b09b149 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -229,6 +229,9 @@ type EventReaderOptsJson struct { KafkaTopic *string `json:"kafkaTopic"` KafkaGroupID *string `json:"kafkaGroupID"` KafkaMaxWait *string `json:"kafkaMaxWait"` + KafkaTLS *bool `json:"kafkaTLS"` + KafkaCAPath *string `json:"kafkaCAPath"` + KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"` SQLDBName *string `json:"sqlDBName"` SQLTableName *string `json:"sqlTableName"` PgSSLMode *string `json:"pgSSLMode"` @@ -316,6 +319,9 @@ type EventExporterOptsJson struct { SQLDBName *string `json:"sqlDBName"` PgSSLMode *string `json:"pgSSLMode"` KafkaTopic *string `json:"kafkaTopic"` + KafkaTLS *bool `json:"kafkaTLS"` + KafkaCAPath *string `json:"kafkaCAPath"` + KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"` AMQPQueueID *string `json:"amqpQueueID"` AMQPRoutingKey *string `json:"amqpRoutingKey"` AMQPExchange *string `json:"amqpExchange"` diff --git a/data/conf/samples/kafka_ssl/cgrates.json b/data/conf/samples/kafka_ssl/cgrates.json new file mode 100644 index 000000000..8f115570b --- /dev/null +++ b/data/conf/samples/kafka_ssl/cgrates.json @@ -0,0 +1,85 @@ +{ + +"logger": { + "type": "*syslog", + "level": 7 +}, + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080" +}, + +"data_db": { + "db_type": "redis", + "db_port": 6379, + "db_name": "10" +}, + +"ees": { + "enabled": true, + "exporters": [ + { + "id": "*default", + "type": "*kafka_json_map", + "export_path": "localhost:9093", + "opts": { + "kafkaTopic": "ssl-topic", + "kafkaTLS": true, + "kafkaCAPath": "/home/kafka/kafka/ssl/ca.pem", + "kafkaSkipTLSVerify": false + }, + "failed_posts_dir": "/var/spool/cgrates/failed_posts" + }, + { + "id": "kafka_processed", + "type": "*kafka_json_map", + "export_path": "localhost:9092", + "opts": { + "kafkaTopic": "processed-topic", + "kafkaTLS": false, + "kafkaCAPath": "/home/kafka/kafka/ssl/ca.pem", + "kafkaSkipTLSVerify": false + }, + "failed_posts_dir": "/var/spool/cgrates/failed_posts" + } + ] +}, + +"ers": { + "enabled": true, + "sessions_conns":[], + "ees_conns": ["*internal"], + "readers": [ + { + "id": "*default", + "type": "*kafka_json_map", + "run_delay": "-1", + "flags": ["*dryRun"], + "source_path": "localhost:9093", + "ees_success_ids": ["kafka_processed"], + "opts": { + "kafkaTopic": "ssl-topic", + "kafkaTLS": true, + "kafkaCAPath": "/home/kafka/kafka/ssl/ca.pem", + "kafkaSkipTLSVerify": false + }, + "fields": [ + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.ToR", "mandatory": true}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.OriginID", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.RequestType", "mandatory": true}, + {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*variable", "value": "~*req.Tenant", "mandatory": true}, + {"tag": "Category", "path": "*cgreq.Category", "type": "*variable", "value": "~*req.Category", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.Account", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.Subject", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.Destination", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.Usage", "mandatory": true} + ] + } + ] +} + +} \ No newline at end of file diff --git a/ees/kafka.go b/ees/kafka.go index dd681fdf9..10c7ca14a 100644 --- a/ees/kafka.go +++ b/ees/kafka.go @@ -19,7 +19,12 @@ package ees import ( "context" + "crypto/tls" + "crypto/x509" + "net" + "os" "sync" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" @@ -34,18 +39,28 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE topic: utils.DefaultQueueID, reqs: newConcReq(cfg.ConcurrentRequests), } - if kafkaOpts := cfg.Opts.Kafka; kafkaOpts != nil { - if kafkaOpts.KafkaTopic != nil { - kfkPstr.topic = *cfg.Opts.Kafka.KafkaTopic - } + if cfg.Opts.Kafka.Topic != nil { + kfkPstr.topic = *cfg.Opts.Kafka.Topic + } + if cfg.Opts.Kafka.TLS != nil && *cfg.Opts.Kafka.TLS { + kfkPstr.tls = true + } + if cfg.Opts.Kafka.CAPath != nil { + kfkPstr.caPath = *cfg.Opts.Kafka.CAPath + } + if cfg.Opts.Kafka.SkipTLSVerify != nil && *cfg.Opts.Kafka.SkipTLSVerify { + kfkPstr.skipTLSVerify = true } return kfkPstr } // KafkaEE is a kafka poster type KafkaEE struct { - topic string // identifier of the CDR queue where we publish - writer *kafka.Writer + topic string // identifier of the CDR queue where we publish + tls bool // if true, it will attempt to authenticate the server + caPath string // path to CA pem file + skipTLSVerify bool // if true, it skips certificate verification + writer *kafka.Writer cfg *config.EventExporterCfg dc *utils.SafeMapStorage @@ -58,6 +73,7 @@ func (pstr *KafkaEE) Cfg() *config.EventExporterCfg { return pstr.cfg } func (pstr *KafkaEE) Connect() (_ error) { pstr.Lock() + defer pstr.Unlock() if pstr.writer == nil { pstr.writer = &kafka.Writer{ Addr: kafka.TCP(pstr.Cfg().ExportPath), @@ -65,7 +81,35 @@ func (pstr *KafkaEE) Connect() (_ error) { MaxAttempts: pstr.Cfg().Attempts, } } - pstr.Unlock() + if pstr.tls { + rootCAs, err := x509.SystemCertPool() + if err != nil { + return + } + if rootCAs == nil { + rootCAs = x509.NewCertPool() + } + if pstr.caPath != "" { + ca, err := os.ReadFile(pstr.caPath) + if err != nil { + return + } + if !rootCAs.AppendCertsFromPEM(ca) { + return + } + } + pstr.writer.Transport = &kafka.Transport{ + Dial: (&net.Dialer{ + Timeout: 3 * time.Second, + DualStack: true, + }).DialContext, + TLS: &tls.Config{ + RootCAs: rootCAs, + InsecureSkipVerify: pstr.skipTLSVerify, + }, + } + } + return } diff --git a/ees/libcdre.go b/ees/libcdre.go index 06dfcde7b..f5f0e8b85 100644 --- a/ees/libcdre.go +++ b/ees/libcdre.go @@ -79,8 +79,8 @@ func AddFailedPost(failedPostsDir, expPath, format string, ev any, opts *config. } } if kfkOpts := opts.Kafka; kfkOpts != nil { - if opts.Kafka.KafkaTopic != nil { - kafkaTopic = *opts.Kafka.KafkaTopic + if opts.Kafka.Topic != nil { + kafkaTopic = *opts.Kafka.Topic } } if qID := utils.FirstNonEmpty(amqpQueueID, s3BucketID, sqsQueueID, diff --git a/ees/poster_test.go b/ees/poster_test.go index 2282a90c4..4578d4740 100644 --- a/ees/poster_test.go +++ b/ees/poster_test.go @@ -56,7 +56,7 @@ func TestKafkaParseURL(t *testing.T) { Attempts: 10, Opts: &config.EventExporterOpts{ Kafka: &config.KafkaOpts{ - KafkaTopic: utils.StringPointer("cdr_billing"), + Topic: utils.StringPointer("cdr_billing"), }, }, } diff --git a/engine/account_test.go b/engine/account_test.go index 23cb01551..018ab0707 100644 --- a/engine/account_test.go +++ b/engine/account_test.go @@ -274,15 +274,15 @@ func TestDebitCreditBlocker(t *testing.T) { var err error cc, err = rifsBalance.debitCreditBalance(cd, false, true, true, nil) if err != nil { - t.Error("Error debiting balance: ", err) + t.Fatal("Error debiting balance: ", err) } if len(cc.Timespans) != 0 { - t.Error("Wrong call cost: ", utils.ToIJSON(cc)) + t.Error("Wrong call cost: ", utils.ToJSON(cc)) } if rifsBalance.BalanceMap[utils.MetaMonetary][0].GetValue() != 0.1152 || rifsBalance.BalanceMap[utils.MetaMonetary][1].GetValue() != 1.5 { t.Error("should not have touched the balances: ", - utils.ToIJSON(rifsBalance.BalanceMap[utils.MetaMonetary])) + utils.ToJSON(rifsBalance.BalanceMap[utils.MetaMonetary])) } } diff --git a/ers/kafka.go b/ers/kafka.go index 9ede6c64f..ad01de723 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -20,9 +20,12 @@ package ers import ( "context" + "crypto/tls" + "crypto/x509" "encoding/json" "fmt" "io" + "os" "time" "github.com/cgrates/cgrates/agents" @@ -67,10 +70,13 @@ type KafkaER struct { cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - dialURL string - topic string - groupID string - maxWait time.Duration + dialURL string + topic string + groupID string + maxWait time.Duration + tls bool // if true it will attempt to authenticate the server it connects to + caPath string // path to CA pem file + skipTLSVerify bool // if true it skips certificate validation rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to @@ -86,12 +92,41 @@ func (rdr *KafkaER) Config() *config.EventReaderCfg { // Serve will start the gorutines needed to watch the kafka topic func (rdr *KafkaER) Serve() (err error) { - r := kafka.NewReader(kafka.ReaderConfig{ + readerCfg := kafka.ReaderConfig{ Brokers: []string{rdr.dialURL}, GroupID: rdr.groupID, Topic: rdr.topic, MaxWait: rdr.maxWait, - }) + } + + if rdr.tls { + var rootCAs *x509.CertPool + if rootCAs, err = x509.SystemCertPool(); err != nil { + return + } + if rootCAs == nil { + rootCAs = x509.NewCertPool() + } + if rdr.caPath != "" { + var ca []byte + if ca, err = os.ReadFile(rdr.caPath); err != nil { + return + } + if !rootCAs.AppendCertsFromPEM(ca) { + return + } + } + readerCfg.Dialer = &kafka.Dialer{ + Timeout: 10 * time.Second, + DualStack: true, + TLS: &tls.Config{ + RootCAs: rootCAs, + InsecureSkipVerify: rdr.skipTLSVerify, + }, + } + } + + r := kafka.NewReader(readerCfg) if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API return @@ -184,6 +219,15 @@ func (rdr *KafkaER) setOpts(opts *config.EventReaderOpts) (err error) { if kfkOpts.MaxWait != nil { rdr.maxWait = *kfkOpts.MaxWait } + if kfkOpts.TLS != nil && *kfkOpts.TLS { + rdr.tls = true + } + if kfkOpts.CAPath != nil { + rdr.caPath = *kfkOpts.CAPath + } + if kfkOpts.SkipTLSVerify != nil && *kfkOpts.SkipTLSVerify { + rdr.skipTLSVerify = true + } } return } diff --git a/general_tests/kafka_ssl_it_test.go b/general_tests/kafka_ssl_it_test.go new file mode 100644 index 000000000..00436847b --- /dev/null +++ b/general_tests/kafka_ssl_it_test.go @@ -0,0 +1,180 @@ +//go:build kafka +// +build kafka + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package general_tests + +import ( + "path" + "testing" + "time" + + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/segmentio/kafka-go" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + kafkaSSLConfigDir string + kafkaSSLCfgPath string + kafkaSSLCfg *config.CGRConfig + kafkaSSLRpc *birpc.Client + + sTestsKafkaSSL = []func(t *testing.T){ + testKafkaSSLLoadConfig, + testKafkaSSLResetDataDB, + + testKafkaSSLStartEngine, + testKafkaSSLRPCConn, + testKafkaSSLExportEvent, // exports event to ssl-topic, then the reader will consume said event and export it to processed-topic + testKafkaSSLVerifyProcessedExport, // checks whether ERs managed to successfully read and export the events served by Kafka server + testKafkaSSLStopEngine, + } +) + +// The test is exporting and reading from a kafka broker with the following configuration + +/* +listeners=PLAINTEXT://:9092,SSL://localhost:9093 +... +advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093 +... +ssl.truststore.location=/home/kafka/kafka/ssl/kafka.server.truststore.jks +ssl.truststore.password=123456 +ssl.keystore.type=PKCS12 +ssl.keystore.location=/home/kafka/kafka/ssl/kafka.server.keystore.p12 +ssl.keystore.password=123456 +ssl.key.password=123456 +ssl.client.auth=none +ssl.protocol=TLSv1.2 +security.inter.broker.protocol=SSL +*/ + +// How to create TLS keys and certificates: + +/* +1. Generate CA if needed (openssl req -new -x509 -keyout ca-key.pem -out ca.pem -days 365); +2. Add the generated CA to the brokers’ truststore; +3. Generate key-certificate pair using the CA from step 1 to sign it and convert the pem files to p12 format; +4. Import both the certificate of the CA and the signed certificate into the broker keystore. +*/ + +func TestKafkaSSL(t *testing.T) { + kafkaSSLConfigDir = "kafka_ssl" + for _, stest := range sTestsKafkaSSL { + t.Run(kafkaSSLConfigDir, stest) + } +} + +func testKafkaSSLLoadConfig(t *testing.T) { + var err error + kafkaSSLCfgPath = path.Join(*dataDir, "conf", "samples", kafkaSSLConfigDir) + if kafkaSSLCfg, err = config.NewCGRConfigFromPath(kafkaSSLCfgPath); err != nil { + t.Error(err) + } +} + +func testKafkaSSLResetDataDB(t *testing.T) { + if err := engine.InitDataDb(kafkaSSLCfg); err != nil { + t.Fatal(err) + } +} + +func testKafkaSSLStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(kafkaSSLCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testKafkaSSLRPCConn(t *testing.T) { + var err error + kafkaSSLRpc, err = newRPCClient(kafkaSSLCfg.ListenCfg()) + if err != nil { + t.Fatal(err) + } +} + +func testKafkaSSLExportEvent(t *testing.T) { + event := &engine.CGREventWithEeIDs{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "KafkaEvent", + Event: map[string]interface{}{ + utils.ToR: utils.MetaVoice, + utils.OriginID: "abcdef", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.MetaRated, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.AccountField: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813748, 0).UTC(), + utils.Usage: 10 * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 1.01, + }, + }, + } + + var reply map[string]map[string]interface{} + if err := kafkaSSLRpc.Call(context.Background(), utils.EeSv1ProcessEvent, event, &reply); err != nil { + t.Error(err) + } + time.Sleep(time.Second) +} + +func testKafkaSSLVerifyProcessedExport(t *testing.T) { + r := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: "processed-topic", + // MinBytes: 10e3, // 10KB + // MaxBytes: 10e6, // 10MB + }) + + ctx, cancel := context.WithCancel(context.Background()) + var rcv string + for { + m, err := r.ReadMessage(ctx) + if err != nil { + break + } + rcv = string(m.Value) + cancel() + } + + exp := `{"Account":"1001","AnswerTime":"2013-11-07T08:42:28Z","Category":"call","Cost":1.01,"Destination":"1002","OriginHost":"192.168.1.1","OriginID":"abcdef","RequestType":"*rated","RunID":"*default","SetupTime":"2013-11-07T08:42:25Z","Subject":"1001","Tenant":"cgrates.org","ToR":"*voice","Usage":10000000000}` + + if rcv != exp { + t.Errorf("expected: <%+v>, \nreceived: <%+v>", exp, rcv) + } + + if err := r.Close(); err != nil { + t.Fatal("failed to close reader:", err) + } +} + +func testKafkaSSLStopEngine(t *testing.T) { + if err := engine.KillEngine(*waitRater); err != nil { + t.Error(err) + } +} diff --git a/utils/consts.go b/utils/consts.go index 55c215f2d..11813eb9d 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2655,9 +2655,12 @@ const ( KafkaDefaultGroupID = "cgrates" KafkaDefaultMaxWait = time.Millisecond - KafkaTopic = "kafkaTopic" - KafkaGroupID = "kafkaGroupID" - KafkaMaxWait = "kafkaMaxWait" + KafkaTopic = "kafkaTopic" + KafkaTLS = "kafkaTLS" + KafkaCAPath = "kafkaCAPath" + KafkaSkipTLSVerify = "kafkaSkipTLSVerify" + KafkaGroupID = "kafkaGroupID" + KafkaMaxWait = "kafkaMaxWait" // partial PartialOpt = "*partial"