diff --git a/apier/v1/apier.go b/apier/v1/apier.go index bdb4bde6c..62dda9272 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -34,6 +34,7 @@ import ( "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" + "github.com/streadway/amqp" ) const ( @@ -1673,9 +1674,11 @@ func (v1 *ApierV1) ServiceStatus(args servmanager.ArgStartService, reply *string type ArgsReplyFailedPosts struct { FailedRequestsInDir *string // if defined it will be our source of requests to be replayed FailedRequestsOutDir *string // if defined it will become our destination for files failing to be replayed, *none to be discarded - Posters []string // list of modules for which replay the requests, nil for all + Modules []string // list of modules for which replay the requests, nil for all + Transports []string // list of transports } +// ReplayFailedPosts will repost failed requests found in the FailedRequestsInDir func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (err error) { failedReqsInDir := v1.Config.FailedPostsDir if args.FailedRequestsInDir != nil && *args.FailedRequestsInDir != "" { @@ -1695,9 +1698,9 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) ( if err != nil { return utils.NewErrServerError(err) } - if len(args.Posters) != 0 { + if len(args.Modules) != 0 { var allowedModule bool - for _, mod := range args.Posters { + for _, mod := range args.Modules { if strings.HasPrefix(ffn.Module, mod) { allowedModule = true break @@ -1707,6 +1710,9 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) ( continue // this file is not to be processed due to Modules ACL } } + if len(args.Transports) != 0 && !utils.IsSliceMember(args.Transports, ffn.Transport) { + continue // this file is not to be processed due to Transports ACL + } var fileContent []byte _, err = guardian.Guardian.Guard(func() (interface{}, error) { if fileContent, err = ioutil.ReadFile(filePath); err != nil { @@ -1724,9 +1730,25 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) ( if failedReqsOutDir != utils.META_NONE { failoverPath = path.Join(failedReqsOutDir, file.Name()) } - _, err = utils.NewHTTPPoster(v1.Config.HttpSkipTlsVerify, - v1.Config.ReplyTimeout).Post(ffn.Address, utils.PosterTransportContentTypes[ffn.Transport], fileContent, - v1.Config.PosterAttempts, failoverPath) + switch ffn.Transport { + case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST: + _, err = utils.NewHTTPPoster(v1.Config.HttpSkipTlsVerify, + v1.Config.ReplyTimeout).Post(ffn.Address, utils.PosterTransportContentTypes[ffn.Transport], fileContent, + v1.Config.PosterAttempts, failoverPath) + case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap: + var amqpPoster *utils.AMQPPoster + amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(ffn.Address, v1.Config.PosterAttempts, failedReqsOutDir) + if err == nil { // error will be checked bellow + var chn *amqp.Channel + chn, err = amqpPoster.Post( + nil, utils.PosterTransportContentTypes[ffn.Transport], fileContent, file.Name()) + if chn != nil { + chn.Close() + } + } + default: + err = fmt.Errorf("unsupported replication transport: %s", ffn.Transport) + } if err != nil && failedReqsOutDir != utils.META_NONE { // Got error from HTTPPoster could be that content was not written, we need to write it ourselves _, err := guardian.Guardian.Guard(func() (interface{}, error) { if _, err := os.Stat(failoverPath); err == nil || !os.IsNotExist(err) { diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index c2b78a83c..40ebeca3c 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -40,6 +40,7 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" + "github.com/streadway/amqp" ) // ToDo: Replace rpc.Client with internal rpc server and Apier using internal map as both data and stor so we can run the tests non-local @@ -1698,10 +1699,10 @@ func TestApierReplayFailedPosts(t *testing.T) { if err != nil { t.Error(err) } - defer fileOut.Close() if _, err := fileOut.Write(fileContent); err != nil { t.Error(err) } + fileOut.Close() var reply string if err := rater.Call("ApierV1.ReplayFailedPosts", args, &reply); err != nil { t.Error(err) @@ -1714,11 +1715,65 @@ func TestApierReplayFailedPosts(t *testing.T) { } else if !reflect.DeepEqual(fileContent, outContent) { t.Errorf("Expecting: %q, received: %q", string(fileContent), string(outContent)) } + fileName = "cdr|*amqp_json_map|amqp%3A%2F%2Fguest%3Aguest%40localhost%3A5672%2F%3Fqueue_id%3Dcgrates_cdrs|ae8cc4b3-5e60-4396-b82a-64b96a72a03c.json" + fileContent = []byte(`{"CGRID":"88ed9c38005f07576a1e1af293063833b60edcc6"}`) + fileInPath := path.Join(*args.FailedRequestsInDir, fileName) + fileOut, err = os.Create(fileInPath) + if err != nil { + t.Error(err) + } + if _, err := fileOut.Write(fileContent); err != nil { + t.Error(err) + } + fileOut.Close() + if err := rater.Call("ApierV1.ReplayFailedPosts", args, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Error("Unexpected reply: ", reply) + } + if _, err := os.Stat(fileInPath); !os.IsNotExist(err) { + t.Error("InFile still exists") + } + if _, err := os.Stat(path.Join(*args.FailedRequestsOutDir, fileName)); !os.IsNotExist(err) { + t.Error("OutFile created") + } + // connect to RabbitMQ server and check if the content was posted there + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + ch, err := conn.Channel() + if err != nil { + t.Fatal(err) + } + defer ch.Close() + q, err := ch.QueueDeclare("cgrates_cdrs", true, false, false, false, nil) + if err != nil { + t.Fatal(err) + } + msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil) + if err != nil { + t.Fatal(err) + } + select { + case d := <-msgs: + var rcvCDR map[string]string + if err := json.Unmarshal(d.Body, &rcvCDR); err != nil { + t.Error(err) + } + if rcvCDR[utils.CGRID] != "88ed9c38005f07576a1e1af293063833b60edcc6" { + t.Errorf("Unexpected CDR received: %+v", rcvCDR) + } + case <-time.After(time.Duration(100 * time.Millisecond)): + t.Error("No message received from RabbitMQ") + } for _, dir := range []string{*args.FailedRequestsInDir, *args.FailedRequestsOutDir} { if err := os.RemoveAll(dir); err != nil { t.Errorf("Error %s removing folder: %s", err, dir) } } + } // Simply kill the engine after we are done with tests within this file diff --git a/config/config_defaults.go b/config/config_defaults.go index ba3ccb150..6d01a8c49 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -154,27 +154,27 @@ const CGRATES_CFG_JSON = ` "aliases_conns": [], // address where to reach the aliases service, empty to disable aliases functionality: <""|*internal|x.y.z.y:1234> "cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality<""|*internal|x.y.z.y:1234> "cdr_replication":[ -// { -// "transport": "*http_post", // mechanism to use when replicating -// "address": "http://127.0.0.1:12080/cdr_http", // address where to replicate -// "attempts": 1, // number of attempts for POST before failing on file -// "cdr_filter": "", // filter the CDRs being replicated -// "content_fields": [ // template of the replicated content fields -// {"tag": "CGRID", "type": "*composed", "value": "CGRID"}, -// {"tag":"RunID", "type": "*composed", "value": "RunID"}, -// {"tag":"TOR", "type": "*composed", "value": "ToR"}, -// {"tag":"OriginID", "type": "*composed", "value": "OriginID"}, -// {"tag":"RequestType", "type": "*composed", "value": "RequestType"}, -// {"tag":"Direction", "type": "*composed", "value": "Direction"}, -// {"tag":"Tenant", "type": "*composed", "value": "Tenant"}, -// {"tag":"Category", "type": "*composed", "value": "Category"}, -// {"tag":"Account", "type": "*composed", "value": "Account"}, -// {"tag":"Subject", "type": "*composed", "value": "Subject"}, -// {"tag":"Destination", "type": "*composed", "value": "Destination"}, -// {"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, -// {"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, -// {"tag":"Usage", "type": "*composed", "value": "Usage"}, -// {"tag":"Cost", "type": "*composed", "value": "Cost"}, +// { // sample replication, not configured by default +// "transport": "*amqp_json_map", // mechanism to use when replicating +// "address": "http://127.0.0.1:12080/cdr_json_map", // address where to replicate +// "attempts": 1, // number of attempts for POST before failing on file +// "cdr_filter": "", // filter the CDRs being replicated +// "content_fields": [ // template of the replicated content fields +// {"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"}, +// {"tag":"RunID", "type": "*composed", "value": "RunID", "field_id": "RunID"}, +// {"tag":"TOR", "type": "*composed", "value": "ToR", "field_id": "ToR"}, +// {"tag":"OriginID", "type": "*composed", "value": "OriginID", "field_id": "OriginID"}, +// {"tag":"RequestType", "type": "*composed", "value": "RequestType", "field_id": "RequestType"}, +// {"tag":"Direction", "type": "*composed", "value": "Direction", "field_id": "Direction"}, +// {"tag":"Tenant", "type": "*composed", "value": "Tenant", "field_id": "Tenant"}, +// {"tag":"Category", "type": "*composed", "value": "Category", "field_id": "Category"}, +// {"tag":"Account", "type": "*composed", "value": "Account", "field_id": "Account"}, +// {"tag":"Subject", "type": "*composed", "value": "Subject", "field_id": "Subject"}, +// {"tag":"Destination", "type": "*composed", "value": "Destination", "field_id": "Destination"}, +// {"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "SetupTime"}, +// {"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "AnswerTime"}, +// {"tag":"Usage", "type": "*composed", "value": "Usage", "field_id": "Usage"}, +// {"tag":"Cost", "type": "*composed", "value": "Cost", "field_id": "Cost"}, // ], // }, ] diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index e4660bce0..dfea2aea6 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -134,27 +134,27 @@ // "aliases_conns": [], // address where to reach the aliases service, empty to disable aliases functionality: <""|*internal|x.y.z.y:1234> // "cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality<""|*internal|x.y.z.y:1234> // "cdr_replication":[ -// // { -// // "transport": "*http_post", // mechanism to use when replicating -// // "address": "http://127.0.0.1:12080/cdr_http", // address where to replicate -// // "attempts": 1, // number of attempts for POST before failing on file -// // "cdr_filter": "", // filter the CDRs being replicated -// // "content_fields": [ // template of the replicated content fields -// // {"tag": "CGRID", "type": "*composed", "value": "CGRID"}, -// // {"tag":"RunID", "type": "*composed", "value": "RunID"}, -// // {"tag":"TOR", "type": "*composed", "value": "ToR"}, -// // {"tag":"OriginID", "type": "*composed", "value": "OriginID"}, -// // {"tag":"RequestType", "type": "*composed", "value": "RequestType"}, -// // {"tag":"Direction", "type": "*composed", "value": "Direction"}, -// // {"tag":"Tenant", "type": "*composed", "value": "Tenant"}, -// // {"tag":"Category", "type": "*composed", "value": "Category"}, -// // {"tag":"Account", "type": "*composed", "value": "Account"}, -// // {"tag":"Subject", "type": "*composed", "value": "Subject"}, -// // {"tag":"Destination", "type": "*composed", "value": "Destination"}, -// // {"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, -// // {"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, -// // {"tag":"Usage", "type": "*composed", "value": "Usage"}, -// // {"tag":"Cost", "type": "*composed", "value": "Cost"}, +// // { // sample replication, not configured by default +// // "transport": "*amqp_json_map", // mechanism to use when replicating +// // "address": "http://127.0.0.1:12080/cdr_json_map", // address where to replicate +// // "attempts": 1, // number of attempts for POST before failing on file +// // "cdr_filter": "", // filter the CDRs being replicated +// // "content_fields": [ // template of the replicated content fields +// // {"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"}, +// // {"tag":"RunID", "type": "*composed", "value": "RunID", "field_id": "RunID"}, +// // {"tag":"TOR", "type": "*composed", "value": "ToR", "field_id": "ToR"}, +// // {"tag":"OriginID", "type": "*composed", "value": "OriginID", "field_id": "OriginID"}, +// // {"tag":"RequestType", "type": "*composed", "value": "RequestType", "field_id": "RequestType"}, +// // {"tag":"Direction", "type": "*composed", "value": "Direction", "field_id": "Direction"}, +// // {"tag":"Tenant", "type": "*composed", "value": "Tenant", "field_id": "Tenant"}, +// // {"tag":"Category", "type": "*composed", "value": "Category", "field_id": "Category"}, +// // {"tag":"Account", "type": "*composed", "value": "Account", "field_id": "Account"}, +// // {"tag":"Subject", "type": "*composed", "value": "Subject", "field_id": "Subject"}, +// // {"tag":"Destination", "type": "*composed", "value": "Destination", "field_id": "Destination"}, +// // {"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "SetupTime"}, +// // {"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "AnswerTime"}, +// // {"tag":"Usage", "type": "*composed", "value": "Usage", "field_id": "Usage"}, +// // {"tag":"Cost", "type": "*composed", "value": "Cost", "field_id": "Cost"}, // // ], // // }, // ] diff --git a/engine/cdrs.go b/engine/cdrs.go index d31bdc960..8ec54b2e0 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -501,9 +501,10 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error { } go func(body interface{}, rplCfg *config.CDRReplicationCfg, content string, errChan chan error) { var err error - fallbackPath := path.Join( - self.cgrCfg.FailedPostsDir, - rplCfg.FallbackFileName()) + fallbackPath := utils.META_NONE + if rplCfg.FallbackFileName() != utils.META_NONE { + fallbackPath = path.Join(self.cgrCfg.FailedPostsDir, rplCfg.FallbackFileName()) + } switch rplCfg.Transport { case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST: _, err = self.httpPoster.Post(rplCfg.Address, utils.PosterTransportContentTypes[rplCfg.Transport], body, rplCfg.Attempts, fallbackPath) @@ -519,8 +520,7 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error { } } default: - utils.Logger.Warning(fmt.Sprintf(" Unsupported replication transport: %s", rplCfg.Transport)) - return + err = fmt.Errorf("unsupported replication transport: %s", rplCfg.Transport) } if err != nil { utils.Logger.Err(fmt.Sprintf(