Unit testing in config

This commit is contained in:
nickolasdaniel
2021-11-05 17:46:09 +02:00
committed by Dan Christian Bogos
parent 907b9476b1
commit 35d0e9bc71
2 changed files with 809 additions and 0 deletions

View File

@@ -1360,3 +1360,315 @@ func TestEventExporterOptsClone(t *testing.T) {
t.Errorf("Expected %v \n but received \n %v", utils.ToJSON(exp), utils.ToJSON(rcv))
}
}
func TestLoadFromJSONCfg(t *testing.T) {
eeOpts := &EventExporterOpts{}
eeSJson := &EventExporterOptsJson{
CSVFieldSeparator: utils.StringPointer(","),
ElsIndex: utils.StringPointer("idx1"),
ElsIfPrimaryTerm: utils.IntPointer(1),
ElsIfSeqNo: utils.IntPointer(2),
ElsOpType: utils.StringPointer("op_type"),
ElsPipeline: utils.StringPointer("pipeline"),
ElsRouting: utils.StringPointer("routing"),
ElsTimeout: utils.StringPointer("2s"),
ElsVersion: utils.IntPointer(1),
ElsVersionType: utils.StringPointer("version_type"),
ElsWaitForActiveShards: utils.StringPointer("wfas"),
SQLMaxIdleConns: utils.IntPointer(5),
SQLMaxOpenConns: utils.IntPointer(10),
SQLConnMaxLifetime: utils.StringPointer("2s"),
SQLTableName: utils.StringPointer("cdrs"),
SQLDBName: utils.StringPointer("cgrates"),
SSLMode: utils.StringPointer("sslm"),
KafkaTopic: utils.StringPointer("topic1"),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPExchange: utils.StringPointer("amqp_exchange"),
AMQPExchangeType: utils.StringPointer("amqp_exchange_type"),
AWSRegion: utils.StringPointer("utc"),
AWSKey: utils.StringPointer("aws_key"),
AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"),
SQSQueueID: utils.StringPointer("sqs_queue_id"),
S3BucketID: utils.StringPointer("s3_bucket_id"),
S3FolderPath: utils.StringPointer("s3_folder_path"),
NATSJetStream: utils.BoolPointer(false),
NATSSubject: utils.StringPointer("ees_nats"),
NATSJWTFile: utils.StringPointer("/path/to/jwt"),
NATSSeedFile: utils.StringPointer("/path/to/seed"),
NATSCertificateAuthority: utils.StringPointer("ca"),
NATSClientCertificate: utils.StringPointer("cc"),
NATSClientKey: utils.StringPointer("ck"),
NATSJetStreamMaxWait: utils.StringPointer("2s"),
RPCCodec: utils.StringPointer("rpccodec"),
ServiceMethod: utils.StringPointer("service_method"),
KeyPath: utils.StringPointer("/path/to/key"),
CertPath: utils.StringPointer("cp"),
CAPath: utils.StringPointer("ca_path"),
TLS: utils.BoolPointer(false),
RPCConnTimeout: utils.StringPointer("2s"),
RPCReplyTimeout: utils.StringPointer("2s"),
}
exp := &EventExporterOpts{
CSVFieldSeparator: utils.StringPointer(","),
ElsIndex: utils.StringPointer("idx1"),
ElsIfPrimaryTerm: utils.IntPointer(1),
ElsIfSeqNo: utils.IntPointer(2),
ElsOpType: utils.StringPointer("op_type"),
ElsPipeline: utils.StringPointer("pipeline"),
ElsRouting: utils.StringPointer("routing"),
ElsTimeout: utils.DurationPointer(2 * time.Second),
ElsVersion: utils.IntPointer(1),
ElsVersionType: utils.StringPointer("version_type"),
ElsWaitForActiveShards: utils.StringPointer("wfas"),
SQLMaxIdleConns: utils.IntPointer(5),
SQLMaxOpenConns: utils.IntPointer(10),
SQLConnMaxLifetime: utils.DurationPointer(2 * time.Second),
SQLTableName: utils.StringPointer("cdrs"),
SQLDBName: utils.StringPointer("cgrates"),
SSLMode: utils.StringPointer("sslm"),
KafkaTopic: utils.StringPointer("topic1"),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPExchange: utils.StringPointer("amqp_exchange"),
AMQPExchangeType: utils.StringPointer("amqp_exchange_type"),
AWSRegion: utils.StringPointer("utc"),
AWSKey: utils.StringPointer("aws_key"),
AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"),
SQSQueueID: utils.StringPointer("sqs_queue_id"),
S3BucketID: utils.StringPointer("s3_bucket_id"),
S3FolderPath: utils.StringPointer("s3_folder_path"),
NATSJetStream: utils.BoolPointer(false),
NATSSubject: utils.StringPointer("ees_nats"),
NATSJWTFile: utils.StringPointer("/path/to/jwt"),
NATSSeedFile: utils.StringPointer("/path/to/seed"),
NATSCertificateAuthority: utils.StringPointer("ca"),
NATSClientCertificate: utils.StringPointer("cc"),
NATSClientKey: utils.StringPointer("ck"),
NATSJetStreamMaxWait: utils.DurationPointer(2 * time.Second),
RPCCodec: utils.StringPointer("rpccodec"),
ServiceMethod: utils.StringPointer("service_method"),
KeyPath: utils.StringPointer("/path/to/key"),
CertPath: utils.StringPointer("cp"),
CAPath: utils.StringPointer("ca_path"),
TLS: utils.BoolPointer(false),
RPCConnTimeout: utils.DurationPointer(2 * time.Second),
RPCReplyTimeout: utils.DurationPointer(2 * time.Second),
}
if err := eeOpts.loadFromJSONCfg(eeSJson); err != nil {
t.Error(err)
}
if !reflect.DeepEqual(exp, eeOpts) {
t.Errorf("Expected %v \n but received \n %v", utils.ToJSON(exp), utils.ToJSON(eeOpts))
}
//check with empty json config
eeSJson = nil
if err := eeOpts.loadFromJSONCfg(eeSJson); err != nil {
t.Error(err)
}
}
func TestLoadFromJsonParseErrors(t *testing.T) {
eeOpts := &EventExporterOpts{}
eeSJson := &EventExporterOptsJson{
CSVFieldSeparator: utils.StringPointer(","),
ElsIndex: utils.StringPointer("idx1"),
ElsIfPrimaryTerm: utils.IntPointer(1),
ElsIfSeqNo: utils.IntPointer(2),
ElsOpType: utils.StringPointer("op_type"),
ElsPipeline: utils.StringPointer("pipeline"),
ElsRouting: utils.StringPointer("routing"),
ElsTimeout: utils.StringPointer("2c"),
ElsVersion: utils.IntPointer(1),
ElsVersionType: utils.StringPointer("version_type"),
ElsWaitForActiveShards: utils.StringPointer("wfas"),
SQLMaxIdleConns: utils.IntPointer(5),
SQLMaxOpenConns: utils.IntPointer(10),
SQLConnMaxLifetime: utils.StringPointer("2s"),
SQLTableName: utils.StringPointer("cdrs"),
SQLDBName: utils.StringPointer("cgrates"),
SSLMode: utils.StringPointer("sslm"),
KafkaTopic: utils.StringPointer("topic1"),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPExchange: utils.StringPointer("amqp_exchange"),
AMQPExchangeType: utils.StringPointer("amqp_exchange_type"),
AWSRegion: utils.StringPointer("utc"),
AWSKey: utils.StringPointer("aws_key"),
AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"),
SQSQueueID: utils.StringPointer("sqs_queue_id"),
S3BucketID: utils.StringPointer("s3_bucket_id"),
S3FolderPath: utils.StringPointer("s3_folder_path"),
NATSJetStream: utils.BoolPointer(false),
NATSSubject: utils.StringPointer("ees_nats"),
NATSJWTFile: utils.StringPointer("/path/to/jwt"),
NATSSeedFile: utils.StringPointer("/path/to/seed"),
NATSCertificateAuthority: utils.StringPointer("ca"),
NATSClientCertificate: utils.StringPointer("cc"),
NATSClientKey: utils.StringPointer("ck"),
NATSJetStreamMaxWait: utils.StringPointer("2s"),
RPCCodec: utils.StringPointer("rpccodec"),
ServiceMethod: utils.StringPointer("service_method"),
KeyPath: utils.StringPointer("/path/to/key"),
CertPath: utils.StringPointer("cp"),
CAPath: utils.StringPointer("ca_path"),
TLS: utils.BoolPointer(false),
RPCConnTimeout: utils.StringPointer("2s"),
RPCReplyTimeout: utils.StringPointer("2s"),
}
errExp := `time: unknown unit "c" in duration "2c"`
if err := eeOpts.loadFromJSONCfg(eeSJson); err == nil || err.Error() != errExp {
t.Errorf("Expected %v \n but received \n %v", errExp, err.Error())
}
eeSJson.ElsTimeout = utils.StringPointer("2s")
///////
eeSJson.SQLConnMaxLifetime = utils.StringPointer("2c")
if err := eeOpts.loadFromJSONCfg(eeSJson); err == nil || err.Error() != errExp {
t.Errorf("Expected %v \n but received \n %v", errExp, err.Error())
}
eeSJson.SQLConnMaxLifetime = utils.StringPointer("2s")
//////
eeSJson.NATSJetStreamMaxWait = utils.StringPointer("2c")
if err := eeOpts.loadFromJSONCfg(eeSJson); err == nil || err.Error() != errExp {
t.Errorf("Expected %v \n but received \n %v", errExp, err.Error())
}
eeSJson.NATSJetStreamMaxWait = utils.StringPointer("2s")
/////
eeSJson.RPCConnTimeout = utils.StringPointer("2c")
if err := eeOpts.loadFromJSONCfg(eeSJson); err == nil || err.Error() != errExp {
t.Errorf("Expected %v \n but received \n %v", errExp, err.Error())
}
eeSJson.RPCConnTimeout = utils.StringPointer("2s")
/////
eeSJson.RPCReplyTimeout = utils.StringPointer("2c")
if err := eeOpts.loadFromJSONCfg(eeSJson); err == nil || err.Error() != errExp {
t.Errorf("Expected %v \n but received \n %v", errExp, err.Error())
}
eeSJson.RPCReplyTimeout = utils.StringPointer("2s")
}
func TestEEsAsMapInterface(t *testing.T) {
eeCfg := &EventExporterCfg{
Opts: &EventExporterOpts{
CSVFieldSeparator: utils.StringPointer(","),
ElsIndex: utils.StringPointer("idx1"),
ElsIfPrimaryTerm: utils.IntPointer(1),
ElsIfSeqNo: utils.IntPointer(2),
ElsOpType: utils.StringPointer("op_type"),
ElsPipeline: utils.StringPointer("pipeline"),
ElsRouting: utils.StringPointer("routing"),
ElsTimeout: utils.DurationPointer(2 * time.Second),
ElsVersion: utils.IntPointer(1),
ElsVersionType: utils.StringPointer("version_type"),
ElsWaitForActiveShards: utils.StringPointer("wfas"),
SQLMaxIdleConns: utils.IntPointer(5),
SQLMaxOpenConns: utils.IntPointer(10),
SQLConnMaxLifetime: utils.DurationPointer(2 * time.Second),
SQLTableName: utils.StringPointer("cdrs"),
SQLDBName: utils.StringPointer("cgrates"),
SSLMode: utils.StringPointer("sslm"),
KafkaTopic: utils.StringPointer("topic1"),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPExchange: utils.StringPointer("amqp_exchange"),
AMQPExchangeType: utils.StringPointer("amqp_exchange_type"),
AWSRegion: utils.StringPointer("utc"),
AWSKey: utils.StringPointer("aws_key"),
AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"),
SQSQueueID: utils.StringPointer("sqs_queue_id"),
S3BucketID: utils.StringPointer("s3_bucket_id"),
S3FolderPath: utils.StringPointer("s3_folder_path"),
NATSJetStream: utils.BoolPointer(false),
NATSSubject: utils.StringPointer("ees_nats"),
NATSJWTFile: utils.StringPointer("/path/to/jwt"),
NATSSeedFile: utils.StringPointer("/path/to/seed"),
NATSCertificateAuthority: utils.StringPointer("ca"),
NATSClientCertificate: utils.StringPointer("cc"),
NATSClientKey: utils.StringPointer("ck"),
NATSJetStreamMaxWait: utils.DurationPointer(2 * time.Second),
RPCCodec: utils.StringPointer("rpccodec"),
ServiceMethod: utils.StringPointer("service_method"),
KeyPath: utils.StringPointer("/path/to/key"),
CertPath: utils.StringPointer("cp"),
CAPath: utils.StringPointer("ca_path"),
TLS: utils.BoolPointer(false),
RPCConnTimeout: utils.DurationPointer(2 * time.Second),
RPCReplyTimeout: utils.DurationPointer(2 * time.Second),
},
}
exp := map[string]interface{}{
"opts": map[string]interface{}{
"TLS": false,
"amqpExchange": "amqp_exchange",
"amqpExchangeType": "amqp_exchange_type",
"amqpQueueID": "queue_id",
"amqpRoutingKey": "routing_key",
"awsKey": "aws_key",
"awsRegion": "utc",
"awsSecret": "aws_secret",
"awsToken": "aws_token",
"caPath": "ca_path",
"certPath": "cp",
"csvFieldSeparator": ",",
"elsIfPrimaryTerm": 1,
"elsIfSeqNo": 2,
"elsIndex": "idx1",
"elsOpType": "op_type",
"elsPipeline": "pipeline",
"elsRouting": "routing",
"elsTimeout": "2s",
"elsVersion": 1,
"elsVersionType": "version_type",
"elsWaitForActiveShards": "wfas",
"kafkaTopic": "topic1",
"keyPath": "/path/to/key",
"natsCertificateAuthority": "ca",
"natsClientCertificate": "cc",
"natsClientKey": "ck",
"natsJWTFile": "/path/to/jwt",
"natsJetStream": false,
"natsJetStreamMaxWait": "2s",
"natsSeedFile": "/path/to/seed",
"natsSubject": "ees_nats",
"rpcCodec": "rpccodec",
"rpcConnTimeout": "2s",
"rpcReplyTimeout": "2s",
"s3BucketID": "s3_bucket_id",
"s3FolderPath": "s3_folder_path",
"serviceMethod": "service_method",
"sqlConnMaxLifetime": "2s",
"sqlDBName": "cgrates",
"sqlMaxIdleConns": 5,
"sqlMaxOpenConns": 10,
"sqlTableName": "cdrs",
"sqsQueueID": "sqs_queue_id",
"sslMode": "sslm",
},
}
rcv := eeCfg.AsMapInterface(",")
if !reflect.DeepEqual(exp[utils.OptsCfg], rcv[utils.OptsCfg]) {
t.Errorf("Expected %v \n but received \n %v", utils.ToJSON(exp), utils.ToJSON(rcv))
}
}

View File

@@ -1733,3 +1733,500 @@ func TestErSCloneSection(t *testing.T) {
t.Errorf("Expected %+v \n but received \n %+v", utils.ToJSON(exp), utils.ToJSON(rcv))
}
}
func TestERsLoadFromJSONCfg(t *testing.T) {
erOpts := &EventReaderOpts{}
erJson := &EventReaderOptsJson{
PartialPath: utils.StringPointer("/tmp/path"),
PartialCacheAction: utils.StringPointer("partial_cache_action"),
PartialOrderField: utils.StringPointer("partial_order_field"),
PartialCSVFieldSeparator: utils.StringPointer(";"),
CSVRowLength: utils.IntPointer(2),
CSVFieldSeparator: utils.StringPointer(","),
CSVHeaderDefineChar: utils.StringPointer("header_define_char"),
CSVLazyQuotes: utils.BoolPointer(false),
XMLRootPath: utils.StringPointer("xml_root_path"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPQueueIDProcessed: utils.StringPointer("queue_id_processed"),
AMQPConsumerTag: utils.StringPointer("consumer_tag"),
AMQPExchange: utils.StringPointer("exchange"),
AMQPExchangeType: utils.StringPointer("exchange_type"),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPExchangeProcessed: utils.StringPointer("exchange_processed"),
AMQPExchangeTypeProcessed: utils.StringPointer("excange_type_processed"),
AMQPRoutingKeyProcessed: utils.StringPointer("routing_key_processed"),
KafkaTopic: utils.StringPointer("topic"),
KafkaGroupID: utils.StringPointer("group_id"),
KafkaMaxWait: utils.StringPointer("2s"),
KafkaTopicProcessed: utils.StringPointer("topic_processed"),
SQLDBName: utils.StringPointer("cgrates"),
SQLTableName: utils.StringPointer("cgrates_t1"),
SSLMode: utils.StringPointer("ssl_mode"),
SQLDBNameProcessed: utils.StringPointer("cgrates_processed"),
SQLTableNameProcessed: utils.StringPointer("cgrates_t1_processed"),
SSLModeProcessed: utils.StringPointer("ssl_mode_processed"),
AWSRegion: utils.StringPointer("us-west"),
AWSKey: utils.StringPointer("aws_key"),
AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"),
AWSRegionProcessed: utils.StringPointer("region_processed"),
AWSKeyProcessed: utils.StringPointer("aws_key_processed"),
AWSSecretProcessed: utils.StringPointer("aws_secret_processed"),
AWSTokenProcessed: utils.StringPointer("aws_token_processed"),
SQSQueueID: utils.StringPointer("queue_id"),
SQSQueueIDProcessed: utils.StringPointer("queue_id_processed"),
S3BucketID: utils.StringPointer("bucket_id"),
S3FolderPathProcessed: utils.StringPointer("folder_path_processed"),
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
NATSJetStream: utils.BoolPointer(false),
NATSConsumerName: utils.StringPointer("consumer_name"),
NATSSubject: utils.StringPointer("subject"),
NATSQueueID: utils.StringPointer("queue_id"),
NATSJWTFile: utils.StringPointer("jsw_file"),
NATSSeedFile: utils.StringPointer("seed_file"),
NATSCertificateAuthority: utils.StringPointer("ca"),
NATSClientCertificate: utils.StringPointer("cc"),
NATSClientKey: utils.StringPointer("ck"),
NATSJetStreamMaxWait: utils.StringPointer("2s"),
NATSJetStreamProcessed: utils.BoolPointer(false),
NATSSubjectProcessed: utils.StringPointer("subject_processed"),
NATSJWTFileProcessed: utils.StringPointer("jwt_file_processed"),
NATSSeedFileProcessed: utils.StringPointer("seed_file_processed"),
NATSCertificateAuthorityProcessed: utils.StringPointer("ca_processed"),
NATSClientCertificateProcessed: utils.StringPointer("cc_processed"),
NATSClientKeyProcessed: utils.StringPointer("ck_processed"),
NATSJetStreamMaxWaitProcessed: utils.StringPointer("2s"),
}
exp := &EventReaderOpts{
PartialPath: utils.StringPointer("/tmp/path"),
PartialCacheAction: utils.StringPointer("partial_cache_action"),
PartialOrderField: utils.StringPointer("partial_order_field"),
PartialCSVFieldSeparator: utils.StringPointer(";"),
CSVRowLength: utils.IntPointer(2),
CSVFieldSeparator: utils.StringPointer(","),
CSVHeaderDefineChar: utils.StringPointer("header_define_char"),
CSVLazyQuotes: utils.BoolPointer(false),
XMLRootPath: utils.StringPointer("xml_root_path"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPQueueIDProcessed: utils.StringPointer("queue_id_processed"),
AMQPConsumerTag: utils.StringPointer("consumer_tag"),
AMQPExchange: utils.StringPointer("exchange"),
AMQPExchangeType: utils.StringPointer("exchange_type"),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPExchangeProcessed: utils.StringPointer("exchange_processed"),
AMQPExchangeTypeProcessed: utils.StringPointer("excange_type_processed"),
AMQPRoutingKeyProcessed: utils.StringPointer("routing_key_processed"),
KafkaTopic: utils.StringPointer("topic"),
KafkaGroupID: utils.StringPointer("group_id"),
KafkaMaxWait: utils.DurationPointer(2 * time.Second),
KafkaTopicProcessed: utils.StringPointer("topic_processed"),
SQLDBName: utils.StringPointer("cgrates"),
SQLTableName: utils.StringPointer("cgrates_t1"),
SSLMode: utils.StringPointer("ssl_mode"),
SQLDBNameProcessed: utils.StringPointer("cgrates_processed"),
SQLTableNameProcessed: utils.StringPointer("cgrates_t1_processed"),
SSLModeProcessed: utils.StringPointer("ssl_mode_processed"),
AWSRegion: utils.StringPointer("us-west"),
AWSKey: utils.StringPointer("aws_key"),
AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"),
AWSRegionProcessed: utils.StringPointer("region_processed"),
AWSKeyProcessed: utils.StringPointer("aws_key_processed"),
AWSSecretProcessed: utils.StringPointer("aws_secret_processed"),
AWSTokenProcessed: utils.StringPointer("aws_token_processed"),
SQSQueueID: utils.StringPointer("queue_id"),
SQSQueueIDProcessed: utils.StringPointer("queue_id_processed"),
S3BucketID: utils.StringPointer("bucket_id"),
S3FolderPathProcessed: utils.StringPointer("folder_path_processed"),
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
NATSJetStream: utils.BoolPointer(false),
NATSConsumerName: utils.StringPointer("consumer_name"),
NATSSubject: utils.StringPointer("subject"),
NATSQueueID: utils.StringPointer("queue_id"),
NATSJWTFile: utils.StringPointer("jsw_file"),
NATSSeedFile: utils.StringPointer("seed_file"),
NATSCertificateAuthority: utils.StringPointer("ca"),
NATSClientCertificate: utils.StringPointer("cc"),
NATSClientKey: utils.StringPointer("ck"),
NATSJetStreamMaxWait: utils.DurationPointer(2 * time.Second),
NATSJetStreamProcessed: utils.BoolPointer(false),
NATSSubjectProcessed: utils.StringPointer("subject_processed"),
NATSJWTFileProcessed: utils.StringPointer("jwt_file_processed"),
NATSSeedFileProcessed: utils.StringPointer("seed_file_processed"),
NATSCertificateAuthorityProcessed: utils.StringPointer("ca_processed"),
NATSClientCertificateProcessed: utils.StringPointer("cc_processed"),
NATSClientKeyProcessed: utils.StringPointer("ck_processed"),
NATSJetStreamMaxWaitProcessed: utils.DurationPointer(2 * time.Second),
}
if err := erOpts.loadFromJSONCfg(erJson); err != nil {
t.Error(err)
}
if !reflect.DeepEqual(erOpts, exp) {
t.Errorf("Expected %+v \n but received \n %+v", utils.ToJSON(exp), utils.ToJSON(erOpts))
}
erJson = nil
if err := erOpts.loadFromJSONCfg(erJson); err != nil {
t.Error(err)
}
}
func TestERsLoadFromJsonCfgParseError(t *testing.T) {
erOpts := &EventReaderOpts{}
erJson := &EventReaderOptsJson{
PartialPath: utils.StringPointer("/tmp/path"),
PartialCacheAction: utils.StringPointer("partial_cache_action"),
PartialOrderField: utils.StringPointer("partial_order_field"),
PartialCSVFieldSeparator: utils.StringPointer(";"),
CSVRowLength: utils.IntPointer(2),
CSVFieldSeparator: utils.StringPointer(","),
CSVHeaderDefineChar: utils.StringPointer("header_define_char"),
CSVLazyQuotes: utils.BoolPointer(false),
XMLRootPath: utils.StringPointer("xml_root_path"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPQueueIDProcessed: utils.StringPointer("queue_id_processed"),
AMQPConsumerTag: utils.StringPointer("consumer_tag"),
AMQPExchange: utils.StringPointer("exchange"),
AMQPExchangeType: utils.StringPointer("exchange_type"),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPExchangeProcessed: utils.StringPointer("exchange_processed"),
AMQPExchangeTypeProcessed: utils.StringPointer("excange_type_processed"),
AMQPRoutingKeyProcessed: utils.StringPointer("routing_key_processed"),
KafkaTopic: utils.StringPointer("topic"),
KafkaGroupID: utils.StringPointer("group_id"),
KafkaMaxWait: utils.StringPointer("2s"),
KafkaTopicProcessed: utils.StringPointer("topic_processed"),
SQLDBName: utils.StringPointer("cgrates"),
SQLTableName: utils.StringPointer("cgrates_t1"),
SSLMode: utils.StringPointer("ssl_mode"),
SQLDBNameProcessed: utils.StringPointer("cgrates_processed"),
SQLTableNameProcessed: utils.StringPointer("cgrates_t1_processed"),
SSLModeProcessed: utils.StringPointer("ssl_mode_processed"),
AWSRegion: utils.StringPointer("us-west"),
AWSKey: utils.StringPointer("aws_key"),
AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"),
AWSRegionProcessed: utils.StringPointer("region_processed"),
AWSKeyProcessed: utils.StringPointer("aws_key_processed"),
AWSSecretProcessed: utils.StringPointer("aws_secret_processed"),
AWSTokenProcessed: utils.StringPointer("aws_token_processed"),
SQSQueueID: utils.StringPointer("queue_id"),
SQSQueueIDProcessed: utils.StringPointer("queue_id_processed"),
S3BucketID: utils.StringPointer("bucket_id"),
S3FolderPathProcessed: utils.StringPointer("folder_path_processed"),
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
NATSJetStream: utils.BoolPointer(false),
NATSConsumerName: utils.StringPointer("consumer_name"),
NATSSubject: utils.StringPointer("subject"),
NATSQueueID: utils.StringPointer("queue_id"),
NATSJWTFile: utils.StringPointer("jsw_file"),
NATSSeedFile: utils.StringPointer("seed_file"),
NATSCertificateAuthority: utils.StringPointer("ca"),
NATSClientCertificate: utils.StringPointer("cc"),
NATSClientKey: utils.StringPointer("ck"),
NATSJetStreamMaxWait: utils.StringPointer("2s"),
NATSJetStreamProcessed: utils.BoolPointer(false),
NATSSubjectProcessed: utils.StringPointer("subject_processed"),
NATSJWTFileProcessed: utils.StringPointer("jwt_file_processed"),
NATSSeedFileProcessed: utils.StringPointer("seed_file_processed"),
NATSCertificateAuthorityProcessed: utils.StringPointer("ca_processed"),
NATSClientCertificateProcessed: utils.StringPointer("cc_processed"),
NATSClientKeyProcessed: utils.StringPointer("ck_processed"),
NATSJetStreamMaxWaitProcessed: utils.StringPointer("2s"),
}
errExp := `time: unknown unit "c" in duration "2c"`
erJson.KafkaMaxWait = utils.StringPointer("2c")
if err := erOpts.loadFromJSONCfg(erJson); err == nil || err.Error() != errExp {
t.Errorf("Expected %v \n but received \n %v", errExp, err.Error())
}
erJson.KafkaMaxWait = utils.StringPointer("2s")
/////
erJson.NATSJetStreamMaxWait = utils.StringPointer("2c")
if err := erOpts.loadFromJSONCfg(erJson); err == nil || err.Error() != errExp {
t.Errorf("Expected %v \n but received \n %v", errExp, err.Error())
}
erJson.NATSJetStreamMaxWait = utils.StringPointer("2s")
/////
erJson.NATSJetStreamMaxWaitProcessed = utils.StringPointer("2c")
if err := erOpts.loadFromJSONCfg(erJson); err == nil || err.Error() != errExp {
t.Errorf("Expected %v \n but received \n %v", errExp, err.Error())
}
erJson.NATSJetStreamMaxWaitProcessed = utils.StringPointer("2s")
}
func TestERsClone(t *testing.T) {
erOpts := &EventReaderOpts{
PartialPath: utils.StringPointer("/tmp/path"),
PartialCacheAction: utils.StringPointer("partial_cache_action"),
PartialOrderField: utils.StringPointer("partial_order_field"),
PartialCSVFieldSeparator: utils.StringPointer(";"),
CSVRowLength: utils.IntPointer(2),
CSVFieldSeparator: utils.StringPointer(","),
CSVHeaderDefineChar: utils.StringPointer("header_define_char"),
CSVLazyQuotes: utils.BoolPointer(false),
XMLRootPath: utils.StringPointer("xml_root_path"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPQueueIDProcessed: utils.StringPointer("queue_id_processed"),
AMQPConsumerTag: utils.StringPointer("consumer_tag"),
AMQPExchange: utils.StringPointer("exchange"),
AMQPExchangeType: utils.StringPointer("exchange_type"),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPExchangeProcessed: utils.StringPointer("exchange_processed"),
AMQPExchangeTypeProcessed: utils.StringPointer("excange_type_processed"),
AMQPRoutingKeyProcessed: utils.StringPointer("routing_key_processed"),
KafkaTopic: utils.StringPointer("topic"),
KafkaGroupID: utils.StringPointer("group_id"),
KafkaMaxWait: utils.DurationPointer(2 * time.Second),
KafkaTopicProcessed: utils.StringPointer("topic_processed"),
SQLDBName: utils.StringPointer("cgrates"),
SQLTableName: utils.StringPointer("cgrates_t1"),
SSLMode: utils.StringPointer("ssl_mode"),
SQLDBNameProcessed: utils.StringPointer("cgrates_processed"),
SQLTableNameProcessed: utils.StringPointer("cgrates_t1_processed"),
SSLModeProcessed: utils.StringPointer("ssl_mode_processed"),
AWSRegion: utils.StringPointer("us-west"),
AWSKey: utils.StringPointer("aws_key"),
AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"),
AWSRegionProcessed: utils.StringPointer("region_processed"),
AWSKeyProcessed: utils.StringPointer("aws_key_processed"),
AWSSecretProcessed: utils.StringPointer("aws_secret_processed"),
AWSTokenProcessed: utils.StringPointer("aws_token_processed"),
SQSQueueID: utils.StringPointer("queue_id"),
SQSQueueIDProcessed: utils.StringPointer("queue_id_processed"),
S3BucketID: utils.StringPointer("bucket_id"),
S3FolderPathProcessed: utils.StringPointer("folder_path_processed"),
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
NATSJetStream: utils.BoolPointer(false),
NATSConsumerName: utils.StringPointer("consumer_name"),
NATSSubject: utils.StringPointer("subject"),
NATSQueueID: utils.StringPointer("queue_id"),
NATSJWTFile: utils.StringPointer("jsw_file"),
NATSSeedFile: utils.StringPointer("seed_file"),
NATSCertificateAuthority: utils.StringPointer("ca"),
NATSClientCertificate: utils.StringPointer("cc"),
NATSClientKey: utils.StringPointer("ck"),
NATSJetStreamMaxWait: utils.DurationPointer(2 * time.Second),
NATSJetStreamProcessed: utils.BoolPointer(false),
NATSSubjectProcessed: utils.StringPointer("subject_processed"),
NATSJWTFileProcessed: utils.StringPointer("jwt_file_processed"),
NATSSeedFileProcessed: utils.StringPointer("seed_file_processed"),
NATSCertificateAuthorityProcessed: utils.StringPointer("ca_processed"),
NATSClientCertificateProcessed: utils.StringPointer("cc_processed"),
NATSClientKeyProcessed: utils.StringPointer("ck_processed"),
NATSJetStreamMaxWaitProcessed: utils.DurationPointer(2 * time.Second),
}
exp := &EventReaderOpts{
PartialPath: utils.StringPointer("/tmp/path"),
PartialCacheAction: utils.StringPointer("partial_cache_action"),
PartialOrderField: utils.StringPointer("partial_order_field"),
PartialCSVFieldSeparator: utils.StringPointer(";"),
CSVRowLength: utils.IntPointer(2),
CSVFieldSeparator: utils.StringPointer(","),
CSVHeaderDefineChar: utils.StringPointer("header_define_char"),
CSVLazyQuotes: utils.BoolPointer(false),
XMLRootPath: utils.StringPointer("xml_root_path"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPQueueIDProcessed: utils.StringPointer("queue_id_processed"),
AMQPConsumerTag: utils.StringPointer("consumer_tag"),
AMQPExchange: utils.StringPointer("exchange"),
AMQPExchangeType: utils.StringPointer("exchange_type"),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPExchangeProcessed: utils.StringPointer("exchange_processed"),
AMQPExchangeTypeProcessed: utils.StringPointer("excange_type_processed"),
AMQPRoutingKeyProcessed: utils.StringPointer("routing_key_processed"),
KafkaTopic: utils.StringPointer("topic"),
KafkaGroupID: utils.StringPointer("group_id"),
KafkaMaxWait: utils.DurationPointer(2 * time.Second),
KafkaTopicProcessed: utils.StringPointer("topic_processed"),
SQLDBName: utils.StringPointer("cgrates"),
SQLTableName: utils.StringPointer("cgrates_t1"),
SSLMode: utils.StringPointer("ssl_mode"),
SQLDBNameProcessed: utils.StringPointer("cgrates_processed"),
SQLTableNameProcessed: utils.StringPointer("cgrates_t1_processed"),
SSLModeProcessed: utils.StringPointer("ssl_mode_processed"),
AWSRegion: utils.StringPointer("us-west"),
AWSKey: utils.StringPointer("aws_key"),
AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"),
AWSRegionProcessed: utils.StringPointer("region_processed"),
AWSKeyProcessed: utils.StringPointer("aws_key_processed"),
AWSSecretProcessed: utils.StringPointer("aws_secret_processed"),
AWSTokenProcessed: utils.StringPointer("aws_token_processed"),
SQSQueueID: utils.StringPointer("queue_id"),
SQSQueueIDProcessed: utils.StringPointer("queue_id_processed"),
S3BucketID: utils.StringPointer("bucket_id"),
S3FolderPathProcessed: utils.StringPointer("folder_path_processed"),
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
NATSJetStream: utils.BoolPointer(false),
NATSConsumerName: utils.StringPointer("consumer_name"),
NATSSubject: utils.StringPointer("subject"),
NATSQueueID: utils.StringPointer("queue_id"),
NATSJWTFile: utils.StringPointer("jsw_file"),
NATSSeedFile: utils.StringPointer("seed_file"),
NATSCertificateAuthority: utils.StringPointer("ca"),
NATSClientCertificate: utils.StringPointer("cc"),
NATSClientKey: utils.StringPointer("ck"),
NATSJetStreamMaxWait: utils.DurationPointer(2 * time.Second),
NATSJetStreamProcessed: utils.BoolPointer(false),
NATSSubjectProcessed: utils.StringPointer("subject_processed"),
NATSJWTFileProcessed: utils.StringPointer("jwt_file_processed"),
NATSSeedFileProcessed: utils.StringPointer("seed_file_processed"),
NATSCertificateAuthorityProcessed: utils.StringPointer("ca_processed"),
NATSClientCertificateProcessed: utils.StringPointer("cc_processed"),
NATSClientKeyProcessed: utils.StringPointer("ck_processed"),
NATSJetStreamMaxWaitProcessed: utils.DurationPointer(2 * time.Second),
}
rcv := erOpts.Clone()
if !reflect.DeepEqual(rcv, exp) {
t.Errorf("Expected %+v \n but received \n %+v", utils.ToJSON(exp), utils.ToJSON(rcv))
}
}
func TestERsAsMapInterface(t *testing.T) {
erCfg := &EventReaderCfg{
Opts: &EventReaderOpts{
PartialPath: utils.StringPointer("/tmp/path"),
PartialCacheAction: utils.StringPointer("partial_cache_action"),
PartialOrderField: utils.StringPointer("partial_order_field"),
PartialCSVFieldSeparator: utils.StringPointer(";"),
CSVRowLength: utils.IntPointer(2),
CSVFieldSeparator: utils.StringPointer(","),
CSVHeaderDefineChar: utils.StringPointer("header_define_char"),
CSVLazyQuotes: utils.BoolPointer(false),
XMLRootPath: utils.StringPointer("xml_root_path"),
AMQPQueueID: utils.StringPointer("queue_id"),
AMQPQueueIDProcessed: utils.StringPointer("queue_id_processed"),
AMQPConsumerTag: utils.StringPointer("consumer_tag"),
AMQPExchange: utils.StringPointer("exchange"),
AMQPExchangeType: utils.StringPointer("exchange_type"),
AMQPRoutingKey: utils.StringPointer("routing_key"),
AMQPExchangeProcessed: utils.StringPointer("exchange_processed"),
AMQPExchangeTypeProcessed: utils.StringPointer("excange_type_processed"),
AMQPRoutingKeyProcessed: utils.StringPointer("routing_key_processed"),
KafkaTopic: utils.StringPointer("topic"),
KafkaGroupID: utils.StringPointer("group_id"),
KafkaMaxWait: utils.DurationPointer(2 * time.Second),
KafkaTopicProcessed: utils.StringPointer("topic_processed"),
SQLDBName: utils.StringPointer("cgrates"),
SQLTableName: utils.StringPointer("cgrates_t1"),
SSLMode: utils.StringPointer("ssl_mode"),
SQLDBNameProcessed: utils.StringPointer("cgrates_processed"),
SQLTableNameProcessed: utils.StringPointer("cgrates_t1_processed"),
SSLModeProcessed: utils.StringPointer("ssl_mode_processed"),
AWSRegion: utils.StringPointer("us-west"),
AWSKey: utils.StringPointer("aws_key"),
AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"),
AWSRegionProcessed: utils.StringPointer("region_processed"),
AWSKeyProcessed: utils.StringPointer("aws_key_processed"),
AWSSecretProcessed: utils.StringPointer("aws_secret_processed"),
AWSTokenProcessed: utils.StringPointer("aws_token_processed"),
SQSQueueID: utils.StringPointer("queue_id"),
SQSQueueIDProcessed: utils.StringPointer("queue_id_processed"),
S3BucketID: utils.StringPointer("bucket_id"),
S3FolderPathProcessed: utils.StringPointer("folder_path_processed"),
S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"),
NATSJetStream: utils.BoolPointer(false),
NATSConsumerName: utils.StringPointer("consumer_name"),
NATSSubject: utils.StringPointer("subject"),
NATSQueueID: utils.StringPointer("queue_id"),
NATSJWTFile: utils.StringPointer("jsw_file"),
NATSSeedFile: utils.StringPointer("seed_file"),
NATSCertificateAuthority: utils.StringPointer("ca"),
NATSClientCertificate: utils.StringPointer("cc"),
NATSClientKey: utils.StringPointer("ck"),
NATSJetStreamMaxWait: utils.DurationPointer(2 * time.Second),
NATSJetStreamProcessed: utils.BoolPointer(false),
NATSSubjectProcessed: utils.StringPointer("subject_processed"),
NATSJWTFileProcessed: utils.StringPointer("jwt_file_processed"),
NATSSeedFileProcessed: utils.StringPointer("seed_file_processed"),
NATSCertificateAuthorityProcessed: utils.StringPointer("ca_processed"),
NATSClientCertificateProcessed: utils.StringPointer("cc_processed"),
NATSClientKeyProcessed: utils.StringPointer("ck_processed"),
NATSJetStreamMaxWaitProcessed: utils.DurationPointer(2 * time.Second),
},
}
exp := map[string]interface{}{
"opts": map[string]interface{}{
"amqpConsumerTag": "consumer_tag",
"amqpExchange": "exchange",
"amqpExchangeProcessed": "exchange_processed",
"amqpExchangeType": "exchange_type",
"amqpExchangeTypeProcessed": "excange_type_processed",
"amqpQueueID": "queue_id",
"amqpQueueIDProcessed": "queue_id_processed",
"amqpRoutingKey": "routing_key",
"amqpRoutingKeyProcessed": "routing_key_processed",
"awsKey": "aws_key",
"awsKeyProcessed": "aws_key_processed",
"awsRegion": "us-west",
"awsRegionProcessed": "region_processed",
"awsSecret": "aws_secret",
"awsSecretProcessed": "aws_secret_processed",
"awsToken": "aws_token",
"awsTokenProcessed": "aws_token_processed",
"csvFieldSeparator": ",",
"csvHeaderDefineChar": "header_define_char",
"csvLazyQuotes": false,
"csvRowLength": 2,
"kafkaGroupID": "group_id",
"kafkaMaxWait": "2s",
"kafkaTopic": "topic",
"kafkaTopicProcessed": "topic_processed",
"natsCertificateAuthority": "ca",
"natsCertificateAuthorityProcessed": "ca_processed",
"natsClientCertificate": "cc",
"natsClientCertificateProcessed": "cc_processed",
"natsClientKey": "ck",
"natsClientKeyProcessed": "ck_processed",
"natsConsumerName": "consumer_name",
"natsJWTFile": "jsw_file",
"natsJWTFileProcessed": "jwt_file_processed",
"natsJetStream": false,
"natsJetStreamMaxWait": "2s",
"natsJetStreamMaxWaitProcessed": "2s",
"natsJetStreamProcessed": false,
"natsQueueID": "queue_id",
"natsSeedFile": "seed_file",
"natsSeedFileProcessed": "seed_file_processed",
"natsSubject": "subject",
"natsSubjectProcessed": "subject_processed",
"partialCacheAction": "partial_cache_action",
"partialOrderField": "partial_order_field",
"partialPath": "/tmp/path",
"partialcsvFieldSeparator": ";",
"s3BucketID": "bucket_id",
"s3BucketIDProcessed": "bucket_id_processed",
"s3FolderPathProcessed": "folder_path_processed",
"sqlDBName": "cgrates",
"sqlDBNameProcessed": "cgrates_processed",
"sqlTableName": "cgrates_t1",
"sqlTableNameProcessed": "cgrates_t1_processed",
"sqsQueueID": "queue_id",
"sqsQueueIDProcessed": "queue_id_processed",
"sslMode": "ssl_mode",
"sslModeProcessed": "ssl_mode_processed",
"xmlRootPath": "xml_root_path",
},
}
rcv := erCfg.AsMapInterface("")
if !reflect.DeepEqual(rcv[utils.OptsCfg], exp[utils.OptsCfg]) {
t.Errorf("Expected %v \n but received \n %v", exp[utils.OptsCfg], rcv[utils.OptsCfg])
}
}