From 0fd4ae8b36ca6fc9be83042a17b07051c8b8dd58 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 30 Jan 2017 20:40:39 +0100 Subject: [PATCH] ApierV1.ReplayFailedPosts --- apier/v1/apier.go | 78 +++++++++++++++++++++++++++++++++++++++ apier/v1/apier_it_test.go | 1 - engine/action.go | 6 ++- engine/cdrs.go | 5 +-- utils/consts.go | 14 ++++++- utils/httpclient.go | 24 ++++++------ utils/httpclient_test.go | 17 +++++++-- 7 files changed, 123 insertions(+), 22 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 146972255..8b656370c 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -20,6 +20,7 @@ package v1 import ( "errors" "fmt" + "io/ioutil" "os" "path" "strconv" @@ -1668,3 +1669,80 @@ func (v1 *ApierV1) StopService(args servmanager.ArgStartService, reply *string) func (v1 *ApierV1) ServiceStatus(args servmanager.ArgStartService, reply *string) (err error) { return v1.ServManager.V1ServiceStatus(args, reply) } + +type ArgsReplyFailedPosts struct { + FailedRequestsInDir *string // if defined it will be our source of requests to be replayed + FailedRequestsOutDir *string // if defined it will become our destination for files failing to be replayed, *none to be discarded + Posters []string // list of modules for which replay the requests, nil for all +} + +func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (err error) { + failedReqsInDir := v1.Config.FailedRequestsDir + if args.FailedRequestsInDir != nil && *args.FailedRequestsInDir != "" { + failedReqsInDir = *args.FailedRequestsInDir + } + failedReqsOutDir := failedReqsInDir + if args.FailedRequestsOutDir != nil && *args.FailedRequestsOutDir != "" { + failedReqsOutDir = *args.FailedRequestsOutDir + } + filesInDir, _ := ioutil.ReadDir(failedReqsInDir) + if len(filesInDir) == 0 { + return utils.ErrNotFound + } + for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config + filePath := path.Join(failedReqsInDir, file.Name()) + ffn, err := utils.NewFallbackFileNameFronString(file.Name()) + if err != nil { + return utils.NewErrServerError(err) + } + if len(args.Posters) != 0 { + var allowedModule bool + for _, mod := range args.Posters { + if strings.HasPrefix(ffn.Module, mod) { + allowedModule = true + break + } + } + if !allowedModule { + continue // this file is not to be processed due to Modules ACL + } + } + var fileContent []byte + _, err = guardian.Guardian.Guard(func() (interface{}, error) { + if fileContent, err = ioutil.ReadFile(filePath); err != nil { + return 0, err + } + if err := os.Remove(filePath); err != nil { + return 0, err + } + return 0, nil + }, v1.Config.LockingTimeout, utils.FileLockPrefix+filePath) + if err != nil { + return utils.NewErrServerError(err) + } + _, err = utils.NewHTTPPoster(v1.Config.HttpSkipTlsVerify, + v1.Config.ReplyTimeout).Post(ffn.Address, utils.PosterTransportContentTypes[ffn.Transport], fileContent, + v1.Config.HttpPosterAttempts, path.Join(failedReqsOutDir, file.Name())) + if err != nil { // Got error from HTTPPoster could be that content was not written, we need to write it ourselves + fileOutPath := path.Join(failedReqsOutDir, ffn.AsString()) + _, err := guardian.Guardian.Guard(func() (interface{}, error) { + if _, err := os.Stat(fileOutPath); err == nil || !os.IsNotExist(err) { + return 0, err + } + fileOut, err := os.Create(fileOutPath) + if err != nil { + return 0, err + } + defer fileOut.Close() + if _, err := fileOut.Write(fileContent); err != nil { + return 0, err + } + return 0, nil + }, v1.Config.LockingTimeout, utils.FileLockPrefix+fileOutPath) + if err != nil { + return utils.NewErrServerError(err) + } + } + } + return nil +} diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index e37c8674d..a20f4eb08 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -75,7 +75,6 @@ func TestApierLoadConfig(t *testing.T) { func TestApierCreateDirs(t *testing.T) { for _, pathDir := range []string{cfg.CdreProfiles[utils.META_DEFAULT].ExportDirectory, "/var/log/cgrates/cdrc/in", "/var/log/cgrates/cdrc/out", cfg.HistoryDir} { - if err := os.RemoveAll(pathDir); err != nil { t.Fatal("Error removing folder: ", pathDir, err) } diff --git a/engine/action.go b/engine/action.go index 49569c275..a07486949 100644 --- a/engine/action.go +++ b/engine/action.go @@ -413,7 +413,8 @@ func callUrl(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error return err } cfg := config.CgrConfig() - ffn := &utils.FallbackFileName{Module: "act/" + a.ActionType, Transport: utils.MetaHTTPjson, Address: a.ExtraParameters, + ffn := &utils.FallbackFileName{Module: fmt.Sprintf("%s/%s", utils.ActionsPoster, a.ActionType), + Transport: utils.MetaHTTPjson, Address: a.ExtraParameters, RequestID: utils.GenUUID(), FileSuffix: utils.JSNSuffix} _, err = utils.NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify, config.CgrConfig().ReplyTimeout).Post(a.ExtraParameters, utils.CONTENT_JSON, jsn, @@ -435,7 +436,8 @@ func callUrlAsync(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) return err } cfg := config.CgrConfig() - ffn := &utils.FallbackFileName{Module: "act/" + a.ActionType, Transport: utils.MetaHTTPjson, Address: a.ExtraParameters, + ffn := &utils.FallbackFileName{Module: fmt.Sprintf("%s/%s", utils.ActionsPoster, a.ActionType), + Transport: utils.MetaHTTPjson, Address: a.ExtraParameters, RequestID: utils.GenUUID(), FileSuffix: utils.JSNSuffix} go utils.NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify, config.CgrConfig().ReplyTimeout).Post(a.ExtraParameters, utils.CONTENT_JSON, jsn, diff --git a/engine/cdrs.go b/engine/cdrs.go index 9647fdd56..8a5042469 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -467,14 +467,12 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error { var content = "" switch rplCfg.Transport { case utils.MetaHTTPjsonCDR: - content = utils.CONTENT_JSON jsn, err := json.Marshal(cdr) if err != nil { return err } body = jsn case utils.MetaHTTPjsonMap: - content = utils.CONTENT_JSON expMp, err := cdr.AsExportMap(rplCfg.ContentFields, self.cgrCfg.HttpSkipTlsVerify, nil) if err != nil { return err @@ -485,7 +483,6 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error { } body = jsn case utils.META_HTTP_POST: - content = utils.CONTENT_FORM expMp, err := cdr.AsExportMap(rplCfg.ContentFields, self.cgrCfg.HttpSkipTlsVerify, nil) if err != nil { return err @@ -504,7 +501,7 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error { fallbackPath := path.Join( self.cgrCfg.FailedRequestsDir, rplCfg.FallbackFileName()) - if _, err := self.httpPoster.Post(rplCfg.Address, content, body, rplCfg.Attempts, fallbackPath); err != nil { + if _, err := self.httpPoster.Post(rplCfg.Address, utils.PosterTransportContentTypes[rplCfg.Transport], body, rplCfg.Attempts, fallbackPath); err != nil { utils.Logger.Err(fmt.Sprintf( " Replicating CDR: %+v, got error: %s", cdr, err.Error())) if rplCfg.Synchronous { diff --git a/utils/consts.go b/utils/consts.go index 466191e3a..da8a6c540 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -21,7 +21,13 @@ var ( CdreCdrFormats = []string{CSV, DRYRUN, CDRE_FIXED_WIDTH} PrimaryCdrFields = []string{CGRID, CDRSOURCE, CDRHOST, ACCID, TOR, REQTYPE, DIRECTION, TENANT, CATEGORY, ACCOUNT, SUBJECT, DESTINATION, SETUP_TIME, PDD, ANSWER_TIME, USAGE, SUPPLIER, DISCONNECT_CAUSE, COST, RATED, PartialField, MEDI_RUNID} - GitLastLog string // If set, it will be processed as part of versioning + GitLastLog string // If set, it will be processed as part of versioning + PosterTransportContentTypes = map[string]string{ + MetaHTTPjsonCDR: CONTENT_JSON, + MetaHTTPjsonMap: CONTENT_JSON, + MetaHTTPjson: CONTENT_JSON, + META_HTTP_POST: CONTENT_FORM, + } ) const ( @@ -338,4 +344,10 @@ const ( TxtSuffix = ".txt" JSNSuffix = ".json" FormSuffix = ".form" + CONTENT_JSON = "json" + CONTENT_FORM = "form" + CONTENT_TEXT = "text" + FileLockPrefix = "file_" + ActionsPoster = "act" + CDRPoster = "cdr" ) diff --git a/utils/httpclient.go b/utils/httpclient.go index b445d23d2..e812fd19e 100644 --- a/utils/httpclient.go +++ b/utils/httpclient.go @@ -30,18 +30,19 @@ import ( "time" ) -var ( - CONTENT_JSON = "json" - CONTENT_FORM = "form" - CONTENT_TEXT = "text" -) - // NewFallbackFileNameFronString will revert the meta information in the fallback file name into original data func NewFallbackFileNameFronString(fileName string) (ffn *FallbackFileName, err error) { ffn = new(FallbackFileName) - moduleIdx := strings.Index(fileName, "_") + moduleIdx := strings.Index(fileName, HandlerArgSep) ffn.Module = fileName[:moduleIdx] - if !IsSliceMember([]string{"cdr"}, ffn.Module) { + var supportedModule bool + for _, prfx := range []string{ActionsPoster, CDRPoster} { + if strings.HasPrefix(ffn.Module, prfx) { + supportedModule = true + break + } + } + if !supportedModule { return nil, fmt.Errorf("unsupported module: %s", ffn.Module) } fileNameWithoutModule := fileName[moduleIdx+1:] @@ -55,7 +56,7 @@ func NewFallbackFileNameFronString(fileName string) (ffn *FallbackFileName, err return nil, fmt.Errorf("unsupported transport in fallback file path: %s", fileName) } fileNameWithoutTransport := fileNameWithoutModule[len(ffn.Transport)+1:] - reqIDidx := strings.LastIndex(fileNameWithoutTransport, "_") + reqIDidx := strings.LastIndex(fileNameWithoutTransport, HandlerArgSep) if reqIDidx == -1 { return nil, fmt.Errorf("cannot find request ID in fallback file path: %s", fileName) } @@ -81,12 +82,12 @@ type FallbackFileName struct { Module string // name of the module writing the file Transport string // transport used to send data remotely Address string // remote address where data should have been sent - RequestID string // unique identifier of the request which should make files unique, should not contain _ character + RequestID string // unique identifier of the request which should make files unique FileSuffix string // informative file termination suffix } func (ffn *FallbackFileName) AsString() string { - return fmt.Sprintf("%s_%s_%s_%s%s", ffn.Module, ffn.Transport, url.QueryEscape(ffn.Address), ffn.RequestID, ffn.FileSuffix) + return fmt.Sprintf("%s%s%s%s%s%s%s%s", ffn.Module, HandlerArgSep, ffn.Transport, HandlerArgSep, url.QueryEscape(ffn.Address), HandlerArgSep, ffn.RequestID, ffn.FileSuffix) } // Converts interface to []byte @@ -179,6 +180,7 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac } return respBody, nil } + Logger.Debug(fmt.Sprintf(" Will failover on path: <%s>", fallbackFilePath)) // If we got that far, post was not possible, write it on disk fileOut, err := os.Create(fallbackFilePath) if err != nil { diff --git a/utils/httpclient_test.go b/utils/httpclient_test.go index ee18fae7e..98711aa98 100644 --- a/utils/httpclient_test.go +++ b/utils/httpclient_test.go @@ -23,7 +23,7 @@ import ( ) func TestFFNNewFallbackFileNameFronString(t *testing.T) { - fileName := "cdr_*http_json_cdr_http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json_1acce2c9-3f2d-4774-8662-c28872dad515.json" + fileName := "cdr|*http_json_cdr|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json|1acce2c9-3f2d-4774-8662-c28872dad515.json" eFFN := &FallbackFileName{Module: "cdr", Transport: MetaHTTPjsonCDR, Address: "http://127.0.0.1:12080/invalid_json", @@ -34,7 +34,7 @@ func TestFFNNewFallbackFileNameFronString(t *testing.T) { } else if !reflect.DeepEqual(eFFN, ffn) { t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn) } - fileName = "cdr_*http_post_http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_70c53d6d-dbd7-452e-a5bd-36bab59bb9ff.form" + fileName = "cdr|*http_post|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid|70c53d6d-dbd7-452e-a5bd-36bab59bb9ff.form" eFFN = &FallbackFileName{Module: "cdr", Transport: META_HTTP_POST, Address: "http://127.0.0.1:12080/invalid", @@ -45,10 +45,21 @@ func TestFFNNewFallbackFileNameFronString(t *testing.T) { } else if !reflect.DeepEqual(eFFN, ffn) { t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn) } + fileName = "act>*call_url|*http_json|http%3A%2F%2Flocalhost%3A2080%2Flog_warning|f52cf23e-da2f-4675-b36b-e8fcc3869270.json" + eFFN = &FallbackFileName{Module: "act>*call_url", + Transport: MetaHTTPjson, + Address: "http://localhost:2080/log_warning", + RequestID: "f52cf23e-da2f-4675-b36b-e8fcc3869270", + FileSuffix: JSNSuffix} + if ffn, err := NewFallbackFileNameFronString(fileName); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eFFN, ffn) { + t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn) + } } func TestFFNFallbackFileNameAsString(t *testing.T) { - eFn := "cdr_*http_json_cdr_http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json_1acce2c9-3f2d-4774-8662-c28872dad515.json" + eFn := "cdr|*http_json_cdr|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json|1acce2c9-3f2d-4774-8662-c28872dad515.json" ffn := &FallbackFileName{ Module: "cdr", Transport: MetaHTTPjsonCDR,