diff --git a/apier/v1/apier.go b/apier/v1/apier.go index b8be8dbe0..78a26e0b5 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1061,7 +1061,7 @@ func (v1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) ( utils.PosterTransportContentTypes[ffn.Transport], failedReqsOutDir, file.Name()) case utils.MetaAMQPV1jsonMap: - err = engine.PostersCache.PostAWS(ffn.Address, v1.Config.GeneralCfg().PosterAttempts, + err = engine.PostersCache.PostAMQPv1(ffn.Address, v1.Config.GeneralCfg().PosterAttempts, fileContent, failedReqsOutDir, file.Name()) case utils.MetaSQSjsonMap: err = engine.PostersCache.PostSQS(ffn.Address, v1.Config.GeneralCfg().PosterAttempts, diff --git a/engine/action.go b/engine/action.go index bcc477e45..db275348a 100644 --- a/engine/action.go +++ b/engine/action.go @@ -426,7 +426,7 @@ func sendAWS(ub *Account, a *Action, acs Actions, extraData interface{}) error { FileSuffix: utils.JSNSuffix, }).AsString() - return PostersCache.PostAWS(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, + return PostersCache.PostAMQPv1(a.ExtraParameters, config.CgrConfig().GeneralCfg().PosterAttempts, body, cfg.GeneralCfg().FailedPostsDir, fallbackFileName) } diff --git a/engine/cdre.go b/engine/cdre.go index 621eae9ba..0870ff11a 100644 --- a/engine/cdre.go +++ b/engine/cdre.go @@ -284,7 +284,7 @@ func (cdre *CDRExporter) postCdr(cdr *CDR) (err error) { err = PostersCache.PostAMQP(cdre.exportPath, cdre.attempts, body.([]byte), utils.PosterTransportContentTypes[cdre.exportFormat], cdre.fallbackPath, fallbackFileName) case utils.MetaAMQPV1jsonMap: - err = PostersCache.PostAWS(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName) + err = PostersCache.PostAMQPv1(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName) case utils.MetaSQSjsonMap: err = PostersCache.PostSQS(cdre.exportPath, cdre.attempts, body.([]byte), cdre.fallbackPath, fallbackFileName) } diff --git a/engine/poster.go b/engine/poster.go index 7f934bcfe..1a8033eeb 100644 --- a/engine/poster.go +++ b/engine/poster.go @@ -19,13 +19,6 @@ along with this program. If not, see package engine import ( - "bytes" - "context" - "crypto/tls" - "fmt" - "io/ioutil" - "net" - "net/http" "net/url" "os" "path" @@ -33,19 +26,10 @@ import ( "sync" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/sqs" "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" - "github.com/streadway/amqp" - amqpv1 "github.com/vcabbage/amqp" ) -var AMQPQuery = []string{"cacertfile", "certfile", "keyfile", "verify", "server_name_indication", "auth_mechanism", "heartbeat", "connection_timeout", "channel_max"} - const ( defaultQueueID = "cgrates_cdrs" defaultExchangeType = "direct" @@ -63,105 +47,19 @@ const ( func init() { PostersCache = &PosterCache{ - amqpCache: make(map[string]Poster), - awsCache: make(map[string]Poster), - sqsCache: make(map[string]Poster), + amqpCache: make(map[string]Poster), + amqpv1Cache: make(map[string]Poster), + sqsCache: make(map[string]Poster), } // Initialize the cache for amqpPosters } var PostersCache *PosterCache -// Post without automatic failover -func HttpJsonPost(url string, skipTlsVerify bool, content []byte) ([]byte, error) { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTlsVerify}, - DisableKeepAlives: true, - } - client := &http.Client{Transport: tr} - resp, err := client.Post(url, "application/json", bytes.NewBuffer(content)) - if err != nil { - return nil, err - } - defer resp.Body.Close() - respBody, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } - if resp.StatusCode > 299 { - return respBody, fmt.Errorf("Unexpected status code received: %d", resp.StatusCode) - } - return respBody, nil -} - -func NewHTTPPoster(skipTLSVerify bool, replyTimeout time.Duration) *HTTPPoster { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLSVerify}, - } - return &HTTPPoster{httpClient: &http.Client{Transport: tr, Timeout: replyTimeout}} -} - -type HTTPPoster struct { - httpClient *http.Client -} - -// Post with built-in failover -// Returns also reference towards client so we can close it's connections when done -func (poster *HTTPPoster) Post(addr string, contentType string, content interface{}, attempts int, fallbackFilePath string) (respBody []byte, err error) { - if !utils.IsSliceMember([]string{utils.CONTENT_JSON, utils.CONTENT_FORM, utils.CONTENT_TEXT}, contentType) { - return nil, fmt.Errorf("unsupported ContentType: %s", contentType) - } - var body []byte // Used to write in file and send over http - var urlVals url.Values // Used when posting form - if utils.IsSliceMember([]string{utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) { - body = content.([]byte) - } else if contentType == utils.CONTENT_FORM { - urlVals = content.(url.Values) - body = []byte(urlVals.Encode()) - } - fib := utils.Fib() - bodyType := "application/x-www-form-urlencoded" - if contentType == utils.CONTENT_JSON { - bodyType = "application/json" - } - for i := 0; i < attempts; i++ { - var resp *http.Response - if utils.IsSliceMember([]string{utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) { - resp, err = poster.httpClient.Post(addr, bodyType, bytes.NewBuffer(body)) - } else if contentType == utils.CONTENT_FORM { - resp, err = poster.httpClient.PostForm(addr, urlVals) - } - if err != nil { - utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) - time.Sleep(time.Duration(fib()) * time.Second) - continue - } - defer resp.Body.Close() - respBody, err = ioutil.ReadAll(resp.Body) - if err != nil { - utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) - time.Sleep(time.Duration(fib()) * time.Second) - continue - } - if resp.StatusCode > 299 { - utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode)) - time.Sleep(time.Duration(fib()) * time.Second) - continue - } - return respBody, nil - } - if fallbackFilePath != utils.META_NONE { - // If we got that far, post was not possible, write it on disk - _, err = guardian.Guardian.Guard(func() (interface{}, error) { - fileOut, err := os.Create(fallbackFilePath) - if err != nil { - return nil, err - } - _, err = fileOut.Write(body) - fileOut.Close() - return nil, err - }, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath) - } - return +type PosterCache struct { + sync.Mutex + amqpCache map[string]Poster + amqpv1Cache map[string]Poster + sqsCache map[string]Poster } type Poster interface { @@ -197,18 +95,11 @@ func parseURL(dialURL string) (URL string, qID string, err error) { return } -type PosterCache struct { - sync.Mutex - amqpCache map[string]Poster - awsCache map[string]Poster - sqsCache map[string]Poster -} - func (pc *PosterCache) Close() { for _, v := range pc.amqpCache { v.Close() } - for _, v := range pc.awsCache { + for _, v := range pc.amqpv1Cache { v.Close() } for _, v := range pc.sqsCache { @@ -231,17 +122,17 @@ func (pc *PosterCache) GetAMQPPoster(dialURL string, attempts int, fallbackFileD return pc.amqpCache[dialURL], nil } -func (pc *PosterCache) GetAWSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { +func (pc *PosterCache) GetAMQPv1Poster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { pc.Lock() defer pc.Unlock() - if _, hasIt := pc.awsCache[dialURL]; !hasIt { - if pstr, err := NewAWSPoster(dialURL, attempts, fallbackFileDir); err != nil { + if _, hasIt := pc.amqpv1Cache[dialURL]; !hasIt { + if pstr, err := NewAMQPv1Poster(dialURL, attempts, fallbackFileDir); err != nil { return nil, err } else { - pc.awsCache[dialURL] = pstr + pc.amqpv1Cache[dialURL] = pstr } } - return pc.awsCache[dialURL], nil + return pc.amqpv1Cache[dialURL], nil } func (pc *PosterCache) GetSQSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { @@ -266,13 +157,13 @@ func (pc *PosterCache) PostAMQP(dialURL string, attempts int, return amqpPoster.Post(content, fallbackFileName) } -func (pc *PosterCache) PostAWS(dialURL string, attempts int, +func (pc *PosterCache) PostAMQPv1(dialURL string, attempts int, content []byte, fallbackFileDir, fallbackFileName string) error { - awsPoster, err := pc.GetAWSPoster(dialURL, attempts, fallbackFileDir) + AMQPv1Poster, err := pc.GetAMQPv1Poster(dialURL, attempts, fallbackFileDir) if err != nil { return err } - return awsPoster.Post(content, fallbackFileName) + return AMQPv1Poster.Post(content, fallbackFileName) } func (pc *PosterCache) PostSQS(dialURL string, attempts int, @@ -283,469 +174,3 @@ func (pc *PosterCache) PostSQS(dialURL string, attempts int, } return sqsPoster.Post(content, fallbackFileName) } - -// "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs" -func NewAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (*AMQPPoster, error) { - amqp := &AMQPPoster{ - attempts: attempts, - fallbackFileDir: fallbackFileDir, - } - if err := amqp.parseURL(dialURL); err != nil { - return nil, err - } - return amqp, nil -} - -type AMQPPoster struct { - dialURL string - queueID string // identifier of the CDR queue where we publish - exchange string - exchangeType string - routingKey string - attempts int - fallbackFileDir string - sync.Mutex // protect connection - conn *amqp.Connection -} - -func (pstr *AMQPPoster) parseURL(dialURL string) error { - u, err := url.Parse(dialURL) - if err != nil { - return err - } - qry := u.Query() - q := url.Values{} - for _, key := range AMQPQuery { - if vals, has := qry[key]; has && len(vals) != 0 { - q.Add(key, vals[0]) - } - } - pstr.dialURL = strings.Split(dialURL, "?")[0] + "?" + q.Encode() - pstr.queueID = defaultQueueID - pstr.routingKey = defaultQueueID - if vals, has := qry[queueID]; has && len(vals) != 0 { - pstr.queueID = vals[0] - } - if vals, has := qry[routingKey]; has && len(vals) != 0 { - pstr.routingKey = vals[0] - } - if vals, has := qry[exchange]; has && len(vals) != 0 { - pstr.exchange = vals[0] - pstr.exchangeType = defaultExchangeType - } - if vals, has := qry[exchangeType]; has && len(vals) != 0 { - pstr.exchangeType = vals[0] - } - return nil -} - -// Post is the method being called when we need to post anything in the queue -// the optional chn will permits channel caching -func (pstr *AMQPPoster) Post(content []byte, fallbackFileName string) (err error) { - var chn *amqp.Channel - fib := utils.Fib() - - for i := 0; i < pstr.attempts; i++ { - if chn, err = pstr.newPostChannel(); err == nil { - break - } - time.Sleep(time.Duration(fib()) * time.Second) - } - if err != nil { - if fallbackFileName != utils.META_NONE { - utils.Logger.Warning(fmt.Sprintf(" creating new post channel, err: %s", err.Error())) - err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content) - } - return err - } - for i := 0; i < pstr.attempts; i++ { - if err = chn.Publish( - pstr.exchange, // exchange - pstr.routingKey, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - DeliveryMode: amqp.Persistent, - ContentType: utils.CONTENT_JSON, - Body: content, - }); err == nil { - break - } - time.Sleep(time.Duration(fib()) * time.Second) - } - if err != nil && fallbackFileName != utils.META_NONE { - err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content) - return err - } - if chn != nil { - chn.Close() - } - return -} - -func (pstr *AMQPPoster) Close() { - pstr.Lock() - if pstr.conn != nil { - pstr.conn.Close() - } - pstr.conn = nil - pstr.Unlock() -} - -func (pstr *AMQPPoster) newPostChannel() (postChan *amqp.Channel, err error) { - pstr.Lock() - if pstr.conn == nil { - var conn *amqp.Connection - conn, err = amqp.Dial(pstr.dialURL) - if err == nil { - pstr.conn = conn - go func() { // monitor connection errors so we can restart - if err := <-pstr.conn.NotifyClose(make(chan *amqp.Error)); err != nil { - utils.Logger.Err(fmt.Sprintf("Connection error received: %s", err.Error())) - pstr.Close() - } - }() - } - } - pstr.Unlock() - if err != nil { - return nil, err - } - if postChan, err = pstr.conn.Channel(); err != nil { - return - } - - if pstr.exchange != "" { - if err = postChan.ExchangeDeclare( - pstr.exchange, // name - pstr.exchangeType, // type - true, // durable - false, // audo-delete - false, // internal - false, // no-wait - nil, // args - ); err != nil { - return - } - } - - if _, err = postChan.QueueDeclare( - pstr.queueID, // name - true, // durable - false, // auto-delete - false, // exclusive - false, // no-wait - nil, // args - ); err != nil { - return - } - - if pstr.exchange != "" { - if err = postChan.QueueBind( - pstr.queueID, // queue - pstr.routingKey, // key - pstr.exchange, // exchange - false, // no-wait - nil, // args - ); err != nil { - return - } - } - return -} - -func NewAWSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { - URL, qID, err := parseURL(dialURL) - if err != nil { - return nil, err - } - return &AWSPoster{ - dialURL: URL, - queueID: "/" + qID, - attempts: attempts, - fallbackFileDir: fallbackFileDir, - }, nil -} - -type AWSPoster struct { - sync.Mutex - dialURL string - queueID string // identifier of the CDR queue where we publish - attempts int - fallbackFileDir string - client *amqpv1.Client -} - -func (pstr *AWSPoster) Close() { - pstr.Lock() - if pstr.client != nil { - pstr.client.Close() - } - pstr.client = nil - pstr.Unlock() -} - -func (pstr *AWSPoster) Post(content []byte, fallbackFileName string) (err error) { - var s *amqpv1.Session - fib := utils.Fib() - - for i := 0; i < pstr.attempts; i++ { - if s, err = pstr.newPosterSession(); err == nil { - break - } - // reset client and try again - // used in case of closed conection because of idle time - if pstr.client != nil { - pstr.client.Close() // Make shure the connection is closed before reseting it - } - pstr.client = nil - time.Sleep(time.Duration(fib()) * time.Second) - } - if err != nil { - if fallbackFileName != utils.META_NONE { - utils.Logger.Warning(fmt.Sprintf(" creating new post channel, err: %s", err.Error())) - err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content) - } - return err - } - - ctx := context.Background() - for i := 0; i < pstr.attempts; i++ { - sender, err := s.NewSender( - amqpv1.LinkTargetAddress(pstr.queueID), - ) - if err != nil { - time.Sleep(time.Duration(fib()) * time.Second) - // if pstr.isRecoverableError(err) { - // s.Close(ctx) - // pstr.client.Close() - // pstr.client = nil - // stmp, err := pstr.newPosterSession() - // if err == nil { - // s = stmp - // } - // } - continue - } - // Send message - err = sender.Send(ctx, amqpv1.NewMessage(content)) - sender.Close(ctx) - if err == nil { - break - } - time.Sleep(time.Duration(fib()) * time.Second) - // if pstr.isRecoverableError(err) { - // s.Close(ctx) - // pstr.client.Close() - // pstr.client = nil - // stmp, err := pstr.newPosterSession() - // if err == nil { - // s = stmp - // } - // } - } - if err != nil && fallbackFileName != utils.META_NONE { - err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content) - return err - } - if s != nil { - s.Close(ctx) - } - return -} - -func (pstr *AWSPoster) newPosterSession() (s *amqpv1.Session, err error) { - pstr.Lock() - defer pstr.Unlock() - if pstr.client == nil { - var client *amqpv1.Client - client, err = amqpv1.Dial(pstr.dialURL) - if err != nil { - return nil, err - } - pstr.client = client - } - return pstr.client.NewSession() -} - -func isRecoverableCloseError(err error) bool { - return err == amqpv1.ErrConnClosed || - err == amqpv1.ErrLinkClosed || - err == amqpv1.ErrSessionClosed -} - -func (pstr *AWSPoster) isRecoverableError(err error) bool { - switch err.(type) { - case *amqpv1.Error, *amqpv1.DetachError, net.Error: - if netErr, ok := err.(net.Error); ok { - if !netErr.Temporary() { - return false - } - } - default: - if !isRecoverableCloseError(err) { - return false - } - } - return true -} - -func NewSQSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { - pstr := &SQSPoster{ - attempts: attempts, - fallbackFileDir: fallbackFileDir, - } - err := pstr.parseURL(dialURL) - if err != nil { - return nil, err - } - return pstr, nil -} - -type SQSPoster struct { - sync.Mutex - dialURL string - awsRegion string - awsID string - awsKey string - awsToken string - attempts int - fallbackFileDir string - queueURL *string - queueID string - // getQueueOnce sync.Once - session *session.Session -} - -func (pstr *SQSPoster) Close() {} - -func (pstr *SQSPoster) parseURL(dialURL string) (err error) { - u, err := url.Parse(dialURL) - if err != nil { - return err - } - qry := u.Query() - - pstr.dialURL = strings.Split(dialURL, "?")[0] - pstr.dialURL = strings.TrimSuffix(pstr.dialURL, "/") // used to remove / to point to correct endpoint - pstr.queueID = defaultQueueID - if vals, has := qry[queueID]; has && len(vals) != 0 { - pstr.queueID = vals[0] - } - if vals, has := qry[awsRegion]; has && len(vals) != 0 { - pstr.awsRegion = url.QueryEscape(vals[0]) - } else { - utils.Logger.Warning(" No region present for AWS.") - } - if vals, has := qry[awsID]; has && len(vals) != 0 { - pstr.awsID = url.QueryEscape(vals[0]) - } else { - utils.Logger.Warning(" No access key ID present for AWS.") - } - if vals, has := qry[awsSecret]; has && len(vals) != 0 { - pstr.awsKey = url.QueryEscape(vals[0]) - } else { - utils.Logger.Warning(" No secret access key present for AWS.") - } - if vals, has := qry[awsToken]; has && len(vals) != 0 { - pstr.awsToken = url.QueryEscape(vals[0]) - } else { - utils.Logger.Warning(" No session token present for AWS.") - } - pstr.getQueueURL() - return nil -} - -func (pstr *SQSPoster) getQueueURL() (err error) { - if pstr.queueURL != nil { - return nil - } - // pstr.getQueueOnce.Do(func() { - var svc *sqs.SQS - if svc, err = pstr.newPosterSession(); err != nil { - return - } - var result *sqs.GetQueueUrlOutput - if result, err = svc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String(pstr.queueID), - }); err == nil { - pstr.queueURL = new(string) - *(pstr.queueURL) = *result.QueueUrl - return - } - if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist { - // For CreateQueue - var createResult *sqs.CreateQueueOutput - if createResult, err = svc.CreateQueue(&sqs.CreateQueueInput{ - QueueName: aws.String(pstr.queueID), - }); err == nil { - pstr.queueURL = new(string) - *(pstr.queueURL) = *createResult.QueueUrl - return - } - } - utils.Logger.Warning(fmt.Sprintf(" can not get url for queue with ID=%s because err: %v", pstr.queueID, err)) - // }) - return err -} - -func (pstr *SQSPoster) Post(message []byte, fallbackFileName string) (err error) { - var svc *sqs.SQS - fib := utils.Fib() - - for i := 0; i < pstr.attempts; i++ { - if svc, err = pstr.newPosterSession(); err == nil { - break - } - time.Sleep(time.Duration(fib()) * time.Second) - } - if err != nil { - if fallbackFileName != utils.META_NONE { - utils.Logger.Warning(fmt.Sprintf(" creating new session, err: %s", err.Error())) - err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message) - } - return err - } - - for i := 0; i < pstr.attempts; i++ { - if _, err = svc.SendMessage( - &sqs.SendMessageInput{ - MessageBody: aws.String(string(message)), - QueueUrl: pstr.queueURL, - }, - ); err == nil { - break - } - } - if err != nil && fallbackFileName != utils.META_NONE { - utils.Logger.Warning(fmt.Sprintf(" posting new message, err: %s", err.Error())) - err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message) - } - return err -} - -func (pstr *SQSPoster) newPosterSession() (s *sqs.SQS, err error) { - pstr.Lock() - defer pstr.Unlock() - if pstr.session == nil { - var ses *session.Session - cfg := aws.Config{Endpoint: aws.String(pstr.dialURL)} - if len(pstr.awsRegion) != 0 { - cfg.Region = aws.String(pstr.awsRegion) - } - if len(pstr.awsID) != 0 && - len(pstr.awsKey) != 0 { - cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken) - } - ses, err = session.NewSessionWithOptions( - session.Options{ - Config: cfg, - }, - ) - if err != nil { - return nil, err - } - pstr.session = ses - } - return sqs.New(pstr.session), nil -} diff --git a/engine/pstr_amqp.go b/engine/pstr_amqp.go new file mode 100644 index 000000000..41c2f2db0 --- /dev/null +++ b/engine/pstr_amqp.go @@ -0,0 +1,201 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package engine + +import ( + "fmt" + "net/url" + "strings" + "sync" + "time" + + "github.com/cgrates/cgrates/utils" + "github.com/streadway/amqp" +) + +var AMQPQuery = []string{"cacertfile", "certfile", "keyfile", "verify", "server_name_indication", "auth_mechanism", "heartbeat", "connection_timeout", "channel_max"} + +// "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs" +func NewAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (*AMQPPoster, error) { + amqp := &AMQPPoster{ + attempts: attempts, + fallbackFileDir: fallbackFileDir, + } + if err := amqp.parseURL(dialURL); err != nil { + return nil, err + } + return amqp, nil +} + +type AMQPPoster struct { + dialURL string + queueID string // identifier of the CDR queue where we publish + exchange string + exchangeType string + routingKey string + attempts int + fallbackFileDir string + sync.Mutex // protect connection + conn *amqp.Connection +} + +func (pstr *AMQPPoster) parseURL(dialURL string) error { + u, err := url.Parse(dialURL) + if err != nil { + return err + } + qry := u.Query() + q := url.Values{} + for _, key := range AMQPQuery { + if vals, has := qry[key]; has && len(vals) != 0 { + q.Add(key, vals[0]) + } + } + pstr.dialURL = strings.Split(dialURL, "?")[0] + "?" + q.Encode() + pstr.queueID = defaultQueueID + pstr.routingKey = defaultQueueID + if vals, has := qry[queueID]; has && len(vals) != 0 { + pstr.queueID = vals[0] + } + if vals, has := qry[routingKey]; has && len(vals) != 0 { + pstr.routingKey = vals[0] + } + if vals, has := qry[exchange]; has && len(vals) != 0 { + pstr.exchange = vals[0] + pstr.exchangeType = defaultExchangeType + } + if vals, has := qry[exchangeType]; has && len(vals) != 0 { + pstr.exchangeType = vals[0] + } + return nil +} + +// Post is the method being called when we need to post anything in the queue +// the optional chn will permits channel caching +func (pstr *AMQPPoster) Post(content []byte, fallbackFileName string) (err error) { + var chn *amqp.Channel + fib := utils.Fib() + + for i := 0; i < pstr.attempts; i++ { + if chn, err = pstr.newPostChannel(); err == nil { + break + } + time.Sleep(time.Duration(fib()) * time.Second) + } + if err != nil { + if fallbackFileName != utils.META_NONE { + utils.Logger.Warning(fmt.Sprintf(" creating new post channel, err: %s", err.Error())) + err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content) + } + return err + } + for i := 0; i < pstr.attempts; i++ { + if err = chn.Publish( + pstr.exchange, // exchange + pstr.routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: utils.CONTENT_JSON, + Body: content, + }); err == nil { + break + } + time.Sleep(time.Duration(fib()) * time.Second) + } + if err != nil && fallbackFileName != utils.META_NONE { + err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content) + return err + } + if chn != nil { + chn.Close() + } + return +} + +func (pstr *AMQPPoster) Close() { + pstr.Lock() + if pstr.conn != nil { + pstr.conn.Close() + } + pstr.conn = nil + pstr.Unlock() +} + +func (pstr *AMQPPoster) newPostChannel() (postChan *amqp.Channel, err error) { + pstr.Lock() + if pstr.conn == nil { + var conn *amqp.Connection + conn, err = amqp.Dial(pstr.dialURL) + if err == nil { + pstr.conn = conn + go func() { // monitor connection errors so we can restart + if err := <-pstr.conn.NotifyClose(make(chan *amqp.Error)); err != nil { + utils.Logger.Err(fmt.Sprintf("Connection error received: %s", err.Error())) + pstr.Close() + } + }() + } + } + pstr.Unlock() + if err != nil { + return nil, err + } + if postChan, err = pstr.conn.Channel(); err != nil { + return + } + + if pstr.exchange != "" { + if err = postChan.ExchangeDeclare( + pstr.exchange, // name + pstr.exchangeType, // type + true, // durable + false, // audo-delete + false, // internal + false, // no-wait + nil, // args + ); err != nil { + return + } + } + + if _, err = postChan.QueueDeclare( + pstr.queueID, // name + true, // durable + false, // auto-delete + false, // exclusive + false, // no-wait + nil, // args + ); err != nil { + return + } + + if pstr.exchange != "" { + if err = postChan.QueueBind( + pstr.queueID, // queue + pstr.routingKey, // key + pstr.exchange, // exchange + false, // no-wait + nil, // args + ); err != nil { + return + } + } + return +} diff --git a/engine/pstr_amqpv1.go b/engine/pstr_amqpv1.go new file mode 100644 index 000000000..fc1f25aef --- /dev/null +++ b/engine/pstr_amqpv1.go @@ -0,0 +1,166 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "context" + "fmt" + "net" + "sync" + "time" + + "github.com/cgrates/cgrates/utils" + amqpv1 "github.com/vcabbage/amqp" +) + +func NewAMQPv1Poster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { + URL, qID, err := parseURL(dialURL) + if err != nil { + return nil, err + } + return &AMQPv1Poster{ + dialURL: URL, + queueID: "/" + qID, + attempts: attempts, + fallbackFileDir: fallbackFileDir, + }, nil +} + +type AMQPv1Poster struct { + sync.Mutex + dialURL string + queueID string // identifier of the CDR queue where we publish + attempts int + fallbackFileDir string + client *amqpv1.Client +} + +func (pstr *AMQPv1Poster) Close() { + pstr.Lock() + if pstr.client != nil { + pstr.client.Close() + } + pstr.client = nil + pstr.Unlock() +} + +func (pstr *AMQPv1Poster) Post(content []byte, fallbackFileName string) (err error) { + var s *amqpv1.Session + fib := utils.Fib() + + for i := 0; i < pstr.attempts; i++ { + if s, err = pstr.newPosterSession(); err == nil { + break + } + // reset client and try again + // used in case of closed conection because of idle time + if pstr.client != nil { + pstr.client.Close() // Make shure the connection is closed before reseting it + } + pstr.client = nil + time.Sleep(time.Duration(fib()) * time.Second) + } + if err != nil { + if fallbackFileName != utils.META_NONE { + utils.Logger.Warning(fmt.Sprintf(" creating new post channel, err: %s", err.Error())) + err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content) + } + return err + } + + ctx := context.Background() + for i := 0; i < pstr.attempts; i++ { + sender, err := s.NewSender( + amqpv1.LinkTargetAddress(pstr.queueID), + ) + if err != nil { + time.Sleep(time.Duration(fib()) * time.Second) + // if pstr.isRecoverableError(err) { + // s.Close(ctx) + // pstr.client.Close() + // pstr.client = nil + // stmp, err := pstr.newPosterSession() + // if err == nil { + // s = stmp + // } + // } + continue + } + // Send message + err = sender.Send(ctx, amqpv1.NewMessage(content)) + sender.Close(ctx) + if err == nil { + break + } + time.Sleep(time.Duration(fib()) * time.Second) + // if pstr.isRecoverableError(err) { + // s.Close(ctx) + // pstr.client.Close() + // pstr.client = nil + // stmp, err := pstr.newPosterSession() + // if err == nil { + // s = stmp + // } + // } + } + if err != nil && fallbackFileName != utils.META_NONE { + err = writeToFile(pstr.fallbackFileDir, fallbackFileName, content) + return err + } + if s != nil { + s.Close(ctx) + } + return +} + +func (pstr *AMQPv1Poster) newPosterSession() (s *amqpv1.Session, err error) { + pstr.Lock() + defer pstr.Unlock() + if pstr.client == nil { + var client *amqpv1.Client + client, err = amqpv1.Dial(pstr.dialURL) + if err != nil { + return nil, err + } + pstr.client = client + } + return pstr.client.NewSession() +} + +func isRecoverableCloseError(err error) bool { + return err == amqpv1.ErrConnClosed || + err == amqpv1.ErrLinkClosed || + err == amqpv1.ErrSessionClosed +} + +func (pstr *AMQPv1Poster) isRecoverableError(err error) bool { + switch err.(type) { + case *amqpv1.Error, *amqpv1.DetachError, net.Error: + if netErr, ok := err.(net.Error); ok { + if !netErr.Temporary() { + return false + } + } + default: + if !isRecoverableCloseError(err) { + return false + } + } + return true +} diff --git a/engine/pstr_http.go b/engine/pstr_http.go new file mode 100644 index 000000000..d1fc4f787 --- /dev/null +++ b/engine/pstr_http.go @@ -0,0 +1,126 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "bytes" + "crypto/tls" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "os" + "time" + + "github.com/cgrates/cgrates/guardian" + "github.com/cgrates/cgrates/utils" +) + +// Post without automatic failover +func HttpJsonPost(url string, skipTlsVerify bool, content []byte) ([]byte, error) { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTlsVerify}, + DisableKeepAlives: true, + } + client := &http.Client{Transport: tr} + resp, err := client.Post(url, "application/json", bytes.NewBuffer(content)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + if resp.StatusCode > 299 { + return respBody, fmt.Errorf("Unexpected status code received: %d", resp.StatusCode) + } + return respBody, nil +} + +func NewHTTPPoster(skipTLSVerify bool, replyTimeout time.Duration) *HTTPPoster { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLSVerify}, + } + return &HTTPPoster{httpClient: &http.Client{Transport: tr, Timeout: replyTimeout}} +} + +type HTTPPoster struct { + httpClient *http.Client +} + +// Post with built-in failover +// Returns also reference towards client so we can close it's connections when done +func (poster *HTTPPoster) Post(addr string, contentType string, content interface{}, attempts int, fallbackFilePath string) (respBody []byte, err error) { + if !utils.IsSliceMember([]string{utils.CONTENT_JSON, utils.CONTENT_FORM, utils.CONTENT_TEXT}, contentType) { + return nil, fmt.Errorf("unsupported ContentType: %s", contentType) + } + var body []byte // Used to write in file and send over http + var urlVals url.Values // Used when posting form + if utils.IsSliceMember([]string{utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) { + body = content.([]byte) + } else if contentType == utils.CONTENT_FORM { + urlVals = content.(url.Values) + body = []byte(urlVals.Encode()) + } + fib := utils.Fib() + bodyType := "application/x-www-form-urlencoded" + if contentType == utils.CONTENT_JSON { + bodyType = "application/json" + } + for i := 0; i < attempts; i++ { + var resp *http.Response + if utils.IsSliceMember([]string{utils.CONTENT_JSON, utils.CONTENT_TEXT}, contentType) { + resp, err = poster.httpClient.Post(addr, bodyType, bytes.NewBuffer(body)) + } else if contentType == utils.CONTENT_FORM { + resp, err = poster.httpClient.PostForm(addr, urlVals) + } + if err != nil { + utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) + time.Sleep(time.Duration(fib()) * time.Second) + continue + } + defer resp.Body.Close() + respBody, err = ioutil.ReadAll(resp.Body) + if err != nil { + utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) + time.Sleep(time.Duration(fib()) * time.Second) + continue + } + if resp.StatusCode > 299 { + utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode)) + time.Sleep(time.Duration(fib()) * time.Second) + continue + } + return respBody, nil + } + if fallbackFilePath != utils.META_NONE { + // If we got that far, post was not possible, write it on disk + _, err = guardian.Guardian.Guard(func() (interface{}, error) { + fileOut, err := os.Create(fallbackFilePath) + if err != nil { + return nil, err + } + _, err = fileOut.Write(body) + fileOut.Close() + return nil, err + }, time.Duration(2*time.Second), utils.FileLockPrefix+fallbackFilePath) + } + return +} diff --git a/engine/pstr_sqs.go b/engine/pstr_sqs.go new file mode 100644 index 000000000..5f682ee4f --- /dev/null +++ b/engine/pstr_sqs.go @@ -0,0 +1,194 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "fmt" + "net/url" + "strings" + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/cgrates/cgrates/utils" +) + +func NewSQSPoster(dialURL string, attempts int, fallbackFileDir string) (Poster, error) { + pstr := &SQSPoster{ + attempts: attempts, + fallbackFileDir: fallbackFileDir, + } + err := pstr.parseURL(dialURL) + if err != nil { + return nil, err + } + return pstr, nil +} + +type SQSPoster struct { + sync.Mutex + dialURL string + awsRegion string + awsID string + awsKey string + awsToken string + attempts int + fallbackFileDir string + queueURL *string + queueID string + // getQueueOnce sync.Once + session *session.Session +} + +func (pstr *SQSPoster) Close() {} + +func (pstr *SQSPoster) parseURL(dialURL string) (err error) { + u, err := url.Parse(dialURL) + if err != nil { + return err + } + qry := u.Query() + + pstr.dialURL = strings.Split(dialURL, "?")[0] + pstr.dialURL = strings.TrimSuffix(pstr.dialURL, "/") // used to remove / to point to correct endpoint + pstr.queueID = defaultQueueID + if vals, has := qry[queueID]; has && len(vals) != 0 { + pstr.queueID = vals[0] + } + if vals, has := qry[awsRegion]; has && len(vals) != 0 { + pstr.awsRegion = url.QueryEscape(vals[0]) + } else { + utils.Logger.Warning(" No region present for AWS.") + } + if vals, has := qry[awsID]; has && len(vals) != 0 { + pstr.awsID = url.QueryEscape(vals[0]) + } else { + utils.Logger.Warning(" No access key ID present for AWS.") + } + if vals, has := qry[awsSecret]; has && len(vals) != 0 { + pstr.awsKey = url.QueryEscape(vals[0]) + } else { + utils.Logger.Warning(" No secret access key present for AWS.") + } + if vals, has := qry[awsToken]; has && len(vals) != 0 { + pstr.awsToken = url.QueryEscape(vals[0]) + } else { + utils.Logger.Warning(" No session token present for AWS.") + } + pstr.getQueueURL() + return nil +} + +func (pstr *SQSPoster) getQueueURL() (err error) { + if pstr.queueURL != nil { + return nil + } + // pstr.getQueueOnce.Do(func() { + var svc *sqs.SQS + if svc, err = pstr.newPosterSession(); err != nil { + return + } + var result *sqs.GetQueueUrlOutput + if result, err = svc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(pstr.queueID), + }); err == nil { + pstr.queueURL = new(string) + *(pstr.queueURL) = *result.QueueUrl + return + } + if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist { + // For CreateQueue + var createResult *sqs.CreateQueueOutput + if createResult, err = svc.CreateQueue(&sqs.CreateQueueInput{ + QueueName: aws.String(pstr.queueID), + }); err == nil { + pstr.queueURL = new(string) + *(pstr.queueURL) = *createResult.QueueUrl + return + } + } + utils.Logger.Warning(fmt.Sprintf(" can not get url for queue with ID=%s because err: %v", pstr.queueID, err)) + // }) + return err +} + +func (pstr *SQSPoster) Post(message []byte, fallbackFileName string) (err error) { + var svc *sqs.SQS + fib := utils.Fib() + + for i := 0; i < pstr.attempts; i++ { + if svc, err = pstr.newPosterSession(); err == nil { + break + } + time.Sleep(time.Duration(fib()) * time.Second) + } + if err != nil { + if fallbackFileName != utils.META_NONE { + utils.Logger.Warning(fmt.Sprintf(" creating new session, err: %s", err.Error())) + err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message) + } + return err + } + + for i := 0; i < pstr.attempts; i++ { + if _, err = svc.SendMessage( + &sqs.SendMessageInput{ + MessageBody: aws.String(string(message)), + QueueUrl: pstr.queueURL, + }, + ); err == nil { + break + } + } + if err != nil && fallbackFileName != utils.META_NONE { + utils.Logger.Warning(fmt.Sprintf(" posting new message, err: %s", err.Error())) + err = writeToFile(pstr.fallbackFileDir, fallbackFileName, message) + } + return err +} + +func (pstr *SQSPoster) newPosterSession() (s *sqs.SQS, err error) { + pstr.Lock() + defer pstr.Unlock() + if pstr.session == nil { + var ses *session.Session + cfg := aws.Config{Endpoint: aws.String(pstr.dialURL)} + if len(pstr.awsRegion) != 0 { + cfg.Region = aws.String(pstr.awsRegion) + } + if len(pstr.awsID) != 0 && + len(pstr.awsKey) != 0 { + cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken) + } + ses, err = session.NewSessionWithOptions( + session.Options{ + Config: cfg, + }, + ) + if err != nil { + return nil, err + } + pstr.session = ses + } + return sqs.New(pstr.session), nil +}