mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-16 05:39:54 +05:00
Moving Poster into engine so we can free up guardian in utils
This commit is contained in:
@@ -51,7 +51,7 @@ type ApierV1 struct {
|
||||
Users rpcclient.RpcClientConnection
|
||||
CDRs rpcclient.RpcClientConnection // FixMe: populate it from cgr-engine
|
||||
ServManager *servmanager.ServiceManager // Need to have them capitalize so we can export in V2
|
||||
HTTPPoster *utils.HTTPPoster
|
||||
HTTPPoster *engine.HTTPPoster
|
||||
}
|
||||
|
||||
func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error {
|
||||
@@ -2106,12 +2106,12 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (
|
||||
}
|
||||
switch ffn.Transport {
|
||||
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
|
||||
_, err = utils.NewHTTPPoster(v1.Config.HttpSkipTlsVerify,
|
||||
_, err = engine.NewHTTPPoster(v1.Config.HttpSkipTlsVerify,
|
||||
v1.Config.ReplyTimeout).Post(ffn.Address, utils.PosterTransportContentTypes[ffn.Transport], fileContent,
|
||||
v1.Config.PosterAttempts, failoverPath)
|
||||
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
|
||||
var amqpPoster *utils.AMQPPoster
|
||||
amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(ffn.Address, v1.Config.PosterAttempts, failedReqsOutDir)
|
||||
var amqpPoster *engine.AMQPPoster
|
||||
amqpPoster, err = engine.AMQPPostersCache.GetAMQPPoster(ffn.Address, v1.Config.PosterAttempts, failedReqsOutDir)
|
||||
if err == nil { // error will be checked bellow
|
||||
var chn *amqp.Channel
|
||||
chn, err = amqpPoster.Post(
|
||||
|
||||
@@ -295,36 +295,38 @@ func (self *ApierV1) ComputeFilterIndexes(args utils.ArgsComputeFilterIndexes, r
|
||||
transactionID := utils.GenUUID()
|
||||
//ThresholdProfile Indexes
|
||||
thdsIndexers, err := self.computeThresholdIndexes(args.Tenant, args.ThresholdIDs, transactionID)
|
||||
if err != nil {
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
//StatQueueProfile Indexes
|
||||
sqpIndexers, err := self.computeStatIndexes(args.Tenant, args.StatIDs, transactionID)
|
||||
if err != nil {
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
//ResourceProfile Indexes
|
||||
rsIndexes, err := self.computeResourceIndexes(args.Tenant, args.ResourceIDs, transactionID)
|
||||
if err != nil {
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
//SupplierProfile Indexes
|
||||
sppIndexes, err := self.computeSupplierIndexes(args.Tenant, args.SupplierIDs, transactionID)
|
||||
if err != nil {
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
//AttributeProfile Indexes
|
||||
attrIndexes, err := self.computeAttributeIndexes(args.Tenant, args.AttributeIDs, transactionID)
|
||||
if err != nil {
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
//Now we move from tmpKey to the right key for each type
|
||||
//ThresholdProfile Indexes
|
||||
if thdsIndexers != nil {
|
||||
if err := thdsIndexers.StoreIndexes(true, transactionID); err != nil {
|
||||
for _, id := range *args.ThresholdIDs {
|
||||
if err := thdsIndexers.RemoveItemFromIndex(id); err != nil {
|
||||
return err
|
||||
if args.ThresholdIDs != nil {
|
||||
for _, id := range *args.ThresholdIDs {
|
||||
if err := thdsIndexers.RemoveItemFromIndex(id); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
|
||||
@@ -223,7 +223,7 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *con
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
|
||||
if outValByte, err = engine.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
|
||||
return nil, err
|
||||
} else {
|
||||
fieldVal = string(outValByte)
|
||||
|
||||
@@ -236,7 +236,7 @@ func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cdrcCfg *confi
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
|
||||
if outValByte, err = engine.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
|
||||
return nil, err
|
||||
} else {
|
||||
fieldVal = string(outValByte)
|
||||
|
||||
@@ -221,7 +221,7 @@ func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity tree.Res, cdrcCfg *con
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if outValByte, err = utils.HttpJsonPost(httpAddr, xmlProc.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
|
||||
if outValByte, err = engine.HttpJsonPost(httpAddr, xmlProc.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
|
||||
return nil, err
|
||||
} else {
|
||||
fieldVal = string(outValByte)
|
||||
|
||||
@@ -188,7 +188,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en
|
||||
responder.SetTimeToLive(cfg.ResponseCacheTTL, nil)
|
||||
apierRpcV1 := &v1.ApierV1{StorDb: loadDb, DataManager: dm, CdrDb: cdrDb,
|
||||
Config: cfg, Responder: responder, ServManager: serviceManager,
|
||||
HTTPPoster: utils.NewHTTPPoster(cfg.HttpSkipTlsVerify, cfg.ReplyTimeout)}
|
||||
HTTPPoster: engine.NewHTTPPoster(cfg.HttpSkipTlsVerify, cfg.ReplyTimeout)}
|
||||
if thdS != nil {
|
||||
engine.SetThresholdS(thdS) // temporary architectural fix until we will have separate AccountS
|
||||
}
|
||||
|
||||
@@ -510,7 +510,7 @@ func callUrl(ub *Account, sq *CDRStatsQueueTriggered, a *Action, acs Actions) er
|
||||
ffn := &utils.FallbackFileName{Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType),
|
||||
Transport: utils.MetaHTTPjson, Address: a.ExtraParameters,
|
||||
RequestID: utils.GenUUID(), FileSuffix: utils.JSNSuffix}
|
||||
_, err = utils.NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify,
|
||||
_, err = NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify,
|
||||
config.CgrConfig().ReplyTimeout).Post(a.ExtraParameters, utils.CONTENT_JSON, jsn,
|
||||
config.CgrConfig().PosterAttempts, path.Join(cfg.FailedPostsDir, ffn.AsString()))
|
||||
return err
|
||||
@@ -533,7 +533,7 @@ func callUrlAsync(ub *Account, sq *CDRStatsQueueTriggered, a *Action, acs Action
|
||||
ffn := &utils.FallbackFileName{Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType),
|
||||
Transport: utils.MetaHTTPjson, Address: a.ExtraParameters,
|
||||
RequestID: utils.GenUUID(), FileSuffix: utils.JSNSuffix}
|
||||
go utils.NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify,
|
||||
go NewHTTPPoster(config.CgrConfig().HttpSkipTlsVerify,
|
||||
config.CgrConfig().ReplyTimeout).Post(a.ExtraParameters, utils.CONTENT_JSON, jsn,
|
||||
config.CgrConfig().PosterAttempts, path.Join(cfg.FailedPostsDir, ffn.AsString()))
|
||||
return nil
|
||||
|
||||
@@ -519,7 +519,7 @@ func (cdr *CDR) formatField(cfgFld *config.CfgCdrField, httpSkipTlsCheck bool,
|
||||
}
|
||||
if len(httpAddr) == 0 {
|
||||
err = fmt.Errorf("Empty http address for field %s type %s", cfgFld.Tag, cfgFld.Type)
|
||||
} else if outValByte, err = utils.HttpJsonPost(httpAddr, httpSkipTlsCheck, jsn); err == nil {
|
||||
} else if outValByte, err = HttpJsonPost(httpAddr, httpSkipTlsCheck, jsn); err == nil {
|
||||
outVal = string(outValByte)
|
||||
if len(outVal) == 0 && cfgFld.Mandatory {
|
||||
err = fmt.Errorf("Empty result for http_post field: %s", cfgFld.Tag)
|
||||
|
||||
@@ -52,7 +52,7 @@ const (
|
||||
|
||||
func NewCDRExporter(cdrs []*CDR, exportTemplate *config.CdreConfig, exportFormat, exportPath, fallbackPath, exportID string,
|
||||
synchronous bool, attempts int, fieldSeparator rune, usageMultiplyFactor utils.FieldMultiplyFactor,
|
||||
costMultiplyFactor float64, roundingDecimals int, httpSkipTlsCheck bool, httpPoster *utils.HTTPPoster) (*CDRExporter, error) {
|
||||
costMultiplyFactor float64, roundingDecimals int, httpSkipTlsCheck bool, httpPoster *HTTPPoster) (*CDRExporter, error) {
|
||||
if len(cdrs) == 0 { // Nothing to export
|
||||
return nil, nil
|
||||
}
|
||||
@@ -91,7 +91,7 @@ type CDRExporter struct {
|
||||
costMultiplyFactor float64
|
||||
roundingDecimals int
|
||||
httpSkipTlsCheck bool
|
||||
httpPoster *utils.HTTPPoster
|
||||
httpPoster *HTTPPoster
|
||||
|
||||
header, trailer []string // Header and Trailer fields
|
||||
content [][]string // Rows of cdr fields
|
||||
@@ -250,8 +250,8 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
|
||||
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
|
||||
_, err = cdre.httpPoster.Post(cdre.exportPath, utils.PosterTransportContentTypes[cdre.exportFormat], body, cdre.attempts, fallbackPath)
|
||||
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
|
||||
var amqpPoster *utils.AMQPPoster
|
||||
amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(cdre.exportPath, cdre.attempts, cdre.fallbackPath)
|
||||
var amqpPoster *AMQPPoster
|
||||
amqpPoster, err = AMQPPostersCache.GetAMQPPoster(cdre.exportPath, cdre.attempts, cdre.fallbackPath)
|
||||
if err == nil { // error will be checked bellow
|
||||
var chn *amqp.Channel
|
||||
chn, err = amqpPoster.Post(
|
||||
|
||||
@@ -97,7 +97,7 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, r
|
||||
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dm: dm,
|
||||
rals: rater, pubsub: pubsub, users: users, aliases: aliases,
|
||||
cdrstats: cdrstats, stats: stats, thdS: thdS, guard: guardian.Guardian,
|
||||
httpPoster: utils.NewHTTPPoster(cgrCfg.HttpSkipTlsVerify, cgrCfg.ReplyTimeout)}, nil
|
||||
httpPoster: NewHTTPPoster(cgrCfg.HttpSkipTlsVerify, cgrCfg.ReplyTimeout)}, nil
|
||||
}
|
||||
|
||||
type CdrServer struct {
|
||||
@@ -114,7 +114,7 @@ type CdrServer struct {
|
||||
stats rpcclient.RpcClientConnection
|
||||
guard *guardian.GuardianLocker
|
||||
responseCache *utils.ResponseCache
|
||||
httpPoster *utils.HTTPPoster // used for replication
|
||||
httpPoster *HTTPPoster // used for replication
|
||||
}
|
||||
|
||||
func (self *CdrServer) Timezone() string {
|
||||
|
||||
@@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package utils
|
||||
package engine
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
@@ -32,6 +32,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
@@ -63,69 +64,6 @@ func HttpJsonPost(url string, skipTlsVerify bool, content []byte) ([]byte, error
|
||||
return respBody, nil
|
||||
}
|
||||
|
||||
// NewFallbackFileNameFronString will revert the meta information in the fallback file name into original data
|
||||
func NewFallbackFileNameFronString(fileName string) (ffn *FallbackFileName, err error) {
|
||||
ffn = new(FallbackFileName)
|
||||
moduleIdx := strings.Index(fileName, HandlerArgSep)
|
||||
ffn.Module = fileName[:moduleIdx]
|
||||
var supportedModule bool
|
||||
for _, prfx := range []string{ActionsPoster, CDRPoster} {
|
||||
if strings.HasPrefix(ffn.Module, prfx) {
|
||||
supportedModule = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !supportedModule {
|
||||
return nil, fmt.Errorf("unsupported module: %s", ffn.Module)
|
||||
}
|
||||
fileNameWithoutModule := fileName[moduleIdx+1:]
|
||||
for _, trspt := range []string{MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap} {
|
||||
if strings.HasPrefix(fileNameWithoutModule, trspt) {
|
||||
ffn.Transport = trspt
|
||||
break
|
||||
}
|
||||
}
|
||||
if ffn.Transport == "" {
|
||||
return nil, fmt.Errorf("unsupported transport in fallback file path: %s", fileName)
|
||||
}
|
||||
fileNameWithoutTransport := fileNameWithoutModule[len(ffn.Transport)+1:]
|
||||
reqIDidx := strings.LastIndex(fileNameWithoutTransport, HandlerArgSep)
|
||||
if reqIDidx == -1 {
|
||||
return nil, fmt.Errorf("cannot find request ID in fallback file path: %s", fileName)
|
||||
}
|
||||
if ffn.Address, err = url.QueryUnescape(fileNameWithoutTransport[:reqIDidx]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fileNameWithoutAddress := fileNameWithoutTransport[reqIDidx+1:]
|
||||
for _, suffix := range []string{TxtSuffix, JSNSuffix, FormSuffix} {
|
||||
if strings.HasSuffix(fileNameWithoutAddress, suffix) {
|
||||
ffn.FileSuffix = suffix
|
||||
break
|
||||
}
|
||||
}
|
||||
if ffn.FileSuffix == "" {
|
||||
return nil, fmt.Errorf("unsupported suffix in fallback file path: %s", fileName)
|
||||
}
|
||||
ffn.RequestID = fileNameWithoutAddress[:len(fileNameWithoutAddress)-len(ffn.FileSuffix)]
|
||||
return
|
||||
}
|
||||
|
||||
// FallbackFileName is the struct defining the name of a file where CGRateS will dump data which fails to be sent remotely
|
||||
type FallbackFileName struct {
|
||||
Module string // name of the module writing the file
|
||||
Transport string // transport used to send data remotely
|
||||
Address string // remote address where data should have been sent
|
||||
RequestID string // unique identifier of the request which should make files unique
|
||||
FileSuffix string // informative file termination suffix
|
||||
}
|
||||
|
||||
func (ffn *FallbackFileName) AsString() string {
|
||||
if ffn.FileSuffix == "" { // Autopopulate FileSuffix based on the transport used
|
||||
ffn.FileSuffix = CDREFileSuffixes[ffn.Transport]
|
||||
}
|
||||
return fmt.Sprintf("%s%s%s%s%s%s%s%s", ffn.Module, HandlerArgSep, ffn.Transport, HandlerArgSep, url.QueryEscape(ffn.Address), HandlerArgSep, ffn.RequestID, ffn.FileSuffix)
|
||||
}
|
||||
|
||||
func NewHTTPPoster(skipTLSVerify bool, replyTimeout time.Duration) *HTTPPoster {
|
||||
tr := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLSVerify},
|
||||
@@ -140,49 +78,49 @@ type HTTPPoster struct {
|
||||
// Post with built-in failover
|
||||
// Returns also reference towards client so we can close it's connections when done
|
||||
func (poster *HTTPPoster) Post(addr string, contentType string, content interface{}, attempts int, fallbackFilePath string) (respBody []byte, err error) {
|
||||
if !IsSliceMember([]string{CONTENT_JSON, CONTENT_FORM, CONTENT_TEXT}, contentType) {
|
||||
if !utils.IsSliceMember([]string{utils.CONTENT_JSON, utils.CONTENT_FORM, utils.CONTENT_TEXT}, contentType) {
|
||||
return nil, fmt.Errorf("unsupported ContentType: %s", contentType)
|
||||
}
|
||||
var body []byte // Used to write in file and send over http
|
||||
var urlVals url.Values // Used when posting form
|
||||
if IsSliceMember([]string{CONTENT_JSON, CONTENT_TEXT}, contentType) {
|
||||
if utils.IsSliceMember([]string{utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) {
|
||||
body = content.([]byte)
|
||||
} else if contentType == CONTENT_FORM {
|
||||
} else if contentType == utils.CONTENT_FORM {
|
||||
urlVals = content.(url.Values)
|
||||
body = []byte(urlVals.Encode())
|
||||
}
|
||||
fib := Fib()
|
||||
fib := utils.Fib()
|
||||
bodyType := "application/x-www-form-urlencoded"
|
||||
if contentType == CONTENT_JSON {
|
||||
if contentType == utils.CONTENT_JSON {
|
||||
bodyType = "application/json"
|
||||
}
|
||||
for i := 0; i < attempts; i++ {
|
||||
var resp *http.Response
|
||||
if IsSliceMember([]string{CONTENT_JSON, CONTENT_TEXT}, contentType) {
|
||||
if utils.IsSliceMember([]string{utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) {
|
||||
resp, err = poster.httpClient.Post(addr, bodyType, bytes.NewBuffer(body))
|
||||
} else if contentType == CONTENT_FORM {
|
||||
} else if contentType == utils.CONTENT_FORM {
|
||||
resp, err = poster.httpClient.PostForm(addr, urlVals)
|
||||
}
|
||||
if err != nil {
|
||||
Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
|
||||
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
continue
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
respBody, err = ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
|
||||
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode > 299 {
|
||||
Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode))
|
||||
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode))
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
continue
|
||||
}
|
||||
return respBody, nil
|
||||
}
|
||||
if fallbackFilePath != META_NONE {
|
||||
if fallbackFilePath != utils.META_NONE {
|
||||
// If we got that far, post was not possible, write it on disk
|
||||
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
|
||||
fileOut, err := os.Create(fallbackFilePath)
|
||||
@@ -194,7 +132,7 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac
|
||||
return nil, err
|
||||
}
|
||||
return nil, nil
|
||||
}, time.Duration(2*time.Second), FileLockPrefix+fallbackFilePath)
|
||||
}, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -250,7 +188,7 @@ type AMQPPoster struct {
|
||||
// the optional chn will permits channel caching
|
||||
func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []byte, fallbackFileName string) (*amqp.Channel, error) {
|
||||
var err error
|
||||
fib := Fib()
|
||||
fib := utils.Fib()
|
||||
if chn == nil {
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
if chn, err = pstr.NewPostChannel(); err == nil {
|
||||
@@ -258,7 +196,7 @@ func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []by
|
||||
}
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
if err != nil && fallbackFileName != META_NONE {
|
||||
if err != nil && fallbackFileName != utils.META_NONE {
|
||||
err = pstr.writeToFile(fallbackFileName, content)
|
||||
return nil, err
|
||||
}
|
||||
@@ -278,7 +216,7 @@ func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []by
|
||||
}
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
if err != nil && fallbackFileName != META_NONE {
|
||||
if err != nil && fallbackFileName != utils.META_NONE {
|
||||
err = pstr.writeToFile(fallbackFileName, content)
|
||||
return nil, err
|
||||
}
|
||||
@@ -303,7 +241,7 @@ func (pstr *AMQPPoster) NewPostChannel() (postChan *amqp.Channel, err error) {
|
||||
pstr.conn = conn
|
||||
go func() { // monitor connection errors so we can restart
|
||||
if err := <-pstr.conn.NotifyClose(make(chan *amqp.Error)); err != nil {
|
||||
Logger.Err(fmt.Sprintf("Connection error received: %s", err.Error()))
|
||||
utils.Logger.Err(fmt.Sprintf("Connection error received: %s", err.Error()))
|
||||
pstr.Close()
|
||||
}
|
||||
}()
|
||||
@@ -333,6 +271,6 @@ func (pstr *AMQPPoster) writeToFile(fileName string, content []byte) (err error)
|
||||
return nil, err
|
||||
}
|
||||
return nil, nil
|
||||
}, time.Duration(2*time.Second), FileLockPrefix+fallbackFilePath)
|
||||
}, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath)
|
||||
return
|
||||
}
|
||||
@@ -17,7 +17,7 @@ GNU General Public License for more details.
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package utils
|
||||
package engine
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
18
engine/poster_test.go
Normal file
18
engine/poster_test.go
Normal file
@@ -0,0 +1,18 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package engine
|
||||
@@ -64,7 +64,7 @@ func NewPubSub(dm *DataManager, ttlVerify bool) (*PubSub, error) {
|
||||
ps := &PubSub{
|
||||
ttlVerify: ttlVerify,
|
||||
subscribers: make(map[string]*SubscriberData),
|
||||
pubFunc: utils.HttpJsonPost,
|
||||
pubFunc: HttpJsonPost,
|
||||
mux: &sync.Mutex{},
|
||||
dm: dm,
|
||||
}
|
||||
|
||||
@@ -308,7 +308,6 @@ func (tS *ThresholdService) processEvent(args *ArgsProcessEvent) (thresholdsIDs
|
||||
fmt.Sprintf("<ThresholdService> failed removing non-recurrent threshold: %s, error: %s",
|
||||
t.TenantID(), err.Error()))
|
||||
withErrors = true
|
||||
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -325,8 +324,6 @@ func (tS *ThresholdService) processEvent(args *ArgsProcessEvent) (thresholdsIDs
|
||||
}
|
||||
if len(tIDs) != 0 {
|
||||
thresholdsIDs = append(thresholdsIDs, tIDs...)
|
||||
} else {
|
||||
thresholdsIDs = []string{}
|
||||
}
|
||||
if withErrors {
|
||||
err = utils.ErrPartiallyExecuted
|
||||
|
||||
@@ -32,6 +32,7 @@ import (
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
@@ -843,3 +844,66 @@ type AuthStruct struct {
|
||||
Tenant string
|
||||
ApiKey string
|
||||
}
|
||||
|
||||
// NewFallbackFileNameFronString will revert the meta information in the fallback file name into original data
|
||||
func NewFallbackFileNameFronString(fileName string) (ffn *FallbackFileName, err error) {
|
||||
ffn = new(FallbackFileName)
|
||||
moduleIdx := strings.Index(fileName, HandlerArgSep)
|
||||
ffn.Module = fileName[:moduleIdx]
|
||||
var supportedModule bool
|
||||
for _, prfx := range []string{ActionsPoster, CDRPoster} {
|
||||
if strings.HasPrefix(ffn.Module, prfx) {
|
||||
supportedModule = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !supportedModule {
|
||||
return nil, fmt.Errorf("unsupported module: %s", ffn.Module)
|
||||
}
|
||||
fileNameWithoutModule := fileName[moduleIdx+1:]
|
||||
for _, trspt := range []string{MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap} {
|
||||
if strings.HasPrefix(fileNameWithoutModule, trspt) {
|
||||
ffn.Transport = trspt
|
||||
break
|
||||
}
|
||||
}
|
||||
if ffn.Transport == "" {
|
||||
return nil, fmt.Errorf("unsupported transport in fallback file path: %s", fileName)
|
||||
}
|
||||
fileNameWithoutTransport := fileNameWithoutModule[len(ffn.Transport)+1:]
|
||||
reqIDidx := strings.LastIndex(fileNameWithoutTransport, HandlerArgSep)
|
||||
if reqIDidx == -1 {
|
||||
return nil, fmt.Errorf("cannot find request ID in fallback file path: %s", fileName)
|
||||
}
|
||||
if ffn.Address, err = url.QueryUnescape(fileNameWithoutTransport[:reqIDidx]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fileNameWithoutAddress := fileNameWithoutTransport[reqIDidx+1:]
|
||||
for _, suffix := range []string{TxtSuffix, JSNSuffix, FormSuffix} {
|
||||
if strings.HasSuffix(fileNameWithoutAddress, suffix) {
|
||||
ffn.FileSuffix = suffix
|
||||
break
|
||||
}
|
||||
}
|
||||
if ffn.FileSuffix == "" {
|
||||
return nil, fmt.Errorf("unsupported suffix in fallback file path: %s", fileName)
|
||||
}
|
||||
ffn.RequestID = fileNameWithoutAddress[:len(fileNameWithoutAddress)-len(ffn.FileSuffix)]
|
||||
return
|
||||
}
|
||||
|
||||
// FallbackFileName is the struct defining the name of a file where CGRateS will dump data which fails to be sent remotely
|
||||
type FallbackFileName struct {
|
||||
Module string // name of the module writing the file
|
||||
Transport string // transport used to send data remotely
|
||||
Address string // remote address where data should have been sent
|
||||
RequestID string // unique identifier of the request which should make files unique
|
||||
FileSuffix string // informative file termination suffix
|
||||
}
|
||||
|
||||
func (ffn *FallbackFileName) AsString() string {
|
||||
if ffn.FileSuffix == "" { // Autopopulate FileSuffix based on the transport used
|
||||
ffn.FileSuffix = CDREFileSuffixes[ffn.Transport]
|
||||
}
|
||||
return fmt.Sprintf("%s%s%s%s%s%s%s%s", ffn.Module, HandlerArgSep, ffn.Transport, HandlerArgSep, url.QueryEscape(ffn.Address), HandlerArgSep, ffn.RequestID, ffn.FileSuffix)
|
||||
}
|
||||
|
||||
@@ -820,3 +820,52 @@ func TestGZIPGUnZIP(t *testing.T) {
|
||||
t.Error("not matching initial source")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFFNNewFallbackFileNameFronString(t *testing.T) {
|
||||
fileName := "cdr|*http_json_cdr|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json|1acce2c9-3f2d-4774-8662-c28872dad515.json"
|
||||
eFFN := &FallbackFileName{Module: "cdr",
|
||||
Transport: MetaHTTPjsonCDR,
|
||||
Address: "http://127.0.0.1:12080/invalid_json",
|
||||
RequestID: "1acce2c9-3f2d-4774-8662-c28872dad515",
|
||||
FileSuffix: JSNSuffix}
|
||||
if ffn, err := NewFallbackFileNameFronString(fileName); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eFFN, ffn) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn)
|
||||
}
|
||||
fileName = "cdr|*http_post|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid|70c53d6d-dbd7-452e-a5bd-36bab59bb9ff.form"
|
||||
eFFN = &FallbackFileName{Module: "cdr",
|
||||
Transport: META_HTTP_POST,
|
||||
Address: "http://127.0.0.1:12080/invalid",
|
||||
RequestID: "70c53d6d-dbd7-452e-a5bd-36bab59bb9ff",
|
||||
FileSuffix: FormSuffix}
|
||||
if ffn, err := NewFallbackFileNameFronString(fileName); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eFFN, ffn) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn)
|
||||
}
|
||||
fileName = "act>*call_url|*http_json|http%3A%2F%2Flocalhost%3A2080%2Flog_warning|f52cf23e-da2f-4675-b36b-e8fcc3869270.json"
|
||||
eFFN = &FallbackFileName{Module: "act>*call_url",
|
||||
Transport: MetaHTTPjson,
|
||||
Address: "http://localhost:2080/log_warning",
|
||||
RequestID: "f52cf23e-da2f-4675-b36b-e8fcc3869270",
|
||||
FileSuffix: JSNSuffix}
|
||||
if ffn, err := NewFallbackFileNameFronString(fileName); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eFFN, ffn) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFFNFallbackFileNameAsString(t *testing.T) {
|
||||
eFn := "cdr|*http_json_cdr|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json|1acce2c9-3f2d-4774-8662-c28872dad515.json"
|
||||
ffn := &FallbackFileName{
|
||||
Module: "cdr",
|
||||
Transport: MetaHTTPjsonCDR,
|
||||
Address: "http://127.0.0.1:12080/invalid_json",
|
||||
RequestID: "1acce2c9-3f2d-4774-8662-c28872dad515",
|
||||
FileSuffix: JSNSuffix}
|
||||
if ffnStr := ffn.AsString(); ffnStr != eFn {
|
||||
t.Errorf("Expecting: <%q>, received: <%q>", eFn, ffnStr)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,72 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package utils
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestFFNNewFallbackFileNameFronString(t *testing.T) {
|
||||
fileName := "cdr|*http_json_cdr|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json|1acce2c9-3f2d-4774-8662-c28872dad515.json"
|
||||
eFFN := &FallbackFileName{Module: "cdr",
|
||||
Transport: MetaHTTPjsonCDR,
|
||||
Address: "http://127.0.0.1:12080/invalid_json",
|
||||
RequestID: "1acce2c9-3f2d-4774-8662-c28872dad515",
|
||||
FileSuffix: JSNSuffix}
|
||||
if ffn, err := NewFallbackFileNameFronString(fileName); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eFFN, ffn) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn)
|
||||
}
|
||||
fileName = "cdr|*http_post|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid|70c53d6d-dbd7-452e-a5bd-36bab59bb9ff.form"
|
||||
eFFN = &FallbackFileName{Module: "cdr",
|
||||
Transport: META_HTTP_POST,
|
||||
Address: "http://127.0.0.1:12080/invalid",
|
||||
RequestID: "70c53d6d-dbd7-452e-a5bd-36bab59bb9ff",
|
||||
FileSuffix: FormSuffix}
|
||||
if ffn, err := NewFallbackFileNameFronString(fileName); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eFFN, ffn) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn)
|
||||
}
|
||||
fileName = "act>*call_url|*http_json|http%3A%2F%2Flocalhost%3A2080%2Flog_warning|f52cf23e-da2f-4675-b36b-e8fcc3869270.json"
|
||||
eFFN = &FallbackFileName{Module: "act>*call_url",
|
||||
Transport: MetaHTTPjson,
|
||||
Address: "http://localhost:2080/log_warning",
|
||||
RequestID: "f52cf23e-da2f-4675-b36b-e8fcc3869270",
|
||||
FileSuffix: JSNSuffix}
|
||||
if ffn, err := NewFallbackFileNameFronString(fileName); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eFFN, ffn) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eFFN, ffn)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFFNFallbackFileNameAsString(t *testing.T) {
|
||||
eFn := "cdr|*http_json_cdr|http%3A%2F%2F127.0.0.1%3A12080%2Finvalid_json|1acce2c9-3f2d-4774-8662-c28872dad515.json"
|
||||
ffn := &FallbackFileName{
|
||||
Module: "cdr",
|
||||
Transport: MetaHTTPjsonCDR,
|
||||
Address: "http://127.0.0.1:12080/invalid_json",
|
||||
RequestID: "1acce2c9-3f2d-4774-8662-c28872dad515",
|
||||
FileSuffix: JSNSuffix}
|
||||
if ffnStr := ffn.AsString(); ffnStr != eFn {
|
||||
t.Errorf("Expecting: <%q>, received: <%q>", eFn, ffnStr)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user