ApierV1.ReplayFailedPosts

This commit is contained in:
DanB
2017-01-30 20:40:39 +01:00
parent 55980e0536
commit 0fd4ae8b36
7 changed files with 123 additions and 22 deletions

View File

@@ -20,6 +20,7 @@ package v1
import (
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"strconv"
@@ -1668,3 +1669,80 @@ func (v1 *ApierV1) StopService(args servmanager.ArgStartService, reply *string)
func (v1 *ApierV1) ServiceStatus(args servmanager.ArgStartService, reply *string) (err error) {
return v1.ServManager.V1ServiceStatus(args, reply)
}
type ArgsReplyFailedPosts struct {
FailedRequestsInDir *string // if defined it will be our source of requests to be replayed
FailedRequestsOutDir *string // if defined it will become our destination for files failing to be replayed, *none to be discarded
Posters []string // list of modules for which replay the requests, nil for all
}
func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (err error) {
failedReqsInDir := v1.Config.FailedRequestsDir
if args.FailedRequestsInDir != nil && *args.FailedRequestsInDir != "" {
failedReqsInDir = *args.FailedRequestsInDir
}
failedReqsOutDir := failedReqsInDir
if args.FailedRequestsOutDir != nil && *args.FailedRequestsOutDir != "" {
failedReqsOutDir = *args.FailedRequestsOutDir
}
filesInDir, _ := ioutil.ReadDir(failedReqsInDir)
if len(filesInDir) == 0 {
return utils.ErrNotFound
}
for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config
filePath := path.Join(failedReqsInDir, file.Name())
ffn, err := utils.NewFallbackFileNameFronString(file.Name())
if err != nil {
return utils.NewErrServerError(err)
}
if len(args.Posters) != 0 {
var allowedModule bool
for _, mod := range args.Posters {
if strings.HasPrefix(ffn.Module, mod) {
allowedModule = true
break
}
}
if !allowedModule {
continue // this file is not to be processed due to Modules ACL
}
}
var fileContent []byte
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
if fileContent, err = ioutil.ReadFile(filePath); err != nil {
return 0, err
}
if err := os.Remove(filePath); err != nil {
return 0, err
}
return 0, nil
}, v1.Config.LockingTimeout, utils.FileLockPrefix+filePath)
if err != nil {
return utils.NewErrServerError(err)
}
_, err = utils.NewHTTPPoster(v1.Config.HttpSkipTlsVerify,
v1.Config.ReplyTimeout).Post(ffn.Address, utils.PosterTransportContentTypes[ffn.Transport], fileContent,
v1.Config.HttpPosterAttempts, path.Join(failedReqsOutDir, file.Name()))
if err != nil { // Got error from HTTPPoster could be that content was not written, we need to write it ourselves
fileOutPath := path.Join(failedReqsOutDir, ffn.AsString())
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
if _, err := os.Stat(fileOutPath); err == nil || !os.IsNotExist(err) {
return 0, err
}
fileOut, err := os.Create(fileOutPath)
if err != nil {
return 0, err
}
defer fileOut.Close()
if _, err := fileOut.Write(fileContent); err != nil {
return 0, err
}
return 0, nil
}, v1.Config.LockingTimeout, utils.FileLockPrefix+fileOutPath)
if err != nil {
return utils.NewErrServerError(err)
}
}
}
return nil
}

View File

@@ -75,7 +75,6 @@ func TestApierLoadConfig(t *testing.T) {
func TestApierCreateDirs(t *testing.T) {
for _, pathDir := range []string{cfg.CdreProfiles[utils.META_DEFAULT].ExportDirectory, "/var/log/cgrates/cdrc/in", "/var/log/cgrates/cdrc/out", cfg.HistoryDir} {
if err := os.RemoveAll(pathDir); err != nil {
t.Fatal("Error removing folder: ", pathDir, err)
}

View File

@@ -413,7 +413,8 @@ func callUrl(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error
return err
}
cfg := config.CgrConfig()
ffn := &utils.FallbackFileName{Module: "act/" + a.ActionType, Transport: utils.MetaHTTPjson, Address: a.ExtraParameters,
ffn := &utils.FallbackFileName{Module: fmt.Sprintf("%s/%s", utils.ActionsPoster, a.ActionType),
Transport: utils.MetaHTTPjson, Address: a.ExtraParameters,
RequestID: utils.GenUUID(), FileSuffix: utils.JSNSuffix}
_, err = utils.NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify,
config.CgrConfig().ReplyTimeout).Post(a.ExtraParameters, utils.CONTENT_JSON, jsn,
@@ -435,7 +436,8 @@ func callUrlAsync(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions)
return err
}
cfg := config.CgrConfig()
ffn := &utils.FallbackFileName{Module: "act/" + a.ActionType, Transport: utils.MetaHTTPjson, Address: a.ExtraParameters,
ffn := &utils.FallbackFileName{Module: fmt.Sprintf("%s/%s", utils.ActionsPoster, a.ActionType),
Transport: utils.MetaHTTPjson, Address: a.ExtraParameters,
RequestID: utils.GenUUID(), FileSuffix: utils.JSNSuffix}
go utils.NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify,
config.CgrConfig().ReplyTimeout).Post(a.ExtraParameters, utils.CONTENT_JSON, jsn,

View File

@@ -467,14 +467,12 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error {
var content = ""
switch rplCfg.Transport {
case utils.MetaHTTPjsonCDR:
content = utils.CONTENT_JSON
jsn, err := json.Marshal(cdr)
if err != nil {
return err
}
body = jsn
case utils.MetaHTTPjsonMap:
content = utils.CONTENT_JSON
expMp, err := cdr.AsExportMap(rplCfg.ContentFields, self.cgrCfg.HttpSkipTlsVerify, nil)
if err != nil {
return err
@@ -485,7 +483,6 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error {
}
body = jsn
case utils.META_HTTP_POST:
content = utils.CONTENT_FORM
expMp, err := cdr.AsExportMap(rplCfg.ContentFields, self.cgrCfg.HttpSkipTlsVerify, nil)
if err != nil {
return err
@@ -504,7 +501,7 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error {
fallbackPath := path.Join(
self.cgrCfg.FailedRequestsDir,
rplCfg.FallbackFileName())
if _, err := self.httpPoster.Post(rplCfg.Address, content, body, rplCfg.Attempts, fallbackPath); err != nil {
if _, err := self.httpPoster.Post(rplCfg.Address, utils.PosterTransportContentTypes[rplCfg.Transport], body, rplCfg.Attempts, fallbackPath); err != nil {
utils.Logger.Err(fmt.Sprintf(
"<CDRReplicator> Replicating CDR: %+v, got error: %s", cdr, err.Error()))
if rplCfg.Synchronous {

View File

@@ -21,7 +21,13 @@ var (
CdreCdrFormats = []string{CSV, DRYRUN, CDRE_FIXED_WIDTH}
PrimaryCdrFields = []string{CGRID, CDRSOURCE, CDRHOST, ACCID, TOR, REQTYPE, DIRECTION, TENANT, CATEGORY, ACCOUNT, SUBJECT, DESTINATION, SETUP_TIME, PDD, ANSWER_TIME, USAGE,
SUPPLIER, DISCONNECT_CAUSE, COST, RATED, PartialField, MEDI_RUNID}
GitLastLog string // If set, it will be processed as part of versioning
GitLastLog string // If set, it will be processed as part of versioning
PosterTransportContentTypes = map[string]string{
MetaHTTPjsonCDR: CONTENT_JSON,
MetaHTTPjsonMap: CONTENT_JSON,
MetaHTTPjson: CONTENT_JSON,
META_HTTP_POST: CONTENT_FORM,
}
)
const (
@@ -338,4 +344,10 @@ const (
TxtSuffix = ".txt"
JSNSuffix = ".json"
FormSuffix = ".form"
CONTENT_JSON = "json"
CONTENT_FORM = "form"
CONTENT_TEXT = "text"
FileLockPrefix = "file_"
ActionsPoster = "act"
CDRPoster = "cdr"
)

View File

@@ -30,18 +30,19 @@ import (
"time"
)
var (
CONTENT_JSON = "json"
CONTENT_FORM = "form"
CONTENT_TEXT = "text"
)
// NewFallbackFileNameFronString will revert the meta information in the fallback file name into original data
func NewFallbackFileNameFronString(fileName string) (ffn *FallbackFileName, err error) {
ffn = new(FallbackFileName)
moduleIdx := strings.Index(fileName, "_")
moduleIdx := strings.Index(fileName, HandlerArgSep)
ffn.Module = fileName[:moduleIdx]
if !IsSliceMember([]string{"cdr"}, ffn.Module) {
var supportedModule bool
for _, prfx := range []string{ActionsPoster, CDRPoster} {
if strings.HasPrefix(ffn.Module, prfx) {
supportedModule = true
break
}
}
if !supportedModule {
return nil, fmt.Errorf("unsupported module: %s", ffn.Module)
}
fileNameWithoutModule := fileName[moduleIdx+1:]
@@ -55,7 +56,7 @@ func NewFallbackFileNameFronString(fileName string) (ffn *FallbackFileName, err
return nil, fmt.Errorf("unsupported transport in fallback file path: %s", fileName)
}
fileNameWithoutTransport := fileNameWithoutModule[len(ffn.Transport)+1:]
reqIDidx := strings.LastIndex(fileNameWithoutTransport, "_")
reqIDidx := strings.LastIndex(fileNameWithoutTransport, HandlerArgSep)
if reqIDidx == -1 {
return nil, fmt.Errorf("cannot find request ID in fallback file path: %s", fileName)
}
@@ -81,12 +82,12 @@ type FallbackFileName struct {
Module string // name of the module writing the file
Transport string // transport used to send data remotely
Address string // remote address where data should have been sent
RequestID string // unique identifier of the request which should make files unique, should not contain _ character
RequestID string // unique identifier of the request which should make files unique
FileSuffix string // informative file termination suffix
}
func (ffn *FallbackFileName) AsString() string {
return fmt.Sprintf("%s_%s_%s_%s%s", ffn.Module, ffn.Transport, url.QueryEscape(ffn.Address), ffn.RequestID, ffn.FileSuffix)
return fmt.Sprintf("%s%s%s%s%s%s%s%s", ffn.Module, HandlerArgSep, ffn.Transport, HandlerArgSep, url.QueryEscape(ffn.Address), HandlerArgSep, ffn.RequestID, ffn.FileSuffix)
}
// Converts interface to []byte
@@ -179,6 +180,7 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac
}
return respBody, nil
}
Logger.Debug(fmt.Sprintf("<HTTPPoster> Will failover on path: <%s>", fallbackFilePath))
// If we got that far, post was not possible, write it on disk
fileOut, err := os.Create(fallbackFilePath)
if err != nil {

View File

@@ -23,7 +23,7 @@ import (
)
func TestFFNNewFallbackFileNameFronString(t *testing.T) {
fileName := "cdr_*http_json_cdr_http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json_1acce2c9-3f2d-4774-8662-c28872dad515.json"
fileName := "cdr|*http_json_cdr|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json|1acce2c9-3f2d-4774-8662-c28872dad515.json"
eFFN := &FallbackFileName{Module: "cdr",
Transport: MetaHTTPjsonCDR,
Address: "http://127.0.0.1:12080/invalid_json",
@@ -34,7 +34,7 @@ func TestFFNNewFallbackFileNameFronString(t *testing.T) {
} else if !reflect.DeepEqual(eFFN, ffn) {
t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn)
}
fileName = "cdr_*http_post_http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_70c53d6d-dbd7-452e-a5bd-36bab59bb9ff.form"
fileName = "cdr|*http_post|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid|70c53d6d-dbd7-452e-a5bd-36bab59bb9ff.form"
eFFN = &FallbackFileName{Module: "cdr",
Transport: META_HTTP_POST,
Address: "http://127.0.0.1:12080/invalid",
@@ -45,10 +45,21 @@ func TestFFNNewFallbackFileNameFronString(t *testing.T) {
} else if !reflect.DeepEqual(eFFN, ffn) {
t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn)
}
fileName = "act>*call_url|*http_json|http%3A%2F%2Flocalhost%3A2080%2Flog_warning|f52cf23e-da2f-4675-b36b-e8fcc3869270.json"
eFFN = &FallbackFileName{Module: "act>*call_url",
Transport: MetaHTTPjson,
Address: "http://localhost:2080/log_warning",
RequestID: "f52cf23e-da2f-4675-b36b-e8fcc3869270",
FileSuffix: JSNSuffix}
if ffn, err := NewFallbackFileNameFronString(fileName); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eFFN, ffn) {
t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn)
}
}
func TestFFNFallbackFileNameAsString(t *testing.T) {
eFn := "cdr_*http_json_cdr_http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json_1acce2c9-3f2d-4774-8662-c28872dad515.json"
eFn := "cdr|*http_json_cdr|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json|1acce2c9-3f2d-4774-8662-c28872dad515.json"
ffn := &FallbackFileName{
Module: "cdr",
Transport: MetaHTTPjsonCDR,