diff --git a/data/conf/samples/cdrsreplicationmaster/cdrsreplicationmaster.json b/data/conf/samples/cdrsreplicationmaster/cdrsreplicationmaster.json index 900209bc5..950c14935 100644 --- a/data/conf/samples/cdrsreplicationmaster/cdrsreplicationmaster.json +++ b/data/conf/samples/cdrsreplicationmaster/cdrsreplicationmaster.json @@ -67,7 +67,8 @@ }, { "transport": "*http_post", - "address": "http://127.0.0.1:12080/invalid", + "address": "http://127.0.0.1:12080/invalid", + "cdr_filter": "OriginID(httpjsonrpc1)", "attempts": 1, "content_fields": [ {"tag": "OriginID", "type": "*composed", "value": "OriginID", "field_id": "OriginID"}, diff --git a/engine/cdrs.go b/engine/cdrs.go index 07cc08fca..0dc50cbac 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -33,6 +33,7 @@ import ( "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" + "github.com/streadway/amqp" ) var cdrServer *CdrServer // Share the server so we can use it in http handlers @@ -509,8 +510,12 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error { var amqpPoster *utils.AMQPPoster amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(rplCfg.Address, rplCfg.Attempts, self.cgrCfg.FailedPostsDir) if err == nil { // error will be checked bellow - _, err = amqpPoster.Post( + var chn *amqp.Channel + chn, err = amqpPoster.Post( nil, utils.PosterTransportContentTypes[rplCfg.Transport], body.([]byte), rplCfg.FallbackFileName()) + if chn != nil { + chn.Close() + } } default: utils.Logger.Warning(fmt.Sprintf(" Unsupported replication transport: %s", rplCfg.Transport)) diff --git a/general_tests/cdrs_replication_it_test.go b/general_tests/cdrs_replication_it_test.go index fb08eb9c6..323158ee0 100644 --- a/general_tests/cdrs_replication_it_test.go +++ b/general_tests/cdrs_replication_it_test.go @@ -24,6 +24,7 @@ import ( "encoding/json" "io/ioutil" "os" + "os/exec" "path" "reflect" "strings" @@ -145,7 +146,6 @@ func TestCdrsAMQPReplication(t *testing.T) { if err != nil { t.Fatal(err) } - defer conn.Close() ch, err := conn.Channel() if err != nil { @@ -173,6 +173,54 @@ func TestCdrsAMQPReplication(t *testing.T) { case <-time.After(time.Duration(100 * time.Millisecond)): t.Error("No message received from RabbitMQ") } + conn.Close() + // restart RabbitMQ server so we can test reconnects + if err := exec.Command("service", "rabbitmq-server", "restart").Run(); err != nil { + t.Error(err) + } + time.Sleep(time.Duration(5 * time.Second)) + testCdr := &engine.CDR{CGRID: utils.Sha1("amqpreconnect", time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC).String()), + ToR: utils.VOICE, OriginID: "amqpreconnect", OriginHost: "192.168.1.1", Source: "UNKNOWN", RequestType: utils.META_PSEUDOPREPAID, + Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "1002", + SetupTime: time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC), AnswerTime: time.Date(2013, 12, 7, 8, 42, 26, 0, time.UTC), + Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, + RunID: utils.DEFAULT_RUNID, Cost: 1.201, Rated: true} + var reply string + if err := cdrsMasterRpc.Call("CdrsV2.ProcessCdr", testCdr, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if reply != utils.OK { + t.Error("Unexpected reply received: ", reply) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) + if conn, err = amqp.Dial("amqp://guest:guest@localhost:5672/"); err != nil { + t.Fatal(err) + } + defer conn.Close() + + if ch, err = conn.Channel(); err != nil { + t.Fatal(err) + } + defer ch.Close() + + if q, err = ch.QueueDeclare("cgrates_cdrs", true, false, false, false, nil); err != nil { + t.Fatal(err) + } + if msgs, err = ch.Consume(q.Name, "", true, false, false, false, nil); err != nil { + t.Fatal(err) + } + select { + case d := <-msgs: + var rcvCDR map[string]string + if err := json.Unmarshal(d.Body, &rcvCDR); err != nil { + t.Error(err) + } + if rcvCDR[utils.CGRID] != testCdr.CGRID { + t.Errorf("Unexpected CDR received: %+v", rcvCDR) + } + case <-time.After(time.Duration(100 * time.Millisecond)): + t.Error("No message received from RabbitMQ") + } + } func TestCdrsHTTPPosterFileFailover(t *testing.T) { @@ -206,13 +254,15 @@ func TestCdrsHTTPPosterFileFailover(t *testing.T) { } else if !reflect.DeepEqual(failoverContent, readBytes) { // Checking just the prefix should do since some content is dynamic t.Errorf("Expecting: %q, received: %q", string(failoverContent), string(readBytes)) } - if err := os.Remove(filePath); err != nil { - t.Error("Failed removing file: ", filePath) - } + /* + if err := os.Remove(filePath); err != nil { + t.Error("Failed removing file: ", filePath) + } + */ } func TestCdrsAMQPPosterFileFailover(t *testing.T) { - time.Sleep(time.Duration(6 * time.Second)) + time.Sleep(time.Duration(10 * time.Second)) failoverContent := []byte(`{"CGRID":"57548d485d61ebcba55afbe5d939c82a8e9ff670"}`) var rplCfg *config.CDRReplicationCfg var foundFile bool @@ -229,22 +279,29 @@ func TestCdrsAMQPPosterFileFailover(t *testing.T) { if len(filesInDir) == 0 { t.Fatalf("No files in directory: %s", cdrsMasterCfg.FailedPostsDir) } + foundFile = false var fileName string for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config fileName = file.Name() if strings.HasPrefix(fileName, "cdr|*amqp_json_map") { + foundFile = true break } } + if !foundFile { + t.Fatal("Could not find the file in folder") + } filePath := path.Join(cdrsMasterCfg.FailedPostsDir, fileName) if readBytes, err := ioutil.ReadFile(filePath); err != nil { t.Error(err) } else if !reflect.DeepEqual(failoverContent, readBytes) { // Checking just the prefix should do since some content is dynamic t.Errorf("Expecting: %q, received: %q", string(failoverContent), string(readBytes)) } - if err := os.Remove(filePath); err != nil { - t.Error("Failed removing file: ", filePath) - } + /* + if err := os.Remove(filePath); err != nil { + t.Error("Failed removing file: ", filePath) + } + */ } /* diff --git a/glide.lock b/glide.lock index 9ad053a23..f7003cd40 100644 --- a/glide.lock +++ b/glide.lock @@ -71,7 +71,7 @@ imports: - name: github.com/mitchellh/mapstructure version: ca63d7c062ee3c9f34db231e352b60012b4fd0c1 - name: github.com/streadway/amqp - version: 63795daa9a446c920826655f26ba31c81c860fd6 + version: d75c3a341ff43309ad0cb69ac8bdbd1d8772775f - name: github.com/peterh/liner version: 8975875355a81d612fafb9f5a6037bdcc2d9b073 - name: github.com/ugorji/go diff --git a/utils/poster.go b/utils/poster.go index e77388af1..36b09ceeb 100644 --- a/utils/poster.go +++ b/utils/poster.go @@ -296,7 +296,9 @@ func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []by func (pstr *AMQPPoster) Close() { pstr.Lock() - pstr.conn.Close() + if pstr.conn != nil { + pstr.conn.Close() + } pstr.conn = nil pstr.Unlock() } @@ -304,7 +306,17 @@ func (pstr *AMQPPoster) Close() { func (pstr *AMQPPoster) NewPostChannel() (postChan *amqp.Channel, err error) { pstr.Lock() if pstr.conn == nil { - pstr.conn, err = amqp.Dial(pstr.dialURL) + var conn *amqp.Connection + conn, err = amqp.Dial(pstr.dialURL) + if err == nil { + pstr.conn = conn + go func() { // monitor connection errors so we can restart + if err := <-pstr.conn.NotifyClose(make(chan *amqp.Error)); err != nil { + Logger.Err(fmt.Sprintf("Connection error received: %s", err.Error())) + pstr.Close() + } + }() + } } pstr.Unlock() if err != nil {