From 55c862d286eba4bda6d6be97f529703c0205ffe3 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 1 Jul 2018 11:50:21 +0200 Subject: [PATCH] Moving Poster into engine so we can free up guardian in utils --- apier/v1/apier.go | 8 +-- apier/v1/filter_indexes.go | 18 ++--- cdrc/csv.go | 2 +- cdrc/fwv.go | 2 +- cdrc/xml.go | 2 +- cmd/cgr-engine/rater.go | 2 +- engine/action.go | 4 +- engine/cdr.go | 2 +- engine/cdre.go | 8 +-- engine/cdrs.go | 4 +- {utils => engine}/poster.go | 100 ++++++---------------------- {utils => engine}/poster_it_test.go | 2 +- engine/poster_test.go | 18 +++++ engine/pubsub.go | 2 +- engine/thresholds.go | 3 - utils/coreutils.go | 64 ++++++++++++++++++ utils/coreutils_test.go | 49 ++++++++++++++ utils/poster_test.go | 72 -------------------- 18 files changed, 179 insertions(+), 183 deletions(-) rename {utils => engine}/poster.go (66%) rename {utils => engine}/poster_it_test.go (99%) create mode 100644 engine/poster_test.go delete mode 100644 utils/poster_test.go diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 5213096ce..28b3a8f4f 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -51,7 +51,7 @@ type ApierV1 struct { Users rpcclient.RpcClientConnection CDRs rpcclient.RpcClientConnection // FixMe: populate it from cgr-engine ServManager *servmanager.ServiceManager // Need to have them capitalize so we can export in V2 - HTTPPoster *utils.HTTPPoster + HTTPPoster *engine.HTTPPoster } func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error { @@ -2106,12 +2106,12 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) ( } switch ffn.Transport { case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST: - _, err = utils.NewHTTPPoster(v1.Config.HttpSkipTlsVerify, + _, err = engine.NewHTTPPoster(v1.Config.HttpSkipTlsVerify, v1.Config.ReplyTimeout).Post(ffn.Address, utils.PosterTransportContentTypes[ffn.Transport], fileContent, v1.Config.PosterAttempts, failoverPath) case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap: - var amqpPoster *utils.AMQPPoster - amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(ffn.Address, v1.Config.PosterAttempts, failedReqsOutDir) + var amqpPoster *engine.AMQPPoster + amqpPoster, err = engine.AMQPPostersCache.GetAMQPPoster(ffn.Address, v1.Config.PosterAttempts, failedReqsOutDir) if err == nil { // error will be checked bellow var chn *amqp.Channel chn, err = amqpPoster.Post( diff --git a/apier/v1/filter_indexes.go b/apier/v1/filter_indexes.go index fcfeb5269..5d7cc3a8b 100644 --- a/apier/v1/filter_indexes.go +++ b/apier/v1/filter_indexes.go @@ -295,36 +295,38 @@ func (self *ApierV1) ComputeFilterIndexes(args utils.ArgsComputeFilterIndexes, r transactionID := utils.GenUUID() //ThresholdProfile Indexes thdsIndexers, err := self.computeThresholdIndexes(args.Tenant, args.ThresholdIDs, transactionID) - if err != nil { + if err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } //StatQueueProfile Indexes sqpIndexers, err := self.computeStatIndexes(args.Tenant, args.StatIDs, transactionID) - if err != nil { + if err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } //ResourceProfile Indexes rsIndexes, err := self.computeResourceIndexes(args.Tenant, args.ResourceIDs, transactionID) - if err != nil { + if err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } //SupplierProfile Indexes sppIndexes, err := self.computeSupplierIndexes(args.Tenant, args.SupplierIDs, transactionID) - if err != nil { + if err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } //AttributeProfile Indexes attrIndexes, err := self.computeAttributeIndexes(args.Tenant, args.AttributeIDs, transactionID) - if err != nil { + if err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } //Now we move from tmpKey to the right key for each type //ThresholdProfile Indexes if thdsIndexers != nil { if err := thdsIndexers.StoreIndexes(true, transactionID); err != nil { - for _, id := range *args.ThresholdIDs { - if err := thdsIndexers.RemoveItemFromIndex(id); err != nil { - return err + if args.ThresholdIDs != nil { + for _, id := range *args.ThresholdIDs { + if err := thdsIndexers.RemoveItemFromIndex(id); err != nil { + return err + } } } return err diff --git a/cdrc/csv.go b/cdrc/csv.go index 02e2894e5..e11240aed 100644 --- a/cdrc/csv.go +++ b/cdrc/csv.go @@ -223,7 +223,7 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *con if err != nil { return nil, err } - if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory { + if outValByte, err = engine.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory { return nil, err } else { fieldVal = string(outValByte) diff --git a/cdrc/fwv.go b/cdrc/fwv.go index 585020714..0e38263ee 100644 --- a/cdrc/fwv.go +++ b/cdrc/fwv.go @@ -236,7 +236,7 @@ func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cdrcCfg *confi if err != nil { return nil, err } - if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory { + if outValByte, err = engine.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory { return nil, err } else { fieldVal = string(outValByte) diff --git a/cdrc/xml.go b/cdrc/xml.go index 6c7cf2ff6..ac7a5088b 100644 --- a/cdrc/xml.go +++ b/cdrc/xml.go @@ -221,7 +221,7 @@ func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity tree.Res, cdrcCfg *con if err != nil { return nil, err } - if outValByte, err = utils.HttpJsonPost(httpAddr, xmlProc.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory { + if outValByte, err = engine.HttpJsonPost(httpAddr, xmlProc.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory { return nil, err } else { fieldVal = string(outValByte) diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index 0b8dd4388..9b600169d 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -188,7 +188,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en responder.SetTimeToLive(cfg.ResponseCacheTTL, nil) apierRpcV1 := &v1.ApierV1{StorDb: loadDb, DataManager: dm, CdrDb: cdrDb, Config: cfg, Responder: responder, ServManager: serviceManager, - HTTPPoster: utils.NewHTTPPoster(cfg.HttpSkipTlsVerify, cfg.ReplyTimeout)} + HTTPPoster: engine.NewHTTPPoster(cfg.HttpSkipTlsVerify, cfg.ReplyTimeout)} if thdS != nil { engine.SetThresholdS(thdS) // temporary architectural fix until we will have separate AccountS } diff --git a/engine/action.go b/engine/action.go index 8bef3b130..5152bdcf2 100644 --- a/engine/action.go +++ b/engine/action.go @@ -510,7 +510,7 @@ func callUrl(ub *Account, sq *CDRStatsQueueTriggered, a *Action, acs Actions) er ffn := &utils.FallbackFileName{Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType), Transport: utils.MetaHTTPjson, Address: a.ExtraParameters, RequestID: utils.GenUUID(), FileSuffix: utils.JSNSuffix} - _, err = utils.NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify, + _, err = NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify, config.CgrConfig().ReplyTimeout).Post(a.ExtraParameters, utils.CONTENT_JSON, jsn, config.CgrConfig().PosterAttempts, path.Join(cfg.FailedPostsDir, ffn.AsString())) return err @@ -533,7 +533,7 @@ func callUrlAsync(ub *Account, sq *CDRStatsQueueTriggered, a *Action, acs Action ffn := &utils.FallbackFileName{Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType), Transport: utils.MetaHTTPjson, Address: a.ExtraParameters, RequestID: utils.GenUUID(), FileSuffix: utils.JSNSuffix} - go utils.NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify, + go NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify, config.CgrConfig().ReplyTimeout).Post(a.ExtraParameters, utils.CONTENT_JSON, jsn, config.CgrConfig().PosterAttempts, path.Join(cfg.FailedPostsDir, ffn.AsString())) return nil diff --git a/engine/cdr.go b/engine/cdr.go index ba0e5832b..4a06713b6 100644 --- a/engine/cdr.go +++ b/engine/cdr.go @@ -519,7 +519,7 @@ func (cdr *CDR) formatField(cfgFld *config.CfgCdrField, httpSkipTlsCheck bool, } if len(httpAddr) == 0 { err = fmt.Errorf("Empty http address for field %s type %s", cfgFld.Tag, cfgFld.Type) - } else if outValByte, err = utils.HttpJsonPost(httpAddr, httpSkipTlsCheck, jsn); err == nil { + } else if outValByte, err = HttpJsonPost(httpAddr, httpSkipTlsCheck, jsn); err == nil { outVal = string(outValByte) if len(outVal) == 0 && cfgFld.Mandatory { err = fmt.Errorf("Empty result for http_post field: %s", cfgFld.Tag) diff --git a/engine/cdre.go b/engine/cdre.go index af13d9619..ff93795d3 100644 --- a/engine/cdre.go +++ b/engine/cdre.go @@ -52,7 +52,7 @@ const ( func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreConfig, exportFormat, exportPath, fallbackPath, exportID string, synchronous bool, attempts int, fieldSeparator rune, usageMultiplyFactor utils.FieldMultiplyFactor, - costMultiplyFactor float64, roundingDecimals int, httpSkipTlsCheck bool, httpPoster *utils.HTTPPoster) (*CDRExporter, error) { + costMultiplyFactor float64, roundingDecimals int, httpSkipTlsCheck bool, httpPoster *HTTPPoster) (*CDRExporter, error) { if len(cdrs) == 0 { // Nothing to export return nil, nil } @@ -91,7 +91,7 @@ type CDRExporter struct { costMultiplyFactor float64 roundingDecimals int httpSkipTlsCheck bool - httpPoster *utils.HTTPPoster + httpPoster *HTTPPoster header, trailer []string // Header and Trailer fields content [][]string // Rows of cdr fields @@ -250,8 +250,8 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) { case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST: _, err = cdre.httpPoster.Post(cdre.exportPath, utils.PosterTransportContentTypes[cdre.exportFormat], body, cdre.attempts, fallbackPath) case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap: - var amqpPoster *utils.AMQPPoster - amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(cdre.exportPath, cdre.attempts, cdre.fallbackPath) + var amqpPoster *AMQPPoster + amqpPoster, err = AMQPPostersCache.GetAMQPPoster(cdre.exportPath, cdre.attempts, cdre.fallbackPath) if err == nil { // error will be checked bellow var chn *amqp.Channel chn, err = amqpPoster.Post( diff --git a/engine/cdrs.go b/engine/cdrs.go index 30f416d47..c805acb98 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -97,7 +97,7 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, r return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dm: dm, rals: rater, pubsub: pubsub, users: users, aliases: aliases, cdrstats: cdrstats, stats: stats, thdS: thdS, guard: guardian.Guardian, - httpPoster: utils.NewHTTPPoster(cgrCfg.HttpSkipTlsVerify, cgrCfg.ReplyTimeout)}, nil + httpPoster: NewHTTPPoster(cgrCfg.HttpSkipTlsVerify, cgrCfg.ReplyTimeout)}, nil } type CdrServer struct { @@ -114,7 +114,7 @@ type CdrServer struct { stats rpcclient.RpcClientConnection guard *guardian.GuardianLocker responseCache *utils.ResponseCache - httpPoster *utils.HTTPPoster // used for replication + httpPoster *HTTPPoster // used for replication } func (self *CdrServer) Timezone() string { diff --git a/utils/poster.go b/engine/poster.go similarity index 66% rename from utils/poster.go rename to engine/poster.go index 44d83f8a1..8f30dc706 100644 --- a/utils/poster.go +++ b/engine/poster.go @@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package utils +package engine import ( "bytes" @@ -32,6 +32,7 @@ import ( "time" "github.com/cgrates/cgrates/guardian" + "github.com/cgrates/cgrates/utils" "github.com/streadway/amqp" ) @@ -63,69 +64,6 @@ func HttpJsonPost(url string, skipTlsVerify bool, content []byte) ([]byte, error return respBody, nil } -// NewFallbackFileNameFronString will revert the meta information in the fallback file name into original data -func NewFallbackFileNameFronString(fileName string) (ffn *FallbackFileName, err error) { - ffn = new(FallbackFileName) - moduleIdx := strings.Index(fileName, HandlerArgSep) - ffn.Module = fileName[:moduleIdx] - var supportedModule bool - for _, prfx := range []string{ActionsPoster, CDRPoster} { - if strings.HasPrefix(ffn.Module, prfx) { - supportedModule = true - break - } - } - if !supportedModule { - return nil, fmt.Errorf("unsupported module: %s", ffn.Module) - } - fileNameWithoutModule := fileName[moduleIdx+1:] - for _, trspt := range []string{MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap} { - if strings.HasPrefix(fileNameWithoutModule, trspt) { - ffn.Transport = trspt - break - } - } - if ffn.Transport == "" { - return nil, fmt.Errorf("unsupported transport in fallback file path: %s", fileName) - } - fileNameWithoutTransport := fileNameWithoutModule[len(ffn.Transport)+1:] - reqIDidx := strings.LastIndex(fileNameWithoutTransport, HandlerArgSep) - if reqIDidx == -1 { - return nil, fmt.Errorf("cannot find request ID in fallback file path: %s", fileName) - } - if ffn.Address, err = url.QueryUnescape(fileNameWithoutTransport[:reqIDidx]); err != nil { - return nil, err - } - fileNameWithoutAddress := fileNameWithoutTransport[reqIDidx+1:] - for _, suffix := range []string{TxtSuffix, JSNSuffix, FormSuffix} { - if strings.HasSuffix(fileNameWithoutAddress, suffix) { - ffn.FileSuffix = suffix - break - } - } - if ffn.FileSuffix == "" { - return nil, fmt.Errorf("unsupported suffix in fallback file path: %s", fileName) - } - ffn.RequestID = fileNameWithoutAddress[:len(fileNameWithoutAddress)-len(ffn.FileSuffix)] - return -} - -// FallbackFileName is the struct defining the name of a file where CGRateS will dump data which fails to be sent remotely -type FallbackFileName struct { - Module string // name of the module writing the file - Transport string // transport used to send data remotely - Address string // remote address where data should have been sent - RequestID string // unique identifier of the request which should make files unique - FileSuffix string // informative file termination suffix -} - -func (ffn *FallbackFileName) AsString() string { - if ffn.FileSuffix == "" { // Autopopulate FileSuffix based on the transport used - ffn.FileSuffix = CDREFileSuffixes[ffn.Transport] - } - return fmt.Sprintf("%s%s%s%s%s%s%s%s", ffn.Module, HandlerArgSep, ffn.Transport, HandlerArgSep, url.QueryEscape(ffn.Address), HandlerArgSep, ffn.RequestID, ffn.FileSuffix) -} - func NewHTTPPoster(skipTLSVerify bool, replyTimeout time.Duration) *HTTPPoster { tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLSVerify}, @@ -140,49 +78,49 @@ type HTTPPoster struct { // Post with built-in failover // Returns also reference towards client so we can close it's connections when done func (poster *HTTPPoster) Post(addr string, contentType string, content interface{}, attempts int, fallbackFilePath string) (respBody []byte, err error) { - if !IsSliceMember([]string{CONTENT_JSON, CONTENT_FORM, CONTENT_TEXT}, contentType) { + if !utils.IsSliceMember([]string{utils.CONTENT_JSON, utils.CONTENT_FORM, utils.CONTENT_TEXT}, contentType) { return nil, fmt.Errorf("unsupported ContentType: %s", contentType) } var body []byte // Used to write in file and send over http var urlVals url.Values // Used when posting form - if IsSliceMember([]string{CONTENT_JSON, CONTENT_TEXT}, contentType) { + if utils.IsSliceMember([]string{utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) { body = content.([]byte) - } else if contentType == CONTENT_FORM { + } else if contentType == utils.CONTENT_FORM { urlVals = content.(url.Values) body = []byte(urlVals.Encode()) } - fib := Fib() + fib := utils.Fib() bodyType := "application/x-www-form-urlencoded" - if contentType == CONTENT_JSON { + if contentType == utils.CONTENT_JSON { bodyType = "application/json" } for i := 0; i < attempts; i++ { var resp *http.Response - if IsSliceMember([]string{CONTENT_JSON, CONTENT_TEXT}, contentType) { + if utils.IsSliceMember([]string{utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) { resp, err = poster.httpClient.Post(addr, bodyType, bytes.NewBuffer(body)) - } else if contentType == CONTENT_FORM { + } else if contentType == utils.CONTENT_FORM { resp, err = poster.httpClient.PostForm(addr, urlVals) } if err != nil { - Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) + utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) time.Sleep(time.Duration(fib()) * time.Second) continue } defer resp.Body.Close() respBody, err = ioutil.ReadAll(resp.Body) if err != nil { - Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) + utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) time.Sleep(time.Duration(fib()) * time.Second) continue } if resp.StatusCode > 299 { - Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode)) + utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode)) time.Sleep(time.Duration(fib()) * time.Second) continue } return respBody, nil } - if fallbackFilePath != META_NONE { + if fallbackFilePath != utils.META_NONE { // If we got that far, post was not possible, write it on disk _, err = guardian.Guardian.Guard(func() (interface{}, error) { fileOut, err := os.Create(fallbackFilePath) @@ -194,7 +132,7 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac return nil, err } return nil, nil - }, time.Duration(2*time.Second), FileLockPrefix+fallbackFilePath) + }, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath) } return } @@ -250,7 +188,7 @@ type AMQPPoster struct { // the optional chn will permits channel caching func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []byte, fallbackFileName string) (*amqp.Channel, error) { var err error - fib := Fib() + fib := utils.Fib() if chn == nil { for i := 0; i < pstr.attempts; i++ { if chn, err = pstr.NewPostChannel(); err == nil { @@ -258,7 +196,7 @@ func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []by } time.Sleep(time.Duration(fib()) * time.Second) } - if err != nil && fallbackFileName != META_NONE { + if err != nil && fallbackFileName != utils.META_NONE { err = pstr.writeToFile(fallbackFileName, content) return nil, err } @@ -278,7 +216,7 @@ func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []by } time.Sleep(time.Duration(fib()) * time.Second) } - if err != nil && fallbackFileName != META_NONE { + if err != nil && fallbackFileName != utils.META_NONE { err = pstr.writeToFile(fallbackFileName, content) return nil, err } @@ -303,7 +241,7 @@ func (pstr *AMQPPoster) NewPostChannel() (postChan *amqp.Channel, err error) { pstr.conn = conn go func() { // monitor connection errors so we can restart if err := <-pstr.conn.NotifyClose(make(chan *amqp.Error)); err != nil { - Logger.Err(fmt.Sprintf("Connection error received: %s", err.Error())) + utils.Logger.Err(fmt.Sprintf("Connection error received: %s", err.Error())) pstr.Close() } }() @@ -333,6 +271,6 @@ func (pstr *AMQPPoster) writeToFile(fileName string, content []byte) (err error) return nil, err } return nil, nil - }, time.Duration(2*time.Second), FileLockPrefix+fallbackFilePath) + }, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath) return } diff --git a/utils/poster_it_test.go b/engine/poster_it_test.go similarity index 99% rename from utils/poster_it_test.go rename to engine/poster_it_test.go index 4ec89924e..d290b6241 100644 --- a/utils/poster_it_test.go +++ b/engine/poster_it_test.go @@ -17,7 +17,7 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ -package utils +package engine import ( "encoding/json" diff --git a/engine/poster_test.go b/engine/poster_test.go new file mode 100644 index 000000000..c607b0784 --- /dev/null +++ b/engine/poster_test.go @@ -0,0 +1,18 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package engine diff --git a/engine/pubsub.go b/engine/pubsub.go index 3c55b07cc..a4890a913 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -64,7 +64,7 @@ func NewPubSub(dm *DataManager, ttlVerify bool) (*PubSub, error) { ps := &PubSub{ ttlVerify: ttlVerify, subscribers: make(map[string]*SubscriberData), - pubFunc: utils.HttpJsonPost, + pubFunc: HttpJsonPost, mux: &sync.Mutex{}, dm: dm, } diff --git a/engine/thresholds.go b/engine/thresholds.go index e7505fc4e..2dc1bcb6b 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -308,7 +308,6 @@ func (tS *ThresholdService) processEvent(args *ArgsProcessEvent) (thresholdsIDs fmt.Sprintf(" failed removing non-recurrent threshold: %s, error: %s", t.TenantID(), err.Error())) withErrors = true - } continue } @@ -325,8 +324,6 @@ func (tS *ThresholdService) processEvent(args *ArgsProcessEvent) (thresholdsIDs } if len(tIDs) != 0 { thresholdsIDs = append(thresholdsIDs, tIDs...) - } else { - thresholdsIDs = []string{} } if withErrors { err = utils.ErrPartiallyExecuted diff --git a/utils/coreutils.go b/utils/coreutils.go index 865a1ec99..ca43fd293 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -32,6 +32,7 @@ import ( "io/ioutil" "log" "math" + "net/url" "os" "path/filepath" "reflect" @@ -843,3 +844,66 @@ type AuthStruct struct { Tenant string ApiKey string } + +// NewFallbackFileNameFronString will revert the meta information in the fallback file name into original data +func NewFallbackFileNameFronString(fileName string) (ffn *FallbackFileName, err error) { + ffn = new(FallbackFileName) + moduleIdx := strings.Index(fileName, HandlerArgSep) + ffn.Module = fileName[:moduleIdx] + var supportedModule bool + for _, prfx := range []string{ActionsPoster, CDRPoster} { + if strings.HasPrefix(ffn.Module, prfx) { + supportedModule = true + break + } + } + if !supportedModule { + return nil, fmt.Errorf("unsupported module: %s", ffn.Module) + } + fileNameWithoutModule := fileName[moduleIdx+1:] + for _, trspt := range []string{MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap} { + if strings.HasPrefix(fileNameWithoutModule, trspt) { + ffn.Transport = trspt + break + } + } + if ffn.Transport == "" { + return nil, fmt.Errorf("unsupported transport in fallback file path: %s", fileName) + } + fileNameWithoutTransport := fileNameWithoutModule[len(ffn.Transport)+1:] + reqIDidx := strings.LastIndex(fileNameWithoutTransport, HandlerArgSep) + if reqIDidx == -1 { + return nil, fmt.Errorf("cannot find request ID in fallback file path: %s", fileName) + } + if ffn.Address, err = url.QueryUnescape(fileNameWithoutTransport[:reqIDidx]); err != nil { + return nil, err + } + fileNameWithoutAddress := fileNameWithoutTransport[reqIDidx+1:] + for _, suffix := range []string{TxtSuffix, JSNSuffix, FormSuffix} { + if strings.HasSuffix(fileNameWithoutAddress, suffix) { + ffn.FileSuffix = suffix + break + } + } + if ffn.FileSuffix == "" { + return nil, fmt.Errorf("unsupported suffix in fallback file path: %s", fileName) + } + ffn.RequestID = fileNameWithoutAddress[:len(fileNameWithoutAddress)-len(ffn.FileSuffix)] + return +} + +// FallbackFileName is the struct defining the name of a file where CGRateS will dump data which fails to be sent remotely +type FallbackFileName struct { + Module string // name of the module writing the file + Transport string // transport used to send data remotely + Address string // remote address where data should have been sent + RequestID string // unique identifier of the request which should make files unique + FileSuffix string // informative file termination suffix +} + +func (ffn *FallbackFileName) AsString() string { + if ffn.FileSuffix == "" { // Autopopulate FileSuffix based on the transport used + ffn.FileSuffix = CDREFileSuffixes[ffn.Transport] + } + return fmt.Sprintf("%s%s%s%s%s%s%s%s", ffn.Module, HandlerArgSep, ffn.Transport, HandlerArgSep, url.QueryEscape(ffn.Address), HandlerArgSep, ffn.RequestID, ffn.FileSuffix) +} diff --git a/utils/coreutils_test.go b/utils/coreutils_test.go index 720417106..49d464276 100644 --- a/utils/coreutils_test.go +++ b/utils/coreutils_test.go @@ -820,3 +820,52 @@ func TestGZIPGUnZIP(t *testing.T) { t.Error("not matching initial source") } } + +func TestFFNNewFallbackFileNameFronString(t *testing.T) { + fileName := "cdr|*http_json_cdr|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json|1acce2c9-3f2d-4774-8662-c28872dad515.json" + eFFN := &FallbackFileName{Module: "cdr", + Transport: MetaHTTPjsonCDR, + Address: "http://127.0.0.1:12080/invalid_json", + RequestID: "1acce2c9-3f2d-4774-8662-c28872dad515", + FileSuffix: JSNSuffix} + if ffn, err := NewFallbackFileNameFronString(fileName); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eFFN, ffn) { + t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn) + } + fileName = "cdr|*http_post|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid|70c53d6d-dbd7-452e-a5bd-36bab59bb9ff.form" + eFFN = &FallbackFileName{Module: "cdr", + Transport: META_HTTP_POST, + Address: "http://127.0.0.1:12080/invalid", + RequestID: "70c53d6d-dbd7-452e-a5bd-36bab59bb9ff", + FileSuffix: FormSuffix} + if ffn, err := NewFallbackFileNameFronString(fileName); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eFFN, ffn) { + t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn) + } + fileName = "act>*call_url|*http_json|http%3A%2F%2Flocalhost%3A2080%2Flog_warning|f52cf23e-da2f-4675-b36b-e8fcc3869270.json" + eFFN = &FallbackFileName{Module: "act>*call_url", + Transport: MetaHTTPjson, + Address: "http://localhost:2080/log_warning", + RequestID: "f52cf23e-da2f-4675-b36b-e8fcc3869270", + FileSuffix: JSNSuffix} + if ffn, err := NewFallbackFileNameFronString(fileName); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eFFN, ffn) { + t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn) + } +} + +func TestFFNFallbackFileNameAsString(t *testing.T) { + eFn := "cdr|*http_json_cdr|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json|1acce2c9-3f2d-4774-8662-c28872dad515.json" + ffn := &FallbackFileName{ + Module: "cdr", + Transport: MetaHTTPjsonCDR, + Address: "http://127.0.0.1:12080/invalid_json", + RequestID: "1acce2c9-3f2d-4774-8662-c28872dad515", + FileSuffix: JSNSuffix} + if ffnStr := ffn.AsString(); ffnStr != eFn { + t.Errorf("Expecting: <%q>, received: <%q>", eFn, ffnStr) + } +} diff --git a/utils/poster_test.go b/utils/poster_test.go deleted file mode 100644 index 98711aa98..000000000 --- a/utils/poster_test.go +++ /dev/null @@ -1,72 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package utils - -import ( - "reflect" - "testing" -) - -func TestFFNNewFallbackFileNameFronString(t *testing.T) { - fileName := "cdr|*http_json_cdr|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json|1acce2c9-3f2d-4774-8662-c28872dad515.json" - eFFN := &FallbackFileName{Module: "cdr", - Transport: MetaHTTPjsonCDR, - Address: "http://127.0.0.1:12080/invalid_json", - RequestID: "1acce2c9-3f2d-4774-8662-c28872dad515", - FileSuffix: JSNSuffix} - if ffn, err := NewFallbackFileNameFronString(fileName); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(eFFN, ffn) { - t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn) - } - fileName = "cdr|*http_post|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid|70c53d6d-dbd7-452e-a5bd-36bab59bb9ff.form" - eFFN = &FallbackFileName{Module: "cdr", - Transport: META_HTTP_POST, - Address: "http://127.0.0.1:12080/invalid", - RequestID: "70c53d6d-dbd7-452e-a5bd-36bab59bb9ff", - FileSuffix: FormSuffix} - if ffn, err := NewFallbackFileNameFronString(fileName); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(eFFN, ffn) { - t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn) - } - fileName = "act>*call_url|*http_json|http%3A%2F%2Flocalhost%3A2080%2Flog_warning|f52cf23e-da2f-4675-b36b-e8fcc3869270.json" - eFFN = &FallbackFileName{Module: "act>*call_url", - Transport: MetaHTTPjson, - Address: "http://localhost:2080/log_warning", - RequestID: "f52cf23e-da2f-4675-b36b-e8fcc3869270", - FileSuffix: JSNSuffix} - if ffn, err := NewFallbackFileNameFronString(fileName); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(eFFN, ffn) { - t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn) - } -} - -func TestFFNFallbackFileNameAsString(t *testing.T) { - eFn := "cdr|*http_json_cdr|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json|1acce2c9-3f2d-4774-8662-c28872dad515.json" - ffn := &FallbackFileName{ - Module: "cdr", - Transport: MetaHTTPjsonCDR, - Address: "http://127.0.0.1:12080/invalid_json", - RequestID: "1acce2c9-3f2d-4774-8662-c28872dad515", - FileSuffix: JSNSuffix} - if ffnStr := ffn.AsString(); ffnStr != eFn { - t.Errorf("Expecting: <%q>, received: <%q>", eFn, ffnStr) - } -}