ApierV1.ReplayFailedPosts with support for *amqp, fix default replication teplate

This commit is contained in:
DanB
2017-02-08 17:06:34 +01:00
parent 495bb03814
commit c6d07d6701
5 changed files with 131 additions and 54 deletions

View File

@@ -34,6 +34,7 @@ import (
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
"github.com/streadway/amqp"
)
const (
@@ -1673,9 +1674,11 @@ func (v1 *ApierV1) ServiceStatus(args servmanager.ArgStartService, reply *string
type ArgsReplyFailedPosts struct {
FailedRequestsInDir *string // if defined it will be our source of requests to be replayed
FailedRequestsOutDir *string // if defined it will become our destination for files failing to be replayed, *none to be discarded
Posters []string // list of modules for which replay the requests, nil for all
Modules []string // list of modules for which replay the requests, nil for all
Transports []string // list of transports
}
// ReplayFailedPosts will repost failed requests found in the FailedRequestsInDir
func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (err error) {
failedReqsInDir := v1.Config.FailedPostsDir
if args.FailedRequestsInDir != nil && *args.FailedRequestsInDir != "" {
@@ -1695,9 +1698,9 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (
if err != nil {
return utils.NewErrServerError(err)
}
if len(args.Posters) != 0 {
if len(args.Modules) != 0 {
var allowedModule bool
for _, mod := range args.Posters {
for _, mod := range args.Modules {
if strings.HasPrefix(ffn.Module, mod) {
allowedModule = true
break
@@ -1707,6 +1710,9 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (
continue // this file is not to be processed due to Modules ACL
}
}
if len(args.Transports) != 0 && !utils.IsSliceMember(args.Transports, ffn.Transport) {
continue // this file is not to be processed due to Transports ACL
}
var fileContent []byte
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
if fileContent, err = ioutil.ReadFile(filePath); err != nil {
@@ -1724,9 +1730,25 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (
if failedReqsOutDir != utils.META_NONE {
failoverPath = path.Join(failedReqsOutDir, file.Name())
}
_, err = utils.NewHTTPPoster(v1.Config.HttpSkipTlsVerify,
v1.Config.ReplyTimeout).Post(ffn.Address, utils.PosterTransportContentTypes[ffn.Transport], fileContent,
v1.Config.PosterAttempts, failoverPath)
switch ffn.Transport {
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
_, err = utils.NewHTTPPoster(v1.Config.HttpSkipTlsVerify,
v1.Config.ReplyTimeout).Post(ffn.Address, utils.PosterTransportContentTypes[ffn.Transport], fileContent,
v1.Config.PosterAttempts, failoverPath)
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
var amqpPoster *utils.AMQPPoster
amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(ffn.Address, v1.Config.PosterAttempts, failedReqsOutDir)
if err == nil { // error will be checked bellow
var chn *amqp.Channel
chn, err = amqpPoster.Post(
nil, utils.PosterTransportContentTypes[ffn.Transport], fileContent, file.Name())
if chn != nil {
chn.Close()
}
}
default:
err = fmt.Errorf("unsupported replication transport: %s", ffn.Transport)
}
if err != nil && failedReqsOutDir != utils.META_NONE { // Got error from HTTPPoster could be that content was not written, we need to write it ourselves
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
if _, err := os.Stat(failoverPath); err == nil || !os.IsNotExist(err) {

View File

@@ -40,6 +40,7 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/streadway/amqp"
)
// ToDo: Replace rpc.Client with internal rpc server and Apier using internal map as both data and stor so we can run the tests non-local
@@ -1698,10 +1699,10 @@ func TestApierReplayFailedPosts(t *testing.T) {
if err != nil {
t.Error(err)
}
defer fileOut.Close()
if _, err := fileOut.Write(fileContent); err != nil {
t.Error(err)
}
fileOut.Close()
var reply string
if err := rater.Call("ApierV1.ReplayFailedPosts", args, &reply); err != nil {
t.Error(err)
@@ -1714,11 +1715,65 @@ func TestApierReplayFailedPosts(t *testing.T) {
} else if !reflect.DeepEqual(fileContent, outContent) {
t.Errorf("Expecting: %q, received: %q", string(fileContent), string(outContent))
}
fileName = "cdr|*amqp_json_map|amqp%3A%2F%2Fguest%3Aguest%40localhost%3A5672%2F%3Fqueue_id%3Dcgrates_cdrs|ae8cc4b3-5e60-4396-b82a-64b96a72a03c.json"
fileContent = []byte(`{"CGRID":"88ed9c38005f07576a1e1af293063833b60edcc6"}`)
fileInPath := path.Join(*args.FailedRequestsInDir, fileName)
fileOut, err = os.Create(fileInPath)
if err != nil {
t.Error(err)
}
if _, err := fileOut.Write(fileContent); err != nil {
t.Error(err)
}
fileOut.Close()
if err := rater.Call("ApierV1.ReplayFailedPosts", args, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Error("Unexpected reply: ", reply)
}
if _, err := os.Stat(fileInPath); !os.IsNotExist(err) {
t.Error("InFile still exists")
}
if _, err := os.Stat(path.Join(*args.FailedRequestsOutDir, fileName)); !os.IsNotExist(err) {
t.Error("OutFile created")
}
// connect to RabbitMQ server and check if the content was posted there
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
t.Fatal(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
t.Fatal(err)
}
defer ch.Close()
q, err := ch.QueueDeclare("cgrates_cdrs", true, false, false, false, nil)
if err != nil {
t.Fatal(err)
}
msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
if err != nil {
t.Fatal(err)
}
select {
case d := <-msgs:
var rcvCDR map[string]string
if err := json.Unmarshal(d.Body, &rcvCDR); err != nil {
t.Error(err)
}
if rcvCDR[utils.CGRID] != "88ed9c38005f07576a1e1af293063833b60edcc6" {
t.Errorf("Unexpected CDR received: %+v", rcvCDR)
}
case <-time.After(time.Duration(100 * time.Millisecond)):
t.Error("No message received from RabbitMQ")
}
for _, dir := range []string{*args.FailedRequestsInDir, *args.FailedRequestsOutDir} {
if err := os.RemoveAll(dir); err != nil {
t.Errorf("Error %s removing folder: %s", err, dir)
}
}
}
// Simply kill the engine after we are done with tests within this file

View File

@@ -154,27 +154,27 @@ const CGRATES_CFG_JSON = `
"aliases_conns": [], // address where to reach the aliases service, empty to disable aliases functionality: <""|*internal|x.y.z.y:1234>
"cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality<""|*internal|x.y.z.y:1234>
"cdr_replication":[
// {
// "transport": "*http_post", // mechanism to use when replicating
// "address": "http://127.0.0.1:12080/cdr_http", // address where to replicate
// "attempts": 1, // number of attempts for POST before failing on file
// "cdr_filter": "", // filter the CDRs being replicated
// "content_fields": [ // template of the replicated content fields
// {"tag": "CGRID", "type": "*composed", "value": "CGRID"},
// {"tag":"RunID", "type": "*composed", "value": "RunID"},
// {"tag":"TOR", "type": "*composed", "value": "ToR"},
// {"tag":"OriginID", "type": "*composed", "value": "OriginID"},
// {"tag":"RequestType", "type": "*composed", "value": "RequestType"},
// {"tag":"Direction", "type": "*composed", "value": "Direction"},
// {"tag":"Tenant", "type": "*composed", "value": "Tenant"},
// {"tag":"Category", "type": "*composed", "value": "Category"},
// {"tag":"Account", "type": "*composed", "value": "Account"},
// {"tag":"Subject", "type": "*composed", "value": "Subject"},
// {"tag":"Destination", "type": "*composed", "value": "Destination"},
// {"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
// {"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
// {"tag":"Usage", "type": "*composed", "value": "Usage"},
// {"tag":"Cost", "type": "*composed", "value": "Cost"},
// { // sample replication, not configured by default
// "transport": "*amqp_json_map", // mechanism to use when replicating
// "address": "http://127.0.0.1:12080/cdr_json_map", // address where to replicate
// "attempts": 1, // number of attempts for POST before failing on file
// "cdr_filter": "", // filter the CDRs being replicated
// "content_fields": [ // template of the replicated content fields
// {"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"},
// {"tag":"RunID", "type": "*composed", "value": "RunID", "field_id": "RunID"},
// {"tag":"TOR", "type": "*composed", "value": "ToR", "field_id": "ToR"},
// {"tag":"OriginID", "type": "*composed", "value": "OriginID", "field_id": "OriginID"},
// {"tag":"RequestType", "type": "*composed", "value": "RequestType", "field_id": "RequestType"},
// {"tag":"Direction", "type": "*composed", "value": "Direction", "field_id": "Direction"},
// {"tag":"Tenant", "type": "*composed", "value": "Tenant", "field_id": "Tenant"},
// {"tag":"Category", "type": "*composed", "value": "Category", "field_id": "Category"},
// {"tag":"Account", "type": "*composed", "value": "Account", "field_id": "Account"},
// {"tag":"Subject", "type": "*composed", "value": "Subject", "field_id": "Subject"},
// {"tag":"Destination", "type": "*composed", "value": "Destination", "field_id": "Destination"},
// {"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "SetupTime"},
// {"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "AnswerTime"},
// {"tag":"Usage", "type": "*composed", "value": "Usage", "field_id": "Usage"},
// {"tag":"Cost", "type": "*composed", "value": "Cost", "field_id": "Cost"},
// ],
// },
]

View File

@@ -134,27 +134,27 @@
// "aliases_conns": [], // address where to reach the aliases service, empty to disable aliases functionality: <""|*internal|x.y.z.y:1234>
// "cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality<""|*internal|x.y.z.y:1234>
// "cdr_replication":[
// // {
// // "transport": "*http_post", // mechanism to use when replicating
// // "address": "http://127.0.0.1:12080/cdr_http", // address where to replicate
// // "attempts": 1, // number of attempts for POST before failing on file
// // "cdr_filter": "", // filter the CDRs being replicated
// // "content_fields": [ // template of the replicated content fields
// // {"tag": "CGRID", "type": "*composed", "value": "CGRID"},
// // {"tag":"RunID", "type": "*composed", "value": "RunID"},
// // {"tag":"TOR", "type": "*composed", "value": "ToR"},
// // {"tag":"OriginID", "type": "*composed", "value": "OriginID"},
// // {"tag":"RequestType", "type": "*composed", "value": "RequestType"},
// // {"tag":"Direction", "type": "*composed", "value": "Direction"},
// // {"tag":"Tenant", "type": "*composed", "value": "Tenant"},
// // {"tag":"Category", "type": "*composed", "value": "Category"},
// // {"tag":"Account", "type": "*composed", "value": "Account"},
// // {"tag":"Subject", "type": "*composed", "value": "Subject"},
// // {"tag":"Destination", "type": "*composed", "value": "Destination"},
// // {"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
// // {"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
// // {"tag":"Usage", "type": "*composed", "value": "Usage"},
// // {"tag":"Cost", "type": "*composed", "value": "Cost"},
// // { // sample replication, not configured by default
// // "transport": "*amqp_json_map", // mechanism to use when replicating
// // "address": "http://127.0.0.1:12080/cdr_json_map", // address where to replicate
// // "attempts": 1, // number of attempts for POST before failing on file
// // "cdr_filter": "", // filter the CDRs being replicated
// // "content_fields": [ // template of the replicated content fields
// // {"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"},
// // {"tag":"RunID", "type": "*composed", "value": "RunID", "field_id": "RunID"},
// // {"tag":"TOR", "type": "*composed", "value": "ToR", "field_id": "ToR"},
// // {"tag":"OriginID", "type": "*composed", "value": "OriginID", "field_id": "OriginID"},
// // {"tag":"RequestType", "type": "*composed", "value": "RequestType", "field_id": "RequestType"},
// // {"tag":"Direction", "type": "*composed", "value": "Direction", "field_id": "Direction"},
// // {"tag":"Tenant", "type": "*composed", "value": "Tenant", "field_id": "Tenant"},
// // {"tag":"Category", "type": "*composed", "value": "Category", "field_id": "Category"},
// // {"tag":"Account", "type": "*composed", "value": "Account", "field_id": "Account"},
// // {"tag":"Subject", "type": "*composed", "value": "Subject", "field_id": "Subject"},
// // {"tag":"Destination", "type": "*composed", "value": "Destination", "field_id": "Destination"},
// // {"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "SetupTime"},
// // {"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "AnswerTime"},
// // {"tag":"Usage", "type": "*composed", "value": "Usage", "field_id": "Usage"},
// // {"tag":"Cost", "type": "*composed", "value": "Cost", "field_id": "Cost"},
// // ],
// // },
// ]

View File

@@ -501,9 +501,10 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error {
}
go func(body interface{}, rplCfg *config.CDRReplicationCfg, content string, errChan chan error) {
var err error
fallbackPath := path.Join(
self.cgrCfg.FailedPostsDir,
rplCfg.FallbackFileName())
fallbackPath := utils.META_NONE
if rplCfg.FallbackFileName() != utils.META_NONE {
fallbackPath = path.Join(self.cgrCfg.FailedPostsDir, rplCfg.FallbackFileName())
}
switch rplCfg.Transport {
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
_, err = self.httpPoster.Post(rplCfg.Address, utils.PosterTransportContentTypes[rplCfg.Transport], body, rplCfg.Attempts, fallbackPath)
@@ -519,8 +520,7 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error {
}
}
default:
utils.Logger.Warning(fmt.Sprintf("<CDRReplicator> Unsupported replication transport: %s", rplCfg.Transport))
return
err = fmt.Errorf("unsupported replication transport: %s", rplCfg.Transport)
}
if err != nil {
utils.Logger.Err(fmt.Sprintf(