mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-24 00:28:44 +05:00
Added S3 as cdre
This commit is contained in:
committed by
Dan Christian Bogos
parent
8d6b518869
commit
fcab2792ba
@@ -130,6 +130,7 @@ func getActionFunc(typ string) (actionTypeFunc, bool) {
|
||||
utils.MetaAMQPV1jsonMap: sendAWS,
|
||||
utils.MetaSQSjsonMap: sendSQS,
|
||||
utils.MetaKafkajsonMap: sendKafka,
|
||||
utils.MetaS3jsonMap: sendS3,
|
||||
MetaRemoveSessionCosts: removeSessionCosts,
|
||||
MetaRemoveExpired: removeExpired,
|
||||
}
|
||||
@@ -467,6 +468,24 @@ func sendKafka(ub *Account, a *Action, acs Actions, extraData interface{}) error
|
||||
body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName)
|
||||
}
|
||||
|
||||
func sendS3(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
body, err := getOneData(ub, extraData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg := config.CgrConfig()
|
||||
fallbackFileName := (&utils.FallbackFileName{
|
||||
Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType),
|
||||
Transport: utils.MetaS3jsonMap,
|
||||
Address: a.ExtraParameters,
|
||||
RequestID: utils.GenUUID(),
|
||||
FileSuffix: utils.JSNSuffix,
|
||||
}).AsString()
|
||||
|
||||
return PostersCache.PostS3(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts,
|
||||
body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName)
|
||||
}
|
||||
|
||||
func callUrl(ub *Account, a *Action, acs Actions, extraData interface{}) error {
|
||||
jsn, err := getOneData(ub, extraData)
|
||||
if err != nil {
|
||||
|
||||
@@ -244,7 +244,7 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
|
||||
return err
|
||||
}
|
||||
body = jsn
|
||||
case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, utils.MetaKafkajsonMap:
|
||||
case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, utils.MetaKafkajsonMap, utils.MetaS3jsonMap:
|
||||
expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, nil, cdre.filterS)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -289,6 +289,8 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
|
||||
err = PostersCache.PostSQS(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName)
|
||||
case utils.MetaKafkajsonMap:
|
||||
err = PostersCache.PostKafka(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName)
|
||||
case utils.MetaS3jsonMap:
|
||||
err = PostersCache.PostS3(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -52,6 +52,7 @@ func init() {
|
||||
amqpv1Cache: make(map[string]Poster),
|
||||
sqsCache: make(map[string]Poster),
|
||||
kafkaCache: make(map[string]Poster),
|
||||
s3Cache: make(map[string]Poster),
|
||||
} // Initialize the cache for amqpPosters
|
||||
}
|
||||
|
||||
@@ -63,6 +64,7 @@ type PosterCache struct {
|
||||
amqpv1Cache map[string]Poster
|
||||
sqsCache map[string]Poster
|
||||
kafkaCache map[string]Poster
|
||||
s3Cache map[string]Poster
|
||||
}
|
||||
|
||||
type Poster interface {
|
||||
@@ -157,7 +159,7 @@ func (pc *PosterCache) GetSQSPoster(dialURL string, attempts int, fallbackFileDi
|
||||
func (pc *PosterCache) GetKafkaPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
|
||||
pc.Lock()
|
||||
defer pc.Unlock()
|
||||
if _, hasIt := pc.sqsCache[dialURL]; !hasIt {
|
||||
if _, hasIt := pc.kafkaCache[dialURL]; !hasIt {
|
||||
if pstr, err := NewKafkaPoster(dialURL, attempts, fallbackFileDir); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
@@ -167,6 +169,19 @@ func (pc *PosterCache) GetKafkaPoster(dialURL string, attempts int, fallbackFile
|
||||
return pc.kafkaCache[dialURL], nil
|
||||
}
|
||||
|
||||
func (pc *PosterCache) GetS3Poster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
|
||||
pc.Lock()
|
||||
defer pc.Unlock()
|
||||
if _, hasIt := pc.s3Cache[dialURL]; !hasIt {
|
||||
if pstr, err := NewS3Poster(dialURL, attempts, fallbackFileDir); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
pc.s3Cache[dialURL] = pstr
|
||||
}
|
||||
}
|
||||
return pc.s3Cache[dialURL], nil
|
||||
}
|
||||
|
||||
func (pc *PosterCache) PostAMQP(dialURL string, attempts int,
|
||||
content []byte, contentType, fallbackFileDir, fallbackFileName string) error {
|
||||
amqpPoster, err := pc.GetAMQPPoster(dialURL, attempts, fallbackFileDir)
|
||||
@@ -202,3 +217,12 @@ func (pc *PosterCache) PostKafka(dialURL string, attempts int,
|
||||
}
|
||||
return kafkaPoster.Post(content, fallbackFileName)
|
||||
}
|
||||
|
||||
func (pc *PosterCache) PostS3(dialURL string, attempts int,
|
||||
content []byte, fallbackFileDir, fallbackFileName string) error {
|
||||
sqsPoster, err := pc.GetS3Poster(dialURL, attempts, fallbackFileDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return sqsPoster.Post(content, fallbackFileName)
|
||||
}
|
||||
|
||||
168
engine/pstr_s3.go
Normal file
168
engine/pstr_s3.go
Normal file
@@ -0,0 +1,168 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewS3Poster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) {
|
||||
pstr := &S3Poster{
|
||||
attempts: attempts,
|
||||
fallbackFileDir: fallbackFileDir,
|
||||
}
|
||||
err := pstr.parseURL(dialURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pstr, nil
|
||||
}
|
||||
|
||||
type S3Poster struct {
|
||||
sync.Mutex
|
||||
dialURL string
|
||||
awsRegion string
|
||||
awsID string
|
||||
awsKey string
|
||||
awsToken string
|
||||
attempts int
|
||||
fallbackFileDir string
|
||||
queueURL *string
|
||||
queueID string
|
||||
// getQueueOnce sync.Once
|
||||
session *session.Session
|
||||
}
|
||||
|
||||
func (pstr *S3Poster) Close() {}
|
||||
|
||||
func (pstr *S3Poster) parseURL(dialURL string) (err error) {
|
||||
u, err := url.Parse(dialURL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
qry := u.Query()
|
||||
|
||||
pstr.dialURL = strings.Split(dialURL, "?")[0]
|
||||
pstr.dialURL = strings.TrimSuffix(pstr.dialURL, "/") // used to remove / to point to correct endpoint
|
||||
pstr.queueID = defaultQueueID
|
||||
if vals, has := qry[queueID]; has && len(vals) != 0 {
|
||||
pstr.queueID = vals[0]
|
||||
}
|
||||
if vals, has := qry[awsRegion]; has && len(vals) != 0 {
|
||||
pstr.awsRegion = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<S3Poster> No region present for AWS.")
|
||||
}
|
||||
if vals, has := qry[awsID]; has && len(vals) != 0 {
|
||||
pstr.awsID = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<S3Poster> No access key ID present for AWS.")
|
||||
}
|
||||
if vals, has := qry[awsSecret]; has && len(vals) != 0 {
|
||||
pstr.awsKey = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<S3Poster> No secret access key present for AWS.")
|
||||
}
|
||||
if vals, has := qry[awsToken]; has && len(vals) != 0 {
|
||||
pstr.awsToken = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<S3Poster> No session token present for AWS.")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pstr *S3Poster) Post(message []byte, fallbackFileName string) (err error) {
|
||||
var svc *s3manager.Uploader
|
||||
fib := utils.Fib()
|
||||
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
if svc, err = pstr.newPosterSession(); err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
if err != nil {
|
||||
if fallbackFileName != utils.META_NONE {
|
||||
utils.Logger.Warning(fmt.Sprintf("<S3Poster> creating new session, err: %s", err.Error()))
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
if _, err = svc.Upload(&s3manager.UploadInput{
|
||||
Bucket: aws.String(pstr.queueID),
|
||||
|
||||
// Can also use the `filepath` standard library package to modify the
|
||||
// filename as need for an S3 object key. Such as turning absolute path
|
||||
// to a relative path.
|
||||
Key: aws.String(fallbackFileName),
|
||||
|
||||
// The file to be uploaded. io.ReadSeeker is preferred as the Uploader
|
||||
// will be able to optimize memory when uploading large content. io.Reader
|
||||
// is supported, but will require buffering of the reader's bytes for
|
||||
// each part.
|
||||
Body: bytes.NewReader(message),
|
||||
}); err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil && fallbackFileName != utils.META_NONE {
|
||||
utils.Logger.Warning(fmt.Sprintf("<S3Poster> posting new message, err: %s", err.Error()))
|
||||
err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (pstr *S3Poster) newPosterSession() (s *s3manager.Uploader, err error) {
|
||||
pstr.Lock()
|
||||
defer pstr.Unlock()
|
||||
if pstr.session == nil {
|
||||
var ses *session.Session
|
||||
cfg := aws.Config{Endpoint: aws.String(pstr.dialURL)}
|
||||
if len(pstr.awsRegion) != 0 {
|
||||
cfg.Region = aws.String(pstr.awsRegion)
|
||||
}
|
||||
if len(pstr.awsID) != 0 &&
|
||||
len(pstr.awsKey) != 0 {
|
||||
cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken)
|
||||
}
|
||||
ses, err = session.NewSessionWithOptions(
|
||||
session.Options{
|
||||
Config: cfg,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pstr.session = ses
|
||||
}
|
||||
return s3manager.NewUploader(pstr.session), nil
|
||||
}
|
||||
@@ -76,22 +76,22 @@ func (pstr *SQSPoster) parseURL(dialURL string) (err error) {
|
||||
pstr.queueID = vals[0]
|
||||
}
|
||||
if vals, has := qry[awsRegion]; has && len(vals) != 0 {
|
||||
pstr.awsRegion = url.QueryEscape(vals[0])
|
||||
pstr.awsRegion = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<SQSPoster> No region present for AWS.")
|
||||
}
|
||||
if vals, has := qry[awsID]; has && len(vals) != 0 {
|
||||
pstr.awsID = url.QueryEscape(vals[0])
|
||||
pstr.awsID = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<SQSPoster> No access key ID present for AWS.")
|
||||
}
|
||||
if vals, has := qry[awsSecret]; has && len(vals) != 0 {
|
||||
pstr.awsKey = url.QueryEscape(vals[0])
|
||||
pstr.awsKey = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<SQSPoster> No secret access key present for AWS.")
|
||||
}
|
||||
if vals, has := qry[awsToken]; has && len(vals) != 0 {
|
||||
pstr.awsToken = url.QueryEscape(vals[0])
|
||||
pstr.awsToken = vals[0]
|
||||
} else {
|
||||
utils.Logger.Warning("<SQSPoster> No session token present for AWS.")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user