diff --git a/apier/v1/apier.go b/apier/v1/apier.go index f27cc38dd..6fc0eb9a3 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1069,6 +1069,9 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) ( case utils.MetaKafkajsonMap: err = engine.PostersCache.PostKafka(ffn.Address, v1.Config.GeneralCfg().PosterAttempts, fileContent, failedReqsOutDir, file.Name()) + case utils.MetaS3jsonMap: + err = engine.PostersCache.PostS3(ffn.Address, v1.Config.GeneralCfg().PosterAttempts, + fileContent, failedReqsOutDir, file.Name()) default: err = fmt.Errorf("unsupported replication transport: %s", ffn.Transport) } diff --git a/data/conf/samples/cdrsonexpmaster/cdrsreplicationmaster.json b/data/conf/samples/cdrsonexpmaster/cdrsreplicationmaster.json index d440e4bea..f3d48fc2f 100644 --- a/data/conf/samples/cdrsonexpmaster/cdrsreplicationmaster.json +++ b/data/conf/samples/cdrsonexpmaster/cdrsreplicationmaster.json @@ -22,7 +22,7 @@ "cdrs": { "enabled": true, // start the CDR Server service: "store_cdrs": false, // store cdrs in storDb - "online_cdr_exports": ["http_localhost", "amqp_localhost", "http_test_file", "amqp_test_file","aws_test_file","sqs_test_file","kafka_localhost"], + "online_cdr_exports": ["http_localhost", "amqp_localhost", "http_test_file", "amqp_test_file","aws_test_file","sqs_test_file","kafka_localhost","s3_test_file"], }, "chargers": { @@ -112,6 +112,14 @@ {"tag": "CGRID", "type": "*composed", "value": "~CGRID", "field_id": "CGRID"}, ], }, + "s3_test_file": { + "export_format": "*s3_json_map", + // export_path for s3: "endpoint?aws_region=region&aws_key=IDkey&aws_secret=secret&aws_token=sessionToken&queue_id=cgrates-cdrs" + "export_path": "http://s3.us-east-2.amazonaws.com/?aws_region=eu-west-2&aws_key=testkey&aws_secret=testsecret&queue_id=cgrates-cdrs", + "content_fields": [ + {"tag": "CGRID", "type": "*composed", "value": "~CGRID", "field_id": "CGRID"}, + ], + }, }, } \ No newline at end of file diff --git a/engine/action.go b/engine/action.go index 4d1a9ef83..6827c6509 100644 --- a/engine/action.go +++ b/engine/action.go @@ -130,6 +130,7 @@ func getActionFunc(typ string) (actionTypeFunc, bool) { utils.MetaAMQPV1jsonMap: sendAWS, utils.MetaSQSjsonMap: sendSQS, utils.MetaKafkajsonMap: sendKafka, + utils.MetaS3jsonMap: sendS3, MetaRemoveSessionCosts: removeSessionCosts, MetaRemoveExpired: removeExpired, } @@ -467,6 +468,24 @@ func sendKafka(ub *Account, a *Action, acs Actions, extraData interface{}) error body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName) } +func sendS3(ub *Account, a *Action, acs Actions, extraData interface{}) error { + body, err := getOneData(ub, extraData) + if err != nil { + return err + } + cfg := config.CgrConfig() + fallbackFileName := (&utils.FallbackFileName{ + Module: fmt.Sprintf("%s>%s", utils.ActionsPoster, a.ActionType), + Transport: utils.MetaS3jsonMap, + Address: a.ExtraParameters, + RequestID: utils.GenUUID(), + FileSuffix: utils.JSNSuffix, + }).AsString() + + return PostersCache.PostS3(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, + body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName) +} + func callUrl(ub *Account, a *Action, acs Actions, extraData interface{}) error { jsn, err := getOneData(ub, extraData) if err != nil { diff --git a/engine/cdre.go b/engine/cdre.go index 494e3d351..d36e6dbc7 100644 --- a/engine/cdre.go +++ b/engine/cdre.go @@ -244,7 +244,7 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) { return err } body = jsn - case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, utils.MetaKafkajsonMap: + case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, utils.MetaKafkajsonMap, utils.MetaS3jsonMap: expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, nil, cdre.filterS) if err != nil { return err @@ -289,6 +289,8 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) { err = PostersCache.PostSQS(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName) case utils.MetaKafkajsonMap: err = PostersCache.PostKafka(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName) + case utils.MetaS3jsonMap: + err = PostersCache.PostS3(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName) } return } diff --git a/engine/poster.go b/engine/poster.go index 1b2f608d8..78ef599bf 100644 --- a/engine/poster.go +++ b/engine/poster.go @@ -52,6 +52,7 @@ func init() { amqpv1Cache: make(map[string]Poster), sqsCache: make(map[string]Poster), kafkaCache: make(map[string]Poster), + s3Cache: make(map[string]Poster), } // Initialize the cache for amqpPosters } @@ -63,6 +64,7 @@ type PosterCache struct { amqpv1Cache map[string]Poster sqsCache map[string]Poster kafkaCache map[string]Poster + s3Cache map[string]Poster } type Poster interface { @@ -157,7 +159,7 @@ func (pc *PosterCache) GetSQSPoster(dialURL string, attempts int, fallbackFileDi func (pc *PosterCache) GetKafkaPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { pc.Lock() defer pc.Unlock() - if _, hasIt := pc.sqsCache[dialURL]; !hasIt { + if _, hasIt := pc.kafkaCache[dialURL]; !hasIt { if pstr, err := NewKafkaPoster(dialURL, attempts, fallbackFileDir); err != nil { return nil, err } else { @@ -167,6 +169,19 @@ func (pc *PosterCache) GetKafkaPoster(dialURL string, attempts int, fallbackFile return pc.kafkaCache[dialURL], nil } +func (pc *PosterCache) GetS3Poster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { + pc.Lock() + defer pc.Unlock() + if _, hasIt := pc.s3Cache[dialURL]; !hasIt { + if pstr, err := NewS3Poster(dialURL, attempts, fallbackFileDir); err != nil { + return nil, err + } else { + pc.s3Cache[dialURL] = pstr + } + } + return pc.s3Cache[dialURL], nil +} + func (pc *PosterCache) PostAMQP(dialURL string, attempts int, content []byte, contentType, fallbackFileDir, fallbackFileName string) error { amqpPoster, err := pc.GetAMQPPoster(dialURL, attempts, fallbackFileDir) @@ -202,3 +217,12 @@ func (pc *PosterCache) PostKafka(dialURL string, attempts int, } return kafkaPoster.Post(content, fallbackFileName) } + +func (pc *PosterCache) PostS3(dialURL string, attempts int, + content []byte, fallbackFileDir, fallbackFileName string) error { + sqsPoster, err := pc.GetS3Poster(dialURL, attempts, fallbackFileDir) + if err != nil { + return err + } + return sqsPoster.Post(content, fallbackFileName) +} diff --git a/engine/pstr_s3.go b/engine/pstr_s3.go new file mode 100644 index 000000000..bbdec5f1b --- /dev/null +++ b/engine/pstr_s3.go @@ -0,0 +1,168 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "bytes" + "fmt" + "net/url" + "strings" + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/cgrates/cgrates/utils" +) + +func NewS3Poster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { + pstr := &S3Poster{ + attempts: attempts, + fallbackFileDir: fallbackFileDir, + } + err := pstr.parseURL(dialURL) + if err != nil { + return nil, err + } + return pstr, nil +} + +type S3Poster struct { + sync.Mutex + dialURL string + awsRegion string + awsID string + awsKey string + awsToken string + attempts int + fallbackFileDir string + queueURL *string + queueID string + // getQueueOnce sync.Once + session *session.Session +} + +func (pstr *S3Poster) Close() {} + +func (pstr *S3Poster) parseURL(dialURL string) (err error) { + u, err := url.Parse(dialURL) + if err != nil { + return err + } + qry := u.Query() + + pstr.dialURL = strings.Split(dialURL, "?")[0] + pstr.dialURL = strings.TrimSuffix(pstr.dialURL, "/") // used to remove / to point to correct endpoint + pstr.queueID = defaultQueueID + if vals, has := qry[queueID]; has && len(vals) != 0 { + pstr.queueID = vals[0] + } + if vals, has := qry[awsRegion]; has && len(vals) != 0 { + pstr.awsRegion = vals[0] + } else { + utils.Logger.Warning(" No region present for AWS.") + } + if vals, has := qry[awsID]; has && len(vals) != 0 { + pstr.awsID = vals[0] + } else { + utils.Logger.Warning(" No access key ID present for AWS.") + } + if vals, has := qry[awsSecret]; has && len(vals) != 0 { + pstr.awsKey = vals[0] + } else { + utils.Logger.Warning(" No secret access key present for AWS.") + } + if vals, has := qry[awsToken]; has && len(vals) != 0 { + pstr.awsToken = vals[0] + } else { + utils.Logger.Warning(" No session token present for AWS.") + } + return nil +} + +func (pstr *S3Poster) Post(message []byte, fallbackFileName string) (err error) { + var svc *s3manager.Uploader + fib := utils.Fib() + + for i := 0; i < pstr.attempts; i++ { + if svc, err = pstr.newPosterSession(); err == nil { + break + } + time.Sleep(time.Duration(fib()) * time.Second) + } + if err != nil { + if fallbackFileName != utils.META_NONE { + utils.Logger.Warning(fmt.Sprintf(" creating new session, err: %s", err.Error())) + err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message) + } + return err + } + + for i := 0; i < pstr.attempts; i++ { + if _, err = svc.Upload(&s3manager.UploadInput{ + Bucket: aws.String(pstr.queueID), + + // Can also use the `filepath` standard library package to modify the + // filename as need for an S3 object key. Such as turning absolute path + // to a relative path. + Key: aws.String(fallbackFileName), + + // The file to be uploaded. io.ReadSeeker is preferred as the Uploader + // will be able to optimize memory when uploading large content. io.Reader + // is supported, but will require buffering of the reader's bytes for + // each part. + Body: bytes.NewReader(message), + }); err == nil { + break + } + } + if err != nil && fallbackFileName != utils.META_NONE { + utils.Logger.Warning(fmt.Sprintf(" posting new message, err: %s", err.Error())) + err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message) + } + return err +} + +func (pstr *S3Poster) newPosterSession() (s *s3manager.Uploader, err error) { + pstr.Lock() + defer pstr.Unlock() + if pstr.session == nil { + var ses *session.Session + cfg := aws.Config{Endpoint: aws.String(pstr.dialURL)} + if len(pstr.awsRegion) != 0 { + cfg.Region = aws.String(pstr.awsRegion) + } + if len(pstr.awsID) != 0 && + len(pstr.awsKey) != 0 { + cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken) + } + ses, err = session.NewSessionWithOptions( + session.Options{ + Config: cfg, + }, + ) + if err != nil { + return nil, err + } + pstr.session = ses + } + return s3manager.NewUploader(pstr.session), nil +} diff --git a/engine/pstr_sqs.go b/engine/pstr_sqs.go index 5f682ee4f..c081df477 100644 --- a/engine/pstr_sqs.go +++ b/engine/pstr_sqs.go @@ -76,22 +76,22 @@ func (pstr *SQSPoster) parseURL(dialURL string) (err error) { pstr.queueID = vals[0] } if vals, has := qry[awsRegion]; has && len(vals) != 0 { - pstr.awsRegion = url.QueryEscape(vals[0]) + pstr.awsRegion = vals[0] } else { utils.Logger.Warning(" No region present for AWS.") } if vals, has := qry[awsID]; has && len(vals) != 0 { - pstr.awsID = url.QueryEscape(vals[0]) + pstr.awsID = vals[0] } else { utils.Logger.Warning(" No access key ID present for AWS.") } if vals, has := qry[awsSecret]; has && len(vals) != 0 { - pstr.awsKey = url.QueryEscape(vals[0]) + pstr.awsKey = vals[0] } else { utils.Logger.Warning(" No secret access key present for AWS.") } if vals, has := qry[awsToken]; has && len(vals) != 0 { - pstr.awsToken = url.QueryEscape(vals[0]) + pstr.awsToken = vals[0] } else { utils.Logger.Warning(" No session token present for AWS.") } diff --git a/general_tests/cdrs_onlexp_it_test.go b/general_tests/cdrs_onlexp_it_test.go index 3bd8d2a16..2df5bb417 100644 --- a/general_tests/cdrs_onlexp_it_test.go +++ b/general_tests/cdrs_onlexp_it_test.go @@ -496,6 +496,64 @@ func TestCDRsOnExpKafkaPosterFileFailover(t *testing.T) { } } +func TestCDRsOnExpSQSPosterFileFailover(t *testing.T) { + time.Sleep(time.Duration(10 * time.Second)) + failoverContent := [][]byte{[]byte(`{"CGRID":"57548d485d61ebcba55afbe5d939c82a8e9ff670"}`), []byte(`{"CGRID":"88ed9c38005f07576a1e1af293063833b60edcc6"}`)} + filesInDir, _ := ioutil.ReadDir(cdrsMasterCfg.GeneralCfg().FailedPostsDir) + if len(filesInDir) == 0 { + t.Fatalf("No files in directory: %s", cdrsMasterCfg.GeneralCfg().FailedPostsDir) + } + var foundFile bool + var fileName string + for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config + fileName = file.Name() + if strings.HasPrefix(fileName, "cdr|*sqs_json_map") { + foundFile = true + filePath := path.Join(cdrsMasterCfg.GeneralCfg().FailedPostsDir, fileName) + if readBytes, err := ioutil.ReadFile(filePath); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(failoverContent[0], readBytes) && !reflect.DeepEqual(failoverContent[1], readBytes) { // Checking just the prefix should do since some content is dynamic + t.Errorf("Expecting: %v or %v, received: %v", string(failoverContent[0]), string(failoverContent[1]), string(readBytes)) + } + if err := os.Remove(filePath); err != nil { + t.Error("Failed removing file: ", filePath) + } + } + } + if !foundFile { + t.Fatal("Could not find the file in folder") + } +} + +func TestCDRsOnExpS3PosterFileFailover(t *testing.T) { + time.Sleep(time.Duration(10 * time.Second)) + failoverContent := [][]byte{[]byte(`{"CGRID":"57548d485d61ebcba55afbe5d939c82a8e9ff670"}`), []byte(`{"CGRID":"88ed9c38005f07576a1e1af293063833b60edcc6"}`)} + filesInDir, _ := ioutil.ReadDir(cdrsMasterCfg.GeneralCfg().FailedPostsDir) + if len(filesInDir) == 0 { + t.Fatalf("No files in directory: %s", cdrsMasterCfg.GeneralCfg().FailedPostsDir) + } + var foundFile bool + var fileName string + for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config + fileName = file.Name() + if strings.HasPrefix(fileName, "cdr|*s3_json_map") { + foundFile = true + filePath := path.Join(cdrsMasterCfg.GeneralCfg().FailedPostsDir, fileName) + if readBytes, err := ioutil.ReadFile(filePath); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(failoverContent[0], readBytes) && !reflect.DeepEqual(failoverContent[1], readBytes) { // Checking just the prefix should do since some content is dynamic + t.Errorf("Expecting: %v or %v, received: %v", string(failoverContent[0]), string(failoverContent[1]), string(readBytes)) + } + if err := os.Remove(filePath); err != nil { + t.Error("Failed removing file: ", filePath) + } + } + } + if !foundFile { + t.Fatal("Could not find the file in folder") + } +} + /* // Performance test, check `lsof -a -p 8427 | wc -l` diff --git a/glide.lock b/glide.lock index 18fb4a974..bd393a9df 100644 --- a/glide.lock +++ b/glide.lock @@ -122,7 +122,7 @@ imports: - name: github.com/antchfx/xpath version: 3de91f3991a1af6e495d49c9218318b5544b20e3 - name: github.com/aws/aws-sdk-go - version: 38d16b00d959591df03ffcc08fa5d463ffc15e2a + version: 68afe677298a894afc385165f93ec26895a50c9e subpackages: - aws - service diff --git a/utils/consts.go b/utils/consts.go index eb7307d29..b4843b028 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -23,7 +23,7 @@ import "sort" var ( CDRExportFormats = []string{DRYRUN, MetaFileCSV, MetaFileFWV, MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap, MetaAMQPV1jsonMap, MetaSQSjsonMap, - MetaKafkajsonMap} + MetaKafkajsonMap, MetaS3jsonMap} MainCDRFields = []string{CGRID, Source, OriginHost, OriginID, ToR, RequestType, Tenant, Category, Account, Subject, Destination, SetupTime, AnswerTime, Usage, COST, RATED, Partial, RunID, PreRated, CostSource, CostDetails, ExtraInfo, OrderID} @@ -43,6 +43,7 @@ var ( MetaAMQPV1jsonMap: CONTENT_JSON, MetaSQSjsonMap: CONTENT_JSON, MetaKafkajsonMap: CONTENT_JSON, + MetaS3jsonMap: CONTENT_JSON, } CDREFileSuffixes = map[string]string{ MetaHTTPjsonCDR: JSNSuffix, @@ -52,6 +53,7 @@ var ( MetaAMQPV1jsonMap: JSNSuffix, MetaSQSjsonMap: JSNSuffix, MetaKafkajsonMap: JSNSuffix, + MetaS3jsonMap: JSNSuffix, META_HTTP_POST: FormSuffix, MetaFileCSV: CSVSuffix, MetaFileFWV: FWVSuffix, @@ -285,6 +287,7 @@ const ( MetaAMQPV1jsonMap = "*amqpv1_json_map" MetaSQSjsonMap = "*sqs_json_map" MetaKafkajsonMap = "*kafka_json_map" + MetaS3jsonMap = "*s3_json_map" NANO_MULTIPLIER = 1000000000 CGR_AUTHORIZE = "CGR_AUTHORIZE" CONFIG_PATH = "/etc/cgrates/" diff --git a/utils/coreutils.go b/utils/coreutils.go index c80b366df..3db3493d2 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -898,7 +898,7 @@ func NewFallbackFileNameFronString(fileName string) (ffn *FallbackFileName, err return nil, fmt.Errorf("unsupported module: %s", ffn.Module) } fileNameWithoutModule := fileName[moduleIdx+1:] - for _, trspt := range []string{MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap, MetaAMQPV1jsonMap, MetaSQSjsonMap, MetaKafkajsonMap} { + for _, trspt := range []string{MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap, MetaAMQPV1jsonMap, MetaSQSjsonMap, MetaKafkajsonMap, MetaS3jsonMap} { if strings.HasPrefix(fileNameWithoutModule, trspt) { ffn.Transport = trspt break