Added AWS AMQP support

This commit is contained in:
Trial97
2019-01-24 10:43:27 +02:00
committed by Dan Christian Bogos
parent 85868c16e4
commit 86570e199a
5 changed files with 55 additions and 8 deletions

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"context"
"errors"
"fmt"
"io/ioutil"
@@ -35,6 +36,7 @@ import (
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
"github.com/streadway/amqp"
amqpv1 "pack.ag/amqp"
)
const (
@@ -1534,6 +1536,18 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (
chn.Close()
}
}
case utils.MetaAWSjsonMap:
var awsPoster *engine.AWSPoster
awsPoster, err = engine.AMQPPostersCache.GetAWSPoster(ffn.Address,
v1.Config.GeneralCfg().PosterAttempts, failedReqsOutDir)
if err != nil {
break
}
var ses *amqpv1.Session
ses, err = awsPoster.Post(fileContent, file.Name())
if ses != nil {
ses.Close(context.Background())
}
default:
err = fmt.Errorf("unsupported replication transport: %s", ffn.Transport)
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"context"
"encoding/csv"
"encoding/json"
"fmt"
@@ -34,6 +35,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/streadway/amqp"
amqpv1 "pack.ag/amqp"
)
const (
@@ -247,7 +249,7 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
return err
}
body = jsn
case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap:
case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAWSjsonMap:
expMp, err := cdr.AsExportMap(cdre.exportTemplate.ContentFields, cdre.httpSkipTlsCheck, nil, cdre.roundingDecimals, cdre.filterS)
if err != nil {
return err
@@ -294,6 +296,17 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) {
chn.Close()
}
}
case utils.MetaAWSjsonMap:
var awsPoster *AWSPoster
awsPoster, err = AMQPPostersCache.GetAWSPoster(cdre.exportPath, cdre.attempts, cdre.fallbackPath)
if err != nil {
break
}
var ses *amqpv1.Session
ses, err = awsPoster.Post(body.([]byte), fallbackFileName)
if ses != nil {
ses.Close(context.Background())
}
}
return
}

View File

@@ -50,7 +50,10 @@ const (
)
func init() {
AMQPPostersCache = &AMQPCachedPosters{cache: make(map[string]*AMQPPoster)} // Initialize the cache for amqpPosters
AMQPPostersCache = &AMQPCachedPosters{
cache: make(map[string]*AMQPPoster),
cache2: make(map[string]*AWSPoster),
} // Initialize the cache for amqpPosters
}
var AMQPPostersCache *AMQPCachedPosters
@@ -151,7 +154,8 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac
// AMQPPosterCache is used to cache mutliple AMQPPoster connections based on the address
type AMQPCachedPosters struct {
sync.Mutex
cache map[string]*AMQPPoster
cache map[string]*AMQPPoster
cache2 map[string]*AWSPoster
}
// GetAMQPPoster creates a new poster only if not already cached
@@ -170,6 +174,20 @@ func (pc *AMQPCachedPosters) GetAMQPPoster(dialURL string, attempts int,
return pc.cache[dialURL], nil
}
func (pc *AMQPCachedPosters) GetAWSPoster(dialURL string, attempts int,
fallbackFileDir string) (amqpPoster *AWSPoster, err error) {
pc.Lock()
defer pc.Unlock()
if _, hasIt := pc.cache2[dialURL]; !hasIt {
if pstr, err := NewAWSPoster(dialURL, attempts, fallbackFileDir); err != nil {
return nil, err
} else {
pc.cache2[dialURL] = pstr
}
}
return pc.cache2[dialURL], nil
}
// "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs"
func NewAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (*AMQPPoster, error) {
amqp := &AMQPPoster{
@@ -425,13 +443,12 @@ func (pstr *AWSPoster) Post(content []byte, fallbackFileName string) (s *amqpv1.
time.Sleep(time.Duration(fib()) * time.Second)
continue
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx := context.Background()
// Send message
err = sender.Send(ctx, amqpv1.NewMessage([]byte(content)))
err = sender.Send(ctx, amqpv1.NewMessage(content))
if err == nil {
sender.Close(ctx)
cancel()
break
}
time.Sleep(time.Duration(fib()) * time.Second)

View File

@@ -20,7 +20,7 @@ package utils
var (
CDRExportFormats = []string{DRYRUN, MetaFileCSV, MetaFileFWV, MetaHTTPjsonCDR, MetaHTTPjsonMap,
MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap}
MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap, MetaAWSjsonMap}
MainCDRFields = []string{CGRID, Source, OriginHost, OriginID, ToR, RequestType, Tenant, Category,
Account, Subject, Destination, SetupTime, AnswerTime, Usage, COST, RATED, Partial, RunID,
PreRated, CostSource}
@@ -34,12 +34,14 @@ var (
META_HTTP_POST: CONTENT_FORM,
MetaAMQPjsonCDR: CONTENT_JSON,
MetaAMQPjsonMap: CONTENT_JSON,
MetaAWSjsonMap: CONTENT_JSON,
}
CDREFileSuffixes = map[string]string{
MetaHTTPjsonCDR: JSNSuffix,
MetaHTTPjsonMap: JSNSuffix,
MetaAMQPjsonCDR: JSNSuffix,
MetaAMQPjsonMap: JSNSuffix,
MetaAWSjsonMap: JSNSuffix,
META_HTTP_POST: FormSuffix,
MetaFileCSV: CSVSuffix,
MetaFileFWV: FWVSuffix,
@@ -273,6 +275,7 @@ const (
MetaHTTPjsonMap = "*http_json_map"
MetaAMQPjsonCDR = "*amqp_json_cdr"
MetaAMQPjsonMap = "*amqp_json_map"
MetaAWSjsonMap = "*aws_json_map"
NANO_MULTIPLIER = 1000000000
CGR_AUTHORIZE = "CGR_AUTHORIZE"
CONFIG_DIR = "/etc/cgrates/"

View File

@@ -854,7 +854,7 @@ func NewFallbackFileNameFronString(fileName string) (ffn *FallbackFileName, err
return nil, fmt.Errorf("unsupported module: %s", ffn.Module)
}
fileNameWithoutModule := fileName[moduleIdx+1:]
for _, trspt := range []string{MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap} {
for _, trspt := range []string{MetaHTTPjsonCDR, MetaHTTPjsonMap, MetaHTTPjson, META_HTTP_POST, MetaAMQPjsonCDR, MetaAMQPjsonMap, MetaAWSjsonMap} {
if strings.HasPrefix(fileNameWithoutModule, trspt) {
ffn.Transport = trspt
break