Support for queue_id in AMQPPoster

This commit is contained in:
DanB
2017-02-03 14:29:53 +01:00
parent d4493a3dc9
commit 7a6c68ded5
2 changed files with 26 additions and 8 deletions

View File

@@ -506,8 +506,12 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error {
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
_, err = self.httpPoster.Post(rplCfg.Address, utils.PosterTransportContentTypes[rplCfg.Transport], body, rplCfg.Attempts, fallbackPath)
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
_, err = utils.AMQPPostersCache.GetAMQPPoster(rplCfg.Address, "cgrates_cdrs", rplCfg.Attempts, self.cgrCfg.FailedPostsDir).Post(
nil, utils.PosterTransportContentTypes[rplCfg.Transport], body.([]byte), rplCfg.FallbackFileName())
var amqpPoster *utils.AMQPPoster
amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(rplCfg.Address, rplCfg.Attempts, self.cgrCfg.FailedPostsDir)
if err == nil { // error will be checked bellow
_, err = amqpPoster.Post(
nil, utils.PosterTransportContentTypes[rplCfg.Transport], body.([]byte), rplCfg.FallbackFileName())
}
default:
utils.Logger.Warning(fmt.Sprintf("<CDRReplicator> Unsupported replication transport: %s", rplCfg.Transport))
return

View File

@@ -216,20 +216,34 @@ type AMQPCachedPosters struct {
// GetAMQPPoster creates a new poster only if not already cached
// uses dialURL as cache key
func (pc *AMQPCachedPosters) GetAMQPPoster(dialURL, posterQueueID string, attempts int, fallbackFileDir string) (amqpPoster *AMQPPoster) {
func (pc *AMQPCachedPosters) GetAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (amqpPoster *AMQPPoster, err error) {
pc.Lock()
defer pc.Unlock()
var hasIt bool
if _, hasIt = pc.cache[dialURL]; !hasIt {
pc.cache[dialURL] = NewAMQPPoster(dialURL, posterQueueID, attempts, fallbackFileDir)
if pstr, err := NewAMQPPoster(dialURL, attempts, fallbackFileDir); err != nil {
return nil, err
} else {
pc.cache[dialURL] = pstr
}
}
return pc.cache[dialURL]
return pc.cache[dialURL], nil
}
// dialURL = fmt.Sprintf("amqp:/%s:%s@%s/", user, passwd, addr)
func NewAMQPPoster(dialURL, posterQueueID string, attempts int, fallbackFileDir string) *AMQPPoster {
// "amqp://guest:guest@localhost:5672/?queueID=cgr_cdrs"
func NewAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (*AMQPPoster, error) {
u, err := url.Parse(dialURL)
if err != nil {
return nil, err
}
qry := u.Query()
posterQueueID := "cgrates_cdrs"
if vals, has := qry["queue_id"]; has && len(vals) != 0 {
posterQueueID = vals[0]
}
dialURL = strings.Split(dialURL, "?")[0] // Take query params out of dialURL
return &AMQPPoster{dialURL: dialURL, posterQueueID: posterQueueID,
attempts: attempts, fallbackFileDir: fallbackFileDir}
attempts: attempts, fallbackFileDir: fallbackFileDir}, nil
}
type AMQPPoster struct {