add diameter connection status tracking

This commit is contained in:
ionutboangiu
2025-11-12 18:11:15 +02:00
committed by Dan Christian Bogos
parent 910f49fa6c
commit fbe5b51046
4 changed files with 342 additions and 21 deletions

View File

@@ -0,0 +1,211 @@
//go:build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"fmt"
"io"
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func TestDiamConnStats(t *testing.T) {
switch *utils.DBType {
case utils.MetaInternal:
case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("unsupported dbtype value")
}
ng := engine.TestEngine{
ConfigJSON: `{
"general": {
"log_level": 7
},
"admins": {
"enabled": true
},
// "prometheus_agent": {
// "enabled": true,
// "stats_conns": ["*internal"],
// "stat_queue_ids": [
// "SQ_CONN_1",
// "SQ_CONN_2",
// "SQ_CONN_3"
// ]
// },
"stats": {
"enabled": true,
"store_interval": "-1",
"string_indexed_fields": ["*req.OriginHost"]
},
"thresholds": {
"enabled": true,
"store_interval": "-1"
},
"sessions": {
"enabled": true
},
"diameter_agent": {
"enabled": true,
"stats_conns": ["*localhost"],
// "thresholds_conns": ["*localhost"]
}
}`,
DBCfg: engine.InternalDBCfg,
Encoding: *utils.Encoding,
// LogBuffer: &bytes.Buffer{},
GracefulShutdown: true,
}
// defer fmt.Println(ng.LogBuffer)
client, cfg := ng.Run(t)
setSQProfile := func(id, originHost, originRealm string, ttl time.Duration) {
t.Helper()
var reply string
if err := client.Call(context.Background(), utils.AdminSv1SetStatQueueProfile,
engine.StatQueueProfileWithAPIOpts{
StatQueueProfile: &engine.StatQueueProfile{
Tenant: "cgrates.org",
ID: id,
FilterIDs: []string{
"*string:~*opts.*eventType:ConnectionStatusReport",
fmt.Sprintf("*string:~*req.OriginHost:%s", originHost),
fmt.Sprintf("*string:~*req.OriginRealm:%s", originRealm),
},
QueueLength: -1,
TTL: ttl,
Metrics: []*engine.MetricWithFilters{
{
MetricID: "*sum#~*req.ConnectionStatus",
},
},
Stored: true,
MinItems: 1,
},
}, &reply); err != nil {
t.Fatal(err)
}
}
_ = func(id string) {
t.Helper()
var reply string
if err := client.Call(context.Background(), utils.AdminSv1SetThresholdProfile,
engine.ThresholdProfileWithAPIOpts{
ThresholdProfile: &engine.ThresholdProfile{
Tenant: "cgrates.org",
ID: id,
FilterIDs: []string{"*string:~*opts.*eventType:ConnectionStatusReport"},
MaxHits: -1,
MinHits: 8,
MinSleep: time.Second,
},
}, &reply); err != nil {
t.Fatal(err)
}
}
initDiamConn := func(originHost, originRealm string) io.Closer {
t.Helper()
client, err := NewDiameterClient(cfg.DiameterAgentCfg().Listen,
originHost, originRealm, cfg.DiameterAgentCfg().VendorID,
cfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision,
cfg.DiameterAgentCfg().DictionariesPath,
cfg.DiameterAgentCfg().ListenNet)
if err != nil {
t.Fatal(err)
}
// TODO: Remove after updating go-diameter dependency.
time.Sleep(10 * time.Millisecond)
return client
}
checkConnStatusMetric := func(sqID string, want float64) {
t.Helper()
var metrics map[string]float64
err := client.Call(context.Background(), utils.StatSv1GetQueueFloatMetrics,
&utils.TenantIDWithAPIOpts{
TenantID: &utils.TenantID{
Tenant: "cgrates.org",
ID: sqID,
},
}, &metrics)
if err != nil {
t.Error(err)
}
metricID := "*sum#~*req.ConnectionStatus"
got, ok := metrics[metricID]
if !ok {
t.Errorf("could not find metric %q", metricID)
}
if got != want {
t.Errorf("%q metric value = %.0f, want %.0f", metricID, got, want)
}
}
setSQProfile("SQ_CONN_1", "host1", "realm1", -1)
setSQProfile("SQ_CONN_2", "host2", "realm1", -1)
setSQProfile("SQ_CONN_3", "host3", "realm2", -1)
// no connections have been established yet, expect -1
checkConnStatusMetric("SQ_CONN_1", -1)
checkConnStatusMetric("SQ_CONN_2", -1)
checkConnStatusMetric("SQ_CONN_3", -1)
// scrapePromURL(t)
// connections have been established, expect 1
connHost1 := initDiamConn("host1", "realm1")
connHost2 := initDiamConn("host2", "realm1")
connHost3 := initDiamConn("host3", "realm2")
checkConnStatusMetric("SQ_CONN_1", 1)
checkConnStatusMetric("SQ_CONN_2", 1)
checkConnStatusMetric("SQ_CONN_3", 1)
// scrapePromURL(t)
// connections have been closed, expect 0
connHost1.Close()
connHost2.Close()
connHost3.Close()
// Ensure periodic health check happens.
time.Sleep(500 * time.Millisecond)
checkConnStatusMetric("SQ_CONN_1", 0)
checkConnStatusMetric("SQ_CONN_2", 0)
checkConnStatusMetric("SQ_CONN_3", 0)
// scrapePromURL(t)
// restart connection from host1
connHost1 = initDiamConn("host1", "realm1")
checkConnStatusMetric("SQ_CONN_1", 1)
checkConnStatusMetric("SQ_CONN_2", 0)
checkConnStatusMetric("SQ_CONN_3", 0)
t.Cleanup(func() { connHost1.Close() })
// scrapePromURL(t)
}

View File

@@ -449,28 +449,126 @@ func (da *DiameterAgent) handleRAA(c diam.Conn, m *diam.Message) {
ch <- m
}
// handleConns is used to handle all conns that are connected to the agent
// it register the connection so it can be used to send a DPR
const (
// Connection status values for ConnectionStatus event field:
// -1 : connection closed/down
// 0 : duplicate connection (no metric change)
// 1 : new connection established/up
diamConnStatusDown = -1
diamConnStatusDuplicate = 0
diamConnStatusUp = 1
// Health check interval for detecting closed connections.
// TODO: Make this configurable.
diamConnHealthCheckInterval = 500 * time.Millisecond
)
// sendConnStatusReport reports connection status changes to StatS and ThresholdS.
func (da *DiameterAgent) sendConnStatusReport(metadata *smpeer.Metadata, status int, localAddr, remoteAddr net.Addr) {
statsConns := da.cgrCfg.DiameterAgentCfg().StatSConns
thConns := da.cgrCfg.DiameterAgentCfg().ThresholdSConns
ev := &utils.CGREvent{
Tenant: da.cgrCfg.GeneralCfg().DefaultTenant,
ID: utils.GenUUID(),
Event: map[string]any{
utils.ConnLocalAddr: localAddr.String(),
utils.ConnRemoteAddr: remoteAddr.String(),
utils.OriginHost: metadata.OriginHost,
utils.OriginRealm: metadata.OriginRealm,
utils.ConnStatus: status,
utils.Source: utils.DiameterAgent,
},
APIOpts: map[string]any{
utils.MetaEventType: utils.EventConnectionStatusReport,
},
}
if len(statsConns) != 0 {
var reply []string
if err := da.connMgr.Call(context.TODO(), statsConns, utils.StatSv1ProcessEvent,
ev, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("failed to process %s event in %s: %v",
utils.EventConnectionStatusReport, utils.StatS, err))
}
}
if len(thConns) != 0 {
var reply []string
if err := da.connMgr.Call(context.TODO(), thConns, utils.ThresholdSv1ProcessEvent,
ev, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("failed to process %s event in %s: %v",
utils.EventConnectionStatusReport, utils.ThresholdS, err))
}
}
}
// handleConns handles all connections to the agent and registers them for DPR support.
func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) {
for c := range peers {
meta, _ := smpeer.FromContext(c.Context())
meta, ok := smpeer.FromContext(c.Context())
if !ok {
utils.Logger.Warning(fmt.Sprintf(
"<%s> could not extract peer metadata from connection %s, skipping status tracking",
utils.DiameterAgent, c.RemoteAddr().String()))
continue
}
key := string(meta.OriginHost + utils.ConcatenatedKeySep + meta.OriginRealm)
da.peersLck.Lock()
da.peers[key] = c // store in peers table
diamConnStatus := diamConnStatusUp
if _, exists := da.peers[key]; exists {
// Connection already exists for this peer. Set status to 0 (duplicate)
// to prevent incrementing StatS metrics.
diamConnStatus = diamConnStatusDuplicate
utils.Logger.Warning(fmt.Sprintf(
"<%s> a connection from a peer with the same ID (%q) is already registered, overwriting...",
utils.DiameterAgent, key))
}
da.peers[key] = c
da.peersLck.Unlock()
go func(c diam.Conn, key string) {
// wait for disconnect notification
<-c.(diam.CloseNotifier).CloseNotify()
da.peersLck.Lock()
delete(da.peers, key) // remove from peers table
da.peersLck.Unlock()
}(c, key)
localAddr, remoteAddr := c.LocalAddr(), c.RemoteAddr()
da.sendConnStatusReport(meta, diamConnStatus, localAddr, remoteAddr)
go func() {
// Use hybrid approach to detect connection closure. CloseNotify() may not
// fire if the serve() goroutine is blocked in Read(), so we also perform
// periodic write checks as a fallback.
// TODO: Remove fallback once go-diameter fixes CloseNotify race condition.
defer func() {
da.peersLck.Lock()
delete(da.peers, key)
da.peersLck.Unlock()
da.sendConnStatusReport(meta, diamConnStatusDown, localAddr, remoteAddr)
}()
closeChan := c.(diam.CloseNotifier).CloseNotify()
ticker := time.NewTicker(diamConnHealthCheckInterval)
defer ticker.Stop()
for {
select {
case <-closeChan:
return
case <-ticker.C:
// Periodic health check: write 0 bytes to detect broken connections.
_, err := c.Connection().Write([]byte{})
if err != nil {
return
}
}
}
}()
}
}
// handleDPA is used to handle all DisconnectPeer Answers that are received
func (da *DiameterAgent) handleDPA(c diam.Conn, m *diam.Message) {
meta, _ := smpeer.FromContext(c.Context())
meta, ok := smpeer.FromContext(c.Context())
if !ok {
utils.Logger.Warning(fmt.Sprintf(
"<%s> could not extract peer metadata from DPA connection %s",
utils.DiameterAgent, c.RemoteAddr().String()))
return
}
key := string(meta.OriginHost + utils.ConcatenatedKeySep + meta.OriginRealm)
da.dpaLck.Lock()

View File

@@ -116,3 +116,9 @@ func (dc *DiameterClient) ReceivedMessage(rplyTimeout time.Duration) *diam.Messa
return nil
}
}
// Close disconnects the DiameterClient. Implements io.Closer.
func (dc *DiameterClient) Close() error {
dc.conn.Close()
return nil
}

View File

@@ -548,15 +548,21 @@ const (
EventReaderID = "EventReaderID"
// Event types
EventType = "EventType"
MetaEventType = "*eventType"
CDRKey = "CDR"
ThresholdHit = "ThresholdHit"
RankingUpdate = "RankingUpdate"
ResourceUpdate = "ResourceUpdate"
StatUpdate = "StatUpdate"
TrendUpdate = "TrendUpdate"
EventPerformanceReport = "PerformanceReport"
EventType = "EventType"
MetaEventType = "*eventType"
CDRKey = "CDR"
ThresholdHit = "ThresholdHit"
RankingUpdate = "RankingUpdate"
ResourceUpdate = "ResourceUpdate"
StatUpdate = "StatUpdate"
TrendUpdate = "TrendUpdate"
EventPerformanceReport = "PerformanceReport"
EventConnectionStatusReport = "ConnectionStatusReport"
// Connection status event fields.
ConnLocalAddr = "LocalAddr"
ConnRemoteAddr = "RemoteAddr"
ConnStatus = "ConnectionStatus" // -1=down, 0=duplicate, 1=up
// ReplyState error constants
ErrReplyStateAuthorize = "ERR_AUTHORIZE"