Adding tests for CDR replication over *amqp_json_map

This commit is contained in:
DanB
2017-02-03 16:30:26 +01:00
parent 7a6c68ded5
commit d2115893b8
3 changed files with 81 additions and 5 deletions

View File

@@ -4,6 +4,11 @@
// Used in apier_local_tests
// Starts rater, cdrs and mediator connecting over internal channel
"general": {
"log_level": 7,
"poster_attempts": 1,
},
"rals": {
"enabled": true, // enable Rater service: <true|false>
},
@@ -36,6 +41,30 @@
{"tag":"Cost", "type": "*composed", "value": "Cost", "field_id": "Cost"},
],
},
{
"transport": "*amqp_json_map",
"address": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs",
"attempts": 1,
"cdr_filter": "",
"content_fields": [
{"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"},
{"tag":"RunID", "type": "*composed", "value": "RunID", "field_id": "RunID"},
{"tag":"TOR", "type": "*composed", "value": "ToR", "field_id": "ToR"},
{"tag":"OriginID", "type": "*composed", "value": "OriginID", "field_id": "OriginID"},
{"tag":"OriginHost", "type": "*composed", "value": "OriginHost", "field_id": "OriginHost"},
{"tag":"RequestType", "type": "*composed", "value": "RequestType", "field_id": "RequestType"},
{"tag":"Direction", "type": "*composed", "value": "Direction", "field_id": "Direction"},
{"tag":"Tenant", "type": "*composed", "value": "Tenant", "field_id": "Tenant"},
{"tag":"Category", "type": "*composed", "value": "Category", "field_id": "Category"},
{"tag":"Account", "type": "*composed", "value": "Account", "field_id": "Account"},
{"tag":"Subject", "type": "*composed", "value": "Subject", "field_id": "Subject"},
{"tag":"Destination", "type": "*composed", "value": "Destination", "field_id": "Destination"},
{"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "SetupTime"},
{"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "AnswerTime"},
{"tag":"Usage", "type": "*composed", "value": "Usage", "field_id": "Usage"},
{"tag":"Cost", "type": "*composed", "value": "Cost", "field_id": "Cost"},
],
},
{
"transport": "*http_post",
"address": "http://127.0.0.1:12080/invalid",
@@ -43,6 +72,14 @@
"content_fields": [
{"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"},
],
},
{
"transport": "*amqp_json_map",
"address": "amqp://guest:guest@localhost:25672/?queue_id=cgrates_cdrs",
"attempts": 1,
"content_fields": [
{"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"},
],
},
],
},

View File

@@ -21,8 +21,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package general_tests
import (
"encoding/json"
"io/ioutil"
"os"
//"os"
"path"
"reflect"
"strings"
@@ -33,6 +34,7 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
"github.com/streadway/amqp"
)
var cdrsMasterCfgPath, cdrsSlaveCfgPath string
@@ -136,6 +138,41 @@ func TestCdrsHttpCdrReplication(t *testing.T) {
}
}
func TestCdrsAMQPReplication(t *testing.T) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
t.Fatal(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
t.Fatal(err)
}
defer ch.Close()
q, err := ch.QueueDeclare("cgrates_cdrs", true, false, false, false, nil)
if err != nil {
t.Fatal(err)
}
msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
if err != nil {
t.Fatal(err)
}
select {
case d := <-msgs:
var rcvCDR map[string]string
if err := json.Unmarshal(d.Body, &rcvCDR); err != nil {
t.Error(err)
}
if rcvCDR[utils.CGRID] != utils.Sha1("httpjsonrpc1", time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC).String()) {
t.Errorf("Unexpected CDR received: %+v", rcvCDR)
}
case <-time.After(time.Duration(100 * time.Millisecond)):
t.Error("No message received from RabbitMQ")
}
}
// Connect rpc client to rater
func TestCdrsFileFailover(t *testing.T) {
time.Sleep(time.Duration(2 * time.Second))
@@ -161,9 +198,11 @@ func TestCdrsFileFailover(t *testing.T) {
} else if !reflect.DeepEqual(failoverContent, readBytes) { // Checking just the prefix should do since some content is dynamic
t.Errorf("Expecting: %q, received: %q", string(failoverContent), string(readBytes))
}
if err := os.Remove(filePath); err != nil {
t.Error("Failed removing file: ", filePath)
}
/*
if err := os.Remove(filePath); err != nil {
t.Error("Failed removing file: ", filePath)
}
*/
}
/*

View File

@@ -230,7 +230,7 @@ func (pc *AMQPCachedPosters) GetAMQPPoster(dialURL string, attempts int, fallbac
return pc.cache[dialURL], nil
}
// "amqp://guest:guest@localhost:5672/?queueID=cgr_cdrs"
// "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs"
func NewAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (*AMQPPoster, error) {
u, err := url.Parse(dialURL)
if err != nil {