diff --git a/engine/cdrs.go b/engine/cdrs.go index 6f7303794..07cc08fca 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -506,8 +506,12 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error { case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST: _, err = self.httpPoster.Post(rplCfg.Address, utils.PosterTransportContentTypes[rplCfg.Transport], body, rplCfg.Attempts, fallbackPath) case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap: - _, err = utils.AMQPPostersCache.GetAMQPPoster(rplCfg.Address, "cgrates_cdrs", rplCfg.Attempts, self.cgrCfg.FailedPostsDir).Post( - nil, utils.PosterTransportContentTypes[rplCfg.Transport], body.([]byte), rplCfg.FallbackFileName()) + var amqpPoster *utils.AMQPPoster + amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(rplCfg.Address, rplCfg.Attempts, self.cgrCfg.FailedPostsDir) + if err == nil { // error will be checked bellow + _, err = amqpPoster.Post( + nil, utils.PosterTransportContentTypes[rplCfg.Transport], body.([]byte), rplCfg.FallbackFileName()) + } default: utils.Logger.Warning(fmt.Sprintf(" Unsupported replication transport: %s", rplCfg.Transport)) return diff --git a/utils/poster.go b/utils/poster.go index 3c6b1224f..caf7b2840 100644 --- a/utils/poster.go +++ b/utils/poster.go @@ -216,20 +216,34 @@ type AMQPCachedPosters struct { // GetAMQPPoster creates a new poster only if not already cached // uses dialURL as cache key -func (pc *AMQPCachedPosters) GetAMQPPoster(dialURL, posterQueueID string, attempts int, fallbackFileDir string) (amqpPoster *AMQPPoster) { +func (pc *AMQPCachedPosters) GetAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (amqpPoster *AMQPPoster, err error) { pc.Lock() defer pc.Unlock() var hasIt bool if _, hasIt = pc.cache[dialURL]; !hasIt { - pc.cache[dialURL] = NewAMQPPoster(dialURL, posterQueueID, attempts, fallbackFileDir) + if pstr, err := NewAMQPPoster(dialURL, attempts, fallbackFileDir); err != nil { + return nil, err + } else { + pc.cache[dialURL] = pstr + } } - return pc.cache[dialURL] + return pc.cache[dialURL], nil } -// dialURL = fmt.Sprintf("amqp:/%s:%s@%s/", user, passwd, addr) -func NewAMQPPoster(dialURL, posterQueueID string, attempts int, fallbackFileDir string) *AMQPPoster { +// "amqp://guest:guest@localhost:5672/?queueID=cgr_cdrs" +func NewAMQPPoster(dialURL string, attempts int, fallbackFileDir string) (*AMQPPoster, error) { + u, err := url.Parse(dialURL) + if err != nil { + return nil, err + } + qry := u.Query() + posterQueueID := "cgrates_cdrs" + if vals, has := qry["queue_id"]; has && len(vals) != 0 { + posterQueueID = vals[0] + } + dialURL = strings.Split(dialURL, "?")[0] // Take query params out of dialURL return &AMQPPoster{dialURL: dialURL, posterQueueID: posterQueueID, - attempts: attempts, fallbackFileDir: fallbackFileDir} + attempts: attempts, fallbackFileDir: fallbackFileDir}, nil } type AMQPPoster struct {