diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 135ed765f..40c69600c 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -19,6 +19,7 @@ along with this program. If not, see package v1 import ( + "context" "errors" "fmt" "io/ioutil" @@ -35,6 +36,7 @@ import ( "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" "github.com/streadway/amqp" + amqpv1 "pack.ag/amqp" ) const ( @@ -1534,6 +1536,18 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) ( chn.Close() } } + case utils.MetaAWSjsonMap: + var awsPoster *engine.AWSPoster + awsPoster, err = engine.AMQPPostersCache.GetAWSPoster(ffn.Address, + v1.Config.GeneralCfg().PosterAttempts, failedReqsOutDir) + if err != nil { + break + } + var ses *amqpv1.Session + ses, err = awsPoster.Post(fileContent, file.Name()) + if ses != nil { + ses.Close(context.Background()) + } default: err = fmt.Errorf("unsupported replication transport: %s", ffn.Transport) } diff --git a/engine/cdre.go b/engine/cdre.go index 482046028..8677c989d 100644 --- a/engine/cdre.go +++ b/engine/cdre.go @@ -19,6 +19,7 @@ along with this program. If not, see package engine import ( + "context" "encoding/csv" "encoding/json" "fmt" @@ -34,6 +35,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/streadway/amqp" + amqpv1 "pack.ag/amqp" ) const ( @@ -247,7 +249,7 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) { return err } body = jsn - case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap: + case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAWSjsonMap: expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, nil, cdre.roundingDecimals, cdre.filterS) if err != nil { return err @@ -294,6 +296,17 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) { chn.Close() } } + case utils.MetaAWSjsonMap: + var awsPoster *AWSPoster + awsPoster, err = AMQPPostersCache.GetAWSPoster(cdre.exportPath, cdre.attempts, cdre.fallbackPath) + if err != nil { + break + } + var ses *amqpv1.Session + ses, err = awsPoster.Post(body.([]byte), fallbackFileName) + if ses != nil { + ses.Close(context.Background()) + } } return } diff --git a/engine/poster.go b/engine/poster.go index fbe694e52..77224e0d6 100644 --- a/engine/poster.go +++ b/engine/poster.go @@ -50,7 +50,10 @@ const ( ) func init() { - AMQPPostersCache = &AMQPCachedPosters{cache: make(map[string]*AMQPPoster)} // Initialize the cache for amqpPosters + AMQPPostersCache = &AMQPCachedPosters{ + cache: make(map[string]*AMQPPoster), + cache2: make(map[string]*AWSPoster), + } // Initialize the cache for amqpPosters } var AMQPPostersCache *AMQPCachedPosters @@ -151,7 +154,8 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac // AMQPPosterCache is used to cache mutliple AMQPPoster connections based on the address type AMQPCachedPosters struct { sync.Mutex - cache map[string]*AMQPPoster + cache map[string]*AMQPPoster + cache2 map[string]*AWSPoster } // GetAMQPPoster creates a new poster only if not already cached @@ -170,6 +174,20 @@ func (pc *AMQPCachedPosters) GetAMQPPoster(dialURL string, attempts int, return pc.cache[dialURL], nil } +func (pc *AMQPCachedPosters) GetAWSPoster(dialURL string, attempts int, + fallbackFileDir string) (amqpPoster *AWSPoster, err error) { + pc.Lock() + defer pc.Unlock() + if _, hasIt := pc.cache2[dialURL]; !hasIt { + if pstr, err := NewAWSPoster(dialURL, attempts, fallbackFileDir); err != nil { + return nil, err + } else { + pc.cache2[dialURL] = pstr + } + } + return pc.cache2[dialURL], nil +} + // "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs" func NewAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (*AMQPPoster, error) { amqp := &AMQPPoster{ @@ -425,13 +443,12 @@ func (pstr *AWSPoster) Post(content []byte, fallbackFileName string) (s *amqpv1. time.Sleep(time.Duration(fib()) * time.Second) continue } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx := context.Background() // Send message - err = sender.Send(ctx, amqpv1.NewMessage([]byte(content))) + err = sender.Send(ctx, amqpv1.NewMessage(content)) if err == nil { sender.Close(ctx) - cancel() break } time.Sleep(time.Duration(fib()) * time.Second) diff --git a/utils/consts.go b/utils/consts.go index 32296788a..534c6a00f 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -20,7 +20,7 @@ package utils var ( CDRExportFormats = []string{DRYRUN, MetaFileCSV, MetaFileFWV, MetaHTTPjsonCDR, MetaHTTPjsonMap, - MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap} + MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap, MetaAWSjsonMap} MainCDRFields = []string{CGRID, Source, OriginHost, OriginID, ToR, RequestType, Tenant, Category, Account, Subject, Destination, SetupTime, AnswerTime, Usage, COST, RATED, Partial, RunID, PreRated, CostSource} @@ -34,12 +34,14 @@ var ( META_HTTP_POST: CONTENT_FORM, MetaAMQPjsonCDR: CONTENT_JSON, MetaAMQPjsonMap: CONTENT_JSON, + MetaAWSjsonMap: CONTENT_JSON, } CDREFileSuffixes = map[string]string{ MetaHTTPjsonCDR: JSNSuffix, MetaHTTPjsonMap: JSNSuffix, MetaAMQPjsonCDR: JSNSuffix, MetaAMQPjsonMap: JSNSuffix, + MetaAWSjsonMap: JSNSuffix, META_HTTP_POST: FormSuffix, MetaFileCSV: CSVSuffix, MetaFileFWV: FWVSuffix, @@ -273,6 +275,7 @@ const ( MetaHTTPjsonMap = "*http_json_map" MetaAMQPjsonCDR = "*amqp_json_cdr" MetaAMQPjsonMap = "*amqp_json_map" + MetaAWSjsonMap = "*aws_json_map" NANO_MULTIPLIER = 1000000000 CGR_AUTHORIZE = "CGR_AUTHORIZE" CONFIG_DIR = "/etc/cgrates/" diff --git a/utils/coreutils.go b/utils/coreutils.go index e752206ca..79e1581f9 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -854,7 +854,7 @@ func NewFallbackFileNameFronString(fileName string) (ffn *FallbackFileName, err return nil, fmt.Errorf("unsupported module: %s", ffn.Module) } fileNameWithoutModule := fileName[moduleIdx+1:] - for _, trspt := range []string{MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap} { + for _, trspt := range []string{MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap, MetaAWSjsonMap} { if strings.HasPrefix(fileNameWithoutModule, trspt) { ffn.Transport = trspt break