AMQP connection restarts

This commit is contained in:
DanB
2017-02-06 16:04:36 +01:00
parent ece7dabe67
commit 775bc810bd
5 changed files with 88 additions and 13 deletions

View File

@@ -67,7 +67,8 @@
},
{
"transport": "*http_post",
"address": "http://127.0.0.1:12080/invalid",
"address": "http://127.0.0.1:12080/invalid",
"cdr_filter": "OriginID(httpjsonrpc1)",
"attempts": 1,
"content_fields": [
{"tag": "OriginID", "type": "*composed", "value": "OriginID", "field_id": "OriginID"},

View File

@@ -33,6 +33,7 @@ import (
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
"github.com/streadway/amqp"
)
var cdrServer *CdrServer // Share the server so we can use it in http handlers
@@ -509,8 +510,12 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error {
var amqpPoster *utils.AMQPPoster
amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(rplCfg.Address, rplCfg.Attempts, self.cgrCfg.FailedPostsDir)
if err == nil { // error will be checked bellow
_, err = amqpPoster.Post(
var chn *amqp.Channel
chn, err = amqpPoster.Post(
nil, utils.PosterTransportContentTypes[rplCfg.Transport], body.([]byte), rplCfg.FallbackFileName())
if chn != nil {
chn.Close()
}
}
default:
utils.Logger.Warning(fmt.Sprintf("<CDRReplicator> Unsupported replication transport: %s", rplCfg.Transport))

View File

@@ -24,6 +24,7 @@ import (
"encoding/json"
"io/ioutil"
"os"
"os/exec"
"path"
"reflect"
"strings"
@@ -145,7 +146,6 @@ func TestCdrsAMQPReplication(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
@@ -173,6 +173,54 @@ func TestCdrsAMQPReplication(t *testing.T) {
case <-time.After(time.Duration(100 * time.Millisecond)):
t.Error("No message received from RabbitMQ")
}
conn.Close()
// restart RabbitMQ server so we can test reconnects
if err := exec.Command("service", "rabbitmq-server", "restart").Run(); err != nil {
t.Error(err)
}
time.Sleep(time.Duration(5 * time.Second))
testCdr := &engine.CDR{CGRID: utils.Sha1("amqpreconnect", time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC).String()),
ToR: utils.VOICE, OriginID: "amqpreconnect", OriginHost: "192.168.1.1", Source: "UNKNOWN", RequestType: utils.META_PSEUDOPREPAID,
Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "1002",
SetupTime: time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC), AnswerTime: time.Date(2013, 12, 7, 8, 42, 26, 0, time.UTC),
Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
RunID: utils.DEFAULT_RUNID, Cost: 1.201, Rated: true}
var reply string
if err := cdrsMasterRpc.Call("CdrsV2.ProcessCdr", testCdr, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
if conn, err = amqp.Dial("amqp://guest:guest@localhost:5672/"); err != nil {
t.Fatal(err)
}
defer conn.Close()
if ch, err = conn.Channel(); err != nil {
t.Fatal(err)
}
defer ch.Close()
if q, err = ch.QueueDeclare("cgrates_cdrs", true, false, false, false, nil); err != nil {
t.Fatal(err)
}
if msgs, err = ch.Consume(q.Name, "", true, false, false, false, nil); err != nil {
t.Fatal(err)
}
select {
case d := <-msgs:
var rcvCDR map[string]string
if err := json.Unmarshal(d.Body, &rcvCDR); err != nil {
t.Error(err)
}
if rcvCDR[utils.CGRID] != testCdr.CGRID {
t.Errorf("Unexpected CDR received: %+v", rcvCDR)
}
case <-time.After(time.Duration(100 * time.Millisecond)):
t.Error("No message received from RabbitMQ")
}
}
func TestCdrsHTTPPosterFileFailover(t *testing.T) {
@@ -206,13 +254,15 @@ func TestCdrsHTTPPosterFileFailover(t *testing.T) {
} else if !reflect.DeepEqual(failoverContent, readBytes) { // Checking just the prefix should do since some content is dynamic
t.Errorf("Expecting: %q, received: %q", string(failoverContent), string(readBytes))
}
if err := os.Remove(filePath); err != nil {
t.Error("Failed removing file: ", filePath)
}
/*
if err := os.Remove(filePath); err != nil {
t.Error("Failed removing file: ", filePath)
}
*/
}
func TestCdrsAMQPPosterFileFailover(t *testing.T) {
time.Sleep(time.Duration(6 * time.Second))
time.Sleep(time.Duration(10 * time.Second))
failoverContent := []byte(`{"CGRID":"57548d485d61ebcba55afbe5d939c82a8e9ff670"}`)
var rplCfg *config.CDRReplicationCfg
var foundFile bool
@@ -229,22 +279,29 @@ func TestCdrsAMQPPosterFileFailover(t *testing.T) {
if len(filesInDir) == 0 {
t.Fatalf("No files in directory: %s", cdrsMasterCfg.FailedPostsDir)
}
foundFile = false
var fileName string
for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config
fileName = file.Name()
if strings.HasPrefix(fileName, "cdr|*amqp_json_map") {
foundFile = true
break
}
}
if !foundFile {
t.Fatal("Could not find the file in folder")
}
filePath := path.Join(cdrsMasterCfg.FailedPostsDir, fileName)
if readBytes, err := ioutil.ReadFile(filePath); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(failoverContent, readBytes) { // Checking just the prefix should do since some content is dynamic
t.Errorf("Expecting: %q, received: %q", string(failoverContent), string(readBytes))
}
if err := os.Remove(filePath); err != nil {
t.Error("Failed removing file: ", filePath)
}
/*
if err := os.Remove(filePath); err != nil {
t.Error("Failed removing file: ", filePath)
}
*/
}
/*

2
glide.lock generated
View File

@@ -71,7 +71,7 @@ imports:
- name: github.com/mitchellh/mapstructure
version: ca63d7c062ee3c9f34db231e352b60012b4fd0c1
- name: github.com/streadway/amqp
version: 63795daa9a446c920826655f26ba31c81c860fd6
version: d75c3a341ff43309ad0cb69ac8bdbd1d8772775f
- name: github.com/peterh/liner
version: 8975875355a81d612fafb9f5a6037bdcc2d9b073
- name: github.com/ugorji/go

View File

@@ -296,7 +296,9 @@ func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []by
func (pstr *AMQPPoster) Close() {
pstr.Lock()
pstr.conn.Close()
if pstr.conn != nil {
pstr.conn.Close()
}
pstr.conn = nil
pstr.Unlock()
}
@@ -304,7 +306,17 @@ func (pstr *AMQPPoster) Close() {
func (pstr *AMQPPoster) NewPostChannel() (postChan *amqp.Channel, err error) {
pstr.Lock()
if pstr.conn == nil {
pstr.conn, err = amqp.Dial(pstr.dialURL)
var conn *amqp.Connection
conn, err = amqp.Dial(pstr.dialURL)
if err == nil {
pstr.conn = conn
go func() { // monitor connection errors so we can restart
if err := <-pstr.conn.NotifyClose(make(chan *amqp.Error)); err != nil {
Logger.Err(fmt.Sprintf("Connection error received: %s", err.Error()))
pstr.Close()
}
}()
}
}
pstr.Unlock()
if err != nil {