diff --git a/apier/v1/apier.go b/apier/v1/apier.go
index 5213096ce..28b3a8f4f 100644
--- a/apier/v1/apier.go
+++ b/apier/v1/apier.go
@@ -51,7 +51,7 @@ type ApierV1 struct {
Users rpcclient.RpcClientConnection
CDRs rpcclient.RpcClientConnection // FixMe: populate it from cgr-engine
ServManager *servmanager.ServiceManager // Need to have them capitalize so we can export in V2
- HTTPPoster *utils.HTTPPoster
+ HTTPPoster *engine.HTTPPoster
}
func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error {
@@ -2106,12 +2106,12 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (
}
switch ffn.Transport {
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
- _, err = utils.NewHTTPPoster(v1.Config.HttpSkipTlsVerify,
+ _, err = engine.NewHTTPPoster(v1.Config.HttpSkipTlsVerify,
v1.Config.ReplyTimeout).Post(ffn.Address, utils.PosterTransportContentTypes[ffn.Transport], fileContent,
v1.Config.PosterAttempts, failoverPath)
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
- var amqpPoster *utils.AMQPPoster
- amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(ffn.Address, v1.Config.PosterAttempts, failedReqsOutDir)
+ var amqpPoster *engine.AMQPPoster
+ amqpPoster, err = engine.AMQPPostersCache.GetAMQPPoster(ffn.Address, v1.Config.PosterAttempts, failedReqsOutDir)
if err == nil { // error will be checked bellow
var chn *amqp.Channel
chn, err = amqpPoster.Post(
diff --git a/apier/v1/filter_indexes.go b/apier/v1/filter_indexes.go
index fcfeb5269..5d7cc3a8b 100644
--- a/apier/v1/filter_indexes.go
+++ b/apier/v1/filter_indexes.go
@@ -295,36 +295,38 @@ func (self *ApierV1) ComputeFilterIndexes(args utils.ArgsComputeFilterIndexes, r
transactionID := utils.GenUUID()
//ThresholdProfile Indexes
thdsIndexers, err := self.computeThresholdIndexes(args.Tenant, args.ThresholdIDs, transactionID)
- if err != nil {
+ if err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
//StatQueueProfile Indexes
sqpIndexers, err := self.computeStatIndexes(args.Tenant, args.StatIDs, transactionID)
- if err != nil {
+ if err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
//ResourceProfile Indexes
rsIndexes, err := self.computeResourceIndexes(args.Tenant, args.ResourceIDs, transactionID)
- if err != nil {
+ if err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
//SupplierProfile Indexes
sppIndexes, err := self.computeSupplierIndexes(args.Tenant, args.SupplierIDs, transactionID)
- if err != nil {
+ if err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
//AttributeProfile Indexes
attrIndexes, err := self.computeAttributeIndexes(args.Tenant, args.AttributeIDs, transactionID)
- if err != nil {
+ if err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
//Now we move from tmpKey to the right key for each type
//ThresholdProfile Indexes
if thdsIndexers != nil {
if err := thdsIndexers.StoreIndexes(true, transactionID); err != nil {
- for _, id := range *args.ThresholdIDs {
- if err := thdsIndexers.RemoveItemFromIndex(id); err != nil {
- return err
+ if args.ThresholdIDs != nil {
+ for _, id := range *args.ThresholdIDs {
+ if err := thdsIndexers.RemoveItemFromIndex(id); err != nil {
+ return err
+ }
}
}
return err
diff --git a/cdrc/csv.go b/cdrc/csv.go
index 02e2894e5..e11240aed 100644
--- a/cdrc/csv.go
+++ b/cdrc/csv.go
@@ -223,7 +223,7 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *con
if err != nil {
return nil, err
}
- if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
+ if outValByte, err = engine.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
return nil, err
} else {
fieldVal = string(outValByte)
diff --git a/cdrc/fwv.go b/cdrc/fwv.go
index 585020714..0e38263ee 100644
--- a/cdrc/fwv.go
+++ b/cdrc/fwv.go
@@ -236,7 +236,7 @@ func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cdrcCfg *confi
if err != nil {
return nil, err
}
- if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
+ if outValByte, err = engine.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
return nil, err
} else {
fieldVal = string(outValByte)
diff --git a/cdrc/xml.go b/cdrc/xml.go
index 6c7cf2ff6..ac7a5088b 100644
--- a/cdrc/xml.go
+++ b/cdrc/xml.go
@@ -221,7 +221,7 @@ func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity tree.Res, cdrcCfg *con
if err != nil {
return nil, err
}
- if outValByte, err = utils.HttpJsonPost(httpAddr, xmlProc.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
+ if outValByte, err = engine.HttpJsonPost(httpAddr, xmlProc.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
return nil, err
} else {
fieldVal = string(outValByte)
diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go
index 0b8dd4388..9b600169d 100755
--- a/cmd/cgr-engine/rater.go
+++ b/cmd/cgr-engine/rater.go
@@ -188,7 +188,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en
responder.SetTimeToLive(cfg.ResponseCacheTTL, nil)
apierRpcV1 := &v1.ApierV1{StorDb: loadDb, DataManager: dm, CdrDb: cdrDb,
Config: cfg, Responder: responder, ServManager: serviceManager,
- HTTPPoster: utils.NewHTTPPoster(cfg.HttpSkipTlsVerify, cfg.ReplyTimeout)}
+ HTTPPoster: engine.NewHTTPPoster(cfg.HttpSkipTlsVerify, cfg.ReplyTimeout)}
if thdS != nil {
engine.SetThresholdS(thdS) // temporary architectural fix until we will have separate AccountS
}
diff --git a/engine/action.go b/engine/action.go
index 8bef3b130..5152bdcf2 100644
--- a/engine/action.go
+++ b/engine/action.go
@@ -510,7 +510,7 @@ func callUrl(ub *Account, sq *CDRStatsQueueTriggered, a *Action, acs Actions) er
ffn := &utils.FallbackFileName{Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType),
Transport: utils.MetaHTTPjson, Address: a.ExtraParameters,
RequestID: utils.GenUUID(), FileSuffix: utils.JSNSuffix}
- _, err = utils.NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify,
+ _, err = NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify,
config.CgrConfig().ReplyTimeout).Post(a.ExtraParameters, utils.CONTENT_JSON, jsn,
config.CgrConfig().PosterAttempts, path.Join(cfg.FailedPostsDir, ffn.AsString()))
return err
@@ -533,7 +533,7 @@ func callUrlAsync(ub *Account, sq *CDRStatsQueueTriggered, a *Action, acs Action
ffn := &utils.FallbackFileName{Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType),
Transport: utils.MetaHTTPjson, Address: a.ExtraParameters,
RequestID: utils.GenUUID(), FileSuffix: utils.JSNSuffix}
- go utils.NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify,
+ go NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify,
config.CgrConfig().ReplyTimeout).Post(a.ExtraParameters, utils.CONTENT_JSON, jsn,
config.CgrConfig().PosterAttempts, path.Join(cfg.FailedPostsDir, ffn.AsString()))
return nil
diff --git a/engine/cdr.go b/engine/cdr.go
index ba0e5832b..4a06713b6 100644
--- a/engine/cdr.go
+++ b/engine/cdr.go
@@ -519,7 +519,7 @@ func (cdr *CDR) formatField(cfgFld *config.CfgCdrField, httpSkipTlsCheck bool,
}
if len(httpAddr) == 0 {
err = fmt.Errorf("Empty http address for field %s type %s", cfgFld.Tag, cfgFld.Type)
- } else if outValByte, err = utils.HttpJsonPost(httpAddr, httpSkipTlsCheck, jsn); err == nil {
+ } else if outValByte, err = HttpJsonPost(httpAddr, httpSkipTlsCheck, jsn); err == nil {
outVal = string(outValByte)
if len(outVal) == 0 && cfgFld.Mandatory {
err = fmt.Errorf("Empty result for http_post field: %s", cfgFld.Tag)
diff --git a/engine/cdre.go b/engine/cdre.go
index af13d9619..ff93795d3 100644
--- a/engine/cdre.go
+++ b/engine/cdre.go
@@ -52,7 +52,7 @@ const (
func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreConfig, exportFormat, exportPath, fallbackPath, exportID string,
synchronous bool, attempts int, fieldSeparator rune, usageMultiplyFactor utils.FieldMultiplyFactor,
- costMultiplyFactor float64, roundingDecimals int, httpSkipTlsCheck bool, httpPoster *utils.HTTPPoster) (*CDRExporter, error) {
+ costMultiplyFactor float64, roundingDecimals int, httpSkipTlsCheck bool, httpPoster *HTTPPoster) (*CDRExporter, error) {
if len(cdrs) == 0 { // Nothing to export
return nil, nil
}
@@ -91,7 +91,7 @@ type CDRExporter struct {
costMultiplyFactor float64
roundingDecimals int
httpSkipTlsCheck bool
- httpPoster *utils.HTTPPoster
+ httpPoster *HTTPPoster
header, trailer []string // Header and Trailer fields
content [][]string // Rows of cdr fields
@@ -250,8 +250,8 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
_, err = cdre.httpPoster.Post(cdre.exportPath, utils.PosterTransportContentTypes[cdre.exportFormat], body, cdre.attempts, fallbackPath)
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
- var amqpPoster *utils.AMQPPoster
- amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(cdre.exportPath, cdre.attempts, cdre.fallbackPath)
+ var amqpPoster *AMQPPoster
+ amqpPoster, err = AMQPPostersCache.GetAMQPPoster(cdre.exportPath, cdre.attempts, cdre.fallbackPath)
if err == nil { // error will be checked bellow
var chn *amqp.Channel
chn, err = amqpPoster.Post(
diff --git a/engine/cdrs.go b/engine/cdrs.go
index 30f416d47..c805acb98 100644
--- a/engine/cdrs.go
+++ b/engine/cdrs.go
@@ -97,7 +97,7 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, r
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dm: dm,
rals: rater, pubsub: pubsub, users: users, aliases: aliases,
cdrstats: cdrstats, stats: stats, thdS: thdS, guard: guardian.Guardian,
- httpPoster: utils.NewHTTPPoster(cgrCfg.HttpSkipTlsVerify, cgrCfg.ReplyTimeout)}, nil
+ httpPoster: NewHTTPPoster(cgrCfg.HttpSkipTlsVerify, cgrCfg.ReplyTimeout)}, nil
}
type CdrServer struct {
@@ -114,7 +114,7 @@ type CdrServer struct {
stats rpcclient.RpcClientConnection
guard *guardian.GuardianLocker
responseCache *utils.ResponseCache
- httpPoster *utils.HTTPPoster // used for replication
+ httpPoster *HTTPPoster // used for replication
}
func (self *CdrServer) Timezone() string {
diff --git a/utils/poster.go b/engine/poster.go
similarity index 66%
rename from utils/poster.go
rename to engine/poster.go
index 44d83f8a1..8f30dc706 100644
--- a/utils/poster.go
+++ b/engine/poster.go
@@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see
*/
-package utils
+package engine
import (
"bytes"
@@ -32,6 +32,7 @@ import (
"time"
"github.com/cgrates/cgrates/guardian"
+ "github.com/cgrates/cgrates/utils"
"github.com/streadway/amqp"
)
@@ -63,69 +64,6 @@ func HttpJsonPost(url string, skipTlsVerify bool, content []byte) ([]byte, error
return respBody, nil
}
-// NewFallbackFileNameFronString will revert the meta information in the fallback file name into original data
-func NewFallbackFileNameFronString(fileName string) (ffn *FallbackFileName, err error) {
- ffn = new(FallbackFileName)
- moduleIdx := strings.Index(fileName, HandlerArgSep)
- ffn.Module = fileName[:moduleIdx]
- var supportedModule bool
- for _, prfx := range []string{ActionsPoster, CDRPoster} {
- if strings.HasPrefix(ffn.Module, prfx) {
- supportedModule = true
- break
- }
- }
- if !supportedModule {
- return nil, fmt.Errorf("unsupported module: %s", ffn.Module)
- }
- fileNameWithoutModule := fileName[moduleIdx+1:]
- for _, trspt := range []string{MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap} {
- if strings.HasPrefix(fileNameWithoutModule, trspt) {
- ffn.Transport = trspt
- break
- }
- }
- if ffn.Transport == "" {
- return nil, fmt.Errorf("unsupported transport in fallback file path: %s", fileName)
- }
- fileNameWithoutTransport := fileNameWithoutModule[len(ffn.Transport)+1:]
- reqIDidx := strings.LastIndex(fileNameWithoutTransport, HandlerArgSep)
- if reqIDidx == -1 {
- return nil, fmt.Errorf("cannot find request ID in fallback file path: %s", fileName)
- }
- if ffn.Address, err = url.QueryUnescape(fileNameWithoutTransport[:reqIDidx]); err != nil {
- return nil, err
- }
- fileNameWithoutAddress := fileNameWithoutTransport[reqIDidx+1:]
- for _, suffix := range []string{TxtSuffix, JSNSuffix, FormSuffix} {
- if strings.HasSuffix(fileNameWithoutAddress, suffix) {
- ffn.FileSuffix = suffix
- break
- }
- }
- if ffn.FileSuffix == "" {
- return nil, fmt.Errorf("unsupported suffix in fallback file path: %s", fileName)
- }
- ffn.RequestID = fileNameWithoutAddress[:len(fileNameWithoutAddress)-len(ffn.FileSuffix)]
- return
-}
-
-// FallbackFileName is the struct defining the name of a file where CGRateS will dump data which fails to be sent remotely
-type FallbackFileName struct {
- Module string // name of the module writing the file
- Transport string // transport used to send data remotely
- Address string // remote address where data should have been sent
- RequestID string // unique identifier of the request which should make files unique
- FileSuffix string // informative file termination suffix
-}
-
-func (ffn *FallbackFileName) AsString() string {
- if ffn.FileSuffix == "" { // Autopopulate FileSuffix based on the transport used
- ffn.FileSuffix = CDREFileSuffixes[ffn.Transport]
- }
- return fmt.Sprintf("%s%s%s%s%s%s%s%s", ffn.Module, HandlerArgSep, ffn.Transport, HandlerArgSep, url.QueryEscape(ffn.Address), HandlerArgSep, ffn.RequestID, ffn.FileSuffix)
-}
-
func NewHTTPPoster(skipTLSVerify bool, replyTimeout time.Duration) *HTTPPoster {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLSVerify},
@@ -140,49 +78,49 @@ type HTTPPoster struct {
// Post with built-in failover
// Returns also reference towards client so we can close it's connections when done
func (poster *HTTPPoster) Post(addr string, contentType string, content interface{}, attempts int, fallbackFilePath string) (respBody []byte, err error) {
- if !IsSliceMember([]string{CONTENT_JSON, CONTENT_FORM, CONTENT_TEXT}, contentType) {
+ if !utils.IsSliceMember([]string{utils.CONTENT_JSON, utils.CONTENT_FORM, utils.CONTENT_TEXT}, contentType) {
return nil, fmt.Errorf("unsupported ContentType: %s", contentType)
}
var body []byte // Used to write in file and send over http
var urlVals url.Values // Used when posting form
- if IsSliceMember([]string{CONTENT_JSON, CONTENT_TEXT}, contentType) {
+ if utils.IsSliceMember([]string{utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) {
body = content.([]byte)
- } else if contentType == CONTENT_FORM {
+ } else if contentType == utils.CONTENT_FORM {
urlVals = content.(url.Values)
body = []byte(urlVals.Encode())
}
- fib := Fib()
+ fib := utils.Fib()
bodyType := "application/x-www-form-urlencoded"
- if contentType == CONTENT_JSON {
+ if contentType == utils.CONTENT_JSON {
bodyType = "application/json"
}
for i := 0; i < attempts; i++ {
var resp *http.Response
- if IsSliceMember([]string{CONTENT_JSON, CONTENT_TEXT}, contentType) {
+ if utils.IsSliceMember([]string{utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) {
resp, err = poster.httpClient.Post(addr, bodyType, bytes.NewBuffer(body))
- } else if contentType == CONTENT_FORM {
+ } else if contentType == utils.CONTENT_FORM {
resp, err = poster.httpClient.PostForm(addr, urlVals)
}
if err != nil {
- Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error()))
+ utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error()))
time.Sleep(time.Duration(fib()) * time.Second)
continue
}
defer resp.Body.Close()
respBody, err = ioutil.ReadAll(resp.Body)
if err != nil {
- Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error()))
+ utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error()))
time.Sleep(time.Duration(fib()) * time.Second)
continue
}
if resp.StatusCode > 299 {
- Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode))
+ utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode))
time.Sleep(time.Duration(fib()) * time.Second)
continue
}
return respBody, nil
}
- if fallbackFilePath != META_NONE {
+ if fallbackFilePath != utils.META_NONE {
// If we got that far, post was not possible, write it on disk
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
fileOut, err := os.Create(fallbackFilePath)
@@ -194,7 +132,7 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac
return nil, err
}
return nil, nil
- }, time.Duration(2*time.Second), FileLockPrefix+fallbackFilePath)
+ }, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath)
}
return
}
@@ -250,7 +188,7 @@ type AMQPPoster struct {
// the optional chn will permits channel caching
func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []byte, fallbackFileName string) (*amqp.Channel, error) {
var err error
- fib := Fib()
+ fib := utils.Fib()
if chn == nil {
for i := 0; i < pstr.attempts; i++ {
if chn, err = pstr.NewPostChannel(); err == nil {
@@ -258,7 +196,7 @@ func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []by
}
time.Sleep(time.Duration(fib()) * time.Second)
}
- if err != nil && fallbackFileName != META_NONE {
+ if err != nil && fallbackFileName != utils.META_NONE {
err = pstr.writeToFile(fallbackFileName, content)
return nil, err
}
@@ -278,7 +216,7 @@ func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []by
}
time.Sleep(time.Duration(fib()) * time.Second)
}
- if err != nil && fallbackFileName != META_NONE {
+ if err != nil && fallbackFileName != utils.META_NONE {
err = pstr.writeToFile(fallbackFileName, content)
return nil, err
}
@@ -303,7 +241,7 @@ func (pstr *AMQPPoster) NewPostChannel() (postChan *amqp.Channel, err error) {
pstr.conn = conn
go func() { // monitor connection errors so we can restart
if err := <-pstr.conn.NotifyClose(make(chan *amqp.Error)); err != nil {
- Logger.Err(fmt.Sprintf("Connection error received: %s", err.Error()))
+ utils.Logger.Err(fmt.Sprintf("Connection error received: %s", err.Error()))
pstr.Close()
}
}()
@@ -333,6 +271,6 @@ func (pstr *AMQPPoster) writeToFile(fileName string, content []byte) (err error)
return nil, err
}
return nil, nil
- }, time.Duration(2*time.Second), FileLockPrefix+fallbackFilePath)
+ }, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath)
return
}
diff --git a/utils/poster_it_test.go b/engine/poster_it_test.go
similarity index 99%
rename from utils/poster_it_test.go
rename to engine/poster_it_test.go
index 4ec89924e..d290b6241 100644
--- a/utils/poster_it_test.go
+++ b/engine/poster_it_test.go
@@ -17,7 +17,7 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see
*/
-package utils
+package engine
import (
"encoding/json"
diff --git a/engine/poster_test.go b/engine/poster_test.go
new file mode 100644
index 000000000..c607b0784
--- /dev/null
+++ b/engine/poster_test.go
@@ -0,0 +1,18 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package engine
diff --git a/engine/pubsub.go b/engine/pubsub.go
index 3c55b07cc..a4890a913 100644
--- a/engine/pubsub.go
+++ b/engine/pubsub.go
@@ -64,7 +64,7 @@ func NewPubSub(dm *DataManager, ttlVerify bool) (*PubSub, error) {
ps := &PubSub{
ttlVerify: ttlVerify,
subscribers: make(map[string]*SubscriberData),
- pubFunc: utils.HttpJsonPost,
+ pubFunc: HttpJsonPost,
mux: &sync.Mutex{},
dm: dm,
}
diff --git a/engine/thresholds.go b/engine/thresholds.go
index e7505fc4e..2dc1bcb6b 100644
--- a/engine/thresholds.go
+++ b/engine/thresholds.go
@@ -308,7 +308,6 @@ func (tS *ThresholdService) processEvent(args *ArgsProcessEvent) (thresholdsIDs
fmt.Sprintf(" failed removing non-recurrent threshold: %s, error: %s",
t.TenantID(), err.Error()))
withErrors = true
-
}
continue
}
@@ -325,8 +324,6 @@ func (tS *ThresholdService) processEvent(args *ArgsProcessEvent) (thresholdsIDs
}
if len(tIDs) != 0 {
thresholdsIDs = append(thresholdsIDs, tIDs...)
- } else {
- thresholdsIDs = []string{}
}
if withErrors {
err = utils.ErrPartiallyExecuted
diff --git a/utils/coreutils.go b/utils/coreutils.go
index 865a1ec99..ca43fd293 100644
--- a/utils/coreutils.go
+++ b/utils/coreutils.go
@@ -32,6 +32,7 @@ import (
"io/ioutil"
"log"
"math"
+ "net/url"
"os"
"path/filepath"
"reflect"
@@ -843,3 +844,66 @@ type AuthStruct struct {
Tenant string
ApiKey string
}
+
+// NewFallbackFileNameFronString will revert the meta information in the fallback file name into original data
+func NewFallbackFileNameFronString(fileName string) (ffn *FallbackFileName, err error) {
+ ffn = new(FallbackFileName)
+ moduleIdx := strings.Index(fileName, HandlerArgSep)
+ ffn.Module = fileName[:moduleIdx]
+ var supportedModule bool
+ for _, prfx := range []string{ActionsPoster, CDRPoster} {
+ if strings.HasPrefix(ffn.Module, prfx) {
+ supportedModule = true
+ break
+ }
+ }
+ if !supportedModule {
+ return nil, fmt.Errorf("unsupported module: %s", ffn.Module)
+ }
+ fileNameWithoutModule := fileName[moduleIdx+1:]
+ for _, trspt := range []string{MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap} {
+ if strings.HasPrefix(fileNameWithoutModule, trspt) {
+ ffn.Transport = trspt
+ break
+ }
+ }
+ if ffn.Transport == "" {
+ return nil, fmt.Errorf("unsupported transport in fallback file path: %s", fileName)
+ }
+ fileNameWithoutTransport := fileNameWithoutModule[len(ffn.Transport)+1:]
+ reqIDidx := strings.LastIndex(fileNameWithoutTransport, HandlerArgSep)
+ if reqIDidx == -1 {
+ return nil, fmt.Errorf("cannot find request ID in fallback file path: %s", fileName)
+ }
+ if ffn.Address, err = url.QueryUnescape(fileNameWithoutTransport[:reqIDidx]); err != nil {
+ return nil, err
+ }
+ fileNameWithoutAddress := fileNameWithoutTransport[reqIDidx+1:]
+ for _, suffix := range []string{TxtSuffix, JSNSuffix, FormSuffix} {
+ if strings.HasSuffix(fileNameWithoutAddress, suffix) {
+ ffn.FileSuffix = suffix
+ break
+ }
+ }
+ if ffn.FileSuffix == "" {
+ return nil, fmt.Errorf("unsupported suffix in fallback file path: %s", fileName)
+ }
+ ffn.RequestID = fileNameWithoutAddress[:len(fileNameWithoutAddress)-len(ffn.FileSuffix)]
+ return
+}
+
+// FallbackFileName is the struct defining the name of a file where CGRateS will dump data which fails to be sent remotely
+type FallbackFileName struct {
+ Module string // name of the module writing the file
+ Transport string // transport used to send data remotely
+ Address string // remote address where data should have been sent
+ RequestID string // unique identifier of the request which should make files unique
+ FileSuffix string // informative file termination suffix
+}
+
+func (ffn *FallbackFileName) AsString() string {
+ if ffn.FileSuffix == "" { // Autopopulate FileSuffix based on the transport used
+ ffn.FileSuffix = CDREFileSuffixes[ffn.Transport]
+ }
+ return fmt.Sprintf("%s%s%s%s%s%s%s%s", ffn.Module, HandlerArgSep, ffn.Transport, HandlerArgSep, url.QueryEscape(ffn.Address), HandlerArgSep, ffn.RequestID, ffn.FileSuffix)
+}
diff --git a/utils/coreutils_test.go b/utils/coreutils_test.go
index 720417106..49d464276 100644
--- a/utils/coreutils_test.go
+++ b/utils/coreutils_test.go
@@ -820,3 +820,52 @@ func TestGZIPGUnZIP(t *testing.T) {
t.Error("not matching initial source")
}
}
+
+func TestFFNNewFallbackFileNameFronString(t *testing.T) {
+ fileName := "cdr|*http_json_cdr|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json|1acce2c9-3f2d-4774-8662-c28872dad515.json"
+ eFFN := &FallbackFileName{Module: "cdr",
+ Transport: MetaHTTPjsonCDR,
+ Address: "http://127.0.0.1:12080/invalid_json",
+ RequestID: "1acce2c9-3f2d-4774-8662-c28872dad515",
+ FileSuffix: JSNSuffix}
+ if ffn, err := NewFallbackFileNameFronString(fileName); err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(eFFN, ffn) {
+ t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn)
+ }
+ fileName = "cdr|*http_post|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid|70c53d6d-dbd7-452e-a5bd-36bab59bb9ff.form"
+ eFFN = &FallbackFileName{Module: "cdr",
+ Transport: META_HTTP_POST,
+ Address: "http://127.0.0.1:12080/invalid",
+ RequestID: "70c53d6d-dbd7-452e-a5bd-36bab59bb9ff",
+ FileSuffix: FormSuffix}
+ if ffn, err := NewFallbackFileNameFronString(fileName); err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(eFFN, ffn) {
+ t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn)
+ }
+ fileName = "act>*call_url|*http_json|http%3A%2F%2Flocalhost%3A2080%2Flog_warning|f52cf23e-da2f-4675-b36b-e8fcc3869270.json"
+ eFFN = &FallbackFileName{Module: "act>*call_url",
+ Transport: MetaHTTPjson,
+ Address: "http://localhost:2080/log_warning",
+ RequestID: "f52cf23e-da2f-4675-b36b-e8fcc3869270",
+ FileSuffix: JSNSuffix}
+ if ffn, err := NewFallbackFileNameFronString(fileName); err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(eFFN, ffn) {
+ t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn)
+ }
+}
+
+func TestFFNFallbackFileNameAsString(t *testing.T) {
+ eFn := "cdr|*http_json_cdr|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json|1acce2c9-3f2d-4774-8662-c28872dad515.json"
+ ffn := &FallbackFileName{
+ Module: "cdr",
+ Transport: MetaHTTPjsonCDR,
+ Address: "http://127.0.0.1:12080/invalid_json",
+ RequestID: "1acce2c9-3f2d-4774-8662-c28872dad515",
+ FileSuffix: JSNSuffix}
+ if ffnStr := ffn.AsString(); ffnStr != eFn {
+ t.Errorf("Expecting: <%q>, received: <%q>", eFn, ffnStr)
+ }
+}
diff --git a/utils/poster_test.go b/utils/poster_test.go
deleted file mode 100644
index 98711aa98..000000000
--- a/utils/poster_test.go
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-package utils
-
-import (
- "reflect"
- "testing"
-)
-
-func TestFFNNewFallbackFileNameFronString(t *testing.T) {
- fileName := "cdr|*http_json_cdr|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json|1acce2c9-3f2d-4774-8662-c28872dad515.json"
- eFFN := &FallbackFileName{Module: "cdr",
- Transport: MetaHTTPjsonCDR,
- Address: "http://127.0.0.1:12080/invalid_json",
- RequestID: "1acce2c9-3f2d-4774-8662-c28872dad515",
- FileSuffix: JSNSuffix}
- if ffn, err := NewFallbackFileNameFronString(fileName); err != nil {
- t.Error(err)
- } else if !reflect.DeepEqual(eFFN, ffn) {
- t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn)
- }
- fileName = "cdr|*http_post|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid|70c53d6d-dbd7-452e-a5bd-36bab59bb9ff.form"
- eFFN = &FallbackFileName{Module: "cdr",
- Transport: META_HTTP_POST,
- Address: "http://127.0.0.1:12080/invalid",
- RequestID: "70c53d6d-dbd7-452e-a5bd-36bab59bb9ff",
- FileSuffix: FormSuffix}
- if ffn, err := NewFallbackFileNameFronString(fileName); err != nil {
- t.Error(err)
- } else if !reflect.DeepEqual(eFFN, ffn) {
- t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn)
- }
- fileName = "act>*call_url|*http_json|http%3A%2F%2Flocalhost%3A2080%2Flog_warning|f52cf23e-da2f-4675-b36b-e8fcc3869270.json"
- eFFN = &FallbackFileName{Module: "act>*call_url",
- Transport: MetaHTTPjson,
- Address: "http://localhost:2080/log_warning",
- RequestID: "f52cf23e-da2f-4675-b36b-e8fcc3869270",
- FileSuffix: JSNSuffix}
- if ffn, err := NewFallbackFileNameFronString(fileName); err != nil {
- t.Error(err)
- } else if !reflect.DeepEqual(eFFN, ffn) {
- t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn)
- }
-}
-
-func TestFFNFallbackFileNameAsString(t *testing.T) {
- eFn := "cdr|*http_json_cdr|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json|1acce2c9-3f2d-4774-8662-c28872dad515.json"
- ffn := &FallbackFileName{
- Module: "cdr",
- Transport: MetaHTTPjsonCDR,
- Address: "http://127.0.0.1:12080/invalid_json",
- RequestID: "1acce2c9-3f2d-4774-8662-c28872dad515",
- FileSuffix: JSNSuffix}
- if ffnStr := ffn.AsString(); ffnStr != eFn {
- t.Errorf("Expecting: <%q>, received: <%q>", eFn, ffnStr)
- }
-}