mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
506 lines
14 KiB
Go
506 lines
14 KiB
Go
/*
|
|
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
|
Copyright (C) ITsysCOM GmbH
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>
|
|
*/
|
|
|
|
package ers
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cgrates/birpc/context"
|
|
"github.com/cgrates/cgrates/agents"
|
|
"github.com/cgrates/cgrates/config"
|
|
"github.com/cgrates/cgrates/engine"
|
|
"github.com/cgrates/cgrates/utils"
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
)
|
|
|
|
// AMQPER implements EventReader interface for AMQP messaging.
|
|
type AMQPER struct {
|
|
cgrCfg *config.CGRConfig
|
|
cfgIdx int // index of current config instance within ERsCfg.Readers
|
|
fltrS *engine.FilterS
|
|
|
|
eventChan chan *erEvent // channel to dispatch the events created
|
|
partialEvChan chan *erEvent // channel to dispatch the partial events created
|
|
errChan chan error
|
|
|
|
client *amqpClient // AMQP client for managing connections and subscriptions.
|
|
}
|
|
|
|
type amqpClient struct {
|
|
exchange string
|
|
exchangeType string
|
|
routingKey string
|
|
queueID string
|
|
consumer string
|
|
|
|
connection *amqp.Connection
|
|
channel *amqp.Channel
|
|
available bool // indicates if the AMQP channel has been established and is ready for use.
|
|
done chan struct{} // done signals the shutdown of the AMQP connection.
|
|
mu sync.RWMutex
|
|
notifyConnClose chan *amqp.Error
|
|
notifyChanClose chan *amqp.Error
|
|
|
|
// prefetchCount defines the maximum number of messages that the server will
|
|
// deliver to the consumer without waiting for acknowledgements. It's used to
|
|
// control the QoS settings for the AMQP channel.
|
|
prefetchCount int
|
|
}
|
|
|
|
// NewAMQPER returns a new AMQP EventReader with the provided configurations.
|
|
func NewAMQPER(cfg *config.CGRConfig, cfgIdx int, eventChan, partialEvChan chan *erEvent, errChan chan error,
|
|
fltrS *engine.FilterS, exitChan chan struct{}) (EventReader, error) {
|
|
rdr := &AMQPER{
|
|
cgrCfg: cfg,
|
|
cfgIdx: cfgIdx,
|
|
fltrS: fltrS,
|
|
eventChan: eventChan,
|
|
partialEvChan: partialEvChan,
|
|
errChan: errChan,
|
|
}
|
|
rdr.createClient(rdr.Config().Opts.AMQP, exitChan)
|
|
return rdr, nil
|
|
}
|
|
|
|
// Config returns the curent configuration
|
|
func (rdr *AMQPER) Config() *config.EventReaderCfg {
|
|
return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx]
|
|
}
|
|
|
|
// createClient initializes the AMQP client with the necessary configurations.
|
|
func (rdr *AMQPER) createClient(opts *config.AMQPROpts, exitChan chan struct{}) {
|
|
rdrCfg := rdr.Config()
|
|
rdr.client = &amqpClient{
|
|
prefetchCount: rdrCfg.ConcurrentReqs,
|
|
done: exitChan,
|
|
}
|
|
if opts != nil {
|
|
if opts.QueueID != nil {
|
|
rdr.client.queueID = *opts.QueueID
|
|
}
|
|
if opts.ConsumerTag != nil {
|
|
rdr.client.consumer = *opts.ConsumerTag
|
|
}
|
|
if opts.RoutingKey != nil {
|
|
rdr.client.routingKey = *opts.RoutingKey
|
|
}
|
|
if opts.Exchange != nil {
|
|
rdr.client.exchange = *opts.Exchange
|
|
}
|
|
if opts.ExchangeType != nil {
|
|
rdr.client.exchangeType = *opts.ExchangeType
|
|
}
|
|
}
|
|
go rdr.client.handleReconnect(rdrCfg.SourcePath, rdrCfg.ID,
|
|
rdrCfg.Reconnects, rdrCfg.MaxReconnectInterval)
|
|
}
|
|
|
|
func (rdr *AMQPER) processMessage(msg []byte) error {
|
|
var decodedMessage map[string]any
|
|
err := json.Unmarshal(msg, &decodedMessage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaReaderID: utils.NewLeafNode(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].ID)}}
|
|
|
|
agReq := agents.NewAgentRequest(
|
|
utils.MapStorage(decodedMessage), reqVars,
|
|
nil, nil, nil, rdr.Config().Tenant,
|
|
rdr.cgrCfg.GeneralCfg().DefaultTenant,
|
|
utils.FirstNonEmpty(rdr.Config().Timezone,
|
|
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
|
|
rdr.fltrS, nil) // create an AgentRequest
|
|
pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
|
|
agReq)
|
|
if err != nil || !pass {
|
|
return err
|
|
}
|
|
if err = agReq.SetFields(rdr.Config().Fields); err != nil {
|
|
return err
|
|
}
|
|
cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts)
|
|
rdrEv := rdr.eventChan
|
|
if _, isPartial := cgrEv.APIOpts[utils.PartialOpt]; isPartial {
|
|
rdrEv = rdr.partialEvChan
|
|
}
|
|
rdrEv <- &erEvent{
|
|
cgrEvent: cgrEv,
|
|
rdrCfg: rdr.Config(),
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Serve starts the goroutine needed to monitor and process delieveries coming from the AMQP queue.
|
|
func (rdr *AMQPER) Serve() error {
|
|
rdrCfg := rdr.Config()
|
|
if rdrCfg.RunDelay == time.Duration(0) {
|
|
return nil
|
|
}
|
|
|
|
// Wait for the client to be ready
|
|
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
fib := utils.FibDuration(time.Millisecond, 0)
|
|
for !rdr.client.isAvailable() {
|
|
select {
|
|
case <-ctx.Done():
|
|
return fmt.Errorf("client not ready to start consuming, error: %w", ctx.Err())
|
|
default:
|
|
time.Sleep(fib())
|
|
}
|
|
}
|
|
|
|
// Start consuming messages from the queue.
|
|
deliveries, err := rdr.client.Consume()
|
|
if err != nil {
|
|
return fmt.Errorf("could not start consuming: %w", err)
|
|
}
|
|
|
|
// Log the setup complete message here
|
|
utils.Logger.Info(fmt.Sprintf(
|
|
"<%s> Reader '%s' - AMQP channel setup complete",
|
|
utils.ERs, rdr.Config().ID))
|
|
|
|
// This channel will receive a notification when a channel closed event
|
|
// happens. This must be different from Client.notifyChanClose because the
|
|
// library sends only one notification and Client.notifyChanClose already has
|
|
// a receiver in handleReconnect().
|
|
chClosedCh := make(chan *amqp.Error, 1) // Buffered to avoid deadlocks.
|
|
rdr.client.channel.NotifyClose(chClosedCh)
|
|
|
|
go rdr.monitorAndProcess(deliveries, chClosedCh)
|
|
|
|
return nil
|
|
}
|
|
|
|
// monitorAndProcess manages the message processing loop for AMQP events.
|
|
// It handles reconnection logic in case the AMQP channel closes unexpectedly.
|
|
func (rdr *AMQPER) monitorAndProcess(deliveries <-chan amqp.Delivery, chClosedCh chan *amqp.Error) {
|
|
if rdr.Config().StartDelay > 0 {
|
|
select {
|
|
case <-time.After(rdr.Config().StartDelay):
|
|
case <-rdr.client.done:
|
|
rdr.close()
|
|
return
|
|
}
|
|
}
|
|
// Initialize a Fibonacci backoff strategy to progressively wait longer
|
|
// between reconnection attempts, avoiding unnecessary load.
|
|
fib := utils.FibDuration(time.Second, rdr.Config().MaxReconnectInterval)
|
|
|
|
for {
|
|
select {
|
|
case <-rdr.client.done:
|
|
rdr.close()
|
|
return
|
|
case amqErr := <-chClosedCh:
|
|
|
|
// This case handles the event of closed channel (e.g. abnormal shutdown). The if
|
|
// condition is there to make sure it is logged only the first time.
|
|
if amqErr != nil {
|
|
utils.Logger.Warning(fmt.Sprintf(
|
|
"<%s> Reader '%s', AMQP Channel closed due to error: %v",
|
|
utils.ERs, rdr.Config().ID, amqErr))
|
|
}
|
|
|
|
// Attempt to re-establish the delivery channel to continue receiving messages.
|
|
var err error
|
|
deliveries, err = rdr.client.Consume()
|
|
if err != nil {
|
|
|
|
// If the AMQP channel is not ready, it will continue the loop. Next
|
|
// iteration will enter this case because chClosedCh is closed by the
|
|
// library.
|
|
utils.Logger.Warning(fmt.Sprintf(
|
|
"<%s> Reader %s, failed to init deliveries channel, will retry. Error: %v",
|
|
utils.ERs, rdr.Config().ID, err))
|
|
|
|
// Wait for either the backoff duration or a done signal, making sure
|
|
// to gracefully shutdown when the client is done.
|
|
select {
|
|
case <-rdr.client.done:
|
|
rdr.close()
|
|
return
|
|
case <-time.After(fib()):
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Successfully reconnected; reset the Fibonacci backoff counter.
|
|
fib = utils.FibDuration(time.Second, rdr.Config().MaxReconnectInterval)
|
|
|
|
// Re-set channel to receive notifications.
|
|
// The library closes this channel after abnormal shutdown.
|
|
chClosedCh = make(chan *amqp.Error, 1)
|
|
rdr.client.channel.NotifyClose(chClosedCh)
|
|
|
|
case delivery, ok := <-deliveries:
|
|
if !ok {
|
|
|
|
// If the deliveries channel is closed, reset it to nil to stop processing
|
|
// until it is reinitialized.
|
|
deliveries = nil
|
|
continue
|
|
}
|
|
go rdr.handleDelivery(delivery)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleDelivery processes a single message delivery.
|
|
func (rdr *AMQPER) handleDelivery(dlv amqp.Delivery) {
|
|
err := rdr.processMessage(dlv.Body)
|
|
if err != nil {
|
|
utils.Logger.Warning(fmt.Sprintf(
|
|
"<%s> Reader %s, processing message %s error: %v",
|
|
utils.ERs, rdr.Config().ID, dlv.MessageId, err))
|
|
|
|
err = dlv.Reject(false)
|
|
if err != nil {
|
|
utils.Logger.Warning(fmt.Sprintf(
|
|
"<%s> Reader %s, error negatively acknowledging message %s: %v",
|
|
utils.ERs, rdr.Config().ID, dlv.MessageId, err))
|
|
}
|
|
return
|
|
}
|
|
|
|
err = dlv.Ack(false)
|
|
if err != nil {
|
|
utils.Logger.Warning(fmt.Sprintf(
|
|
"<%s> Reader %s, error acknowledging message %s: %v",
|
|
utils.ERs, rdr.Config().ID, dlv.MessageId, err))
|
|
}
|
|
}
|
|
|
|
func (rdr *AMQPER) close() error {
|
|
utils.Logger.Info(fmt.Sprintf(
|
|
"<%s> Reader %s, stop monitoring amqp queue <%s>", utils.ERs, rdr.Config().ID, rdr.Config().SourcePath))
|
|
return rdr.client.Close()
|
|
}
|
|
|
|
// Close will cleanly shut down the channel and connection.
|
|
func (client *amqpClient) Close() error {
|
|
if !client.isAvailable() {
|
|
return errors.New("already closed: not connected to the server")
|
|
}
|
|
|
|
var err error
|
|
if client.channel != nil {
|
|
if err = client.channel.Close(); err != nil && !errors.Is(err, amqp.ErrClosed) {
|
|
return fmt.Errorf("failed to close AMQP client channel: %w", err)
|
|
}
|
|
}
|
|
if client.connection != nil {
|
|
if err = client.connection.Close(); err != nil && !errors.Is(err, amqp.ErrClosed) {
|
|
return fmt.Errorf("failed to close AMQP client connection: %w", err)
|
|
}
|
|
}
|
|
client.mu.Lock()
|
|
client.available = false
|
|
client.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// handleReconnect will wait for a connection error on
|
|
// notifyConnClose, and then continuously attempt to reconnect.
|
|
func (client *amqpClient) handleReconnect(addr, readerID string, maxRetries int,
|
|
maxReconnectInterval time.Duration) {
|
|
|
|
fib := utils.FibDuration(time.Second, maxReconnectInterval)
|
|
retryCount := 0
|
|
|
|
for retryCount < maxRetries || maxRetries == -1 { // if maxRetries is -1, retry indefinitely
|
|
client.mu.Lock()
|
|
client.available = false
|
|
client.mu.Unlock()
|
|
|
|
// Establish an AMQP connection.
|
|
conn, err := amqp.Dial(addr)
|
|
if err != nil {
|
|
reconnectDelay := fib()
|
|
retryCount++
|
|
utils.Logger.Warning(fmt.Sprintf(
|
|
"<%s> Reader %s, failed to connect to AMQP server '%s', will retry. Error: %v",
|
|
utils.ERs, readerID, addr, err))
|
|
|
|
select {
|
|
case <-client.done:
|
|
return
|
|
case <-time.After(reconnectDelay):
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Take a new connection to the queue, and update
|
|
// the close listener to reflect this.
|
|
client.connection = conn
|
|
client.notifyConnClose = make(chan *amqp.Error, 1)
|
|
client.connection.NotifyClose(client.notifyConnClose)
|
|
|
|
// Reset the fibonacci sequence and retry count after a successful connection.
|
|
fib = utils.FibDuration(time.Second, maxReconnectInterval)
|
|
retryCount = 0
|
|
|
|
if done := client.handleReInit(conn, readerID, maxRetries, maxReconnectInterval); done {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleReconnect will wait for a channel error
|
|
// and then continuously attempt to re-initialize both channels
|
|
func (client *amqpClient) handleReInit(conn *amqp.Connection, readerID string, maxRetries int, maxReInitInterval time.Duration) bool {
|
|
fib := utils.FibDuration(time.Second, maxReInitInterval)
|
|
retryCount := 0
|
|
for retryCount < maxRetries || maxRetries == -1 { // if maxRetries is -1, retry indefinitely
|
|
client.mu.Lock()
|
|
client.available = false
|
|
client.mu.Unlock()
|
|
err := client.init(conn)
|
|
if err != nil {
|
|
reInitDelay := fib()
|
|
retryCount++
|
|
utils.Logger.Warning(fmt.Sprintf(
|
|
"<%s> Reader %s, channel init failed, will retry. Error: %v",
|
|
utils.ERs, readerID, err))
|
|
|
|
select {
|
|
case <-client.done:
|
|
return true
|
|
case <-client.notifyConnClose:
|
|
utils.Logger.Warning(fmt.Sprintf(
|
|
"<%s> Reader %s, connection closed, will attempt reconnect. Error: %v",
|
|
utils.ERs, readerID, err))
|
|
return false
|
|
case <-time.After(reInitDelay):
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Reset the fibonacci sequence and retry count after a successful init.
|
|
fib = utils.FibDuration(time.Second, maxReInitInterval)
|
|
retryCount = 0
|
|
|
|
select {
|
|
case <-client.done:
|
|
return true
|
|
case err := <-client.notifyConnClose:
|
|
utils.Logger.Warning(fmt.Sprintf(
|
|
"<%s> Reader %s, connection closed, will atempt to reconnect. Error: %v",
|
|
utils.ERs, readerID, err))
|
|
return false
|
|
case err := <-client.notifyChanClose:
|
|
utils.Logger.Warning(fmt.Sprintf(
|
|
"<%s> Reader %s, channel closed, will attempt re-init. Error: %v",
|
|
utils.ERs, readerID, err))
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// init will initialize channel & declare queue
|
|
func (client *amqpClient) init(conn *amqp.Connection) error {
|
|
ch, err := conn.Channel()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open a server channel: %w", err)
|
|
}
|
|
|
|
err = ch.ExchangeDeclare(
|
|
client.exchange, // name
|
|
client.exchangeType, // type
|
|
true, // durable
|
|
false, // auto-deleted
|
|
false, // internal
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to declare exchange: %w", err)
|
|
}
|
|
|
|
_, err = ch.QueueDeclare(
|
|
client.queueID,
|
|
true,
|
|
false,
|
|
false,
|
|
false,
|
|
nil,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to declare queue: %w", err)
|
|
}
|
|
|
|
err = ch.QueueBind(
|
|
client.queueID, // queue name
|
|
client.routingKey, // routing key
|
|
client.exchange, // exchange
|
|
false,
|
|
nil,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to bind the queue to the exchange: %w", err)
|
|
}
|
|
|
|
// Take a new channel to the queue, and update
|
|
// the channel listener to reflect this.
|
|
client.channel = ch
|
|
client.notifyChanClose = make(chan *amqp.Error, 1)
|
|
client.channel.NotifyClose(client.notifyChanClose)
|
|
|
|
client.mu.Lock()
|
|
client.available = true
|
|
client.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// Consume will continuously put queue items on the channel.
|
|
func (client *amqpClient) Consume() (<-chan amqp.Delivery, error) {
|
|
if !client.isAvailable() {
|
|
return nil, utils.ErrDisconnected
|
|
}
|
|
if err := client.channel.Qos(
|
|
client.prefetchCount,
|
|
0,
|
|
false,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return client.channel.Consume(
|
|
client.queueID,
|
|
client.consumer,
|
|
false,
|
|
false,
|
|
false,
|
|
false,
|
|
nil,
|
|
)
|
|
}
|
|
|
|
func (client *amqpClient) isAvailable() bool {
|
|
client.mu.RLock()
|
|
defer client.mu.RUnlock()
|
|
return client.available
|
|
}
|