Added Opts to ERs

This commit is contained in:
Trial97
2020-09-02 17:07:42 +03:00
committed by Dan Christian Bogos
parent 8f638b5b85
commit 6c10cf0960
29 changed files with 375 additions and 443 deletions

View File

@@ -1946,7 +1946,10 @@ func testApierReplayFldPosts(t *testing.T) {
bev = []byte(`{"CGRID":"88ed9c38005f07576a1e1af293063833b60edcc6"}`)
fileInPath := path.Join(*args.FailedRequestsInDir, fileName)
ev = &engine.ExportEvents{
Path: "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs",
Path: "amqp://guest:guest@localhost:5672/",
Opts: map[string]interface{}{
"queue_id": "cgrates_cdrs",
},
Format: utils.MetaAMQPjsonMap,
Events: []interface{}{bev},
}

View File

@@ -338,6 +338,7 @@ const CGRATES_CFG_JSON = `
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
"source_path": "/var/spool/cgrates/ers/in", // read data from this path
"processed_path": "/var/spool/cgrates/ers/out", // move processed data here
"opts": {},
"xml_root_path": "", // path towards one event in case of XML CDRs
"tenant": "", // tenant used by import
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>

View File

@@ -58,27 +58,27 @@ func (ers *ERsCfg) appendERsReaders(jsnReaders *[]*EventReaderJsonCfg, msgTempla
return
}
for _, jsnReader := range *jsnReaders {
rdr := new(EventReaderCfg)
if dfltRdrCfg != nil {
rdr = dfltRdrCfg.Clone()
}
var haveID bool
var rdr *EventReaderCfg
if jsnReader.Id != nil {
for _, reader := range ers.Readers {
if reader.ID == *jsnReader.Id {
rdr = reader
haveID = true
break
}
}
}
if rdr == nil {
if dfltRdrCfg != nil {
rdr = dfltRdrCfg.Clone()
} else {
rdr = new(EventReaderCfg)
rdr.Opts = make(map[string]interface{})
}
ers.Readers = append(ers.Readers, rdr)
}
if err := rdr.loadFromJsonCfg(jsnReader, msgTemplates, sep); err != nil {
return err
}
if !haveID {
ers.Readers = append(ers.Readers, rdr)
}
}
return nil
@@ -121,6 +121,7 @@ type EventReaderCfg struct {
ConcurrentReqs int
SourcePath string
ProcessedPath string
Opts map[string]interface{}
XmlRootPath utils.HierarchyPath
Tenant RSRParsers
Timezone string
@@ -222,16 +223,18 @@ func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, msgTemplat
//Clone itself into a new EventReaderCfg
func (er *EventReaderCfg) Clone() (cln *EventReaderCfg) {
cln = new(EventReaderCfg)
cln.ID = er.ID
cln.Type = er.Type
cln.FieldSep = er.FieldSep
cln.HeaderDefineChar = er.HeaderDefineChar
cln.RunDelay = er.RunDelay
cln.ConcurrentReqs = er.ConcurrentReqs
cln.SourcePath = er.SourcePath
cln.ProcessedPath = er.ProcessedPath
cln.XmlRootPath = er.XmlRootPath
cln = &EventReaderCfg{
ID: er.ID,
Type: er.Type,
FieldSep: er.FieldSep,
HeaderDefineChar: er.HeaderDefineChar,
RunDelay: er.RunDelay,
ConcurrentReqs: er.ConcurrentReqs,
SourcePath: er.SourcePath,
ProcessedPath: er.ProcessedPath,
XmlRootPath: er.XmlRootPath,
Opts: make(map[string]interface{}),
}
if len(er.Tenant) != 0 {
cln.Tenant = make(RSRParsers, len(er.Tenant))
for idx, val := range er.Tenant {
@@ -256,6 +259,9 @@ func (er *EventReaderCfg) Clone() (cln *EventReaderCfg) {
for idx, fld := range er.CacheDumpFields {
cln.CacheDumpFields[idx] = fld.Clone()
}
for k, v := range er.Opts {
cln.Opts[k] = v
}
return
}
@@ -315,5 +321,6 @@ func (er *EventReaderCfg) AsMapInterface(separator string) map[string]interface{
utils.PartialCacheExpiryActionCfg: er.PartialCacheExpiryAction,
utils.FieldsCfg: fields,
utils.CacheDumpFieldsCfg: cacheDumpFields,
utils.OptsCfg: er.Opts,
}
}

View File

@@ -170,6 +170,7 @@ type EventReaderJsonCfg struct {
Concurrent_requests *int
Source_path *string
Processed_path *string
Opts map[string]interface{}
Xml_root_path *string
Tenant *string
Timezone *string

View File

@@ -62,7 +62,13 @@
{
"id": "amqp_localhost",
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs&exchange=exchangename&exchange_type=fanout&routing_key=cgr_cdrs",
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queue_id": "cgrates_cdrs",
"exchange": "exchangename",
"exchange_type": "fanout",
"routing_key": "cgr_cdrs",
},
"tenant": "cgrates.org",
"attempts": 3,
"fields":[
@@ -72,7 +78,10 @@
{
"id": "aws_test_file",
"type": "*amqpv1_json_map",
"export_path": "amqps://guest:guest@localhost:25672/?queue_id=cgrates_cdrs",
"export_path": "amqps://guest:guest@localhost:25672/",
"opts": {
"queue_id": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -82,8 +91,15 @@
{
"id": "sqs_test_file",
"type": "*sqs_json_map",
// export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
"export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs",
// export_path for sqs: "endpoint"
"export_path": "http://sqs.eu-west-2.amazonaws.com/",
"opts": {
// posible options for sqs:
"aws_region": "eu-west-2",
"aws_key": "testkey",
"aws_secret": "testsecret",
"queue_id": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -93,7 +109,10 @@
{
"id": "kafka_localhost",
"type": "*kafka_json_map",
"export_path": "127.0.0.1:9092?topic=cgrates_cdrs",
"export_path": "127.0.0.1:9092",
"opts":{
"topic": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -103,8 +122,15 @@
{
"id": "s3_test_file",
"type": "*s3_json_map",
// export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
"export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs",
// export_path for s3: "endpoint"
"export_path": "http://s3.us-east-2.amazonaws.com/",
"opts": {
// posible options for s3:
"aws_region": "eu-west-2",
"aws_key": "testkey",
"aws_secret": "testsecret",
"queue_id": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[

View File

@@ -67,7 +67,13 @@
{
"id": "amqp_localhost",
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs&exchange=exchangename&exchange_type=fanout&routing_key=cgr_cdrs",
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queue_id": "cgrates_cdrs",
"exchange": "exchangename",
"exchange_type": "fanout",
"routing_key": "cgr_cdrs",
},
"tenant": "cgrates.org",
"attempts": 3,
"fields":[
@@ -77,7 +83,10 @@
{
"id": "aws_test_file",
"type": "*amqpv1_json_map",
"export_path": "amqps://guest:guest@localhost:25672/?queue_id=cgrates_cdrs",
"export_path": "amqps://guest:guest@localhost:25672/",
"opts": {
"queue_id": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -87,8 +96,15 @@
{
"id": "sqs_test_file",
"type": "*sqs_json_map",
// export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
"export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs",
// export_path for sqs: "endpoint"
"export_path": "http://sqs.eu-west-2.amazonaws.com/",
"opts": {
// posible options for sqs:
"aws_region": "eu-west-2",
"aws_key": "testkey",
"aws_secret": "testsecret",
"queue_id": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -98,7 +114,10 @@
{
"id": "kafka_localhost",
"type": "*kafka_json_map",
"export_path": "127.0.0.1:9092?topic=cgrates_cdrs",
"export_path": "127.0.0.1:9092",
"opts":{
"topic": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -108,8 +127,15 @@
{
"id": "s3_test_file",
"type": "*s3_json_map",
// export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
"export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs",
// export_path for s3: "endpoint"
"export_path": "http://s3.us-east-2.amazonaws.com/",
"opts": {
// posible options for s3:
"aws_region": "eu-west-2",
"aws_key": "testkey",
"aws_secret": "testsecret",
"queue_id": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[

View File

@@ -64,7 +64,13 @@
{
"id": "amqp_localhost",
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs&exchange=exchangename&exchange_type=fanout&routing_key=cgr_cdrs",
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queue_id": "cgrates_cdrs",
"exchange": "exchangename",
"exchange_type": "fanout",
"routing_key": "cgr_cdrs",
},
"tenant": "cgrates.org",
"attempts": 3,
"fields":[
@@ -74,7 +80,10 @@
{
"id": "aws_test_file",
"type": "*amqpv1_json_map",
"export_path": "amqps://guest:guest@localhost:25672/?queue_id=cgrates_cdrs",
"export_path": "amqps://guest:guest@localhost:25672/",
"opts": {
"queue_id": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -84,8 +93,14 @@
{
"id": "sqs_test_file",
"type": "*sqs_json_map",
// export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
"export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs",
// export_path for sqs: "endpoint"
"export_path": "http://sqs.eu-west-2.amazonaws.com/",
"opts": {
"aws_region": "eu-west-2",
"aws_key": "testkey",
"aws_secret": "testsecret",
"queue_id": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -95,7 +110,10 @@
{
"id": "kafka_localhost",
"type": "*kafka_json_map",
"export_path": "127.0.0.1:9092?topic=cgrates_cdrs",
"export_path": "127.0.0.1:9092",
"opts": {
"topic": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -105,8 +123,14 @@
{
"id": "s3_test_file",
"type": "*s3_json_map",
// export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
"export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs",
// export_path for s3: "endpoint"
"export_path": "http://s3.us-east-2.amazonaws.com/",
"opts": {
"aws_region": "eu-west-2",
"aws_key": "testkey",
"aws_secret": "testsecret",
"queue_id": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[

View File

@@ -73,7 +73,13 @@
{
"id": "amqp_localhost",
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs&exchange=exchangename&exchange_type=fanout&routing_key=cgr_cdrs",
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queue_id": "cgrates_cdrs",
"exchange": "exchangename",
"exchange_type": "fanout",
"routing_key": "cgr_cdrs",
},
"tenant": "cgrates.org",
"attempts": 3,
"fields":[
@@ -106,7 +112,10 @@
{
"id": "aws_test_file",
"type": "*amqpv1_json_map",
"export_path": "amqps://guest:guest@localhost:25672/?queue_id=cgrates_cdrs",
"export_path": "amqps://guest:guest@localhost:25672/",
"opts": {
"queue_id": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -116,8 +125,14 @@
{
"id": "sqs_test_file",
"type": "*sqs_json_map",
// export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
"export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs",
// export_path for sqs: "endpoint"
"export_path": "http://sqs.eu-west-2.amazonaws.com/",
"opts": {
"aws_region": "eu-west-2",
"aws_key": "testkey",
"aws_secret": "testsecret",
"queue_id": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -127,7 +142,10 @@
{
"id": "amqp_test_file",
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:25672/?queue_id=cgrates_cdrs",
"export_path": "amqp://guest:guest@localhost:25672/",
"opts": {
"queue_id": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -137,7 +155,10 @@
{
"id": "kafka_localhost",
"type": "*kafka_json_map",
"export_path": "localhost:9092?topic=cgrates_cdrs",
"export_path": "localhost:9092",
"opts": {
"topic": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -147,8 +168,14 @@
{
"id": "s3_test_file",
"type": "*s3_json_map",
// export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
"export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs",
// export_path for s3: "endpoint"
"export_path": "http://s3.us-east-2.amazonaws.com/",
"opts": {
"aws_region": "eu-west-2",
"aws_key": "testkey",
"aws_secret": "testsecret",
"queue_id": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -158,7 +185,10 @@
{
"id": "eventcost_filter",
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@wrongurl:25672/?queue_id=cgrates_cdrs",
"export_path": "amqp://guest:guest@wrongurl:25672/",
"opts": {
"queue_id": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"filters":["*string:~*ec.Cost:100"],
"attempts": 1,

View File

@@ -72,7 +72,13 @@
{
"id": "amqp_localhost",
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs&exchange=exchangename&exchange_type=fanout&routing_key=cgr_cdrs",
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queue_id": "cgrates_cdrs",
"exchange": "exchangename",
"exchange_type": "fanout",
"routing_key": "cgr_cdrs",
},
"tenant": "cgrates.org",
"attempts": 3,
"fields":[
@@ -105,7 +111,10 @@
{
"id": "aws_test_file",
"type": "*amqpv1_json_map",
"export_path": "amqps://guest:guest@localhost:25672/?queue_id=cgrates_cdrs",
"export_path": "amqps://guest:guest@localhost:25672/",
"opts": {
"queue_id": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -115,8 +124,14 @@
{
"id": "sqs_test_file",
"type": "*sqs_json_map",
// export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
"export_path": "http://sqs.eu-west-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs",
// export_path for sqs: "endpoint"
"export_path": "http://sqs.eu-west-2.amazonaws.com/",
"opts": {
"aws_region": "eu-west-2",
"aws_key": "testkey",
"aws_secret": "testsecret",
"queue_id": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -126,7 +141,10 @@
{
"id": "amqp_test_file",
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:25672/?queue_id=cgrates_cdrs",
"export_path": "amqp://guest:guest@localhost:25672/",
"opts": {
"queue_id": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -136,7 +154,10 @@
{
"id": "kafka_localhost",
"type": "*kafka_json_map",
"export_path": "localhost:9092?topic=cgrates_cdrs",
"export_path": "localhost:9092",
"opts": {
"topic": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -146,8 +167,14 @@
{
"id": "s3_test_file",
"type": "*s3_json_map",
// export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
"export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs",
// export_path for s3: "endpoint"
"export_path": "http://s3.us-east-2.amazonaws.com/",
"opts": {
"aws_region": "eu-west-2",
"aws_key": "testkey",
"aws_secret": "testsecret",
"queue_id": "cgrates-cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
@@ -157,7 +184,10 @@
{
"id": "eventcost_filter",
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@wrongurl:25672/?queue_id=cgrates_cdrs",
"export_path": "amqp://guest:guest@wrongurl:25672/",
"opts": {
"queue_id": "cgrates_cdrs",
},
"tenant": "cgrates.org",
"filters":["*string:~*ec.Cost:100"],
"attempts": 1,

View File

@@ -68,8 +68,16 @@
{
"id": "s3_test_file",
"type": "*s3_json_map",
// export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
"export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs",
// export_path for s3: "endpoint"
"export_path": "http://s3.us-east-2.amazonaws.com/",
"opts": {
// posible options for s3:
"aws_region": "eu-west-2",
"aws_key": "testkey",
"aws_secret": "testsecret",
"queue_id": "cgrates-cdrs",
// "aws_token": "sessionToken",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[

View File

@@ -77,8 +77,16 @@
{
"id": "s3_test_file",
"type": "*s3_json_map",
// export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
"export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs",
// export_path for s3: "endpoint"
"export_path": "http://s3.us-east-2.amazonaws.com/",
"opts": {
// posible options for s3:
"aws_region": "eu-west-2",
"aws_key": "testkey",
"aws_secret": "testsecret",
"queue_id": "cgrates-cdrs",
// "aws_token": "sessionToken",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[

View File

@@ -74,8 +74,16 @@
{
"id": "s3_test_file",
"type": "*s3_json_map",
// export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
"export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs",
// export_path for s3: "endpoint"
"export_path": "http://s3.us-east-2.amazonaws.com/",
"opts": {
// posible options for s3:
"aws_region": "eu-west-2",
"aws_key": "testkey",
"aws_secret": "testsecret",
"queue_id": "cgrates-cdrs",
// "aws_token": "sessionToken",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[

View File

@@ -59,7 +59,10 @@
{
"id": "amqp_localhost",
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs",
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queue_id":"cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[

View File

@@ -66,7 +66,10 @@
{
"id": "amqp_localhost",
"type": "*amqp_json_map",
"export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs",
"export_path": "amqp://guest:guest@localhost:5672/",
"opts": {
"queue_id":"cgrates_cdrs",
},
"tenant": "cgrates.org",
"attempts": 1,
"fields":[

View File

@@ -131,7 +131,8 @@ func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
if err = pstrEE.poster.Post(body, utils.ConcatenatedKey(cgrID, runID)); err != nil &&
pstrEE.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE {
engine.AddFailedPost(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].ExportPath,
pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Type, utils.EventExporterS, body)
pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Type, utils.EventExporterS, body,
pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Opts)
}
return
}

View File

@@ -103,7 +103,8 @@ func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
if err = httpPost.httpPoster.PostValues(urlVals); err != nil &&
httpPost.cgrCfg.GeneralCfg().FailedPostsDir != utils.META_NONE {
engine.AddFailedPost(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ExportPath,
httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Type, utils.EventExporterS, urlVals)
httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Type, utils.EventExporterS, urlVals,
httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Opts)
}
return
}

View File

@@ -99,11 +99,6 @@ func getActionFunc(typ string) (actionTypeFunc, bool) {
utils.SetExpiry: setExpiryAction,
utils.MetaPublishAccount: publishAccount,
utils.MetaPublishBalance: publishBalance,
utils.MetaAMQPjsonMap: sendAMQP,
utils.MetaAMQPV1jsonMap: sendAWS,
utils.MetaSQSjsonMap: sendSQS,
utils.MetaKafkajsonMap: sendKafka,
utils.MetaS3jsonMap: sendS3,
utils.MetaRemoveSessionCosts: removeSessionCosts,
utils.MetaRemoveExpired: removeExpired,
utils.MetaPostEvent: postEvent,
@@ -384,71 +379,6 @@ func getOneData(ub *Account, extraData interface{}) ([]byte, error) {
return nil, nil
}
func sendAMQP(ub *Account, a *Action, acs Actions, extraData interface{}) error {
body, err := getOneData(ub, extraData)
if err != nil {
return err
}
err = PostersCache.PostAMQP(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body)
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
AddFailedPost(a.ExtraParameters, utils.MetaAMQPjsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
err = nil
}
return err
}
func sendAWS(ub *Account, a *Action, acs Actions, extraData interface{}) error {
body, err := getOneData(ub, extraData)
if err != nil {
return err
}
err = PostersCache.PostAMQPv1(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body)
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
AddFailedPost(a.ExtraParameters, utils.MetaAMQPV1jsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
err = nil
}
return err
}
func sendSQS(ub *Account, a *Action, acs Actions, extraData interface{}) error {
body, err := getOneData(ub, extraData)
if err != nil {
return err
}
err = PostersCache.PostSQS(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body)
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
AddFailedPost(a.ExtraParameters, utils.MetaSQSjsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
err = nil
}
return err
}
func sendKafka(ub *Account, a *Action, acs Actions, extraData interface{}) error {
body, err := getOneData(ub, extraData)
if err != nil {
return err
}
err = PostersCache.PostKafka(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body, utils.UUIDSha1Prefix())
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
AddFailedPost(a.ExtraParameters, utils.MetaKafkajsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
err = nil
}
return err
}
func sendS3(ub *Account, a *Action, acs Actions, extraData interface{}) error {
body, err := getOneData(ub, extraData)
if err != nil {
return err
}
err = PostersCache.PostS3(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body, utils.UUIDSha1Prefix())
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
AddFailedPost(a.ExtraParameters, utils.MetaS3jsonMap, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
err = nil
}
return err
}
func callURL(ub *Account, a *Action, acs Actions, extraData interface{}) error {
body, err := getOneData(ub, extraData)
if err != nil {
@@ -462,7 +392,7 @@ func callURL(ub *Account, a *Action, acs Actions, extraData interface{}) error {
}
err = pstr.PostValues(body)
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body, make(map[string]interface{}))
err = nil
}
return err
@@ -483,7 +413,7 @@ func callURLAsync(ub *Account, a *Action, acs Actions, extraData interface{}) er
go func() {
err := pstr.PostValues(body)
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body, make(map[string]interface{}))
}
}()
return nil
@@ -1042,7 +972,7 @@ func postEvent(ub *Account, a *Action, acs Actions, extraData interface{}) error
}
err = pstr.PostValues(body)
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.META_NONE {
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body)
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HIERARCHY_SEP+a.ActionType, body, make(map[string]interface{}))
err = nil
}
return err

View File

@@ -58,8 +58,13 @@ func writeFailedPosts(itmID string, value interface{}) {
return
}
func AddFailedPost(expPath, format, module string, ev interface{}) {
func AddFailedPost(expPath, format, module string, ev interface{}, opts map[string]interface{}) {
key := utils.ConcatenatedKey(expPath, format, module)
// also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id
if qID := utils.FirstNonEmpty(utils.IfaceAsString(opts[QueueID]),
utils.IfaceAsString(opts[utils.KafkaTopic])); len(qID) != 0 {
key = utils.ConcatenatedKey(key, qID)
}
var failedPost *ExportEvents
if x, ok := failedPostCache.Get(key); ok {
if x != nil {
@@ -70,6 +75,7 @@ func AddFailedPost(expPath, format, module string, ev interface{}) {
failedPost = &ExportEvents{
Path: expPath,
Format: format,
Opts: opts,
module: module,
}
}
@@ -101,6 +107,7 @@ func NewExportEventsFromFile(filePath string) (expEv *ExportEvents, err error) {
type ExportEvents struct {
lk sync.RWMutex
Path string
Opts map[string]interface{}
Format string
Events []interface{}
module string
@@ -142,8 +149,11 @@ func (expEv *ExportEvents) AddEvent(ev interface{}) {
func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *ExportEvents, err error) {
failedEvents = &ExportEvents{
Path: expEv.Path,
Opts: expEv.Opts,
Format: expEv.Format,
}
var pstr Poster
keyFunc := func() string { return utils.EmptyString }
switch expEv.Format {
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.MetaHTTPPost:
var pstr *HTTPPoster
@@ -160,42 +170,31 @@ func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *Export
failedEvents.AddEvent(ev)
}
}
if len(failedEvents.Events) > 0 {
err = utils.ErrPartiallyExecuted
} else {
failedEvents = nil
}
return
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
for _, ev := range expEv.Events {
err = PostersCache.PostAMQP(expEv.Path, attempts, ev.([]byte))
if err != nil {
failedEvents.AddEvent(ev)
}
}
pstr = NewAMQPPoster(expEv.Path, attempts, expEv.Opts)
case utils.MetaAMQPV1jsonMap:
for _, ev := range expEv.Events {
err = PostersCache.PostAMQPv1(expEv.Path, attempts, ev.([]byte))
if err != nil {
failedEvents.AddEvent(ev)
}
}
pstr = NewAMQPv1Poster(expEv.Path, attempts, expEv.Opts)
case utils.MetaSQSjsonMap:
for _, ev := range expEv.Events {
err = PostersCache.PostSQS(expEv.Path, attempts, ev.([]byte))
if err != nil {
failedEvents.AddEvent(ev)
}
}
pstr = NewSQSPoster(expEv.Path, attempts, expEv.Opts)
case utils.MetaKafkajsonMap:
for _, ev := range expEv.Events {
err = PostersCache.PostKafka(expEv.Path, attempts, ev.([]byte), utils.UUIDSha1Prefix())
if err != nil {
failedEvents.AddEvent(ev)
}
}
pstr = NewKafkaPoster(expEv.Path, attempts, expEv.Opts)
keyFunc = utils.UUIDSha1Prefix
case utils.MetaS3jsonMap:
for _, ev := range expEv.Events {
err = PostersCache.PostS3(expEv.Path, attempts, ev.([]byte), utils.UUIDSha1Prefix())
if err != nil {
failedEvents.AddEvent(ev)
}
pstr = NewS3Poster(expEv.Path, attempts, expEv.Opts)
keyFunc = utils.UUIDSha1Prefix
}
for _, ev := range expEv.Events {
if err = pstr.Post(ev.([]byte), keyFunc()); err != nil {
failedEvents.AddEvent(ev)
}
}
pstr.Close()
if len(failedEvents.Events) > 0 {
err = utils.ErrPartiallyExecuted
} else {

View File

@@ -37,7 +37,7 @@ func TestSetFldPostCacheTTL(t *testing.T) {
func TestAddFldPost(t *testing.T) {
SetFailedPostCacheTTL(time.Duration(5 * time.Second))
AddFailedPost("path1", "format1", "module1", "1")
AddFailedPost("path1", "format1", "module1", "1", make(map[string]interface{}))
x, ok := failedPostCache.Get(utils.ConcatenatedKey("path1", "format1", "module1"))
if !ok {
t.Error("Error reading from cache")
@@ -55,12 +55,13 @@ func TestAddFldPost(t *testing.T) {
Format: "format1",
module: "module1",
Events: []interface{}{"1"},
Opts: make(map[string]interface{}),
}
if !reflect.DeepEqual(eOut, failedPost) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
}
AddFailedPost("path1", "format1", "module1", "2")
AddFailedPost("path2", "format2", "module2", "3")
AddFailedPost("path1", "format1", "module1", "2", make(map[string]interface{}))
AddFailedPost("path2", "format2", "module2", "3", map[string]interface{}{QueueID: "qID"})
x, ok = failedPostCache.Get(utils.ConcatenatedKey("path1", "format1", "module1"))
if !ok {
t.Error("Error reading from cache")
@@ -77,11 +78,12 @@ func TestAddFldPost(t *testing.T) {
Format: "format1",
module: "module1",
Events: []interface{}{"1", "2"},
Opts: make(map[string]interface{}),
}
if !reflect.DeepEqual(eOut, failedPost) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
}
x, ok = failedPostCache.Get(utils.ConcatenatedKey("path2", "format2", "module2"))
x, ok = failedPostCache.Get(utils.ConcatenatedKey("path2", "format2", "module2", "qID"))
if !ok {
t.Error("Error reading from cache")
}
@@ -97,6 +99,7 @@ func TestAddFldPost(t *testing.T) {
Format: "format2",
module: "module2",
Events: []interface{}{"3"},
Opts: map[string]interface{}{QueueID: "qID"},
}
if !reflect.DeepEqual(eOut, failedPost) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))

View File

@@ -18,10 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"sync"
)
// General constants for posters
const (
DefaultQueueID = "cgrates_cdrs"
@@ -35,125 +31,7 @@ const (
folderPath = "folder_path"
)
func init() {
PostersCache = &PosterCache{
amqpCache: make(map[string]Poster),
amqpv1Cache: make(map[string]Poster),
sqsCache: make(map[string]Poster),
kafkaCache: make(map[string]Poster),
s3Cache: make(map[string]Poster),
} // Initialize the cache for amqpPosters
}
var PostersCache *PosterCache
type PosterCache struct {
sync.Mutex
amqpCache map[string]Poster
amqpv1Cache map[string]Poster
sqsCache map[string]Poster
kafkaCache map[string]Poster
s3Cache map[string]Poster
}
type Poster interface {
Post(body []byte, key string) error
Close()
}
// Close closes all cached posters
func (pc *PosterCache) Close() {
for _, v := range pc.amqpCache {
v.Close()
}
for _, v := range pc.amqpv1Cache {
v.Close()
}
for _, v := range pc.sqsCache {
v.Close()
}
for _, v := range pc.kafkaCache {
v.Close()
}
}
// GetAMQPPoster creates a new poster only if not already cached
// uses dialURL as cache key
func (pc *PosterCache) GetAMQPPoster(dialURL string, attempts int) (pstr Poster) {
pc.Lock()
defer pc.Unlock()
if _, hasIt := pc.amqpCache[dialURL]; !hasIt {
pstr = NewAMQPPoster(dialURL, attempts, nil)
pc.amqpCache[dialURL] = pstr
}
return pc.amqpCache[dialURL]
}
// GetAMQPv1Poster creates a new poster only if not already cached
func (pc *PosterCache) GetAMQPv1Poster(dialURL string, attempts int) (pstr Poster) {
pc.Lock()
defer pc.Unlock()
if _, hasIt := pc.amqpv1Cache[dialURL]; !hasIt {
pstr = NewAMQPv1Poster(dialURL, attempts, nil)
pc.amqpv1Cache[dialURL] = pstr
}
return pc.amqpv1Cache[dialURL]
}
// GetSQSPoster creates a new poster only if not already cached
func (pc *PosterCache) GetSQSPoster(dialURL string, attempts int) (pstr Poster) {
pc.Lock()
defer pc.Unlock()
if _, hasIt := pc.sqsCache[dialURL]; !hasIt {
pstr = NewSQSPoster(dialURL, attempts, nil)
pc.sqsCache[dialURL] = pstr
}
return pc.sqsCache[dialURL]
}
// GetKafkaPoster creates a new poster only if not already cached
func (pc *PosterCache) GetKafkaPoster(dialURL string, attempts int) (pstr Poster) {
pc.Lock()
defer pc.Unlock()
if _, hasIt := pc.kafkaCache[dialURL]; !hasIt {
pstr = NewKafkaPoster(dialURL, attempts, nil)
pc.kafkaCache[dialURL] = pstr
}
return pc.kafkaCache[dialURL]
}
// GetS3Poster creates a new poster only if not already cached
func (pc *PosterCache) GetS3Poster(dialURL string, attempts int) (pstr Poster) {
pc.Lock()
defer pc.Unlock()
if _, hasIt := pc.s3Cache[dialURL]; !hasIt {
pstr = NewS3Poster(dialURL, attempts, nil)
pc.s3Cache[dialURL] = pstr
}
return pc.s3Cache[dialURL]
}
func (pc *PosterCache) PostAMQP(dialURL string, attempts int,
content []byte) error {
return pc.GetAMQPPoster(dialURL, attempts).Post(content, "")
}
func (pc *PosterCache) PostAMQPv1(dialURL string, attempts int,
content []byte) error {
return pc.GetAMQPv1Poster(dialURL, attempts).Post(content, "")
}
func (pc *PosterCache) PostSQS(dialURL string, attempts int,
content []byte) error {
return pc.GetSQSPoster(dialURL, attempts).Post(content, "")
}
func (pc *PosterCache) PostKafka(dialURL string, attempts int,
content []byte, key string) error {
return pc.GetKafkaPoster(dialURL, attempts).Post(content, key)
}
func (pc *PosterCache) PostS3(dialURL string, attempts int,
content []byte, key string) error {
return pc.GetS3Poster(dialURL, attempts).Post(content, key)
}

View File

@@ -26,7 +26,9 @@ import (
)
func TestAMQPPosterParseURL(t *testing.T) {
amqp := &AMQPPoster{}
amqp := &AMQPPoster{
dialURL: "amqp://guest:guest@localhost:5672/?heartbeat=5",
}
expected := &AMQPPoster{
dialURL: "amqp://guest:guest@localhost:5672/?heartbeat=5",
queueID: "q1",
@@ -34,35 +36,35 @@ func TestAMQPPosterParseURL(t *testing.T) {
exchangeType: "fanout",
routingKey: "CGRCDR",
}
dialURL := "amqp://guest:guest@localhost:5672/?queue_id=q1&exchange=E1&routing_key=CGRCDR&heartbeat=5&exchange_type=fanout"
if err := amqp.parseOpts(dialURL); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expected, amqp) {
opts := map[string]interface{}{
"queue_id": "q1",
"exchange": "E1",
"routing_key": "CGRCDR",
"exchange_type": "fanout",
}
amqp.parseOpts(opts)
if !reflect.DeepEqual(expected, amqp) {
t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(expected), utils.ToJSON(amqp))
}
}
func TestKafkaParseURL(t *testing.T) {
u := "127.0.0.1:9092?topic=cdr_billing"
u := "127.0.0.1:9092"
exp := &KafkaPoster{
dialURL: "127.0.0.1:9092",
topic: "cdr_billing",
attempts: 10,
}
if kfk, err := NewKafkaPoster(u, 10, nil); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, kfk) {
if kfk := NewKafkaPoster(u, 10, map[string]interface{}{"topic": "cdr_billing"}); !reflect.DeepEqual(exp, kfk) {
t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(exp), utils.ToJSON(kfk))
}
u = "localhost:9092?topic=cdr_billing"
u = "localhost:9092"
exp = &KafkaPoster{
dialURL: "localhost:9092",
topic: "cdr_billing",
attempts: 10,
}
if kfk, err := NewKafkaPoster(u, 10, nil); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(exp, kfk) {
if kfk := NewKafkaPoster(u, 10, map[string]interface{}{"topic": "cdr_billing"}); !reflect.DeepEqual(exp, kfk) {
t.Errorf("Expected: %s ,recived: %s", utils.ToJSON(exp), utils.ToJSON(kfk))
}
}

View File

@@ -23,7 +23,6 @@ import (
"context"
"encoding/json"
"flag"
"fmt"
"path/filepath"
"reflect"
"testing"
@@ -75,7 +74,7 @@ func TestHttpJsonPoster(t *testing.T) {
if err = pstr.PostValues(jsn); err == nil {
t.Error("Expected error")
}
AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test1", jsn)
AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test1", jsn, make(map[string]interface{}))
time.Sleep(2)
fs, err := filepath.Glob("/tmp/test1*")
if err != nil {
@@ -108,7 +107,7 @@ func TestHttpBytesPoster(t *testing.T) {
if err = pstr.PostValues(content); err == nil {
t.Error("Expected error")
}
AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test2", content)
AddFailedPost("http://localhost:8080/invalid", utils.CONTENT_JSON, "test2", content, make(map[string]interface{}))
time.Sleep(2)
fs, err := filepath.Glob("/tmp/test2*")
if err != nil {
@@ -145,17 +144,18 @@ func TestSQSPoster(t *testing.T) {
awsKey := "replace-this-with-your-secret-key"
awsSecret := "replace-this-with-your-secret"
qname := "cgrates-cdrs"
//#####################################
// export_path for sqs: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
dialURL := fmt.Sprintf("%s?aws_region=%s&aws_key=%s&aws_secret=%s&queue_id=%s", endpoint, region, awsKey, awsSecret, qname)
opts := map[string]interface{}{
"aws_region": region,
"aws_key": awsKey,
"aws_secret": awsSecret,
"queue_id": qname,
}
//#####################################
body := "testString"
pstr, err := PostersCache.GetSQSPoster(dialURL, 5)
if err != nil {
t.Fatal(err)
}
pstr := NewSQSPoster(endpoint, 5, opts)
if err := pstr.Post([]byte(body), ""); err != nil {
t.Fatal(err)
}
@@ -225,17 +225,18 @@ func TestS3Poster(t *testing.T) {
awsKey := "replace-this-with-your-secret-key"
awsSecret := "replace-this-with-your-secret"
qname := "cgrates-cdrs"
//#####################################
// export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs"
dialURL := fmt.Sprintf("%s?aws_region=%s&aws_key=%s&aws_secret=%s&queue_id=%s", endpoint, region, awsKey, awsSecret, qname)
opts := map[string]interface{}{
"aws_region": region,
"aws_key": awsKey,
"aws_secret": awsSecret,
"queue_id": qname,
}
//#####################################
body := "testString"
key := "key1234"
pstr, err := PostersCache.GetS3Poster(dialURL, 5)
if err != nil {
t.Fatal(err)
}
pstr := NewS3Poster(endpoint, 5, opts)
if err := pstr.Post([]byte(body), key); err != nil {
t.Fatal(err)
}
@@ -284,17 +285,14 @@ func TestAMQPv1Poster(t *testing.T) {
// update this variables
endpoint := "amqps://RootManageSharedAccessKey:UlfIJ%2But11L0ZzA%2Fgpje8biFJeQihpWibJsUhaOi1DU%3D@cdrscgrates.servicebus.windows.net"
qname := "cgrates-cdrs"
opts := map[string]interface{}{
"queue_id": qname,
}
//#####################################
// export_path for amqpv1: "amqps://admin:admin@endpoint?queue_id=cgrates_cdrs",
dialURL := fmt.Sprintf("%s?queue_id=%s", endpoint, qname)
body := "testString"
pstr, err := PostersCache.GetAMQPv1Poster(dialURL, 5)
if err != nil {
t.Fatal(err)
}
pstr := NewAMQPv1Poster(endpoint, 5, opts)
if err := pstr.Post([]byte(body), ""); err != nil {
t.Fatal(err)
}

View File

@@ -21,8 +21,6 @@ package ers
import (
"encoding/json"
"fmt"
"net/url"
"strings"
"time"
"github.com/cgrates/cgrates/agents"
@@ -41,7 +39,6 @@ const (
func NewAMQPER(cfg *config.CGRConfig, cfgIdx int,
rdrEvents chan *erEvent, rdrErr chan error,
fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
rdr := &AMQPER{
cgrCfg: cfg,
cfgIdx: cfgIdx,
@@ -56,9 +53,9 @@ func NewAMQPER(cfg *config.CGRConfig, cfgIdx int,
rdr.cap <- struct{}{}
}
}
er = rdr
err = rdr.setURL(rdr.Config().SourcePath)
return
rdr.dialURL = rdr.Config().SourcePath
rdr.setOpts(rdr.Config().Opts)
return rdr, nil
}
// AMQPER implements EventReader interface for kafka message
@@ -171,12 +168,12 @@ func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) {
utils.ERs, msg.MessageId, err.Error()))
}
if rdr.Config().ProcessedPath != utils.EmptyString { // post it
if err := engine.PostersCache.PostAMQP(rdr.Config().ProcessedPath,
rdr.cgrCfg.GeneralCfg().PosterAttempts, msg.Body); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, msg.MessageId, err.Error()))
}
// if err := engine.PostersCache.PostAMQP(rdr.Config().ProcessedPath,
// rdr.cgrCfg.GeneralCfg().PosterAttempts, msg.Body); err != nil {
// utils.Logger.Warning(
// fmt.Sprintf("<%s> writing message %s error: %s",
// utils.ERs, msg.MessageId, err.Error()))
// }
}
if rdr.Config().ConcurrentReqs != -1 {
rdr.cap <- struct{}{}
@@ -214,44 +211,26 @@ func (rdr *AMQPER) processMessage(msg []byte) (err error) {
return
}
func (rdr *AMQPER) setURL(dialURL string) (err error) {
var u *url.URL
if u, err = url.Parse(dialURL); err != nil {
return
}
qry := u.Query()
q := url.Values{}
for _, key := range engine.AMQPPosibleQuery {
if vals, has := qry[key]; has && len(vals) != 0 {
q.Add(key, vals[0])
}
}
rdr.dialURL = strings.Split(dialURL, "?")[0]
if params := q.Encode(); params != utils.EmptyString {
rdr.dialURL += "?" + params
}
func (rdr *AMQPER) setOpts(opts map[string]interface{}) {
rdr.queueID = engine.DefaultQueueID
if vals, has := qry[engine.QueueID]; has && len(vals) != 0 {
rdr.queueID = vals[0]
if vals, has := opts[engine.QueueID]; has {
rdr.queueID = utils.IfaceAsString(vals)
}
rdr.tag = defaultConsumerTag
if vals, has := qry[consumerTag]; has && len(vals) != 0 {
rdr.tag = vals[0]
if vals, has := opts[consumerTag]; has {
rdr.tag = utils.IfaceAsString(vals)
}
if vals, has := qry[engine.RoutingKey]; has && len(vals) != 0 {
rdr.routingKey = vals[0]
if vals, has := opts[engine.RoutingKey]; has {
rdr.routingKey = utils.IfaceAsString(vals)
}
if vals, has := qry[engine.Exchange]; has && len(vals) != 0 {
rdr.exchange = vals[0]
if vals, has := opts[engine.Exchange]; has {
rdr.exchange = utils.IfaceAsString(vals)
rdr.exchangeType = engine.DefaultExchangeType
}
if vals, has := qry[engine.ExchangeType]; has && len(vals) != 0 {
rdr.exchangeType = vals[0]
if vals, has := opts[engine.ExchangeType]; has {
rdr.exchangeType = utils.IfaceAsString(vals)
}
return nil
}
func (rdr *AMQPER) close() (err error) {

View File

@@ -22,17 +22,15 @@ import (
"testing"
)
func TestAMQPSetURL(t *testing.T) {
func TestAMQPSetOpts(t *testing.T) {
k := new(AMQPER)
k.dialURL = "amqp://localhost:2013"
expKafka := &AMQPER{
dialURL: "amqp://localhost:2013",
queueID: "cdrs",
tag: "new",
}
url := "amqp://localhost:2013?queue_id=cdrs&consumer_tag=new"
if err := k.setURL(url); err != nil {
t.Fatal(err)
} else if expKafka.dialURL != k.dialURL {
if k.setOpts(map[string]interface{}{"queue_id": "cdrs", "consumer_tag": "new"}); expKafka.dialURL != k.dialURL {
t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL)
} else if expKafka.queueID != k.queueID {
t.Errorf("Expected: %s ,received: %s", expKafka.queueID, k.queueID)
@@ -40,28 +38,17 @@ func TestAMQPSetURL(t *testing.T) {
t.Errorf("Expected: %s ,received: %s", expKafka.tag, k.tag)
}
k = new(AMQPER)
k.dialURL = "amqp://localhost:2013"
expKafka = &AMQPER{
dialURL: "amqp://localhost:2013",
queueID: "cgrates_cdrs",
tag: "cgrates",
}
url = "amqp://localhost:2013"
if err := k.setURL(url); err != nil {
t.Fatal(err)
} else if expKafka.dialURL != k.dialURL {
if k.setOpts(map[string]interface{}{}); expKafka.dialURL != k.dialURL {
t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL)
} else if expKafka.queueID != k.queueID {
t.Errorf("Expected: %s ,received: %s", expKafka.queueID, k.queueID)
} else if expKafka.tag != k.tag {
t.Errorf("Expected: %s ,received: %s", expKafka.tag, k.tag)
}
k = new(AMQPER)
expKafka = &AMQPER{
dialURL: "amqp://localhost:2013",
queueID: "cgrates",
tag: "cgrates",
}
if err := k.setURL("127.0.0.1:2013?queue_id=cdrs&consumer_tag=new"); err == nil {
t.Errorf("Expected error received: %v", err)
}
}

View File

@@ -23,8 +23,6 @@ import (
"encoding/json"
"fmt"
"io"
"net/url"
"strings"
"time"
"github.com/cgrates/cgrates/agents"
@@ -60,9 +58,11 @@ func NewKafkaER(cfg *config.CGRConfig, cfgIdx int,
rdr.cap <- struct{}{}
}
}
rdr.dialURL = rdr.Config().SourcePath
er = rdr
err = rdr.setURL(rdr.Config().SourcePath)
err = rdr.setOpts(rdr.Config().Opts)
return
}
// KafkaER implements EventReader interface for kafka message
@@ -138,12 +138,12 @@ func (rdr *KafkaER) readLoop(r *kafka.Reader) {
utils.ERs, string(msg.Key), err.Error()))
}
if rdr.Config().ProcessedPath != utils.EmptyString { // post it
if err := engine.PostersCache.PostKafka(rdr.Config().ProcessedPath,
rdr.cgrCfg.GeneralCfg().PosterAttempts, msg.Value, string(msg.Key)); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, string(msg.Key), err.Error()))
}
// if err := engine.PostersCache.PostKafka(rdr.Config().ProcessedPath,
// rdr.cgrCfg.GeneralCfg().PosterAttempts, msg.Value, string(msg.Key)); err != nil {
// utils.Logger.Warning(
// fmt.Sprintf("<%s> writing message %s error: %s",
// utils.ERs, string(msg.Key), err.Error()))
// }
}
if rdr.Config().ConcurrentReqs != -1 {
rdr.cap <- struct{}{}
@@ -181,31 +181,19 @@ func (rdr *KafkaER) processMessage(msg []byte) (err error) {
return
}
func (rdr *KafkaER) setURL(dialURL string) (err error) {
func (rdr *KafkaER) setOpts(opts map[string]interface{}) (err error) {
rdr.topic = defaultTopic
rdr.groupID = defaultGroupID
rdr.maxWait = defaultMaxWait
i := strings.IndexByte(dialURL, '?')
if i < 0 {
rdr.dialURL = dialURL
return
if vals, has := opts[utils.KafkaTopic]; has {
rdr.topic = utils.IfaceAsString(vals)
}
rdr.dialURL = dialURL[:i]
rawQuery := dialURL[i+1:]
var qry url.Values
if qry, err = url.ParseQuery(rawQuery); err != nil {
return
if vals, has := opts[utils.KafkaGroupID]; has {
rdr.groupID = utils.IfaceAsString(vals)
}
if vals, has := qry[utils.KafkaTopic]; has && len(vals) != 0 {
rdr.topic = vals[0]
}
if vals, has := qry[utils.KafkaGroupID]; has && len(vals) != 0 {
rdr.groupID = vals[0]
}
if vals, has := qry[utils.KafkaMaxWait]; has && len(vals) != 0 {
rdr.maxWait, err = time.ParseDuration(vals[0])
if vals, has := opts[utils.KafkaMaxWait]; has {
rdr.maxWait, err = utils.IfaceAsDuration(vals)
}
return
}

View File

@@ -19,20 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package general_tests
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/rpc"
"path"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
/*
var (
pstrCfg *config.CGRConfig
pstrRpc *rpc.Client
@@ -337,3 +324,4 @@ func testPosterITStopCgrEngine(t *testing.T) {
t.Error(err)
}
}
*/

View File

@@ -270,11 +270,11 @@ func BenchmarkGuardian(b *testing.B) {
// BenchmarkGuardIDs-8 1000000 8732 ns/op
func BenchmarkGuardIDs(b *testing.B) {
for n := 0; n < b.N; n++ {
go func() {
if refID := Guardian.GuardIDs("", 0, strconv.Itoa(n)); refID != "" {
go func(i int) {
if refID := Guardian.GuardIDs("", 0, strconv.Itoa(i)); refID != "" {
time.Sleep(time.Microsecond)
Guardian.UnguardIDs(refID)
}
}()
}(n)
}
}

View File

@@ -87,14 +87,14 @@ func (v1ms *mongoStorDBMigrator) renameV1SMCosts() (err error) {
return err
}
return v1ms.mgoDB.DB().RunCommand(v1ms.mgoDB.GetContext(),
bson.D{{"create", utils.SessionCostsTBL}}).Err()
bson.D{{Key: "create", Value: utils.SessionCostsTBL}}).Err()
}
func (v1ms *mongoStorDBMigrator) createV1SMCosts() (err error) {
v1ms.mgoDB.DB().Collection(utils.OldSMCosts).Drop(v1ms.mgoDB.GetContext())
v1ms.mgoDB.DB().Collection(utils.SessionCostsTBL).Drop(v1ms.mgoDB.GetContext())
return v1ms.mgoDB.DB().RunCommand(v1ms.mgoDB.GetContext(),
bson.D{{"create", utils.OldSMCosts}, {"size", 1024}}).Err()
bson.D{{Key: "create", Value: utils.OldSMCosts}, {Key: "size", Value: 1024}}).Err()
}
//get

View File

@@ -66,7 +66,7 @@ func Testv2ActionTriggerAsThreshold(t *testing.T) {
Tenant: config.CgrConfig().GeneralCfg().DefaultTenant,
Blocker: false,
Weight: v2ATR.Weight,
ActivationInterval: &utils.ActivationInterval{v2ATR.ExpirationDate, v2ATR.ActivationDate},
ActivationInterval: &utils.ActivationInterval{ExpiryTime: v2ATR.ExpirationDate, ActivationTime: v2ATR.ActivationDate},
MinSleep: v2ATR.MinSleep,
}
th := &engine.Threshold{