Fix all compilation errors related to ers after making ees opts of type struct

This commit is contained in:
ionutboangiu
2021-11-25 17:20:33 +02:00
committed by Dan Christian Bogos
parent aa0ae292a2
commit 40eb832060
27 changed files with 355 additions and 459 deletions

View File

@@ -55,21 +55,21 @@ type AMQPee struct {
bytePreparing
}
func (pstr *AMQPee) parseOpts(dialURL map[string]interface{}) {
func (pstr *AMQPee) parseOpts(dialURL *config.EventExporterOpts) {
pstr.queueID = utils.DefaultQueueID
pstr.routingKey = utils.DefaultQueueID
if vals, has := dialURL[utils.AMQPQueueID]; has {
pstr.queueID = utils.IfaceAsString(vals)
if dialURL.AMQPQueueID != nil {
pstr.queueID = *dialURL.AMQPQueueID
}
if vals, has := dialURL[utils.AMQPRoutingKey]; has {
pstr.routingKey = utils.IfaceAsString(vals)
if dialURL.AMQPRoutingKey != nil {
pstr.routingKey = *dialURL.AMQPRoutingKey
}
if vals, has := dialURL[utils.AMQPExchange]; has {
pstr.exchange = utils.IfaceAsString(vals)
if dialURL.AMQPExchange != nil {
pstr.exchange = *dialURL.AMQPExchange
pstr.exchangeType = utils.DefaultExchangeType
}
if vals, has := dialURL[utils.AMQPExchangeType]; has {
pstr.exchangeType = utils.IfaceAsString(vals)
if dialURL.AMQPExchangeType != nil {
pstr.exchangeType = *dialURL.AMQPExchangeType
}
}

View File

@@ -35,8 +35,8 @@ func NewAMQPv1EE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPv1
queueID: "/" + utils.DefaultQueueID,
reqs: newConcReq(cfg.ConcurrentRequests),
}
if vals, has := cfg.Opts[utils.AMQPQueueID]; has {
pstr.queueID = "/" + utils.IfaceAsString(vals)
if cfg.Opts.AMQPQueueID != nil {
pstr.queueID = "/" + *cfg.Opts.AMQPQueueID
}
return pstr
}

View File

@@ -58,49 +58,29 @@ type ElasticEE struct {
func (eEe *ElasticEE) prepareOpts() (err error) {
//parse opts
eEe.opts.Index = utils.CDRsTBL
if val, has := eEe.Cfg().Opts[utils.ElsIndex]; has {
eEe.opts.Index = utils.IfaceAsString(val)
if eEe.Cfg().Opts.ElsIndex != nil {
eEe.opts.Index = *eEe.Cfg().Opts.ElsIndex
}
if val, has := eEe.Cfg().Opts[utils.ElsIfPrimaryTerm]; has {
var intVal int64
if intVal, err = utils.IfaceAsTInt64(val); err != nil {
return
}
eEe.opts.IfPrimaryTerm = utils.IntPointer(int(intVal))
eEe.opts.IfPrimaryTerm = eEe.Cfg().Opts.ElsIfPrimaryTerm
eEe.opts.IfSeqNo = eEe.Cfg().Opts.ElsIfSeqNo
if eEe.Cfg().Opts.ElsOpType != nil {
eEe.opts.OpType = *eEe.Cfg().Opts.ElsOpType
}
if val, has := eEe.Cfg().Opts[utils.ElsIfSeqNo]; has {
var intVal int64
if intVal, err = utils.IfaceAsTInt64(val); err != nil {
return
}
eEe.opts.IfSeqNo = utils.IntPointer(int(intVal))
if eEe.Cfg().Opts.ElsPipeline != nil {
eEe.opts.Pipeline = *eEe.Cfg().Opts.ElsPipeline
}
if val, has := eEe.Cfg().Opts[utils.ElsOpType]; has {
eEe.opts.OpType = utils.IfaceAsString(val)
if eEe.Cfg().Opts.ElsRouting != nil {
eEe.opts.Routing = *eEe.Cfg().Opts.ElsRouting
}
if val, has := eEe.Cfg().Opts[utils.ElsPipeline]; has {
eEe.opts.Pipeline = utils.IfaceAsString(val)
if eEe.Cfg().Opts.ElsTimeout != nil {
eEe.opts.Timeout = *eEe.Cfg().Opts.ElsTimeout
}
if val, has := eEe.Cfg().Opts[utils.ElsRouting]; has {
eEe.opts.Routing = utils.IfaceAsString(val)
eEe.opts.Version = eEe.Cfg().Opts.ElsVersion
if eEe.Cfg().Opts.ElsVersionType != nil {
eEe.opts.VersionType = *eEe.Cfg().Opts.ElsVersionType
}
if val, has := eEe.Cfg().Opts[utils.ElsTimeout]; has {
if eEe.opts.Timeout, err = utils.IfaceAsDuration(val); err != nil {
return
}
}
if val, has := eEe.Cfg().Opts[utils.ElsVersionLow]; has {
var intVal int64
if intVal, err = utils.IfaceAsTInt64(val); err != nil {
return
}
eEe.opts.Version = utils.IntPointer(int(intVal))
}
if val, has := eEe.Cfg().Opts[utils.ElsVersionType]; has {
eEe.opts.VersionType = utils.IfaceAsString(val)
}
if val, has := eEe.Cfg().Opts[utils.ElsWaitForActiveShards]; has {
eEe.opts.WaitForActiveShards = utils.IfaceAsString(val)
if eEe.Cfg().Opts.ElsWaitForActiveShards != nil {
eEe.opts.WaitForActiveShards = *eEe.Cfg().Opts.ElsWaitForActiveShards
}
return
}

View File

@@ -56,7 +56,9 @@ func TestInitClient(t *testing.T) {
func TestInitCase1(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: map[string]interface{}{utils.ElsIndex: "test"},
Opts: &config.EventExporterOpts{
ElsIndex: utils.StringPointer("test"),
},
},
}
if err := ee.prepareOpts(); err != nil {
@@ -71,7 +73,9 @@ func TestInitCase1(t *testing.T) {
func TestInitCase2(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: map[string]interface{}{utils.ElsIfPrimaryTerm: 20},
Opts: &config.EventExporterOpts{
ElsIfPrimaryTerm: utils.IntPointer(20),
},
},
}
if err := ee.prepareOpts(); err != nil {
@@ -83,22 +87,12 @@ func TestInitCase2(t *testing.T) {
}
}
func TestInitCase2Err(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: map[string]interface{}{utils.ElsIfPrimaryTerm: "test"},
},
}
errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax"
if err := ee.prepareOpts(); err == nil || err.Error() != errExpect {
t.Errorf("Expected %+v \n but got %+v", errExpect, err)
}
}
func TestInitCase3(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: map[string]interface{}{utils.ElsIfSeqNo: 20},
Opts: &config.EventExporterOpts{
ElsIfSeqNo: utils.IntPointer(20),
},
},
}
if err := ee.prepareOpts(); err != nil {
@@ -110,22 +104,12 @@ func TestInitCase3(t *testing.T) {
}
}
func TestInitCase3Err(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: map[string]interface{}{utils.ElsIfSeqNo: "test"},
},
}
errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax"
if err := ee.prepareOpts(); err == nil || err.Error() != errExpect {
t.Errorf("Expected %+v \n but got %+v", errExpect, err)
}
}
func TestInitCase4(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: map[string]interface{}{utils.ElsOpType: "test"},
Opts: &config.EventExporterOpts{
ElsOpType: utils.StringPointer("test"),
},
},
}
if err := ee.prepareOpts(); err != nil {
@@ -140,7 +124,9 @@ func TestInitCase4(t *testing.T) {
func TestInitCase5(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: map[string]interface{}{utils.ElsPipeline: "test"},
Opts: &config.EventExporterOpts{
ElsPipeline: utils.StringPointer("test"),
},
},
}
if err := ee.prepareOpts(); err != nil {
@@ -155,7 +141,9 @@ func TestInitCase5(t *testing.T) {
func TestInitCase6(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: map[string]interface{}{utils.ElsRouting: "test"},
Opts: &config.EventExporterOpts{
ElsRouting: utils.StringPointer("test"),
},
},
}
if err := ee.prepareOpts(); err != nil {
@@ -167,22 +155,12 @@ func TestInitCase6(t *testing.T) {
}
}
func TestInitCase7(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: map[string]interface{}{utils.ElsTimeout: "test"},
},
}
errExpect := "time: invalid duration \"test\""
if err := ee.prepareOpts(); err == nil || err.Error() != errExpect {
t.Errorf("Expected %+v \n but got %+v", errExpect, err)
}
}
func TestInitCase8(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: map[string]interface{}{utils.ElsVersionLow: 20},
Opts: &config.EventExporterOpts{
ElsVersion: utils.IntPointer(20),
},
},
}
if err := ee.prepareOpts(); err != nil {
@@ -194,22 +172,12 @@ func TestInitCase8(t *testing.T) {
}
}
func TestInitCase8Err(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: map[string]interface{}{utils.ElsVersionLow: "test"},
},
}
errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax"
if err := ee.prepareOpts(); err == nil || err.Error() != errExpect {
t.Errorf("Expected %+v \n but got %+v", errExpect, err)
}
}
func TestInitCase9(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: map[string]interface{}{utils.ElsVersionType: "test"},
Opts: &config.EventExporterOpts{
ElsVersionType: utils.StringPointer("test"),
},
},
}
if err := ee.prepareOpts(); err != nil {
@@ -224,7 +192,9 @@ func TestInitCase9(t *testing.T) {
func TestInitCase10(t *testing.T) {
ee := &ElasticEE{
cfg: &config.EventExporterCfg{
Opts: map[string]interface{}{utils.ElsWaitForActiveShards: "test"},
Opts: &config.EventExporterOpts{
ElsWaitForActiveShards: utils.StringPointer("test"),
},
},
}
if err := ee.prepareOpts(); err != nil {

View File

@@ -73,8 +73,8 @@ func (fCsv *FileCSVee) init() (err error) {
}
fCsv.csvWriter = csv.NewWriter(fCsv.file)
fCsv.csvWriter.Comma = utils.CSVSep
if fieldSep, has := fCsv.Cfg().Opts[utils.CSVFieldSepOpt]; has {
fCsv.csvWriter.Comma = rune(utils.IfaceAsString(fieldSep)[0])
if fCsv.Cfg().Opts.CSVFieldSeparator != nil {
fCsv.csvWriter.Comma = rune((*fCsv.Cfg().Opts.CSVFieldSeparator)[0])
}
return fCsv.composeHeader()
}

View File

@@ -34,8 +34,8 @@ func NewKafkaEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *KafkaEE
topic: utils.DefaultQueueID,
reqs: newConcReq(cfg.ConcurrentRequests),
}
if vals, has := cfg.Opts[utils.KafkaTopic]; has {
kfkPstr.topic = utils.IfaceAsString(vals)
if cfg.Opts.KafkaTopic != nil {
kfkPstr.topic = *cfg.Opts.KafkaTopic
}
return kfkPstr
}

View File

@@ -56,14 +56,27 @@ func writeFailedPosts(_ string, value interface{}) {
}
}
func AddFailedPost(failedPostsDir, expPath, format string, ev interface{}, opts map[string]interface{}) {
func AddFailedPost(failedPostsDir, expPath, format string, ev interface{}, opts *config.EventExporterOpts) {
key := utils.ConcatenatedKey(failedPostsDir, expPath, format)
// also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id
if qID := utils.FirstNonEmpty(
utils.IfaceAsString(opts[utils.AMQPQueueID]),
utils.IfaceAsString(opts[utils.S3Bucket]),
utils.IfaceAsString(opts[utils.SQSQueueID]),
utils.IfaceAsString(opts[utils.KafkaTopic])); len(qID) != 0 {
var amqpQueueID string
var s3BucketID string
var sqsQueueID string
var kafkaTopic string
if opts.AMQPQueueID != nil {
amqpQueueID = *opts.AMQPQueueID
}
if opts.S3BucketID != nil {
s3BucketID = *opts.S3BucketID
}
if opts.SQSQueueID != nil {
sqsQueueID = *opts.SQSQueueID
}
if opts.KafkaTopic != nil {
kafkaTopic = *opts.KafkaTopic
}
if qID := utils.FirstNonEmpty(amqpQueueID, s3BucketID, sqsQueueID,
kafkaTopic); len(qID) != 0 {
key = utils.ConcatenatedKey(key, qID)
}
var failedPost *ExportEvents
@@ -107,7 +120,7 @@ func NewExportEventsFromFile(filePath string) (expEv *ExportEvents, err error) {
type ExportEvents struct {
lk sync.RWMutex
Path string
Opts map[string]interface{}
Opts *config.EventExporterOpts
Format string
Events []interface{}
failedPostsDir string

View File

@@ -24,6 +24,7 @@ import (
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -38,7 +39,7 @@ func TestSetFldPostCacheTTL(t *testing.T) {
func TestAddFldPost(t *testing.T) {
SetFailedPostCacheTTL(5 * time.Second)
AddFailedPost("", "path1", "format1", "1", make(map[string]interface{}))
AddFailedPost("", "path1", "format1", "1", &config.EventExporterOpts{})
x, ok := failedPostCache.Get(utils.ConcatenatedKey("", "path1", "format1"))
if !ok {
t.Error("Error reading from cache")
@@ -55,13 +56,15 @@ func TestAddFldPost(t *testing.T) {
Path: "path1",
Format: "format1",
Events: []interface{}{"1"},
Opts: make(map[string]interface{}),
Opts: &config.EventExporterOpts{},
}
if !reflect.DeepEqual(eOut, failedPost) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
}
AddFailedPost("", "path1", "format1", "2", make(map[string]interface{}))
AddFailedPost("", "path2", "format2", "3", map[string]interface{}{utils.SQSQueueID: "qID"})
AddFailedPost("", "path1", "format1", "2", &config.EventExporterOpts{})
AddFailedPost("", "path2", "format2", "3", &config.EventExporterOpts{
SQSQueueID: utils.StringPointer("qID"),
})
x, ok = failedPostCache.Get(utils.ConcatenatedKey("", "path1", "format1"))
if !ok {
t.Error("Error reading from cache")
@@ -77,7 +80,7 @@ func TestAddFldPost(t *testing.T) {
Path: "path1",
Format: "format1",
Events: []interface{}{"1", "2"},
Opts: make(map[string]interface{}),
Opts: &config.EventExporterOpts{},
}
if !reflect.DeepEqual(eOut, failedPost) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
@@ -97,7 +100,9 @@ func TestAddFldPost(t *testing.T) {
Path: "path2",
Format: "format2",
Events: []interface{}{"3"},
Opts: map[string]interface{}{utils.SQSQueueID: "qID"},
Opts: &config.EventExporterOpts{
SQSQueueID: utils.StringPointer("qID"),
},
}
if !reflect.DeepEqual(eOut, failedPost) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))

View File

@@ -60,24 +60,18 @@ type NatsEE struct {
bytePreparing
}
func (pstr *NatsEE) parseOpt(opts map[string]interface{}, nodeID string, connTimeout time.Duration) (err error) {
if useJetStreamVal, has := opts[utils.NatsJetStream]; has {
if pstr.jetStream, err = utils.IfaceAsBool(useJetStreamVal); err != nil {
return
}
func (pstr *NatsEE) parseOpt(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (err error) {
if opts.NATSJetStream != nil {
pstr.jetStream = *opts.NATSJetStream
}
pstr.subject = utils.DefaultQueueID
if vals, has := opts[utils.NatsSubject]; has {
pstr.subject = utils.IfaceAsString(vals)
if opts.NATSSubject != nil {
pstr.subject = *opts.NATSSubject
}
pstr.opts, err = GetNatsOpts(opts, nodeID, connTimeout)
if pstr.jetStream {
if maxWaitVal, has := opts[utils.NatsJetStreamMaxWait]; has {
var maxWait time.Duration
if maxWait, err = utils.IfaceAsDuration(maxWaitVal); err != nil {
return
}
pstr.jsOpts = []nats.JSOpt{nats.MaxWait(maxWait)}
if opts.NATSJetStreamMaxWait != nil {
pstr.jsOpts = []nats.JSOpt{nats.MaxWait(*opts.NATSJetStreamMaxWait)}
}
}
return
@@ -129,51 +123,51 @@ func (pstr *NatsEE) Close() (err error) {
func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
func GetNatsOpts(opts map[string]interface{}, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) {
func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) {
nop = make([]nats.Option, 0, 7)
nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID),
nats.Timeout(connTimeout),
nats.DrainTimeout(time.Second))
if userFile, has := opts[utils.NatsJWTFile]; has {
if opts.NATSJWTFile != nil {
keys := make([]string, 0, 1)
if keyFile, has := opts[utils.NatsSeedFile]; has {
keys = append(keys, utils.IfaceAsString(keyFile))
if opts.NATSSeedFile != nil {
keys = append(keys, *opts.NATSSeedFile)
}
nop = append(nop, nats.UserCredentials(utils.IfaceAsString(userFile), keys...))
nop = append(nop, nats.UserCredentials(*opts.NATSJWTFile, keys...))
}
if nkeyFile, has := opts[utils.NatsSeedFile]; has {
opt, err := nats.NkeyOptionFromSeed(utils.IfaceAsString(nkeyFile))
if opts.NATSSeedFile != nil {
opt, err := nats.NkeyOptionFromSeed(*opts.NATSSeedFile)
if err != nil {
return nil, err
}
nop = append(nop, opt)
}
if certFile, has := opts[utils.NatsClientCertificate]; has {
clientFile, has := opts[utils.NatsClientKey]
if !has {
if opts.NATSClientCertificate != nil {
if opts.NATSClientKey == nil {
err = fmt.Errorf("has certificate but no key")
return
}
nop = append(nop, nats.ClientCert(utils.IfaceAsString(certFile), utils.IfaceAsString(clientFile)))
} else if _, has := opts[utils.NatsClientKey]; has {
nop = append(nop, nats.ClientCert(*opts.NATSClientCertificate, *opts.NATSClientKey))
} else if opts.NATSClientKey != nil {
err = fmt.Errorf("has key but no certificate")
return
}
if caFile, has := opts[utils.NatsCertificateAuthority]; has {
if opts.NATSCertificateAuthority != nil {
nop = append(nop,
func(o *nats.Options) error {
pool, err := x509.SystemCertPool()
if err != nil {
return err
}
rootPEM, err := ioutil.ReadFile(utils.IfaceAsString(caFile))
rootPEM, err := ioutil.ReadFile(*opts.NATSCertificateAuthority)
if err != nil || rootPEM == nil {
return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err)
}
ok := pool.AppendCertsFromPEM(rootPEM)
if !ok {
return fmt.Errorf("nats: failed to parse root certificate from %q", caFile)
return fmt.Errorf("nats: failed to parse root certificate from %q",
*opts.NATSCertificateAuthority)
}
if o.TLSConfig == nil {
o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}

View File

@@ -196,9 +196,8 @@ func TestGetNatsOptsSeedFile(t *testing.T) {
nkey := "SUACSSL3UAHUDXKFSNVUZRF5UHPMWZ6BFDTJ7M6USDXIEDNPPQYYYCU3VY"
os.WriteFile("/tmp/nkey.txt", []byte(nkey), 0777)
opts := map[string]interface{}{
utils.NatsSeedFile: "/tmp/nkey.txt",
// utils.NatsSeedFile: "file",
opts := &config.EventExporterOpts{
NATSSeedFile: utils.StringPointer("/tmp/nkey.txt"),
}
nodeID := "node_id1"

View File

@@ -73,7 +73,7 @@ func TestParseOpt(t *testing.T) {
Attempts: 2,
ConcurrentRequests: 2,
}
opts := map[string]interface{}{}
opts := &config.EventExporterOpts{}
nodeID := "node_id1"
connTimeout := 2 * time.Second
dc, err := newEEMetrics("Local")
@@ -99,8 +99,8 @@ func TestParseOptJetStream(t *testing.T) {
Attempts: 2,
ConcurrentRequests: 2,
}
opts := map[string]interface{}{
utils.NatsJetStream: true,
opts := &config.EventExporterOpts{
NATSJetStream: utils.BoolPointer(true),
}
nodeID := "node_id1"
connTimeout := 2 * time.Second
@@ -121,17 +121,6 @@ func TestParseOptJetStream(t *testing.T) {
if !pstr.jetStream {
t.Error("Expected jetStream to be true")
}
//test error on converson
opts = map[string]interface{}{
utils.NatsJetStream: uint16(2),
}
err = pstr.parseOpt(opts, nodeID, connTimeout)
if err.Error() != "cannot convert field: 2 to bool" {
t.Error("The conversion shouldn't have been possible")
}
}
func TestParseOptJetStreamMaxWait(t *testing.T) {
@@ -141,9 +130,9 @@ func TestParseOptJetStreamMaxWait(t *testing.T) {
Attempts: 2,
ConcurrentRequests: 2,
}
opts := map[string]interface{}{
utils.NatsJetStream: true,
utils.NatsJetStreamMaxWait: "2ns",
opts := &config.EventExporterOpts{
NATSJetStream: utils.BoolPointer(true),
NATSJetStreamMaxWait: utils.DurationPointer(2),
}
nodeID := "node_id1"
connTimeout := 2 * time.Second
@@ -164,17 +153,6 @@ func TestParseOptJetStreamMaxWait(t *testing.T) {
if !reflect.DeepEqual(pstr.jsOpts, exp) {
t.Errorf("Expected %v \n but received \n %v", exp, pstr.jsOpts)
}
//test conversion error
opts = map[string]interface{}{
utils.NatsJetStream: true,
utils.NatsJetStreamMaxWait: true,
}
err = pstr.parseOpt(opts, nodeID, connTimeout)
if err.Error() != "cannot convert field: true to time.Duration" {
t.Errorf("The conversion shouldn't have been possible: %v", err.Error())
}
}
func TestParseOptSubject(t *testing.T) {
@@ -184,8 +162,8 @@ func TestParseOptSubject(t *testing.T) {
Attempts: 2,
ConcurrentRequests: 2,
}
opts := map[string]interface{}{
utils.NatsSubject: "nats_subject",
opts := &config.EventExporterOpts{
NATSSubject: utils.StringPointer("nats_subject"),
}
nodeID := "node_id1"
connTimeout := 2 * time.Second
@@ -203,15 +181,14 @@ func TestParseOptSubject(t *testing.T) {
t.Error(err)
}
if pstr.subject != opts[utils.NatsSubject] {
t.Errorf("Expected %v \n but received \n %v", opts[utils.NatsSubject], pstr.subject)
if opts.NATSSubject == nil || pstr.subject != *opts.NATSSubject {
t.Errorf("Expected %v \n but received \n %v", *opts.NATSSubject, pstr.subject)
}
}
func TestGetNatsOptsJWT(t *testing.T) {
opts := map[string]interface{}{
utils.NatsJWTFile: "jwtfile",
// utils.NatsSeedFile: "file",
opts := &config.EventExporterOpts{
NATSJWTFile: utils.StringPointer("jwtfile"),
}
nodeID := "node_id1"
@@ -224,9 +201,9 @@ func TestGetNatsOptsJWT(t *testing.T) {
}
func TestGetNatsOptsClientCert(t *testing.T) {
opts := map[string]interface{}{
utils.NatsClientCertificate: "client_cert",
utils.NatsClientKey: "client_key",
opts := &config.EventExporterOpts{
NATSClientCertificate: utils.StringPointer("client_cert"),
NATSClientKey: utils.StringPointer("client_key"),
}
nodeID := "node_id1"
connTimeout := 2 * time.Second
@@ -245,8 +222,8 @@ func TestGetNatsOptsClientCert(t *testing.T) {
// }
// no key error
opts = map[string]interface{}{
utils.NatsClientCertificate: "client_cert",
opts = &config.EventExporterOpts{
NATSClientCertificate: utils.StringPointer("client_cert"),
}
_, err = GetNatsOpts(opts, nodeID, connTimeout)
if err.Error() != "has certificate but no key" {
@@ -254,8 +231,8 @@ func TestGetNatsOptsClientCert(t *testing.T) {
}
// no certificate error
opts = map[string]interface{}{
utils.NatsClientKey: "client_key",
opts = &config.EventExporterOpts{
NATSClientKey: utils.StringPointer("client_key"),
}
_, err = GetNatsOpts(opts, nodeID, connTimeout)
if err.Error() != "has key but no certificate" {

View File

@@ -80,7 +80,7 @@ func TestHttpJsonPoster(t *testing.T) {
if err = ExportWithAttempts(pstr, &HTTPPosterRequest{Body: jsn, Header: make(http.Header)}, ""); err == nil {
t.Error("Expected error")
}
AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.MetaHTTPjsonMap, jsn, make(map[string]interface{}))
AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.MetaHTTPjsonMap, jsn, &config.EventExporterOpts{})
time.Sleep(5 * time.Millisecond)
fs, err := filepath.Glob("/tmp/EEs*")
if err != nil {
@@ -117,7 +117,7 @@ func TestHttpBytesPoster(t *testing.T) {
if err = ExportWithAttempts(pstr, &HTTPPosterRequest{Body: content, Header: make(http.Header)}, ""); err == nil {
t.Error("Expected error")
}
AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.ContentJSON, content, make(map[string]interface{}))
AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.ContentJSON, content, &config.EventExporterOpts{})
time.Sleep(5 * time.Millisecond)
fs, err := filepath.Glob("/tmp/test2*")
if err != nil {
@@ -154,11 +154,11 @@ func TestSQSPoster(t *testing.T) {
awsSecret := "replace-this-with-your-secret"
qname := "cgrates-cdrs"
opts := map[string]interface{}{
utils.AWSRegion: region,
utils.AWSKey: awsKey,
utils.AWSSecret: awsSecret,
utils.SQSQueueID: qname,
opts := &config.EventExporterOpts{
AWSRegion: utils.StringPointer(region),
AWSKey: utils.StringPointer(awsKey),
AWSSecret: utils.StringPointer(awsSecret),
SQSQueueID: utils.StringPointer(qname),
}
//#####################################
@@ -237,11 +237,11 @@ func TestS3Poster(t *testing.T) {
awsSecret := "replace-this-with-your-secret"
qname := "cgrates-cdrs"
opts := map[string]interface{}{
utils.AWSRegion: region,
utils.AWSKey: awsKey,
utils.AWSSecret: awsSecret,
utils.S3Bucket: qname,
opts := &config.EventExporterOpts{
AWSRegion: utils.StringPointer(region),
AWSKey: utils.StringPointer(awsKey),
AWSSecret: utils.StringPointer(awsSecret),
SQSQueueID: utils.StringPointer(qname),
}
//#####################################
@@ -299,8 +299,8 @@ func TestAMQPv1Poster(t *testing.T) {
// update this variables
endpoint := "amqps://RootManageSharedAccessKey:UlfIJ%2But11L0ZzA%2Fgpje8biFJeQihpWibJsUhaOi1DU%3D@cdrscgrates.servicebus.windows.net"
qname := "cgrates-cdrs"
opts := map[string]interface{}{
utils.AMQPQueueID: qname,
opts := &config.EventExporterOpts{
AMQPQueueID: utils.StringPointer(qname),
}
//#####################################

View File

@@ -37,11 +37,11 @@ func TestAMQPeeParseURL(t *testing.T) {
exchangeType: "fanout",
routingKey: "CGRCDR",
}
opts := map[string]interface{}{
utils.AMQPQueueID: "q1",
utils.AMQPExchange: "E1",
utils.AMQPRoutingKey: "CGRCDR",
utils.AMQPExchangeType: "fanout",
opts := &config.EventExporterOpts{
AMQPQueueID: utils.StringPointer("q1"),
AMQPExchange: utils.StringPointer("E1"),
AMQPRoutingKey: utils.StringPointer("CGRCDR"),
AMQPExchangeType: utils.StringPointer("fanout"),
}
amqp.parseOpts(opts)
if !reflect.DeepEqual(expected, amqp) {
@@ -53,7 +53,9 @@ func TestKafkaParseURL(t *testing.T) {
cfg := &config.EventExporterCfg{
ExportPath: "127.0.0.1:9092",
Attempts: 10,
Opts: map[string]interface{}{utils.KafkaTopic: "cdr_billing"},
Opts: &config.EventExporterOpts{
KafkaTopic: utils.StringPointer("cdr_billing"),
},
}
exp := &KafkaEE{
cfg: cfg,

View File

@@ -60,25 +60,25 @@ type S3EE struct {
bytePreparing
}
func (pstr *S3EE) parseOpts(opts map[string]interface{}) {
func (pstr *S3EE) parseOpts(opts *config.EventExporterOpts) {
pstr.bucket = utils.DefaultQueueID
if val, has := opts[utils.S3Bucket]; has {
pstr.bucket = utils.IfaceAsString(val)
if opts.S3BucketID != nil {
pstr.bucket = *opts.S3BucketID
}
if val, has := opts[utils.S3FolderPath]; has {
pstr.folderPath = utils.IfaceAsString(val)
if opts.S3FolderPath != nil {
pstr.folderPath = *opts.S3FolderPath
}
if val, has := opts[utils.AWSRegion]; has {
pstr.awsRegion = utils.IfaceAsString(val)
if opts.AWSRegion != nil {
pstr.awsRegion = *opts.AWSRegion
}
if val, has := opts[utils.AWSKey]; has {
pstr.awsID = utils.IfaceAsString(val)
if opts.AWSKey != nil {
pstr.awsID = *opts.AWSKey
}
if val, has := opts[utils.AWSSecret]; has {
pstr.awsKey = utils.IfaceAsString(val)
if opts.AWSSecret != nil {
pstr.awsKey = *opts.AWSSecret
}
if val, has := opts[utils.AWSToken]; has {
pstr.awsToken = utils.IfaceAsString(val)
if opts.AWSToken != nil {
pstr.awsToken = *opts.AWSToken
}
}

View File

@@ -71,18 +71,18 @@ func (sqlEe *SQLEe) initDialector() (err error) {
password, _ := u.User.Password()
dbname := utils.SQLDefaultDBName
if vals, has := sqlEe.Cfg().Opts[utils.SQLDBNameOpt]; has {
dbname = utils.IfaceAsString(vals)
if sqlEe.Cfg().Opts.SQLDBName != nil {
dbname = *sqlEe.Cfg().Opts.SQLDBName
}
ssl := utils.SQLDefaultSSLMode
if vals, has := sqlEe.Cfg().Opts[utils.SSLModeCfg]; has {
ssl = utils.IfaceAsString(vals)
if sqlEe.Cfg().Opts.SSLMode != nil {
ssl = *sqlEe.Cfg().Opts.SSLMode
}
// tableName is mandatory in opts
if iface, has := sqlEe.Cfg().Opts[utils.SQLTableNameOpt]; !has {
return utils.NewErrMandatoryIeMissing(utils.SQLTableNameOpt)
if sqlEe.Cfg().Opts.SQLTableName != nil {
sqlEe.tableName = *sqlEe.Cfg().Opts.SQLTableName
} else {
sqlEe.tableName = utils.IfaceAsString(iface)
return utils.NewErrMandatoryIeMissing(utils.SQLTableNameOpt)
}
// var dialect gorm.Dialector
@@ -98,7 +98,7 @@ func (sqlEe *SQLEe) initDialector() (err error) {
return
}
func openDB(dialect gorm.Dialector, opts map[string]interface{}) (db *gorm.DB, sqlDB *sql.DB, err error) {
func openDB(dialect gorm.Dialector, opts *config.EventExporterOpts) (db *gorm.DB, sqlDB *sql.DB, err error) {
if db, err = gorm.Open(dialect, &gorm.Config{AllowGlobalUpdate: true}); err != nil {
return
}
@@ -106,26 +106,14 @@ func openDB(dialect gorm.Dialector, opts map[string]interface{}) (db *gorm.DB, s
return
}
if iface, has := opts[utils.SQLMaxIdleConnsCfg]; has {
val, err := utils.IfaceAsTInt64(iface)
if err != nil {
return nil, nil, err
}
sqlDB.SetMaxIdleConns(int(val))
if opts.SQLMaxIdleConns != nil {
sqlDB.SetMaxIdleConns(*opts.SQLMaxIdleConns)
}
if iface, has := opts[utils.SQLMaxOpenConns]; has {
val, err := utils.IfaceAsTInt64(iface)
if err != nil {
return nil, nil, err
}
sqlDB.SetMaxOpenConns(int(val))
if opts.SQLMaxOpenConns != nil {
sqlDB.SetMaxOpenConns(*opts.SQLMaxOpenConns)
}
if iface, has := opts[utils.SQLConnMaxLifetime]; has {
val, err := utils.IfaceAsDuration(iface)
if err != nil {
return nil, nil, err
}
sqlDB.SetConnMaxLifetime(val)
if opts.SQLConnMaxLifetime != nil {
sqlDB.SetConnMaxLifetime(*opts.SQLConnMaxLifetime)
}
return

View File

@@ -233,64 +233,40 @@ func testSqlEeVerifyExportedEvent2(t *testing.T) {
func TestOpenDB1(t *testing.T) {
dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
"cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates"))
_, _, err := openDB(dialect, map[string]interface{}{utils.SQLMaxIdleConnsCfg: 2})
_, _, err := openDB(dialect, &config.EventExporterOpts{
SQLMaxIdleConns: utils.IntPointer(2),
})
if err != nil {
t.Error(err)
}
}
func TestOpenDB1Err(t *testing.T) {
dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
"cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates"))
_, _, err := openDB(dialect, map[string]interface{}{utils.SQLMaxIdleConnsCfg: "test"})
errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax"
if err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
func TestOpenDB2(t *testing.T) {
dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
"cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates"))
_, _, err := openDB(dialect, map[string]interface{}{utils.SQLMaxOpenConns: 2})
_, _, err := openDB(dialect, &config.EventExporterOpts{
SQLMaxOpenConns: utils.IntPointer(2),
})
if err != nil {
t.Error(err)
}
}
func TestOpenDB2Err(t *testing.T) {
dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
"cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates"))
_, _, err := openDB(dialect, map[string]interface{}{utils.SQLMaxOpenConns: "test"})
errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax"
if err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
func TestOpenDB3(t *testing.T) {
dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
"cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates"))
_, _, err := openDB(dialect, map[string]interface{}{utils.SQLConnMaxLifetime: 2})
_, _, err := openDB(dialect, &config.EventExporterOpts{
SQLConnMaxLifetime: utils.DurationPointer(2),
})
if err != nil {
t.Error(err)
}
}
func TestOpenDB3Err(t *testing.T) {
dialect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
"cgrates", "CGRateS.org", "127.0.0.1", "3306", "cgrates"))
_, _, err := openDB(dialect, map[string]interface{}{utils.SQLConnMaxLifetime: "test"})
errExpect := "time: invalid duration \"test\""
if err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
func TestSQLExportEvent1(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable"
cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "cgrates"
cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = utils.StringPointer("expTable")
cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = utils.StringPointer("cgrates")
cgrCfg.EEsCfg().Exporters[0].ExportPath = `mysql://cgrates:CGRateS.org@127.0.0.1:3306`
sqlEe, err := NewSQLEe(cgrCfg.EEsCfg().Exporters[0], nil)
if err != nil {

View File

@@ -46,9 +46,9 @@ func TestSqlGetMetrics(t *testing.T) {
func TestNewSQLeUrl(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable"
cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "postgres"
cgrCfg.EEsCfg().Exporters[0].Opts[utils.SSLModeCfg] = "test"
cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = utils.StringPointer("expTable")
cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = utils.StringPointer("postgres")
cgrCfg.EEsCfg().Exporters[0].Opts.SSLMode = utils.StringPointer("test")
sqlEe := &SQLEe{
cfg: cgrCfg.EEsCfg().Exporters[0],
reqs: newConcReq(0),
@@ -61,8 +61,8 @@ func TestNewSQLeUrl(t *testing.T) {
func TestNewSQLeUrlSQL(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable"
cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "mysql"
cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = utils.StringPointer("expTable")
cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = utils.StringPointer("mysql")
cgrCfg.EEsCfg().Exporters[0].ExportPath = `mysql://cgrates:CGRateS.org@127.0.0.1:3306`
sqlEe := &SQLEe{
cfg: cgrCfg.EEsCfg().Exporters[0],
@@ -79,8 +79,8 @@ func TestNewSQLeUrlSQL(t *testing.T) {
func TestNewSQLeUrlPostgres(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable"
cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "postgres"
cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = utils.StringPointer("expTable")
cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = utils.StringPointer("postgres")
cgrCfg.EEsCfg().Exporters[0].ExportPath = `postgres://cgrates:CGRateS.org@127.0.0.1:3306`
sqlEe := &SQLEe{
cfg: cgrCfg.EEsCfg().Exporters[0],
@@ -97,8 +97,8 @@ func TestNewSQLeUrlPostgres(t *testing.T) {
func TestNewSQLeExportPathError(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable"
cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "postgres"
cgrCfg.EEsCfg().Exporters[0].Opts.SQLTableName = utils.StringPointer("expTable")
cgrCfg.EEsCfg().Exporters[0].Opts.SQLDBName = utils.StringPointer("postgres")
cgrCfg.EEsCfg().Exporters[0].ExportPath = ":foo"
sqlEe := &SQLEe{
cfg: cgrCfg.EEsCfg().Exporters[0],
@@ -120,7 +120,7 @@ func TestOpenDBError2(t *testing.T) {
tmp := logger.Default
logger.Default = logger.Default.LogMode(logger.Silent)
mckDialect := new(mockDialect2)
_, _, err := openDB(mckDialect, make(map[string]interface{}))
_, _, err := openDB(mckDialect, &config.EventExporterOpts{})
errExpect := "invalid db"
if err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
@@ -140,7 +140,7 @@ func TestOpenDBError3(t *testing.T) {
tmp := logger.Default
logger.Default = logger.Default.LogMode(logger.Silent)
mckDialect := new(mockDialectErr)
_, _, err := openDB(mckDialect, make(map[string]interface{}))
_, _, err := openDB(mckDialect, &config.EventExporterOpts{})
errExpect := "NOT_FOUND"
if err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)

View File

@@ -59,22 +59,22 @@ type SQSee struct {
bytePreparing
}
func (pstr *SQSee) parseOpts(opts map[string]interface{}) {
func (pstr *SQSee) parseOpts(opts *config.EventExporterOpts) {
pstr.queueID = utils.DefaultQueueID
if val, has := opts[utils.SQSQueueID]; has {
pstr.queueID = utils.IfaceAsString(val)
if opts.SQSQueueID != nil {
pstr.queueID = *opts.SQSQueueID
}
if val, has := opts[utils.AWSRegion]; has {
pstr.awsRegion = utils.IfaceAsString(val)
if opts.AWSRegion != nil {
pstr.awsRegion = *opts.AWSRegion
}
if val, has := opts[utils.AWSKey]; has {
pstr.awsID = utils.IfaceAsString(val)
if opts.AWSKey != nil {
pstr.awsID = *opts.AWSKey
}
if val, has := opts[utils.AWSSecret]; has {
pstr.awsKey = utils.IfaceAsString(val)
if opts.AWSSecret != nil {
pstr.awsKey = *opts.AWSSecret
}
if val, has := opts[utils.AWSToken]; has {
pstr.awsToken = utils.IfaceAsString(val)
if opts.AWSToken != nil {
pstr.awsToken = *opts.AWSToken
}
}