Finished implementing all the exporters

This commit is contained in:
Trial97
2021-08-17 17:15:50 +03:00
committed by Dan Christian Bogos
parent a74aacd7fe
commit cd0964dfea
69 changed files with 1365 additions and 2859 deletions

View File

@@ -69,46 +69,50 @@ func (a *Action) Clone() (cln *Action) {
type actionTypeFunc func(*Account, *Action, Actions, interface{}) error
func getActionFunc(typ string) (actionTypeFunc, bool) {
actionFuncMap := map[string]actionTypeFunc{
utils.MetaLog: logAction,
utils.MetaResetTriggers: resetTriggersAction,
utils.CDRLog: cdrLogAction,
utils.MetaSetRecurrent: setRecurrentAction,
utils.MetaUnsetRecurrent: unsetRecurrentAction,
utils.MetaAllowNegative: allowNegativeAction,
utils.MetaDenyNegative: denyNegativeAction,
utils.MetaResetAccount: resetAccountAction,
utils.MetaTopUpReset: topupResetAction,
utils.MetaTopUp: topupAction,
utils.MetaDebitReset: debitResetAction,
utils.MetaDebit: debitAction,
utils.MetaResetCounters: resetCountersAction,
utils.MetaEnableAccount: enableAccountAction,
utils.MetaDisableAccount: disableAccountAction,
utils.MetaHTTPPost: callURL,
utils.HttpPostAsync: callURLAsync,
utils.MetaMailAsync: mailAsync,
utils.MetaSetDDestinations: setddestinations,
utils.MetaRemoveAccount: removeAccountAction,
utils.MetaRemoveBalance: removeBalanceAction,
utils.MetaSetBalance: setBalanceAction,
utils.MetaTransferMonetaryDefault: transferMonetaryDefaultAction,
utils.MetaCgrRpc: cgrRPCAction,
utils.TopUpZeroNegative: topupZeroNegativeAction,
utils.SetExpiry: setExpiryAction,
utils.MetaPublishAccount: publishAccount,
utils.MetaRemoveSessionCosts: removeSessionCosts,
utils.MetaRemoveExpired: removeExpired,
utils.MetaPostEvent: postEvent,
utils.MetaCDRAccount: resetAccountCDR,
utils.MetaExport: export,
utils.MetaResetThreshold: resetThreshold,
utils.MetaResetStatQueue: resetStatQueue,
utils.MetaRemoteSetAccount: remoteSetAccount,
}
f, exists := actionFuncMap[typ]
return f, exists
var actionFuncMap = make(map[string]actionTypeFunc)
func init() {
actionFuncMap[utils.MetaLog] = logAction
actionFuncMap[utils.MetaResetTriggers] = resetTriggersAction
actionFuncMap[utils.CDRLog] = cdrLogAction
actionFuncMap[utils.MetaSetRecurrent] = setRecurrentAction
actionFuncMap[utils.MetaUnsetRecurrent] = unsetRecurrentAction
actionFuncMap[utils.MetaAllowNegative] = allowNegativeAction
actionFuncMap[utils.MetaDenyNegative] = denyNegativeAction
actionFuncMap[utils.MetaResetAccount] = resetAccountAction
actionFuncMap[utils.MetaTopUpReset] = topupResetAction
actionFuncMap[utils.MetaTopUp] = topupAction
actionFuncMap[utils.MetaDebitReset] = debitResetAction
actionFuncMap[utils.MetaDebit] = debitAction
actionFuncMap[utils.MetaResetCounters] = resetCountersAction
actionFuncMap[utils.MetaEnableAccount] = enableAccountAction
actionFuncMap[utils.MetaDisableAccount] = disableAccountAction
actionFuncMap[utils.MetaMailAsync] = mailAsync
actionFuncMap[utils.MetaSetDDestinations] = setddestinations
actionFuncMap[utils.MetaRemoveAccount] = removeAccountAction
actionFuncMap[utils.MetaRemoveBalance] = removeBalanceAction
actionFuncMap[utils.MetaSetBalance] = setBalanceAction
actionFuncMap[utils.MetaTransferMonetaryDefault] = transferMonetaryDefaultAction
actionFuncMap[utils.MetaCgrRpc] = cgrRPCAction
actionFuncMap[utils.TopUpZeroNegative] = topupZeroNegativeAction
actionFuncMap[utils.SetExpiry] = setExpiryAction
actionFuncMap[utils.MetaPublishAccount] = publishAccount
actionFuncMap[utils.MetaRemoveSessionCosts] = removeSessionCosts
actionFuncMap[utils.MetaRemoveExpired] = removeExpired
actionFuncMap[utils.MetaCDRAccount] = resetAccountCDR
actionFuncMap[utils.MetaExport] = export
actionFuncMap[utils.MetaResetThreshold] = resetThreshold
actionFuncMap[utils.MetaResetStatQueue] = resetStatQueue
actionFuncMap[utils.MetaRemoteSetAccount] = remoteSetAccount
}
func getActionFunc(typ string) (f actionTypeFunc, exists bool) {
f, exists = actionFuncMap[typ]
return
}
func RegisterActionFunc(action string, f actionTypeFunc) {
actionFuncMap[action] = f
}
func logAction(ub *Account, a *Action, acs Actions, extraData interface{}) (err error) {
@@ -371,54 +375,6 @@ func genericReset(ub *Account) error {
return nil
}
func getOneData(ub *Account, extraData interface{}) ([]byte, error) {
switch {
case ub != nil:
return json.Marshal(ub)
case extraData != nil:
return json.Marshal(extraData)
}
return nil, nil
}
func callURL(ub *Account, a *Action, acs Actions, extraData interface{}) error {
body, err := getOneData(ub, extraData)
if err != nil {
return err
}
pstr, err := NewHTTPPoster(config.CgrConfig().GeneralCfg().ReplyTimeout, a.ExtraParameters,
utils.ContentJSON, config.CgrConfig().GeneralCfg().PosterAttempts)
if err != nil {
return err
}
err = pstr.PostValues(body, make(http.Header))
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.MetaNone {
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HierarchySep+a.ActionType, body, make(map[string]interface{}))
err = nil
}
return err
}
// Does not block for posts, no error reports
func callURLAsync(ub *Account, a *Action, acs Actions, extraData interface{}) error {
body, err := getOneData(ub, extraData)
if err != nil {
return err
}
pstr, err := NewHTTPPoster(config.CgrConfig().GeneralCfg().ReplyTimeout, a.ExtraParameters,
utils.ContentJSON, config.CgrConfig().GeneralCfg().PosterAttempts)
if err != nil {
return err
}
go func() {
err := pstr.PostValues(body, make(http.Header))
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.MetaNone {
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HierarchySep+a.ActionType, body, make(map[string]interface{}))
}
}()
return nil
}
// Mails the balance hitting the threshold towards predefined list of addresses
func mailAsync(ub *Account, a *Action, acs Actions, extraData interface{}) error {
cgrCfg := config.CgrConfig()
@@ -941,24 +897,6 @@ func removeExpired(acc *Account, action *Action, _ Actions, extraData interface{
return nil
}
func postEvent(ub *Account, a *Action, acs Actions, extraData interface{}) error {
body, err := json.Marshal(extraData)
if err != nil {
return err
}
pstr, err := NewHTTPPoster(config.CgrConfig().GeneralCfg().ReplyTimeout, a.ExtraParameters,
utils.ContentJSON, config.CgrConfig().GeneralCfg().PosterAttempts)
if err != nil {
return err
}
err = pstr.PostValues(body, make(http.Header))
if err != nil && config.CgrConfig().GeneralCfg().FailedPostsDir != utils.MetaNone {
AddFailedPost(a.ExtraParameters, utils.MetaHTTPjson, utils.ActionsPoster+utils.HierarchySep+a.ActionType, body, make(map[string]interface{}))
err = nil
}
return err
}
// resetAccountCDR resets the account out of values from CDR
func resetAccountCDR(ub *Account, action *Action, acts Actions, _ interface{}) error {
if ub == nil {

View File

@@ -80,8 +80,6 @@ func init() {
gob.Register(new(StatAverage))
gob.Register(new(StatDistinct))
gob.Register(new(HTTPPosterRequest))
gob.Register([]interface{}{})
gob.Register([]map[string]interface{}{})
gob.Register(map[string]interface{}{})

View File

@@ -1,211 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"bytes"
"encoding/gob"
"fmt"
"os"
"path"
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/ltcache"
)
var failedPostCache *ltcache.Cache
func init() {
failedPostCache = ltcache.NewCache(-1, 5*time.Second, false, writeFailedPosts) // configurable general
}
// SetFailedPostCacheTTL recreates the failed cache
func SetFailedPostCacheTTL(ttl time.Duration) {
failedPostCache = ltcache.NewCache(-1, ttl, false, writeFailedPosts)
}
func writeFailedPosts(itmID string, value interface{}) {
expEv, canConvert := value.(*ExportEvents)
if !canConvert {
return
}
filePath := expEv.FilePath()
if err := expEv.WriteToFile(filePath); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to write file <%s> because <%s>",
utils.CDRs, filePath, err))
}
}
func AddFailedPost(failedPostsDir, expPath, format, module string, ev interface{}, opts map[string]interface{}) {
key := utils.ConcatenatedKey(failedPostsDir, expPath, format, module)
// also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id
if qID := utils.FirstNonEmpty(
utils.IfaceAsString(opts[utils.AMQPQueueID]),
utils.IfaceAsString(opts[utils.S3Bucket]),
utils.IfaceAsString(opts[utils.SQSQueueID]),
utils.IfaceAsString(opts[utils.KafkaTopic])); len(qID) != 0 {
key = utils.ConcatenatedKey(key, qID)
}
var failedPost *ExportEvents
if x, ok := failedPostCache.Get(key); ok {
if x != nil {
failedPost = x.(*ExportEvents)
}
}
if failedPost == nil {
failedPost = &ExportEvents{
Path: expPath,
Format: format,
Opts: opts,
module: module,
failedPostsDir: failedPostsDir,
}
}
failedPost.AddEvent(ev)
failedPostCache.Set(key, failedPost, nil)
}
// NewExportEventsFromFile returns ExportEvents from the file
// used only on replay failed post
func NewExportEventsFromFile(filePath string) (expEv *ExportEvents, err error) {
var fileContent []byte
if err = guardian.Guardian.Guard(func() error {
if fileContent, err = os.ReadFile(filePath); err != nil {
return err
}
return os.Remove(filePath)
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath); err != nil {
return
}
dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
// unmarshall it
expEv = new(ExportEvents)
err = dec.Decode(&expEv)
return
}
// ExportEvents used to save the failed post to file
type ExportEvents struct {
lk sync.RWMutex
Path string
Opts map[string]interface{}
Format string
Events []interface{}
failedPostsDir string
module string
}
// FilePath returns the file path it should use for saving the failed events
func (expEv *ExportEvents) FilePath() string {
return path.Join(expEv.failedPostsDir, expEv.module+utils.PipeSep+utils.UUIDSha1Prefix()+utils.GOBSuffix)
}
// SetModule sets the module for this event
func (expEv *ExportEvents) SetModule(mod string) {
expEv.module = mod
}
// WriteToFile writes the events to file
func (expEv *ExportEvents) WriteToFile(filePath string) (err error) {
return guardian.Guardian.Guard(func() error {
fileOut, err := os.Create(filePath)
if err != nil {
return err
}
encd := gob.NewEncoder(fileOut)
err = encd.Encode(expEv)
fileOut.Close()
return err
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath)
}
// AddEvent adds one event
func (expEv *ExportEvents) AddEvent(ev interface{}) {
expEv.lk.Lock()
expEv.Events = append(expEv.Events, ev)
expEv.lk.Unlock()
}
// ReplayFailedPosts tryies to post cdrs again
func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *ExportEvents, err error) {
failedEvents = &ExportEvents{
Path: expEv.Path,
Opts: expEv.Opts,
Format: expEv.Format,
}
var pstr Poster
keyFunc := func() string { return utils.EmptyString }
switch expEv.Format {
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.MetaHTTPPost:
var pstr *HTTPPoster
pstr, err = NewHTTPPoster(config.CgrConfig().GeneralCfg().ReplyTimeout, expEv.Path,
utils.PosterTransportContentTypes[expEv.Format],
config.CgrConfig().GeneralCfg().PosterAttempts)
if err != nil {
return expEv, err
}
for _, ev := range expEv.Events {
req := ev.(*HTTPPosterRequest)
err = pstr.PostValues(req.Body, req.Header)
if err != nil {
failedEvents.AddEvent(req)
}
}
if len(failedEvents.Events) > 0 {
err = utils.ErrPartiallyExecuted
} else {
failedEvents = nil
}
return
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
pstr = NewAMQPPoster(expEv.Path, attempts, expEv.Opts)
case utils.MetaAMQPV1jsonMap:
pstr = NewAMQPv1Poster(expEv.Path, attempts, expEv.Opts)
case utils.MetaSQSjsonMap:
pstr = NewSQSPoster(expEv.Path, attempts, expEv.Opts)
case utils.MetaKafkajsonMap:
pstr = NewKafkaPoster(expEv.Path, attempts, expEv.Opts)
keyFunc = utils.UUIDSha1Prefix
case utils.MetaS3jsonMap:
pstr = NewS3Poster(expEv.Path, attempts, expEv.Opts)
keyFunc = utils.UUIDSha1Prefix
case utils.MetaNatsjsonMap:
if pstr, err = NewNatsPoster(expEv.Path, attempts, expEv.Opts,
config.CgrConfig().GeneralCfg().NodeID,
config.CgrConfig().GeneralCfg().ConnectTimeout); err != nil {
return expEv, err
}
}
for _, ev := range expEv.Events {
if err = pstr.Post(ev.([]byte), keyFunc()); err != nil {
failedEvents.AddEvent(ev)
}
}
pstr.Close()
if len(failedEvents.Events) > 0 {
err = utils.ErrPartiallyExecuted
} else {
failedEvents = nil
}
return
}

View File

@@ -1,155 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/utils"
)
func TestSetFldPostCacheTTL(t *testing.T) {
var1 := failedPostCache
SetFailedPostCacheTTL(50 * time.Millisecond)
var2 := failedPostCache
if reflect.DeepEqual(var1, var2) {
t.Error("Expecting to be different")
}
}
func TestAddFldPost(t *testing.T) {
SetFailedPostCacheTTL(5 * time.Second)
AddFailedPost("", "path1", "format1", "module1", "1", make(map[string]interface{}))
x, ok := failedPostCache.Get(utils.ConcatenatedKey("path1", "format1", "module1"))
if !ok {
t.Error("Error reading from cache")
}
if x == nil {
t.Error("Received an empty element")
}
failedPost, canCast := x.(*ExportEvents)
if !canCast {
t.Error("Error when casting")
}
eOut := &ExportEvents{
Path: "path1",
Format: "format1",
module: "module1",
Events: []interface{}{"1"},
Opts: make(map[string]interface{}),
}
if !reflect.DeepEqual(eOut, failedPost) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
}
AddFailedPost("", "path1", "format1", "module1", "2", make(map[string]interface{}))
AddFailedPost("", "path2", "format2", "module2", "3", map[string]interface{}{utils.SQSQueueID: "qID"})
x, ok = failedPostCache.Get(utils.ConcatenatedKey("path1", "format1", "module1"))
if !ok {
t.Error("Error reading from cache")
}
if x == nil {
t.Error("Received an empty element")
}
failedPost, canCast = x.(*ExportEvents)
if !canCast {
t.Error("Error when casting")
}
eOut = &ExportEvents{
Path: "path1",
Format: "format1",
module: "module1",
Events: []interface{}{"1", "2"},
Opts: make(map[string]interface{}),
}
if !reflect.DeepEqual(eOut, failedPost) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
}
x, ok = failedPostCache.Get(utils.ConcatenatedKey("path2", "format2", "module2", "qID"))
if !ok {
t.Error("Error reading from cache")
}
if x == nil {
t.Error("Received an empty element")
}
failedPost, canCast = x.(*ExportEvents)
if !canCast {
t.Error("Error when casting")
}
eOut = &ExportEvents{
Path: "path2",
Format: "format2",
module: "module2",
Events: []interface{}{"3"},
Opts: map[string]interface{}{utils.SQSQueueID: "qID"},
}
if !reflect.DeepEqual(eOut, failedPost) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
}
}
func TestFilePath(t *testing.T) {
exportEvent := &ExportEvents{}
rcv := exportEvent.FilePath()
if rcv[0] != '|' {
t.Errorf("Expecting: '|', received: %+v", rcv[0])
} else if rcv[8:] != ".gob" {
t.Errorf("Expecting: '.gob', received: %+v", rcv[8:])
}
exportEvent = &ExportEvents{
module: "module",
}
rcv = exportEvent.FilePath()
if rcv[:7] != "module|" {
t.Errorf("Expecting: 'module|', received: %+v", rcv[:7])
} else if rcv[14:] != ".gob" {
t.Errorf("Expecting: '.gob', received: %+v", rcv[14:])
}
}
func TestSetModule(t *testing.T) {
exportEvent := &ExportEvents{}
eOut := &ExportEvents{
module: "module",
}
exportEvent.SetModule("module")
if !reflect.DeepEqual(eOut, exportEvent) {
t.Errorf("Expecting: %+v, received: %+v", eOut, exportEvent)
}
}
func TestAddEvent(t *testing.T) {
exportEvent := &ExportEvents{}
eOut := &ExportEvents{Events: []interface{}{"event1"}}
exportEvent.AddEvent("event1")
if !reflect.DeepEqual(eOut, exportEvent) {
t.Errorf("Expecting: %+v, received: %+v", eOut, exportEvent)
}
exportEvent = &ExportEvents{}
eOut = &ExportEvents{Events: []interface{}{"event1", "event2", "event3"}}
exportEvent.AddEvent("event1")
exportEvent.AddEvent("event2")
exportEvent.AddEvent("event3")
if !reflect.DeepEqual(eOut, exportEvent) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(exportEvent))
}
}

View File

@@ -528,7 +528,7 @@ func GetFltrIdxHealth(dm *DataManager, fltrCache, fltrIdxCache, objCache *ltcach
return
}
missingFltrs := utils.StringSet{} // for checking multiple filters that are missing(to not append the same ID in case)
for _, id := range ids { // get all the objects from DB
for _, id := range ids { // get all the objects from DB
id = strings.TrimPrefix(id, objPrfx)
tntID := utils.NewTenantID(id)
var obj *objFIH

View File

@@ -1,24 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
type Poster interface {
Post(body []byte, key string) error
Close()
}

View File

@@ -1,70 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"reflect"
"testing"
"github.com/cgrates/cgrates/utils"
)
func TestAMQPPosterParseURL(t *testing.T) {
amqp := &AMQPPoster{
dialURL: "amqp://guest:guest@localhost:5672/?heartbeat=5",
}
expected := &AMQPPoster{
dialURL: "amqp://guest:guest@localhost:5672/?heartbeat=5",
queueID: "q1",
exchange: "E1",
exchangeType: "fanout",
routingKey: "CGRCDR",
}
opts := map[string]interface{}{
utils.AMQPQueueID: "q1",
utils.AMQPExchange: "E1",
utils.AMQPRoutingKey: "CGRCDR",
utils.AMQPExchangeType: "fanout",
}
amqp.parseOpts(opts)
if !reflect.DeepEqual(expected, amqp) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(expected), utils.ToJSON(amqp))
}
}
func TestKafkaParseURL(t *testing.T) {
u := "127.0.0.1:9092"
exp := &KafkaPoster{
dialURL: "127.0.0.1:9092",
topic: "cdr_billing",
attempts: 10,
}
if kfk := NewKafkaPoster(u, 10, map[string]interface{}{utils.KafkaTopic: "cdr_billing"}); !reflect.DeepEqual(exp, kfk) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(kfk))
}
u = "localhost:9092"
exp = &KafkaPoster{
dialURL: "localhost:9092",
topic: "cdr_billing",
attempts: 10,
}
if kfk := NewKafkaPoster(u, 10, map[string]interface{}{utils.KafkaTopic: "cdr_billing"}); !reflect.DeepEqual(exp, kfk) {
t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(kfk))
}
}

View File

@@ -1,147 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"context"
"fmt"
"sync"
"time"
amqpv1 "github.com/Azure/go-amqp"
"github.com/cgrates/cgrates/utils"
)
// NewAMQPv1Poster creates a poster for amqpv1
func NewAMQPv1Poster(dialURL string, attempts int, opts map[string]interface{}) Poster {
pstr := &AMQPv1Poster{
dialURL: dialURL,
queueID: "/" + utils.DefaultQueueID,
attempts: attempts,
}
if vals, has := opts[utils.AMQPQueueID]; has {
pstr.queueID = "/" + utils.IfaceAsString(vals)
}
return pstr
}
// AMQPv1Poster a poster for amqpv1
type AMQPv1Poster struct {
sync.Mutex
dialURL string
queueID string // identifier of the CDR queue where we publish
attempts int
client *amqpv1.Client
}
// Close closes the connections
func (pstr *AMQPv1Poster) Close() {
pstr.Lock()
if pstr.client != nil {
pstr.client.Close()
}
pstr.client = nil
pstr.Unlock()
}
// Post is the method being called when we need to post anything in the queue
func (pstr *AMQPv1Poster) Post(content []byte, _ string) (err error) {
var s *amqpv1.Session
fib := utils.Fib()
for i := 0; i < pstr.attempts; i++ {
if s, err = pstr.newPosterSession(); err == nil {
break
}
// reset client and try again
// used in case of closed connection because of idle time
if pstr.client != nil {
pstr.client.Close() // Make shure the connection is closed before reseting it
}
pstr.client = nil
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<AMQPv1Poster> creating new post channel, err: %s", err.Error()))
return err
}
ctx := context.Background()
for i := 0; i < pstr.attempts; i++ {
sender, err := s.NewSender(
amqpv1.LinkTargetAddress(pstr.queueID),
)
if err != nil {
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
// if pstr.isRecoverableError(err) {
// s.Close(ctx)
// pstr.client.Close()
// pstr.client = nil
// stmp, err := pstr.newPosterSession()
// if err == nil {
// s = stmp
// }
// }
continue
}
// Send message
err = sender.Send(ctx, amqpv1.NewMessage(content))
sender.Close(ctx)
if err == nil {
break
}
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
// if pstr.isRecoverableError(err) {
// s.Close(ctx)
// pstr.client.Close()
// pstr.client = nil
// stmp, err := pstr.newPosterSession()
// if err == nil {
// s = stmp
// }
// }
}
if err != nil {
return
}
if s != nil {
s.Close(ctx)
}
return
}
func (pstr *AMQPv1Poster) newPosterSession() (s *amqpv1.Session, err error) {
pstr.Lock()
defer pstr.Unlock()
if pstr.client == nil {
var client *amqpv1.Client
client, err = amqpv1.Dial(pstr.dialURL)
if err != nil {
return nil, err
}
pstr.client = client
}
return pstr.client.NewSession()
}

View File

@@ -1,123 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"bytes"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/cgrates/cgrates/utils"
)
type HTTPPosterRequest struct {
Header http.Header
Body interface{}
}
// NewHTTPPoster return a new HTTP poster
func NewHTTPPoster(replyTimeout time.Duration, addr, contentType string,
attempts int) (httposter *HTTPPoster, err error) {
if !utils.SliceHasMember([]string{utils.ContentForm, utils.ContentJSON, utils.ContentText}, contentType) {
return nil, fmt.Errorf("unsupported ContentType: %s", contentType)
}
return &HTTPPoster{
httpClient: &http.Client{Transport: httpPstrTransport, Timeout: replyTimeout},
addr: addr,
contentType: contentType,
attempts: attempts,
}, nil
}
// HTTPPoster used to post cdrs
type HTTPPoster struct {
httpClient *http.Client
addr string
contentType string
attempts int
}
// PostValues will post the event
func (pstr *HTTPPoster) PostValues(content interface{}, hdr http.Header) (err error) {
_, err = pstr.GetResponse(content, hdr)
return
}
// GetResponse will post the event and return the response
func (pstr *HTTPPoster) GetResponse(content interface{}, hdr http.Header) (respBody []byte, err error) {
fib := utils.Fib()
for i := 0; i < pstr.attempts; i++ {
var req *http.Request
if req, err = pstr.getRequest(content, hdr); err != nil {
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error creating request: <%s>", pstr.addr, err.Error()))
return
}
if respBody, err = pstr.do(req); err != nil {
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
continue
}
return
}
return
}
func (pstr *HTTPPoster) do(req *http.Request) (respBody []byte, err error) {
var resp *http.Response
if resp, err = pstr.httpClient.Do(req); err != nil {
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", pstr.addr, err.Error()))
return
}
respBody, err = io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", pstr.addr, err.Error()))
return
}
if resp.StatusCode > 299 {
err = fmt.Errorf("unexpected status code received: <%d>", resp.StatusCode)
utils.Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, unexpected status code received: <%d>", pstr.addr, resp.StatusCode))
return
}
return
}
func (pstr *HTTPPoster) getRequest(content interface{}, hdr http.Header) (req *http.Request, err error) {
var body io.Reader
if pstr.contentType == utils.ContentForm {
body = strings.NewReader(content.(url.Values).Encode())
} else {
body = bytes.NewBuffer(content.([]byte))
}
contentType := "application/x-www-form-urlencoded"
if pstr.contentType == utils.ContentJSON {
contentType = "application/json"
}
hdr.Set("Content-Type", contentType)
if req, err = http.NewRequest(http.MethodPost, pstr.addr, body); err != nil {
return
}
req.Header = hdr
return
}

View File

@@ -1,86 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"context"
"sync"
"github.com/cgrates/cgrates/utils"
kafka "github.com/segmentio/kafka-go"
)
// NewKafkaPoster creates a kafka poster
func NewKafkaPoster(dialURL string, attempts int, opts map[string]interface{}) *KafkaPoster {
kfkPstr := &KafkaPoster{
dialURL: dialURL,
attempts: attempts,
topic: utils.DefaultQueueID,
}
if vals, has := opts[utils.KafkaTopic]; has {
kfkPstr.topic = utils.IfaceAsString(vals)
}
return kfkPstr
}
// KafkaPoster is a kafka poster
type KafkaPoster struct {
dialURL string
topic string // identifier of the CDR queue where we publish
attempts int
sync.Mutex // protect writer
writer *kafka.Writer
}
// Post is the method being called when we need to post anything in the queue
// the optional chn will permits channel caching
func (pstr *KafkaPoster) Post(content []byte, key string) (err error) {
pstr.newPostWriter()
pstr.Lock()
if err = pstr.writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(key),
Value: content,
}); err == nil {
pstr.Unlock()
return
}
pstr.Unlock()
return
}
// Close closes the kafka writer
func (pstr *KafkaPoster) Close() {
pstr.Lock()
if pstr.writer != nil {
pstr.writer.Close()
}
pstr.writer = nil
pstr.Unlock()
}
func (pstr *KafkaPoster) newPostWriter() {
pstr.Lock()
if pstr.writer == nil {
pstr.writer = kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{pstr.dialURL},
MaxAttempts: pstr.attempts,
Topic: pstr.topic,
})
}
pstr.Unlock()
}

View File

@@ -1,152 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"bytes"
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/cgrates/cgrates/utils"
)
// NewS3Poster creates a s3 poster
func NewS3Poster(dialURL string, attempts int, opts map[string]interface{}) Poster {
pstr := &S3Poster{
dialURL: dialURL,
attempts: attempts,
}
pstr.parseOpts(opts)
return pstr
}
// S3Poster is a s3 poster
type S3Poster struct {
sync.Mutex
dialURL string
awsRegion string
awsID string
awsKey string
awsToken string
attempts int
bucket string
folderPath string
session *session.Session
}
// Close for Poster interface
func (pstr *S3Poster) Close() {}
func (pstr *S3Poster) parseOpts(opts map[string]interface{}) {
pstr.bucket = utils.DefaultQueueID
if val, has := opts[utils.S3Bucket]; has {
pstr.bucket = utils.IfaceAsString(val)
}
if val, has := opts[utils.S3FolderPath]; has {
pstr.folderPath = utils.IfaceAsString(val)
}
if val, has := opts[utils.AWSRegion]; has {
pstr.awsRegion = utils.IfaceAsString(val)
}
if val, has := opts[utils.AWSKey]; has {
pstr.awsID = utils.IfaceAsString(val)
}
if val, has := opts[utils.AWSSecret]; has {
pstr.awsKey = utils.IfaceAsString(val)
}
if val, has := opts[utils.AWSToken]; has {
pstr.awsToken = utils.IfaceAsString(val)
}
}
// Post is the method being called when we need to post anything in the queue
func (pstr *S3Poster) Post(message []byte, key string) (err error) {
var svc *s3manager.Uploader
fib := utils.Fib()
for i := 0; i < pstr.attempts; i++ {
if svc, err = pstr.newPosterSession(); err == nil {
break
}
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<S3Poster> creating new session, err: %s", err.Error()))
return
}
for i := 0; i < pstr.attempts; i++ {
if _, err = svc.Upload(&s3manager.UploadInput{
Bucket: aws.String(pstr.bucket),
// Can also use the `filepath` standard library package to modify the
// filename as need for an S3 object key. Such as turning absolute path
// to a relative path.
Key: aws.String(fmt.Sprintf("%s/%s.json", pstr.folderPath, key)),
// The file to be uploaded. io.ReadSeeker is preferred as the Uploader
// will be able to optimize memory when uploading large content. io.Reader
// is supported, but will require buffering of the reader's bytes for
// each part.
Body: bytes.NewReader(message),
}); err == nil {
break
}
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<S3Poster> posting new message, err: %s", err.Error()))
}
return
}
func (pstr *S3Poster) newPosterSession() (s *s3manager.Uploader, err error) {
pstr.Lock()
defer pstr.Unlock()
if pstr.session == nil {
var ses *session.Session
cfg := aws.Config{Endpoint: aws.String(pstr.dialURL)}
if len(pstr.awsRegion) != 0 {
cfg.Region = aws.String(pstr.awsRegion)
}
if len(pstr.awsID) != 0 &&
len(pstr.awsKey) != 0 {
cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken)
}
ses, err = session.NewSessionWithOptions(
session.Options{
Config: cfg,
},
)
if err != nil {
return nil, err
}
pstr.session = ses
}
return s3manager.NewUploader(pstr.session), nil
}

View File

@@ -1,175 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/cgrates/cgrates/utils"
)
// NewSQSPoster creates a poster for sqs
func NewSQSPoster(dialURL string, attempts int, opts map[string]interface{}) Poster {
pstr := &SQSPoster{
attempts: attempts,
}
pstr.parseOpts(opts)
return pstr
}
// SQSPoster is a poster for sqs
type SQSPoster struct {
sync.Mutex
dialURL string
awsRegion string
awsID string
awsKey string
awsToken string
attempts int
queueURL *string
queueID string
// getQueueOnce sync.Once
session *session.Session
}
// Close for Poster interface
func (pstr *SQSPoster) Close() {}
func (pstr *SQSPoster) parseOpts(opts map[string]interface{}) {
pstr.queueID = utils.DefaultQueueID
if val, has := opts[utils.SQSQueueID]; has {
pstr.queueID = utils.IfaceAsString(val)
}
if val, has := opts[utils.AWSRegion]; has {
pstr.awsRegion = utils.IfaceAsString(val)
}
if val, has := opts[utils.AWSKey]; has {
pstr.awsID = utils.IfaceAsString(val)
}
if val, has := opts[utils.AWSSecret]; has {
pstr.awsKey = utils.IfaceAsString(val)
}
if val, has := opts[utils.AWSToken]; has {
pstr.awsToken = utils.IfaceAsString(val)
}
pstr.getQueueURL()
}
func (pstr *SQSPoster) getQueueURL() (err error) {
if pstr.queueURL != nil {
return nil
}
// pstr.getQueueOnce.Do(func() {
var svc *sqs.SQS
if svc, err = pstr.newPosterSession(); err != nil {
return
}
var result *sqs.GetQueueUrlOutput
if result, err = svc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String(pstr.queueID),
}); err == nil {
pstr.queueURL = new(string)
*(pstr.queueURL) = *result.QueueUrl
return
}
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist {
// For CreateQueue
var createResult *sqs.CreateQueueOutput
if createResult, err = svc.CreateQueue(&sqs.CreateQueueInput{
QueueName: aws.String(pstr.queueID),
}); err == nil {
pstr.queueURL = new(string)
*(pstr.queueURL) = *createResult.QueueUrl
return
}
}
utils.Logger.Warning(fmt.Sprintf("<SQSPoster> can not get url for queue with ID=%s because err: %v", pstr.queueID, err))
// })
return err
}
// Post is the method being called when we need to post anything in the queue
func (pstr *SQSPoster) Post(message []byte, _ string) (err error) {
var svc *sqs.SQS
fib := utils.Fib()
for i := 0; i < pstr.attempts; i++ {
if svc, err = pstr.newPosterSession(); err == nil {
break
}
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<SQSPoster> creating new session, err: %s", err.Error()))
return
}
for i := 0; i < pstr.attempts; i++ {
if _, err = svc.SendMessage(
&sqs.SendMessageInput{
MessageBody: aws.String(string(message)),
QueueUrl: pstr.queueURL,
},
); err == nil {
break
}
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<SQSPoster> posting new message, err: %s", err.Error()))
}
return
}
func (pstr *SQSPoster) newPosterSession() (s *sqs.SQS, err error) {
pstr.Lock()
defer pstr.Unlock()
if pstr.session == nil {
var ses *session.Session
cfg := aws.Config{Endpoint: aws.String(pstr.dialURL)}
if len(pstr.awsRegion) != 0 {
cfg.Region = aws.String(pstr.awsRegion)
}
if len(pstr.awsID) != 0 &&
len(pstr.awsKey) != 0 {
cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken)
}
ses, err = session.NewSessionWithOptions(
session.Options{
Config: cfg,
},
)
if err != nil {
return nil, err
}
pstr.session = ses
}
return sqs.New(pstr.session), nil
}

View File

@@ -1,112 +0,0 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"os"
"path/filepath"
"reflect"
"testing"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
func TestWriteFldPosts(t *testing.T) {
// can't convert
var notanExportEvent string
writeFailedPosts("somestring", notanExportEvent)
// can convert & write
dir := "/tmp/engine/libcdre_test/"
exportEvent := &ExportEvents{
module: "module",
}
if err := os.RemoveAll(dir); err != nil {
t.Fatal("Error removing folder: ", dir, err)
}
if err := os.MkdirAll(dir, 0755); err != nil {
t.Fatal("Error creating folder: ", dir, err)
}
config.CgrConfig().GeneralCfg().FailedPostsDir = dir
writeFailedPosts("itmID", exportEvent)
if filename, err := filepath.Glob(filepath.Join(dir, "module|*.gob")); err != nil {
t.Error(err)
} else if len(filename) == 0 {
t.Error("Expecting one file")
} else if len(filename) > 1 {
t.Error("Expecting only one file")
}
}
func TestWriteToFile(t *testing.T) {
filePath := "/tmp/engine/libcdre_test/writeToFile.txt"
exportEvent := &ExportEvents{}
//call WriteToFile function
if err := exportEvent.WriteToFile(filePath); err != nil {
t.Error(err)
}
// check if the file exists / throw error if the file doesn't exist
if _, err := os.Stat(filePath); os.IsNotExist(err) {
t.Fatalf("File doesn't exists")
}
//check if the file was written correctly
rcv, err := NewExportEventsFromFile(filePath)
if err != nil {
t.Errorf("Error deconding the file content: %+v", err)
}
if !reflect.DeepEqual(rcv, exportEvent) {
t.Errorf("Expecting: %+v,\nReceived: %+v", utils.ToJSON(exportEvent), utils.ToJSON(rcv))
}
//populate the exportEvent struct
exportEvent = &ExportEvents{
Events: []interface{}{"something1", "something2"},
Path: "path",
Format: "test",
}
filePath = "/tmp/engine/libcdre_test/writeToFile2.txt"
if err := exportEvent.WriteToFile(filePath); err != nil {
t.Error(err)
}
// check if the file exists / throw error if the file doesn't exist
if _, err := os.Stat(filePath); os.IsNotExist(err) {
t.Fatalf("File doesn't exists")
}
//check if the file was written correctly
rcv, err = NewExportEventsFromFile(filePath)
if err != nil {
t.Errorf("Error deconding the file content: %+v", err)
}
if !reflect.DeepEqual(rcv, exportEvent) {
t.Errorf("Expected: %+v,\nReceived: %+v", utils.ToJSON(exportEvent), utils.ToJSON(rcv))
}
//wrong path *reading
exportEvent = &ExportEvents{}
filePath = "/tmp/engine/libcdre_test/wrongpath.txt"
if _, err = NewExportEventsFromFile(filePath); err == nil || err.Error() != "open /tmp/engine/libcdre_test/wrongpath.txt: no such file or directory" {
t.Errorf("Expecting: 'open /tmp/engine/libcdre_test/wrongpath.txt: no such file or directory',\nReceived: '%+v'", err)
}
//wrong path *writing
filePath = utils.EmptyString
if err := exportEvent.WriteToFile(filePath); err == nil || err.Error() != "open : no such file or directory" {
t.Error(err)
}
}

View File

@@ -1081,8 +1081,8 @@ func TestHealthIndexDispatchers(t *testing.T) {
// we will set this dispatcherProfile but without indexing
dspPrf := &DispatcherProfile{
Tenant: "cgrates.org",
ID: "Dsp1",
Tenant: "cgrates.org",
ID: "Dsp1",
Subsystems: []string{utils.MetaAny, utils.MetaSessionS},
FilterIDs: []string{
"*string:~*opts.*apikey:dps1234|dsp9876",
@@ -1094,7 +1094,7 @@ func TestHealthIndexDispatchers(t *testing.T) {
Weight: 20,
Hosts: DispatcherHostProfiles{
{
ID: "ALL",
ID: "ALL",
},
},
}
@@ -1105,11 +1105,11 @@ func TestHealthIndexDispatchers(t *testing.T) {
args := &IndexHealthArgsWith3Ch{}
exp := &FilterIHReply{
MissingIndexes: map[string][]string{
"cgrates.org:*any:*string:*opts.*apikey:dps1234": {"Dsp1"},
"cgrates.org:*any:*string:*opts.*apikey:dsp9876": {"Dsp1"},
"cgrates.org:*any:*string:*req.AnswerTime:2013-11-07T08:42:26Z": {"Dsp1"},
"cgrates.org:*sessions:*string:*opts.*apikey:dps1234": {"Dsp1"},
"cgrates.org:*sessions:*string:*opts.*apikey:dsp9876": {"Dsp1"},
"cgrates.org:*any:*string:*opts.*apikey:dps1234": {"Dsp1"},
"cgrates.org:*any:*string:*opts.*apikey:dsp9876": {"Dsp1"},
"cgrates.org:*any:*string:*req.AnswerTime:2013-11-07T08:42:26Z": {"Dsp1"},
"cgrates.org:*sessions:*string:*opts.*apikey:dps1234": {"Dsp1"},
"cgrates.org:*sessions:*string:*opts.*apikey:dsp9876": {"Dsp1"},
"cgrates.org:*sessions:*string:*req.AnswerTime:2013-11-07T08:42:26Z": {"Dsp1"},
},
BrokenIndexes: map[string][]string{},
@@ -1117,17 +1117,17 @@ func TestHealthIndexDispatchers(t *testing.T) {
MissingObjects: []string{},
}
/*
if rply, err := GetFltrIdxHealth(dm,
ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil),
ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil),
ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil),
utils.CacheDispatcherFilterIndexes); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expected %+v, received %+v", utils.ToJSON(exp), utils.ToJSON(rply))
}
if rply, err := GetFltrIdxHealth(dm,
ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil),
ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil),
ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil),
utils.CacheDispatcherFilterIndexes); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expected %+v, received %+v", utils.ToJSON(exp), utils.ToJSON(rply))
}
*/
*/
// we will set manually some indexes that points to an nil object or index is valid but the obj is missing
indexes := map[string]utils.StringSet{
@@ -1140,7 +1140,7 @@ func TestHealthIndexDispatchers(t *testing.T) {
"Dsp2": {},
},
"*string:*req.ExtraField:Usage": { // index is valid but the obj does not exist
"InexistingDispatcher": {},
"InexistingDispatcher": {},
"InexistingDispatcher2": {},
},
}
@@ -1152,15 +1152,15 @@ func TestHealthIndexDispatchers(t *testing.T) {
//get the newIdxHealth for dispatchersProfile
exp = &FilterIHReply{
MissingIndexes: map[string][]string{
"cgrates.org:*any:*string:*opts.*apikey:dps1234": {"Dsp1"},
"cgrates.org:*any:*string:*opts.*apikey:dsp9876": {"Dsp1"},
"cgrates.org:*any:*string:*req.AnswerTime:2013-11-07T08:42:26Z": {"Dsp1"},
"cgrates.org:*sessions:*string:*opts.*apikey:dps1234": {"Dsp1"},
"cgrates.org:*sessions:*string:*opts.*apikey:dsp9876": {"Dsp1"},
"cgrates.org:*any:*string:*opts.*apikey:dps1234": {"Dsp1"},
"cgrates.org:*any:*string:*opts.*apikey:dsp9876": {"Dsp1"},
"cgrates.org:*any:*string:*req.AnswerTime:2013-11-07T08:42:26Z": {"Dsp1"},
"cgrates.org:*sessions:*string:*opts.*apikey:dps1234": {"Dsp1"},
"cgrates.org:*sessions:*string:*opts.*apikey:dsp9876": {"Dsp1"},
"cgrates.org:*sessions:*string:*req.AnswerTime:2013-11-07T08:42:26Z": {"Dsp1"},
},
BrokenIndexes: map[string][]string{
"cgrates.org:*suffix:*opts.Destination:+100": {"Dsp1"},
BrokenIndexes: map[string][]string{
"cgrates.org:*suffix:*opts.Destination:+100": {"Dsp1"},
"cgrates.org:*string:*req.RequestType:*rated": {"Dsp1"},
},
MissingFilters: map[string][]string{},
@@ -1171,26 +1171,26 @@ func TestHealthIndexDispatchers(t *testing.T) {
},
}
/*
if rply, err := GetFltrIdxHealth(dm,
ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil),
ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil),
ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil),
utils.CacheDispatcherFilterIndexes); err != nil {
t.Error(err)
} else {
sort.Strings(rply.MissingObjects)
sort.Strings(exp.MissingObjects)
if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expected %+v, received %+v", utils.ToJSON(exp), utils.ToJSON(rply))
if rply, err := GetFltrIdxHealth(dm,
ltcache.NewCache(args.FilterCacheLimit, args.FilterCacheTTL, args.FilterCacheStaticTTL, nil),
ltcache.NewCache(args.IndexCacheLimit, args.IndexCacheTTL, args.IndexCacheStaticTTL, nil),
ltcache.NewCache(args.ObjectCacheLimit, args.ObjectCacheTTL, args.ObjectCacheStaticTTL, nil),
utils.CacheDispatcherFilterIndexes); err != nil {
t.Error(err)
} else {
sort.Strings(rply.MissingObjects)
sort.Strings(exp.MissingObjects)
if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expected %+v, received %+v", utils.ToJSON(exp), utils.ToJSON(rply))
}
}
}
*/
*/
//we will use an inexisting Filter(not inline) for the same DispatcherProfile
dspPrf = &DispatcherProfile{
Tenant: "cgrates.org",
ID: "Dsp1",
Tenant: "cgrates.org",
ID: "Dsp1",
Subsystems: []string{utils.MetaAny, utils.MetaSessionS},
FilterIDs: []string{
"*string:~*opts.*apikey:dps1234|dsp9876",
@@ -1203,7 +1203,7 @@ func TestHealthIndexDispatchers(t *testing.T) {
Weight: 20,
Hosts: DispatcherHostProfiles{
{
ID: "ALL",
ID: "ALL",
},
},
}
@@ -1214,15 +1214,15 @@ func TestHealthIndexDispatchers(t *testing.T) {
//get the newIdxHealth for dispatchersProfile
exp = &FilterIHReply{
MissingIndexes: map[string][]string{
"cgrates.org:*any:*string:*opts.*apikey:dps1234": {"Dsp1"},
"cgrates.org:*any:*string:*opts.*apikey:dsp9876": {"Dsp1"},
"cgrates.org:*any:*string:*req.AnswerTime:2013-11-07T08:42:26Z": {"Dsp1"},
"cgrates.org:*sessions:*string:*opts.*apikey:dps1234": {"Dsp1"},
"cgrates.org:*sessions:*string:*opts.*apikey:dsp9876": {"Dsp1"},
"cgrates.org:*any:*string:*opts.*apikey:dps1234": {"Dsp1"},
"cgrates.org:*any:*string:*opts.*apikey:dsp9876": {"Dsp1"},
"cgrates.org:*any:*string:*req.AnswerTime:2013-11-07T08:42:26Z": {"Dsp1"},
"cgrates.org:*sessions:*string:*opts.*apikey:dps1234": {"Dsp1"},
"cgrates.org:*sessions:*string:*opts.*apikey:dsp9876": {"Dsp1"},
"cgrates.org:*sessions:*string:*req.AnswerTime:2013-11-07T08:42:26Z": {"Dsp1"},
},
BrokenIndexes: map[string][]string{
"cgrates.org:*suffix:*opts.Destination:+100": {"Dsp1"},
BrokenIndexes: map[string][]string{
"cgrates.org:*suffix:*opts.Destination:+100": {"Dsp1"},
"cgrates.org:*string:*req.RequestType:*rated": {"Dsp1"},
},
MissingFilters: map[string][]string{
@@ -1278,8 +1278,8 @@ func TestIndexHealthMultipleProfiles(t *testing.T) {
"*string:~*req.Account:1234",
"FLTR_1_NOT_EXIST2",
},
RunID: "*default",
Weight: 10,
RunID: "*default",
Weight: 10,
}
chPrf3 := &ChargerProfile{
Tenant: "cgrates.org",
@@ -1309,14 +1309,14 @@ func TestIndexHealthMultipleProfiles(t *testing.T) {
exp := &FilterIHReply{
MissingIndexes: map[string][]string{
"cgrates.org:*string:*opts.*eventType:ChargerAccountUpdate": {"Raw", "Default"},
"cgrates.org:*string:*req.Account:1234": {"Raw", "Default", "Call_Attr1"},
"cgrates.org:*prefix:*req.Destination:+2234": {"Default"},
"cgrates.org:*suffix:*req.Usage:10":{"Default"},
"cgrates.org:*string:*req.Account:1234": {"Raw", "Default", "Call_Attr1"},
"cgrates.org:*prefix:*req.Destination:+2234": {"Default"},
"cgrates.org:*suffix:*req.Usage:10": {"Default"},
},
BrokenIndexes: map[string][]string{},
BrokenIndexes: map[string][]string{},
MissingFilters: map[string][]string{
"cgrates.org:FLTR_1_NOT_EXIST2": {"Default", "Call_Attr1"},
"cgrates.org:FLTR_1_NOT_EXIST": {"Call_Attr1"},
"cgrates.org:FLTR_1_NOT_EXIST": {"Call_Attr1"},
},
MissingObjects: []string{},
}
@@ -1369,12 +1369,12 @@ func TestIndexHealthReverseChecking(t *testing.T) {
// reverse indexes for charger
exp := map[string]*ReverseFilterIHReply{
utils.CacheChargerFilterIndexes: {
MissingFilters: map[string][]string{
"cgrates.org:FLTR_1": {"Raw"},
utils.CacheChargerFilterIndexes: {
MissingFilters: map[string][]string{
"cgrates.org:FLTR_1": {"Raw"},
"cgrates.org:FLTR_2": {"Raw"},
},
BrokenReverseIndexes: map[string][]string{},
BrokenReverseIndexes: map[string][]string{},
MissingReverseIndexes: map[string][]string{},
},
}
@@ -1390,8 +1390,8 @@ func TestIndexHealthReverseChecking(t *testing.T) {
// set reverse indexes for Raw that is already set and 2 that does not exist
indexes := map[string]utils.StringSet{
utils.CacheChargerFilterIndexes: {
"Raw": {},
"Default": {},
"Raw": {},
"Default": {},
"Call_Attr1": {},
},
}
@@ -1400,7 +1400,6 @@ func TestIndexHealthReverseChecking(t *testing.T) {
t.Error(err)
}
// reverse indexes for charger with the changes
exp = map[string]*ReverseFilterIHReply{
utils.CacheChargerFilterIndexes: {
@@ -1408,9 +1407,9 @@ func TestIndexHealthReverseChecking(t *testing.T) {
"cgrates.org:FLTR_1": {"Raw"},
"cgrates.org:FLTR_2": {"Raw"},
},
BrokenReverseIndexes: map[string][]string{},
BrokenReverseIndexes: map[string][]string{},
MissingReverseIndexes: map[string][]string{},
MissingObjects: []string{"cgrates.org:Default","cgrates.org:Call_Attr1" },
MissingObjects: []string{"cgrates.org:Default", "cgrates.org:Call_Attr1"},
},
}
if rply, err := GetRevFltrIdxHealth(dm,
@@ -1429,8 +1428,8 @@ func TestIndexHealthReverseChecking(t *testing.T) {
// reverse for a filter present in PROFILE but does not exist for the same indexes
indexes = map[string]utils.StringSet{
utils.CacheChargerFilterIndexes: {
"Raw": {},
"Default": {},
"Raw": {},
"Default": {},
"Call_Attr1": {},
},
}
@@ -1450,7 +1449,7 @@ func TestIndexHealthReverseChecking(t *testing.T) {
"cgrates.org:Raw": {"FLTR_NOT_IN_PROFILE"},
},
MissingReverseIndexes: map[string][]string{},
MissingObjects: []string{"cgrates.org:Default","cgrates.org:Call_Attr1"},
MissingObjects: []string{"cgrates.org:Default", "cgrates.org:Call_Attr1"},
},
}
if rply, err := GetRevFltrIdxHealth(dm,
@@ -1540,7 +1539,7 @@ func TestIndexHealthMissingReverseIndexes(t *testing.T) {
exp := map[string]*ReverseFilterIHReply{
utils.CacheChargerFilterIndexes: {
MissingFilters: map[string][]string{},
MissingFilters: map[string][]string{},
BrokenReverseIndexes: map[string][]string{},
MissingReverseIndexes: map[string][]string{
"cgrates.org:Raw": {"FLTR_1", "FLTR_3"},
@@ -1558,18 +1557,18 @@ func TestIndexHealthMissingReverseIndexes(t *testing.T) {
}
}
/*
if err := dm.SetFilter(filter1, true); err != nil {
t.Error(err)
}
if err := dm.SetFilter(filter2, true); err != nil {
t.Error(err)
}
if err := dm.SetFilter(filter3, true); err != nil {
t.Error(err)
}
if err := dm.SetFilter(filter1, true); err != nil {
t.Error(err)
}
if err := dm.SetFilter(filter2, true); err != nil {
t.Error(err)
}
if err := dm.SetFilter(filter3, true); err != nil {
t.Error(err)
}
*/
//Cache.Clear(nil)
*/
//Cache.Clear(nil)
// we will set this multiple chargers but without indexing(same and different indexes)
chPrf1 = &ChargerProfile{
Tenant: "cgrates.org",
@@ -1590,7 +1589,7 @@ func TestIndexHealthMissingReverseIndexes(t *testing.T) {
}
exp = map[string]*ReverseFilterIHReply{
utils.CacheChargerFilterIndexes: {
MissingFilters: map[string][]string{},
MissingFilters: map[string][]string{},
BrokenReverseIndexes: map[string][]string{},
MissingReverseIndexes: map[string][]string{
"cgrates.org:Raw": {"FLTR_1", "FLTR_3"}, // check for FLTR_2
@@ -1610,5 +1609,3 @@ func TestIndexHealthMissingReverseIndexes(t *testing.T) {
}
}
}

View File

@@ -1,334 +0,0 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"context"
"encoding/json"
"flag"
"net/http"
"path/filepath"
"reflect"
"testing"
"time"
amqpv1 "github.com/Azure/go-amqp"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
var (
/*
README
run test for poster with following commands:
- sqs
go test -tags=integration -run=TestSQSPoster -sqs
- s3
go test -tags=integration -run=TestS3Poster -s3
- amqpv1
go test -tags=integration -run=TestAMQPv1Poster -amqpv1
also configure the credentials from test function
*/
itTestSQS = flag.Bool("sqs", false, "Run the test for SQSPoster")
itTestS3 = flag.Bool("s3", false, "Run the test for S3Poster")
itTestAMQPv1 = flag.Bool("amqpv1", false, "Run the test for AMQPv1Poster")
)
type TestContent struct {
Var1 string
Var2 string
}
func TestHttpJsonPoster(t *testing.T) {
SetFailedPostCacheTTL(time.Millisecond)
config.CgrConfig().GeneralCfg().FailedPostsDir = "/tmp"
content := &TestContent{Var1: "Val1", Var2: "Val2"}
jsn, _ := json.Marshal(content)
pstr, err := NewHTTPPoster(2*time.Second, "http://localhost:8080/invalid", utils.ContentJSON, 3)
if err != nil {
t.Error(err)
}
if err = pstr.PostValues(jsn, make(http.Header)); err == nil {
t.Error("Expected error")
}
AddFailedPost("http://localhost:8080/invalid", utils.ContentJSON, "test1", jsn, make(map[string]interface{}))
time.Sleep(5 * time.Millisecond)
fs, err := filepath.Glob("/tmp/test1*")
if err != nil {
t.Fatal(err)
} else if len(fs) == 0 {
t.Fatal("Expected at least one file")
}
ev, err := NewExportEventsFromFile(fs[0])
if err != nil {
t.Fatal(err)
} else if len(ev.Events) == 0 {
t.Fatal("Expected at least one event")
}
if !reflect.DeepEqual(jsn, ev.Events[0]) {
t.Errorf("Expecting: %q, received: %q", string(jsn), ev.Events[0])
}
}
func TestHttpBytesPoster(t *testing.T) {
SetFailedPostCacheTTL(time.Millisecond)
config.CgrConfig().GeneralCfg().FailedPostsDir = "/tmp"
content := []byte(`Test
Test2
`)
pstr, err := NewHTTPPoster(2*time.Second, "http://localhost:8080/invalid", utils.ContentText, 3)
if err != nil {
t.Error(err)
}
if err = pstr.PostValues(content, make(http.Header)); err == nil {
t.Error("Expected error")
}
AddFailedPost("http://localhost:8080/invalid", utils.ContentJSON, "test2", content, make(map[string]interface{}))
time.Sleep(5 * time.Millisecond)
fs, err := filepath.Glob("/tmp/test2*")
if err != nil {
t.Fatal(err)
} else if len(fs) == 0 {
t.Fatal("Expected at least one file")
}
ev, err := NewExportEventsFromFile(fs[0])
if err != nil {
t.Fatal(err)
} else if len(ev.Events) == 0 {
t.Fatal("Expected at least one event")
}
if !reflect.DeepEqual(content, ev.Events[0]) {
t.Errorf("Expecting: %q, received: %q", string(content), ev.Events[0])
}
}
func TestSQSPoster(t *testing.T) {
if !*itTestSQS {
return
}
cfg1 := config.NewDefaultCGRConfig()
utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg1.GeneralCfg().NodeID)
utils.Logger.SetLogLevel(7)
//#####################################
// update this variables
endpoint := "https://sqs.us-east-2.amazonaws.com"
region := "us-east-2"
awsKey := "replace-this-with-your-secret-key"
awsSecret := "replace-this-with-your-secret"
qname := "cgrates-cdrs"
opts := map[string]interface{}{
utils.AWSRegion: region,
utils.AWSKey: awsKey,
utils.AWSSecret: awsSecret,
utils.SQSQueueID: qname,
}
//#####################################
body := "testString"
pstr := NewSQSPoster(endpoint, 5, opts)
if err := pstr.Post([]byte(body), ""); err != nil {
t.Fatal(err)
}
var sess *session.Session
cfg := aws.Config{Endpoint: aws.String(endpoint)}
cfg.Region = aws.String(region)
cfg.Credentials = credentials.NewStaticCredentials(awsKey, awsSecret, "")
sess, err = session.NewSessionWithOptions(
session.Options{
Config: cfg,
},
)
// Create a SQS service client.
svc := sqs.New(sess)
resultURL, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String(qname),
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist {
t.Fatalf("Unable to find queue %q.", qname)
}
t.Fatalf("Unable to queue %q, %v.", qname, err)
}
result, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: resultURL.QueueUrl,
MaxNumberOfMessages: aws.Int64(1),
VisibilityTimeout: aws.Int64(30), // 20 seconds
WaitTimeSeconds: aws.Int64(0),
})
if err != nil {
t.Error(err)
return
}
if len(result.Messages) != 1 {
t.Fatalf("Expected 1 message received: %d", len(result.Messages))
}
if result.Messages[0].Body == nil {
t.Fatal("No Msg Body")
}
if *result.Messages[0].Body != body {
t.Errorf("Expected: %q, received: %q", body, *result.Messages[0].Body)
}
}
func TestS3Poster(t *testing.T) {
if !*itTestS3 {
return
}
cfg1 := config.NewDefaultCGRConfig()
utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg1.GeneralCfg().NodeID)
utils.Logger.SetLogLevel(7)
//#####################################
// update this variables
endpoint := "http://s3.us-east-2.amazonaws.com"
region := "us-east-2"
awsKey := "replace-this-with-your-secret-key"
awsSecret := "replace-this-with-your-secret"
qname := "cgrates-cdrs"
opts := map[string]interface{}{
utils.AWSRegion: region,
utils.AWSKey: awsKey,
utils.AWSSecret: awsSecret,
utils.S3Bucket: qname,
}
//#####################################
body := "testString"
key := "key1234"
pstr := NewS3Poster(endpoint, 5, opts)
if err := pstr.Post([]byte(body), key); err != nil {
t.Fatal(err)
}
key += ".json"
var sess *session.Session
cfg := aws.Config{Endpoint: aws.String(endpoint)}
cfg.Region = aws.String(region)
cfg.Credentials = credentials.NewStaticCredentials(awsKey, awsSecret, "")
sess, err = session.NewSessionWithOptions(
session.Options{
Config: cfg,
},
)
s31 := s3.New(sess)
s31.DeleteObject(&s3.DeleteObjectInput{})
file := aws.NewWriteAtBuffer([]byte{})
// Create a SQS service client.
svc := s3manager.NewDownloader(sess)
if _, err = svc.Download(file,
&s3.GetObjectInput{
Bucket: aws.String(qname),
Key: aws.String(key),
}); err != nil {
t.Fatalf("Unable to download item %v", err)
}
if rply := string(file.Bytes()); rply != body {
t.Errorf("Expected: %q, received: %q", body, rply)
}
}
func TestAMQPv1Poster(t *testing.T) {
if !*itTestAMQPv1 {
return
}
cfg1 := config.NewDefaultCGRConfig()
utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg1.GeneralCfg().NodeID)
utils.Logger.SetLogLevel(7)
//#####################################
// update this variables
endpoint := "amqps://RootManageSharedAccessKey:UlfIJ%2But11L0ZzA%2Fgpje8biFJeQihpWibJsUhaOi1DU%3D@cdrscgrates.servicebus.windows.net"
qname := "cgrates-cdrs"
opts := map[string]interface{}{
utils.AMQPQueueID: qname,
}
//#####################################
body := "testString"
pstr := NewAMQPv1Poster(endpoint, 5, opts)
if err := pstr.Post([]byte(body), ""); err != nil {
t.Fatal(err)
}
// Create client
client, err := amqpv1.Dial(endpoint)
if err != nil {
t.Fatal("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession()
if err != nil {
t.Fatal("Creating AMQP session:", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// Create a receiver
receiver, err := session.NewReceiver(
amqpv1.LinkSourceAddress("/" + qname),
)
if err != nil {
t.Fatal("Creating receiver link:", err)
}
defer func() {
ctx, cancel := context.WithTimeout(ctx, time.Second)
receiver.Close(ctx)
cancel()
}()
// Receive next message
msg, err := receiver.Receive(ctx)
cancel()
if err != nil {
t.Fatal("Reading message from AMQP:", err)
}
// Accept message
msg.Accept(ctx)
if rply := string(msg.GetData()); rply != body {
t.Errorf("Expected: %q, received: %q", body, rply)
}
}