diff --git a/data/conf/samples/cdrsv1processevent/cgrates.json b/data/conf/samples/cdrsv1processevent/cgrates.json index a32cccbab..3ed6dec4b 100644 --- a/data/conf/samples/cdrsv1processevent/cgrates.json +++ b/data/conf/samples/cdrsv1processevent/cgrates.json @@ -5,6 +5,7 @@ "general": { "log_level": 7, + "poster_attempts": 1, }, "data_db":{ @@ -27,8 +28,10 @@ "cdre":{ "aws_test_file": { - "export_format": "*amqpv1_json_map", - "export_path": "amqps://guest:guest@localhost:25672/?queue_id=cgrates_cdrswrong", + "export_format": "*amqp_json_map", + "export_path": "amqps://guest:guest@localhost:256733/", + "attempts": 1, // export attempts + "synchronous": true, // block processing until export has a result "content_fields": [ {"tag": "CGRID", "type": "*composed", "value": "~CGRID", "field_id": "CGRID"}, ], diff --git a/data/conf/samples/cdrsv1processeventmongo/cgrates.json b/data/conf/samples/cdrsv1processeventmongo/cgrates.json index 21c60b657..2cfbaed85 100644 --- a/data/conf/samples/cdrsv1processeventmongo/cgrates.json +++ b/data/conf/samples/cdrsv1processeventmongo/cgrates.json @@ -3,6 +3,11 @@ // // Used in general_tests +"general": { + "log_level": 7, + "poster_attempts": 1, +}, + "data_db": { "db_type": "mongo", "db_name": "10", @@ -27,8 +32,10 @@ "cdre":{ "aws_test_file": { - "export_format": "*amqpv1_json_map", - "export_path": "amqps://guest:guest@localhost:25672/?queue_id=cgrates_cdrswrong", + "export_format": "*amqp_json_map", + "export_path": "amqps://guest:guest@localhost:256733/", + "attempts": 1, // export attempts + "synchronous": true, // block processing until export has a result "content_fields": [ {"tag": "CGRID", "type": "*composed", "value": "~CGRID", "field_id": "CGRID"}, ], diff --git a/data/conf/samples/cdrsv1processeventmysql/cgrates.json b/data/conf/samples/cdrsv1processeventmysql/cgrates.json index 5f6122ba7..6324fac0f 100644 --- a/data/conf/samples/cdrsv1processeventmysql/cgrates.json +++ b/data/conf/samples/cdrsv1processeventmysql/cgrates.json @@ -3,6 +3,10 @@ // // Used in general_tests +"general": { + "log_level": 7, + "poster_attempts": 1, +}, "data_db": { // database used to store runtime data (eg: accounts, cdr stats) "db_type": "redis", // data_db type: @@ -25,8 +29,10 @@ "cdre":{ "aws_test_file": { - "export_format": "*amqpv1_json_map", - "export_path": "amqps://guest:guest@localhost:25672/?queue_id=cgrates_cdrswrong", + "export_format": "*amqp_json_map", + "export_path": "amqps://guest:guest@localhost:256733/", + "attempts": 1, // export attempts + "synchronous": true, // block processing until export has a result "content_fields": [ {"tag": "CGRID", "type": "*composed", "value": "~CGRID", "field_id": "CGRID"}, ], diff --git a/engine/poster.go b/engine/poster.go index ebb82ab45..c5d570b02 100644 --- a/engine/poster.go +++ b/engine/poster.go @@ -24,8 +24,8 @@ import ( "path" "strings" "sync" - "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" ) @@ -78,7 +78,7 @@ func writeToFile(fileDir, fileName string, content []byte) (err error) { _, err = fileOut.Write(content) fileOut.Close() return nil, err - }, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath) + }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+fallbackFilePath) return } diff --git a/engine/pstr_amqp.go b/engine/pstr_amqp.go index 38acc8d47..430893ed1 100644 --- a/engine/pstr_amqp.go +++ b/engine/pstr_amqp.go @@ -95,7 +95,9 @@ func (pstr *AMQPPoster) Post(content []byte, fallbackFileName, _ string) (err er if chn, err = pstr.newPostChannel(); err == nil { break } - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < pstr.attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } } if err != nil { if fallbackFileName != utils.META_NONE { @@ -117,7 +119,9 @@ func (pstr *AMQPPoster) Post(content []byte, fallbackFileName, _ string) (err er }); err == nil { break } - time.Sleep(time.Duration(fib()) * time.Second) + if i+1 < pstr.attempts { + time.Sleep(time.Duration(fib()) * time.Second) + } } if err != nil && fallbackFileName != utils.META_NONE { err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content) diff --git a/general_tests/cdrs_processevent_it_test.go b/general_tests/cdrs_processevent_it_test.go index 9b6f2c13a..082a7777d 100644 --- a/general_tests/cdrs_processevent_it_test.go +++ b/general_tests/cdrs_processevent_it_test.go @@ -494,7 +494,7 @@ func testV1CDRsProcessEventThreshold(t *testing.T) { utils.OriginID: "testV2CDRsProcessCDRWithThreshold", utils.OriginHost: "OriginHost6", utils.Source: "testV2CDRsProcessCDRWithThreshold", - utils.RequestType: utils.META_PREPAID, + utils.RequestType: utils.META_PSEUDOPREPAID, utils.Category: "call", utils.Account: "1005", utils.Subject: "ANY2CNT", @@ -529,6 +529,7 @@ func testV1CDRsProcessEventThreshold(t *testing.T) { acntAttrs := &utils.AttrGetAccount{ Tenant: "cgrates.org", Account: "1005"} + time.Sleep(50 * time.Millisecond) expectedVoice := 10.0 if err := pecdrsRpc.Call(utils.ApierV2GetAccount, acntAttrs, &acnt); err != nil { t.Error(err) @@ -560,12 +561,6 @@ func testV1CDRsProcessEventExport(t *testing.T) { } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) } - // var cdrs []*engine.CDR - // if err := pecdrsRpc.Call(utils.CDRsV1GetCDRs, utils.RPCCDRsFilter{OriginHosts: []string{"OriginHost7"}}, &cdrs); err != nil { - // t.Error("Unexpected error: ", err) - // } else if len(cdrs) != 1 { - // t.Errorf("Expecting: 1, received: %+v", len(cdrs)) - // } } func testV1CDRsProcessEventExportCheck(t *testing.T) { failoverContent := []byte(fmt.Sprintf(`{"CGRID":"%s"}`, utils.Sha1("test7_processEvent", "OriginHost7"))) @@ -578,7 +573,7 @@ func testV1CDRsProcessEventExportCheck(t *testing.T) { for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config fileName = file.Name() filePath := path.Join(pecdrsCfg.GeneralCfg().FailedPostsDir, fileName) - if strings.HasPrefix(fileName, "cdr|*amqpv1_json_map") { + if strings.HasPrefix(fileName, "cdr|*amqp_json_map") { foundFile = true if readBytes, err := ioutil.ReadFile(filePath); err != nil { t.Error(err)