From 35d0e9bc710e6c035760c01f7ab2a76b52a5561e Mon Sep 17 00:00:00 2001 From: nickolasdaniel Date: Fri, 5 Nov 2021 17:46:09 +0200 Subject: [PATCH] Unit testing in config --- config/eescfg_test.go | 312 ++++++++++++++++++++++++++ config/erscfg_test.go | 497 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 809 insertions(+) diff --git a/config/eescfg_test.go b/config/eescfg_test.go index 00315e44c..81cb48c69 100644 --- a/config/eescfg_test.go +++ b/config/eescfg_test.go @@ -1360,3 +1360,315 @@ func TestEventExporterOptsClone(t *testing.T) { t.Errorf("Expected %v \n but received \n %v", utils.ToJSON(exp), utils.ToJSON(rcv)) } } + +func TestLoadFromJSONCfg(t *testing.T) { + eeOpts := &EventExporterOpts{} + + eeSJson := &EventExporterOptsJson{ + CSVFieldSeparator: utils.StringPointer(","), + ElsIndex: utils.StringPointer("idx1"), + ElsIfPrimaryTerm: utils.IntPointer(1), + ElsIfSeqNo: utils.IntPointer(2), + ElsOpType: utils.StringPointer("op_type"), + ElsPipeline: utils.StringPointer("pipeline"), + ElsRouting: utils.StringPointer("routing"), + ElsTimeout: utils.StringPointer("2s"), + ElsVersion: utils.IntPointer(1), + ElsVersionType: utils.StringPointer("version_type"), + ElsWaitForActiveShards: utils.StringPointer("wfas"), + SQLMaxIdleConns: utils.IntPointer(5), + SQLMaxOpenConns: utils.IntPointer(10), + SQLConnMaxLifetime: utils.StringPointer("2s"), + SQLTableName: utils.StringPointer("cdrs"), + SQLDBName: utils.StringPointer("cgrates"), + SSLMode: utils.StringPointer("sslm"), + KafkaTopic: utils.StringPointer("topic1"), + AMQPRoutingKey: utils.StringPointer("routing_key"), + AMQPQueueID: utils.StringPointer("queue_id"), + AMQPExchange: utils.StringPointer("amqp_exchange"), + AMQPExchangeType: utils.StringPointer("amqp_exchange_type"), + AWSRegion: utils.StringPointer("utc"), + AWSKey: utils.StringPointer("aws_key"), + AWSSecret: utils.StringPointer("aws_secret"), + AWSToken: utils.StringPointer("aws_token"), + SQSQueueID: utils.StringPointer("sqs_queue_id"), + S3BucketID: utils.StringPointer("s3_bucket_id"), + S3FolderPath: utils.StringPointer("s3_folder_path"), + NATSJetStream: utils.BoolPointer(false), + NATSSubject: utils.StringPointer("ees_nats"), + NATSJWTFile: utils.StringPointer("/path/to/jwt"), + NATSSeedFile: utils.StringPointer("/path/to/seed"), + NATSCertificateAuthority: utils.StringPointer("ca"), + NATSClientCertificate: utils.StringPointer("cc"), + NATSClientKey: utils.StringPointer("ck"), + NATSJetStreamMaxWait: utils.StringPointer("2s"), + RPCCodec: utils.StringPointer("rpccodec"), + ServiceMethod: utils.StringPointer("service_method"), + KeyPath: utils.StringPointer("/path/to/key"), + CertPath: utils.StringPointer("cp"), + CAPath: utils.StringPointer("ca_path"), + TLS: utils.BoolPointer(false), + RPCConnTimeout: utils.StringPointer("2s"), + RPCReplyTimeout: utils.StringPointer("2s"), + } + + exp := &EventExporterOpts{ + CSVFieldSeparator: utils.StringPointer(","), + ElsIndex: utils.StringPointer("idx1"), + ElsIfPrimaryTerm: utils.IntPointer(1), + ElsIfSeqNo: utils.IntPointer(2), + ElsOpType: utils.StringPointer("op_type"), + ElsPipeline: utils.StringPointer("pipeline"), + ElsRouting: utils.StringPointer("routing"), + ElsTimeout: utils.DurationPointer(2 * time.Second), + ElsVersion: utils.IntPointer(1), + ElsVersionType: utils.StringPointer("version_type"), + ElsWaitForActiveShards: utils.StringPointer("wfas"), + SQLMaxIdleConns: utils.IntPointer(5), + SQLMaxOpenConns: utils.IntPointer(10), + SQLConnMaxLifetime: utils.DurationPointer(2 * time.Second), + SQLTableName: utils.StringPointer("cdrs"), + SQLDBName: utils.StringPointer("cgrates"), + SSLMode: utils.StringPointer("sslm"), + KafkaTopic: utils.StringPointer("topic1"), + AMQPRoutingKey: utils.StringPointer("routing_key"), + AMQPQueueID: utils.StringPointer("queue_id"), + AMQPExchange: utils.StringPointer("amqp_exchange"), + AMQPExchangeType: utils.StringPointer("amqp_exchange_type"), + AWSRegion: utils.StringPointer("utc"), + AWSKey: utils.StringPointer("aws_key"), + AWSSecret: utils.StringPointer("aws_secret"), + AWSToken: utils.StringPointer("aws_token"), + SQSQueueID: utils.StringPointer("sqs_queue_id"), + S3BucketID: utils.StringPointer("s3_bucket_id"), + S3FolderPath: utils.StringPointer("s3_folder_path"), + NATSJetStream: utils.BoolPointer(false), + NATSSubject: utils.StringPointer("ees_nats"), + NATSJWTFile: utils.StringPointer("/path/to/jwt"), + NATSSeedFile: utils.StringPointer("/path/to/seed"), + NATSCertificateAuthority: utils.StringPointer("ca"), + NATSClientCertificate: utils.StringPointer("cc"), + NATSClientKey: utils.StringPointer("ck"), + NATSJetStreamMaxWait: utils.DurationPointer(2 * time.Second), + RPCCodec: utils.StringPointer("rpccodec"), + ServiceMethod: utils.StringPointer("service_method"), + KeyPath: utils.StringPointer("/path/to/key"), + CertPath: utils.StringPointer("cp"), + CAPath: utils.StringPointer("ca_path"), + TLS: utils.BoolPointer(false), + RPCConnTimeout: utils.DurationPointer(2 * time.Second), + RPCReplyTimeout: utils.DurationPointer(2 * time.Second), + } + + if err := eeOpts.loadFromJSONCfg(eeSJson); err != nil { + t.Error(err) + } + + if !reflect.DeepEqual(exp, eeOpts) { + t.Errorf("Expected %v \n but received \n %v", utils.ToJSON(exp), utils.ToJSON(eeOpts)) + } + + //check with empty json config + eeSJson = nil + if err := eeOpts.loadFromJSONCfg(eeSJson); err != nil { + t.Error(err) + } +} + +func TestLoadFromJsonParseErrors(t *testing.T) { + eeOpts := &EventExporterOpts{} + + eeSJson := &EventExporterOptsJson{ + CSVFieldSeparator: utils.StringPointer(","), + ElsIndex: utils.StringPointer("idx1"), + ElsIfPrimaryTerm: utils.IntPointer(1), + ElsIfSeqNo: utils.IntPointer(2), + ElsOpType: utils.StringPointer("op_type"), + ElsPipeline: utils.StringPointer("pipeline"), + ElsRouting: utils.StringPointer("routing"), + ElsTimeout: utils.StringPointer("2c"), + ElsVersion: utils.IntPointer(1), + ElsVersionType: utils.StringPointer("version_type"), + ElsWaitForActiveShards: utils.StringPointer("wfas"), + SQLMaxIdleConns: utils.IntPointer(5), + SQLMaxOpenConns: utils.IntPointer(10), + SQLConnMaxLifetime: utils.StringPointer("2s"), + SQLTableName: utils.StringPointer("cdrs"), + SQLDBName: utils.StringPointer("cgrates"), + SSLMode: utils.StringPointer("sslm"), + KafkaTopic: utils.StringPointer("topic1"), + AMQPRoutingKey: utils.StringPointer("routing_key"), + AMQPQueueID: utils.StringPointer("queue_id"), + AMQPExchange: utils.StringPointer("amqp_exchange"), + AMQPExchangeType: utils.StringPointer("amqp_exchange_type"), + AWSRegion: utils.StringPointer("utc"), + AWSKey: utils.StringPointer("aws_key"), + AWSSecret: utils.StringPointer("aws_secret"), + AWSToken: utils.StringPointer("aws_token"), + SQSQueueID: utils.StringPointer("sqs_queue_id"), + S3BucketID: utils.StringPointer("s3_bucket_id"), + S3FolderPath: utils.StringPointer("s3_folder_path"), + NATSJetStream: utils.BoolPointer(false), + NATSSubject: utils.StringPointer("ees_nats"), + NATSJWTFile: utils.StringPointer("/path/to/jwt"), + NATSSeedFile: utils.StringPointer("/path/to/seed"), + NATSCertificateAuthority: utils.StringPointer("ca"), + NATSClientCertificate: utils.StringPointer("cc"), + NATSClientKey: utils.StringPointer("ck"), + NATSJetStreamMaxWait: utils.StringPointer("2s"), + RPCCodec: utils.StringPointer("rpccodec"), + ServiceMethod: utils.StringPointer("service_method"), + KeyPath: utils.StringPointer("/path/to/key"), + CertPath: utils.StringPointer("cp"), + CAPath: utils.StringPointer("ca_path"), + TLS: utils.BoolPointer(false), + RPCConnTimeout: utils.StringPointer("2s"), + RPCReplyTimeout: utils.StringPointer("2s"), + } + + errExp := `time: unknown unit "c" in duration "2c"` + if err := eeOpts.loadFromJSONCfg(eeSJson); err == nil || err.Error() != errExp { + t.Errorf("Expected %v \n but received \n %v", errExp, err.Error()) + } + eeSJson.ElsTimeout = utils.StringPointer("2s") + + /////// + + eeSJson.SQLConnMaxLifetime = utils.StringPointer("2c") + if err := eeOpts.loadFromJSONCfg(eeSJson); err == nil || err.Error() != errExp { + t.Errorf("Expected %v \n but received \n %v", errExp, err.Error()) + } + eeSJson.SQLConnMaxLifetime = utils.StringPointer("2s") + + ////// + + eeSJson.NATSJetStreamMaxWait = utils.StringPointer("2c") + if err := eeOpts.loadFromJSONCfg(eeSJson); err == nil || err.Error() != errExp { + t.Errorf("Expected %v \n but received \n %v", errExp, err.Error()) + } + eeSJson.NATSJetStreamMaxWait = utils.StringPointer("2s") + + ///// + + eeSJson.RPCConnTimeout = utils.StringPointer("2c") + if err := eeOpts.loadFromJSONCfg(eeSJson); err == nil || err.Error() != errExp { + t.Errorf("Expected %v \n but received \n %v", errExp, err.Error()) + } + eeSJson.RPCConnTimeout = utils.StringPointer("2s") + + ///// + + eeSJson.RPCReplyTimeout = utils.StringPointer("2c") + if err := eeOpts.loadFromJSONCfg(eeSJson); err == nil || err.Error() != errExp { + t.Errorf("Expected %v \n but received \n %v", errExp, err.Error()) + } + eeSJson.RPCReplyTimeout = utils.StringPointer("2s") +} + +func TestEEsAsMapInterface(t *testing.T) { + eeCfg := &EventExporterCfg{ + Opts: &EventExporterOpts{ + CSVFieldSeparator: utils.StringPointer(","), + ElsIndex: utils.StringPointer("idx1"), + ElsIfPrimaryTerm: utils.IntPointer(1), + ElsIfSeqNo: utils.IntPointer(2), + ElsOpType: utils.StringPointer("op_type"), + ElsPipeline: utils.StringPointer("pipeline"), + ElsRouting: utils.StringPointer("routing"), + ElsTimeout: utils.DurationPointer(2 * time.Second), + ElsVersion: utils.IntPointer(1), + ElsVersionType: utils.StringPointer("version_type"), + ElsWaitForActiveShards: utils.StringPointer("wfas"), + SQLMaxIdleConns: utils.IntPointer(5), + SQLMaxOpenConns: utils.IntPointer(10), + SQLConnMaxLifetime: utils.DurationPointer(2 * time.Second), + SQLTableName: utils.StringPointer("cdrs"), + SQLDBName: utils.StringPointer("cgrates"), + SSLMode: utils.StringPointer("sslm"), + KafkaTopic: utils.StringPointer("topic1"), + AMQPRoutingKey: utils.StringPointer("routing_key"), + AMQPQueueID: utils.StringPointer("queue_id"), + AMQPExchange: utils.StringPointer("amqp_exchange"), + AMQPExchangeType: utils.StringPointer("amqp_exchange_type"), + AWSRegion: utils.StringPointer("utc"), + AWSKey: utils.StringPointer("aws_key"), + AWSSecret: utils.StringPointer("aws_secret"), + AWSToken: utils.StringPointer("aws_token"), + SQSQueueID: utils.StringPointer("sqs_queue_id"), + S3BucketID: utils.StringPointer("s3_bucket_id"), + S3FolderPath: utils.StringPointer("s3_folder_path"), + NATSJetStream: utils.BoolPointer(false), + NATSSubject: utils.StringPointer("ees_nats"), + NATSJWTFile: utils.StringPointer("/path/to/jwt"), + NATSSeedFile: utils.StringPointer("/path/to/seed"), + NATSCertificateAuthority: utils.StringPointer("ca"), + NATSClientCertificate: utils.StringPointer("cc"), + NATSClientKey: utils.StringPointer("ck"), + NATSJetStreamMaxWait: utils.DurationPointer(2 * time.Second), + RPCCodec: utils.StringPointer("rpccodec"), + ServiceMethod: utils.StringPointer("service_method"), + KeyPath: utils.StringPointer("/path/to/key"), + CertPath: utils.StringPointer("cp"), + CAPath: utils.StringPointer("ca_path"), + TLS: utils.BoolPointer(false), + RPCConnTimeout: utils.DurationPointer(2 * time.Second), + RPCReplyTimeout: utils.DurationPointer(2 * time.Second), + }, + } + + exp := map[string]interface{}{ + "opts": map[string]interface{}{ + "TLS": false, + "amqpExchange": "amqp_exchange", + "amqpExchangeType": "amqp_exchange_type", + "amqpQueueID": "queue_id", + "amqpRoutingKey": "routing_key", + "awsKey": "aws_key", + "awsRegion": "utc", + "awsSecret": "aws_secret", + "awsToken": "aws_token", + "caPath": "ca_path", + "certPath": "cp", + "csvFieldSeparator": ",", + "elsIfPrimaryTerm": 1, + "elsIfSeqNo": 2, + "elsIndex": "idx1", + "elsOpType": "op_type", + "elsPipeline": "pipeline", + "elsRouting": "routing", + "elsTimeout": "2s", + "elsVersion": 1, + "elsVersionType": "version_type", + "elsWaitForActiveShards": "wfas", + "kafkaTopic": "topic1", + "keyPath": "/path/to/key", + "natsCertificateAuthority": "ca", + "natsClientCertificate": "cc", + "natsClientKey": "ck", + "natsJWTFile": "/path/to/jwt", + "natsJetStream": false, + "natsJetStreamMaxWait": "2s", + "natsSeedFile": "/path/to/seed", + "natsSubject": "ees_nats", + "rpcCodec": "rpccodec", + "rpcConnTimeout": "2s", + "rpcReplyTimeout": "2s", + "s3BucketID": "s3_bucket_id", + "s3FolderPath": "s3_folder_path", + "serviceMethod": "service_method", + "sqlConnMaxLifetime": "2s", + "sqlDBName": "cgrates", + "sqlMaxIdleConns": 5, + "sqlMaxOpenConns": 10, + "sqlTableName": "cdrs", + "sqsQueueID": "sqs_queue_id", + "sslMode": "sslm", + }, + } + + rcv := eeCfg.AsMapInterface(",") + + if !reflect.DeepEqual(exp[utils.OptsCfg], rcv[utils.OptsCfg]) { + t.Errorf("Expected %v \n but received \n %v", utils.ToJSON(exp), utils.ToJSON(rcv)) + } +} diff --git a/config/erscfg_test.go b/config/erscfg_test.go index c31a369ca..38a2fa57b 100644 --- a/config/erscfg_test.go +++ b/config/erscfg_test.go @@ -1733,3 +1733,500 @@ func TestErSCloneSection(t *testing.T) { t.Errorf("Expected %+v \n but received \n %+v", utils.ToJSON(exp), utils.ToJSON(rcv)) } } + +func TestERsLoadFromJSONCfg(t *testing.T) { + erOpts := &EventReaderOpts{} + + erJson := &EventReaderOptsJson{ + PartialPath: utils.StringPointer("/tmp/path"), + PartialCacheAction: utils.StringPointer("partial_cache_action"), + PartialOrderField: utils.StringPointer("partial_order_field"), + PartialCSVFieldSeparator: utils.StringPointer(";"), + CSVRowLength: utils.IntPointer(2), + CSVFieldSeparator: utils.StringPointer(","), + CSVHeaderDefineChar: utils.StringPointer("header_define_char"), + CSVLazyQuotes: utils.BoolPointer(false), + XMLRootPath: utils.StringPointer("xml_root_path"), + AMQPQueueID: utils.StringPointer("queue_id"), + AMQPQueueIDProcessed: utils.StringPointer("queue_id_processed"), + AMQPConsumerTag: utils.StringPointer("consumer_tag"), + AMQPExchange: utils.StringPointer("exchange"), + AMQPExchangeType: utils.StringPointer("exchange_type"), + AMQPRoutingKey: utils.StringPointer("routing_key"), + AMQPExchangeProcessed: utils.StringPointer("exchange_processed"), + AMQPExchangeTypeProcessed: utils.StringPointer("excange_type_processed"), + AMQPRoutingKeyProcessed: utils.StringPointer("routing_key_processed"), + KafkaTopic: utils.StringPointer("topic"), + KafkaGroupID: utils.StringPointer("group_id"), + KafkaMaxWait: utils.StringPointer("2s"), + KafkaTopicProcessed: utils.StringPointer("topic_processed"), + SQLDBName: utils.StringPointer("cgrates"), + SQLTableName: utils.StringPointer("cgrates_t1"), + SSLMode: utils.StringPointer("ssl_mode"), + SQLDBNameProcessed: utils.StringPointer("cgrates_processed"), + SQLTableNameProcessed: utils.StringPointer("cgrates_t1_processed"), + SSLModeProcessed: utils.StringPointer("ssl_mode_processed"), + AWSRegion: utils.StringPointer("us-west"), + AWSKey: utils.StringPointer("aws_key"), + AWSSecret: utils.StringPointer("aws_secret"), + AWSToken: utils.StringPointer("aws_token"), + AWSRegionProcessed: utils.StringPointer("region_processed"), + AWSKeyProcessed: utils.StringPointer("aws_key_processed"), + AWSSecretProcessed: utils.StringPointer("aws_secret_processed"), + AWSTokenProcessed: utils.StringPointer("aws_token_processed"), + SQSQueueID: utils.StringPointer("queue_id"), + SQSQueueIDProcessed: utils.StringPointer("queue_id_processed"), + S3BucketID: utils.StringPointer("bucket_id"), + S3FolderPathProcessed: utils.StringPointer("folder_path_processed"), + S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"), + NATSJetStream: utils.BoolPointer(false), + NATSConsumerName: utils.StringPointer("consumer_name"), + NATSSubject: utils.StringPointer("subject"), + NATSQueueID: utils.StringPointer("queue_id"), + NATSJWTFile: utils.StringPointer("jsw_file"), + NATSSeedFile: utils.StringPointer("seed_file"), + NATSCertificateAuthority: utils.StringPointer("ca"), + NATSClientCertificate: utils.StringPointer("cc"), + NATSClientKey: utils.StringPointer("ck"), + NATSJetStreamMaxWait: utils.StringPointer("2s"), + NATSJetStreamProcessed: utils.BoolPointer(false), + NATSSubjectProcessed: utils.StringPointer("subject_processed"), + NATSJWTFileProcessed: utils.StringPointer("jwt_file_processed"), + NATSSeedFileProcessed: utils.StringPointer("seed_file_processed"), + NATSCertificateAuthorityProcessed: utils.StringPointer("ca_processed"), + NATSClientCertificateProcessed: utils.StringPointer("cc_processed"), + NATSClientKeyProcessed: utils.StringPointer("ck_processed"), + NATSJetStreamMaxWaitProcessed: utils.StringPointer("2s"), + } + + exp := &EventReaderOpts{ + PartialPath: utils.StringPointer("/tmp/path"), + PartialCacheAction: utils.StringPointer("partial_cache_action"), + PartialOrderField: utils.StringPointer("partial_order_field"), + PartialCSVFieldSeparator: utils.StringPointer(";"), + CSVRowLength: utils.IntPointer(2), + CSVFieldSeparator: utils.StringPointer(","), + CSVHeaderDefineChar: utils.StringPointer("header_define_char"), + CSVLazyQuotes: utils.BoolPointer(false), + XMLRootPath: utils.StringPointer("xml_root_path"), + AMQPQueueID: utils.StringPointer("queue_id"), + AMQPQueueIDProcessed: utils.StringPointer("queue_id_processed"), + AMQPConsumerTag: utils.StringPointer("consumer_tag"), + AMQPExchange: utils.StringPointer("exchange"), + AMQPExchangeType: utils.StringPointer("exchange_type"), + AMQPRoutingKey: utils.StringPointer("routing_key"), + AMQPExchangeProcessed: utils.StringPointer("exchange_processed"), + AMQPExchangeTypeProcessed: utils.StringPointer("excange_type_processed"), + AMQPRoutingKeyProcessed: utils.StringPointer("routing_key_processed"), + KafkaTopic: utils.StringPointer("topic"), + KafkaGroupID: utils.StringPointer("group_id"), + KafkaMaxWait: utils.DurationPointer(2 * time.Second), + KafkaTopicProcessed: utils.StringPointer("topic_processed"), + SQLDBName: utils.StringPointer("cgrates"), + SQLTableName: utils.StringPointer("cgrates_t1"), + SSLMode: utils.StringPointer("ssl_mode"), + SQLDBNameProcessed: utils.StringPointer("cgrates_processed"), + SQLTableNameProcessed: utils.StringPointer("cgrates_t1_processed"), + SSLModeProcessed: utils.StringPointer("ssl_mode_processed"), + AWSRegion: utils.StringPointer("us-west"), + AWSKey: utils.StringPointer("aws_key"), + AWSSecret: utils.StringPointer("aws_secret"), + AWSToken: utils.StringPointer("aws_token"), + AWSRegionProcessed: utils.StringPointer("region_processed"), + AWSKeyProcessed: utils.StringPointer("aws_key_processed"), + AWSSecretProcessed: utils.StringPointer("aws_secret_processed"), + AWSTokenProcessed: utils.StringPointer("aws_token_processed"), + SQSQueueID: utils.StringPointer("queue_id"), + SQSQueueIDProcessed: utils.StringPointer("queue_id_processed"), + S3BucketID: utils.StringPointer("bucket_id"), + S3FolderPathProcessed: utils.StringPointer("folder_path_processed"), + S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"), + NATSJetStream: utils.BoolPointer(false), + NATSConsumerName: utils.StringPointer("consumer_name"), + NATSSubject: utils.StringPointer("subject"), + NATSQueueID: utils.StringPointer("queue_id"), + NATSJWTFile: utils.StringPointer("jsw_file"), + NATSSeedFile: utils.StringPointer("seed_file"), + NATSCertificateAuthority: utils.StringPointer("ca"), + NATSClientCertificate: utils.StringPointer("cc"), + NATSClientKey: utils.StringPointer("ck"), + NATSJetStreamMaxWait: utils.DurationPointer(2 * time.Second), + NATSJetStreamProcessed: utils.BoolPointer(false), + NATSSubjectProcessed: utils.StringPointer("subject_processed"), + NATSJWTFileProcessed: utils.StringPointer("jwt_file_processed"), + NATSSeedFileProcessed: utils.StringPointer("seed_file_processed"), + NATSCertificateAuthorityProcessed: utils.StringPointer("ca_processed"), + NATSClientCertificateProcessed: utils.StringPointer("cc_processed"), + NATSClientKeyProcessed: utils.StringPointer("ck_processed"), + NATSJetStreamMaxWaitProcessed: utils.DurationPointer(2 * time.Second), + } + + if err := erOpts.loadFromJSONCfg(erJson); err != nil { + t.Error(err) + } + + if !reflect.DeepEqual(erOpts, exp) { + t.Errorf("Expected %+v \n but received \n %+v", utils.ToJSON(exp), utils.ToJSON(erOpts)) + } + + erJson = nil + if err := erOpts.loadFromJSONCfg(erJson); err != nil { + t.Error(err) + } +} + +func TestERsLoadFromJsonCfgParseError(t *testing.T) { + erOpts := &EventReaderOpts{} + + erJson := &EventReaderOptsJson{ + PartialPath: utils.StringPointer("/tmp/path"), + PartialCacheAction: utils.StringPointer("partial_cache_action"), + PartialOrderField: utils.StringPointer("partial_order_field"), + PartialCSVFieldSeparator: utils.StringPointer(";"), + CSVRowLength: utils.IntPointer(2), + CSVFieldSeparator: utils.StringPointer(","), + CSVHeaderDefineChar: utils.StringPointer("header_define_char"), + CSVLazyQuotes: utils.BoolPointer(false), + XMLRootPath: utils.StringPointer("xml_root_path"), + AMQPQueueID: utils.StringPointer("queue_id"), + AMQPQueueIDProcessed: utils.StringPointer("queue_id_processed"), + AMQPConsumerTag: utils.StringPointer("consumer_tag"), + AMQPExchange: utils.StringPointer("exchange"), + AMQPExchangeType: utils.StringPointer("exchange_type"), + AMQPRoutingKey: utils.StringPointer("routing_key"), + AMQPExchangeProcessed: utils.StringPointer("exchange_processed"), + AMQPExchangeTypeProcessed: utils.StringPointer("excange_type_processed"), + AMQPRoutingKeyProcessed: utils.StringPointer("routing_key_processed"), + KafkaTopic: utils.StringPointer("topic"), + KafkaGroupID: utils.StringPointer("group_id"), + KafkaMaxWait: utils.StringPointer("2s"), + KafkaTopicProcessed: utils.StringPointer("topic_processed"), + SQLDBName: utils.StringPointer("cgrates"), + SQLTableName: utils.StringPointer("cgrates_t1"), + SSLMode: utils.StringPointer("ssl_mode"), + SQLDBNameProcessed: utils.StringPointer("cgrates_processed"), + SQLTableNameProcessed: utils.StringPointer("cgrates_t1_processed"), + SSLModeProcessed: utils.StringPointer("ssl_mode_processed"), + AWSRegion: utils.StringPointer("us-west"), + AWSKey: utils.StringPointer("aws_key"), + AWSSecret: utils.StringPointer("aws_secret"), + AWSToken: utils.StringPointer("aws_token"), + AWSRegionProcessed: utils.StringPointer("region_processed"), + AWSKeyProcessed: utils.StringPointer("aws_key_processed"), + AWSSecretProcessed: utils.StringPointer("aws_secret_processed"), + AWSTokenProcessed: utils.StringPointer("aws_token_processed"), + SQSQueueID: utils.StringPointer("queue_id"), + SQSQueueIDProcessed: utils.StringPointer("queue_id_processed"), + S3BucketID: utils.StringPointer("bucket_id"), + S3FolderPathProcessed: utils.StringPointer("folder_path_processed"), + S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"), + NATSJetStream: utils.BoolPointer(false), + NATSConsumerName: utils.StringPointer("consumer_name"), + NATSSubject: utils.StringPointer("subject"), + NATSQueueID: utils.StringPointer("queue_id"), + NATSJWTFile: utils.StringPointer("jsw_file"), + NATSSeedFile: utils.StringPointer("seed_file"), + NATSCertificateAuthority: utils.StringPointer("ca"), + NATSClientCertificate: utils.StringPointer("cc"), + NATSClientKey: utils.StringPointer("ck"), + NATSJetStreamMaxWait: utils.StringPointer("2s"), + NATSJetStreamProcessed: utils.BoolPointer(false), + NATSSubjectProcessed: utils.StringPointer("subject_processed"), + NATSJWTFileProcessed: utils.StringPointer("jwt_file_processed"), + NATSSeedFileProcessed: utils.StringPointer("seed_file_processed"), + NATSCertificateAuthorityProcessed: utils.StringPointer("ca_processed"), + NATSClientCertificateProcessed: utils.StringPointer("cc_processed"), + NATSClientKeyProcessed: utils.StringPointer("ck_processed"), + NATSJetStreamMaxWaitProcessed: utils.StringPointer("2s"), + } + + errExp := `time: unknown unit "c" in duration "2c"` + + erJson.KafkaMaxWait = utils.StringPointer("2c") + if err := erOpts.loadFromJSONCfg(erJson); err == nil || err.Error() != errExp { + t.Errorf("Expected %v \n but received \n %v", errExp, err.Error()) + } + erJson.KafkaMaxWait = utils.StringPointer("2s") + + ///// + + erJson.NATSJetStreamMaxWait = utils.StringPointer("2c") + if err := erOpts.loadFromJSONCfg(erJson); err == nil || err.Error() != errExp { + t.Errorf("Expected %v \n but received \n %v", errExp, err.Error()) + } + erJson.NATSJetStreamMaxWait = utils.StringPointer("2s") + + ///// + + erJson.NATSJetStreamMaxWaitProcessed = utils.StringPointer("2c") + if err := erOpts.loadFromJSONCfg(erJson); err == nil || err.Error() != errExp { + t.Errorf("Expected %v \n but received \n %v", errExp, err.Error()) + } + erJson.NATSJetStreamMaxWaitProcessed = utils.StringPointer("2s") +} + +func TestERsClone(t *testing.T) { + erOpts := &EventReaderOpts{ + PartialPath: utils.StringPointer("/tmp/path"), + PartialCacheAction: utils.StringPointer("partial_cache_action"), + PartialOrderField: utils.StringPointer("partial_order_field"), + PartialCSVFieldSeparator: utils.StringPointer(";"), + CSVRowLength: utils.IntPointer(2), + CSVFieldSeparator: utils.StringPointer(","), + CSVHeaderDefineChar: utils.StringPointer("header_define_char"), + CSVLazyQuotes: utils.BoolPointer(false), + XMLRootPath: utils.StringPointer("xml_root_path"), + AMQPQueueID: utils.StringPointer("queue_id"), + AMQPQueueIDProcessed: utils.StringPointer("queue_id_processed"), + AMQPConsumerTag: utils.StringPointer("consumer_tag"), + AMQPExchange: utils.StringPointer("exchange"), + AMQPExchangeType: utils.StringPointer("exchange_type"), + AMQPRoutingKey: utils.StringPointer("routing_key"), + AMQPExchangeProcessed: utils.StringPointer("exchange_processed"), + AMQPExchangeTypeProcessed: utils.StringPointer("excange_type_processed"), + AMQPRoutingKeyProcessed: utils.StringPointer("routing_key_processed"), + KafkaTopic: utils.StringPointer("topic"), + KafkaGroupID: utils.StringPointer("group_id"), + KafkaMaxWait: utils.DurationPointer(2 * time.Second), + KafkaTopicProcessed: utils.StringPointer("topic_processed"), + SQLDBName: utils.StringPointer("cgrates"), + SQLTableName: utils.StringPointer("cgrates_t1"), + SSLMode: utils.StringPointer("ssl_mode"), + SQLDBNameProcessed: utils.StringPointer("cgrates_processed"), + SQLTableNameProcessed: utils.StringPointer("cgrates_t1_processed"), + SSLModeProcessed: utils.StringPointer("ssl_mode_processed"), + AWSRegion: utils.StringPointer("us-west"), + AWSKey: utils.StringPointer("aws_key"), + AWSSecret: utils.StringPointer("aws_secret"), + AWSToken: utils.StringPointer("aws_token"), + AWSRegionProcessed: utils.StringPointer("region_processed"), + AWSKeyProcessed: utils.StringPointer("aws_key_processed"), + AWSSecretProcessed: utils.StringPointer("aws_secret_processed"), + AWSTokenProcessed: utils.StringPointer("aws_token_processed"), + SQSQueueID: utils.StringPointer("queue_id"), + SQSQueueIDProcessed: utils.StringPointer("queue_id_processed"), + S3BucketID: utils.StringPointer("bucket_id"), + S3FolderPathProcessed: utils.StringPointer("folder_path_processed"), + S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"), + NATSJetStream: utils.BoolPointer(false), + NATSConsumerName: utils.StringPointer("consumer_name"), + NATSSubject: utils.StringPointer("subject"), + NATSQueueID: utils.StringPointer("queue_id"), + NATSJWTFile: utils.StringPointer("jsw_file"), + NATSSeedFile: utils.StringPointer("seed_file"), + NATSCertificateAuthority: utils.StringPointer("ca"), + NATSClientCertificate: utils.StringPointer("cc"), + NATSClientKey: utils.StringPointer("ck"), + NATSJetStreamMaxWait: utils.DurationPointer(2 * time.Second), + NATSJetStreamProcessed: utils.BoolPointer(false), + NATSSubjectProcessed: utils.StringPointer("subject_processed"), + NATSJWTFileProcessed: utils.StringPointer("jwt_file_processed"), + NATSSeedFileProcessed: utils.StringPointer("seed_file_processed"), + NATSCertificateAuthorityProcessed: utils.StringPointer("ca_processed"), + NATSClientCertificateProcessed: utils.StringPointer("cc_processed"), + NATSClientKeyProcessed: utils.StringPointer("ck_processed"), + NATSJetStreamMaxWaitProcessed: utils.DurationPointer(2 * time.Second), + } + + exp := &EventReaderOpts{ + PartialPath: utils.StringPointer("/tmp/path"), + PartialCacheAction: utils.StringPointer("partial_cache_action"), + PartialOrderField: utils.StringPointer("partial_order_field"), + PartialCSVFieldSeparator: utils.StringPointer(";"), + CSVRowLength: utils.IntPointer(2), + CSVFieldSeparator: utils.StringPointer(","), + CSVHeaderDefineChar: utils.StringPointer("header_define_char"), + CSVLazyQuotes: utils.BoolPointer(false), + XMLRootPath: utils.StringPointer("xml_root_path"), + AMQPQueueID: utils.StringPointer("queue_id"), + AMQPQueueIDProcessed: utils.StringPointer("queue_id_processed"), + AMQPConsumerTag: utils.StringPointer("consumer_tag"), + AMQPExchange: utils.StringPointer("exchange"), + AMQPExchangeType: utils.StringPointer("exchange_type"), + AMQPRoutingKey: utils.StringPointer("routing_key"), + AMQPExchangeProcessed: utils.StringPointer("exchange_processed"), + AMQPExchangeTypeProcessed: utils.StringPointer("excange_type_processed"), + AMQPRoutingKeyProcessed: utils.StringPointer("routing_key_processed"), + KafkaTopic: utils.StringPointer("topic"), + KafkaGroupID: utils.StringPointer("group_id"), + KafkaMaxWait: utils.DurationPointer(2 * time.Second), + KafkaTopicProcessed: utils.StringPointer("topic_processed"), + SQLDBName: utils.StringPointer("cgrates"), + SQLTableName: utils.StringPointer("cgrates_t1"), + SSLMode: utils.StringPointer("ssl_mode"), + SQLDBNameProcessed: utils.StringPointer("cgrates_processed"), + SQLTableNameProcessed: utils.StringPointer("cgrates_t1_processed"), + SSLModeProcessed: utils.StringPointer("ssl_mode_processed"), + AWSRegion: utils.StringPointer("us-west"), + AWSKey: utils.StringPointer("aws_key"), + AWSSecret: utils.StringPointer("aws_secret"), + AWSToken: utils.StringPointer("aws_token"), + AWSRegionProcessed: utils.StringPointer("region_processed"), + AWSKeyProcessed: utils.StringPointer("aws_key_processed"), + AWSSecretProcessed: utils.StringPointer("aws_secret_processed"), + AWSTokenProcessed: utils.StringPointer("aws_token_processed"), + SQSQueueID: utils.StringPointer("queue_id"), + SQSQueueIDProcessed: utils.StringPointer("queue_id_processed"), + S3BucketID: utils.StringPointer("bucket_id"), + S3FolderPathProcessed: utils.StringPointer("folder_path_processed"), + S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"), + NATSJetStream: utils.BoolPointer(false), + NATSConsumerName: utils.StringPointer("consumer_name"), + NATSSubject: utils.StringPointer("subject"), + NATSQueueID: utils.StringPointer("queue_id"), + NATSJWTFile: utils.StringPointer("jsw_file"), + NATSSeedFile: utils.StringPointer("seed_file"), + NATSCertificateAuthority: utils.StringPointer("ca"), + NATSClientCertificate: utils.StringPointer("cc"), + NATSClientKey: utils.StringPointer("ck"), + NATSJetStreamMaxWait: utils.DurationPointer(2 * time.Second), + NATSJetStreamProcessed: utils.BoolPointer(false), + NATSSubjectProcessed: utils.StringPointer("subject_processed"), + NATSJWTFileProcessed: utils.StringPointer("jwt_file_processed"), + NATSSeedFileProcessed: utils.StringPointer("seed_file_processed"), + NATSCertificateAuthorityProcessed: utils.StringPointer("ca_processed"), + NATSClientCertificateProcessed: utils.StringPointer("cc_processed"), + NATSClientKeyProcessed: utils.StringPointer("ck_processed"), + NATSJetStreamMaxWaitProcessed: utils.DurationPointer(2 * time.Second), + } + + rcv := erOpts.Clone() + if !reflect.DeepEqual(rcv, exp) { + t.Errorf("Expected %+v \n but received \n %+v", utils.ToJSON(exp), utils.ToJSON(rcv)) + } +} + +func TestERsAsMapInterface(t *testing.T) { + erCfg := &EventReaderCfg{ + Opts: &EventReaderOpts{ + PartialPath: utils.StringPointer("/tmp/path"), + PartialCacheAction: utils.StringPointer("partial_cache_action"), + PartialOrderField: utils.StringPointer("partial_order_field"), + PartialCSVFieldSeparator: utils.StringPointer(";"), + CSVRowLength: utils.IntPointer(2), + CSVFieldSeparator: utils.StringPointer(","), + CSVHeaderDefineChar: utils.StringPointer("header_define_char"), + CSVLazyQuotes: utils.BoolPointer(false), + XMLRootPath: utils.StringPointer("xml_root_path"), + AMQPQueueID: utils.StringPointer("queue_id"), + AMQPQueueIDProcessed: utils.StringPointer("queue_id_processed"), + AMQPConsumerTag: utils.StringPointer("consumer_tag"), + AMQPExchange: utils.StringPointer("exchange"), + AMQPExchangeType: utils.StringPointer("exchange_type"), + AMQPRoutingKey: utils.StringPointer("routing_key"), + AMQPExchangeProcessed: utils.StringPointer("exchange_processed"), + AMQPExchangeTypeProcessed: utils.StringPointer("excange_type_processed"), + AMQPRoutingKeyProcessed: utils.StringPointer("routing_key_processed"), + KafkaTopic: utils.StringPointer("topic"), + KafkaGroupID: utils.StringPointer("group_id"), + KafkaMaxWait: utils.DurationPointer(2 * time.Second), + KafkaTopicProcessed: utils.StringPointer("topic_processed"), + SQLDBName: utils.StringPointer("cgrates"), + SQLTableName: utils.StringPointer("cgrates_t1"), + SSLMode: utils.StringPointer("ssl_mode"), + SQLDBNameProcessed: utils.StringPointer("cgrates_processed"), + SQLTableNameProcessed: utils.StringPointer("cgrates_t1_processed"), + SSLModeProcessed: utils.StringPointer("ssl_mode_processed"), + AWSRegion: utils.StringPointer("us-west"), + AWSKey: utils.StringPointer("aws_key"), + AWSSecret: utils.StringPointer("aws_secret"), + AWSToken: utils.StringPointer("aws_token"), + AWSRegionProcessed: utils.StringPointer("region_processed"), + AWSKeyProcessed: utils.StringPointer("aws_key_processed"), + AWSSecretProcessed: utils.StringPointer("aws_secret_processed"), + AWSTokenProcessed: utils.StringPointer("aws_token_processed"), + SQSQueueID: utils.StringPointer("queue_id"), + SQSQueueIDProcessed: utils.StringPointer("queue_id_processed"), + S3BucketID: utils.StringPointer("bucket_id"), + S3FolderPathProcessed: utils.StringPointer("folder_path_processed"), + S3BucketIDProcessed: utils.StringPointer("bucket_id_processed"), + NATSJetStream: utils.BoolPointer(false), + NATSConsumerName: utils.StringPointer("consumer_name"), + NATSSubject: utils.StringPointer("subject"), + NATSQueueID: utils.StringPointer("queue_id"), + NATSJWTFile: utils.StringPointer("jsw_file"), + NATSSeedFile: utils.StringPointer("seed_file"), + NATSCertificateAuthority: utils.StringPointer("ca"), + NATSClientCertificate: utils.StringPointer("cc"), + NATSClientKey: utils.StringPointer("ck"), + NATSJetStreamMaxWait: utils.DurationPointer(2 * time.Second), + NATSJetStreamProcessed: utils.BoolPointer(false), + NATSSubjectProcessed: utils.StringPointer("subject_processed"), + NATSJWTFileProcessed: utils.StringPointer("jwt_file_processed"), + NATSSeedFileProcessed: utils.StringPointer("seed_file_processed"), + NATSCertificateAuthorityProcessed: utils.StringPointer("ca_processed"), + NATSClientCertificateProcessed: utils.StringPointer("cc_processed"), + NATSClientKeyProcessed: utils.StringPointer("ck_processed"), + NATSJetStreamMaxWaitProcessed: utils.DurationPointer(2 * time.Second), + }, + } + + exp := map[string]interface{}{ + "opts": map[string]interface{}{ + "amqpConsumerTag": "consumer_tag", + "amqpExchange": "exchange", + "amqpExchangeProcessed": "exchange_processed", + "amqpExchangeType": "exchange_type", + "amqpExchangeTypeProcessed": "excange_type_processed", + "amqpQueueID": "queue_id", + "amqpQueueIDProcessed": "queue_id_processed", + "amqpRoutingKey": "routing_key", + "amqpRoutingKeyProcessed": "routing_key_processed", + "awsKey": "aws_key", + "awsKeyProcessed": "aws_key_processed", + "awsRegion": "us-west", + "awsRegionProcessed": "region_processed", + "awsSecret": "aws_secret", + "awsSecretProcessed": "aws_secret_processed", + "awsToken": "aws_token", + "awsTokenProcessed": "aws_token_processed", + "csvFieldSeparator": ",", + "csvHeaderDefineChar": "header_define_char", + "csvLazyQuotes": false, + "csvRowLength": 2, + "kafkaGroupID": "group_id", + "kafkaMaxWait": "2s", + "kafkaTopic": "topic", + "kafkaTopicProcessed": "topic_processed", + "natsCertificateAuthority": "ca", + "natsCertificateAuthorityProcessed": "ca_processed", + "natsClientCertificate": "cc", + "natsClientCertificateProcessed": "cc_processed", + "natsClientKey": "ck", + "natsClientKeyProcessed": "ck_processed", + "natsConsumerName": "consumer_name", + "natsJWTFile": "jsw_file", + "natsJWTFileProcessed": "jwt_file_processed", + "natsJetStream": false, + "natsJetStreamMaxWait": "2s", + "natsJetStreamMaxWaitProcessed": "2s", + "natsJetStreamProcessed": false, + "natsQueueID": "queue_id", + "natsSeedFile": "seed_file", + "natsSeedFileProcessed": "seed_file_processed", + "natsSubject": "subject", + "natsSubjectProcessed": "subject_processed", + "partialCacheAction": "partial_cache_action", + "partialOrderField": "partial_order_field", + "partialPath": "/tmp/path", + "partialcsvFieldSeparator": ";", + "s3BucketID": "bucket_id", + "s3BucketIDProcessed": "bucket_id_processed", + "s3FolderPathProcessed": "folder_path_processed", + "sqlDBName": "cgrates", + "sqlDBNameProcessed": "cgrates_processed", + "sqlTableName": "cgrates_t1", + "sqlTableNameProcessed": "cgrates_t1_processed", + "sqsQueueID": "queue_id", + "sqsQueueIDProcessed": "queue_id_processed", + "sslMode": "ssl_mode", + "sslModeProcessed": "ssl_mode_processed", + "xmlRootPath": "xml_root_path", + }, + } + + rcv := erCfg.AsMapInterface("") + if !reflect.DeepEqual(rcv[utils.OptsCfg], exp[utils.OptsCfg]) { + t.Errorf("Expected %v \n but received \n %v", exp[utils.OptsCfg], rcv[utils.OptsCfg]) + } +}