From 2f40fbacbf764b42800b88dec5679d7ea23942ed Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Mon, 6 Oct 2025 10:48:57 +0300 Subject: [PATCH] add diameter connection status tracking --- agents/diam_conn_stats_test.go | 210 +++++++++++++++++++++++++++++++++ agents/diamagent.go | 123 +++++++++++++++++-- agents/diamclient.go | 6 + utils/consts.go | 6 + 4 files changed, 333 insertions(+), 12 deletions(-) create mode 100644 agents/diam_conn_stats_test.go diff --git a/agents/diam_conn_stats_test.go b/agents/diam_conn_stats_test.go new file mode 100644 index 000000000..e6b3f2668 --- /dev/null +++ b/agents/diam_conn_stats_test.go @@ -0,0 +1,210 @@ +//go:build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package agents + +import ( + "fmt" + "io" + "testing" + "time" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func TestDiamConnStats(t *testing.T) { + switch *utils.DBType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + + ng := engine.TestEngine{ + ConfigJSON: `{ +"general": { + "log_level": 7 +}, +"apiers": { + "enabled": true +}, +// "prometheus_agent": { +// "enabled": true, +// "stats_conns": ["*internal"], +// "stat_queue_ids": [ +// "SQ_CONN_1", +// "SQ_CONN_2", +// "SQ_CONN_3" +// ] +// }, +"stats": { + "enabled": true, + "store_interval": "-1", + "string_indexed_fields": ["*req.OriginHost"] +}, +"thresholds": { + "enabled": true, + "store_interval": "-1" +}, +"sessions": { + "enabled": true +}, +"diameter_agent": { + "enabled": true, + "stats_conns": ["*localhost"], + // "thresholds_conns": ["*localhost"] +} +}`, + DBCfg: engine.InternalDBCfg, + // LogBuffer: &bytes.Buffer{}, + GracefulShutdown: true, + } + // defer fmt.Println(ng.LogBuffer) + client, cfg := ng.Run(t) + + setSQProfile := func(id, originHost, originRealm string, ttl time.Duration) { + t.Helper() + var reply string + if err := client.Call(context.Background(), utils.APIerSv1SetStatQueueProfile, + engine.StatQueueProfileWithAPIOpts{ + StatQueueProfile: &engine.StatQueueProfile{ + Tenant: "cgrates.org", + ID: id, + FilterIDs: []string{ + "*string:~*opts.*eventType:ConnectionStatusReport", + fmt.Sprintf("*string:~*req.OriginHost:%s", originHost), + fmt.Sprintf("*string:~*req.OriginRealm:%s", originRealm), + }, + QueueLength: -1, + TTL: ttl, + Metrics: []*engine.MetricWithFilters{ + { + MetricID: "*sum#~*req.ConnectionStatus", + }, + }, + Stored: true, + MinItems: 1, + }, + }, &reply); err != nil { + t.Fatal(err) + } + } + + _ = func(id string) { + t.Helper() + var reply string + if err := client.Call(context.Background(), utils.APIerSv1SetThresholdProfile, + engine.ThresholdProfileWithAPIOpts{ + ThresholdProfile: &engine.ThresholdProfile{ + Tenant: "cgrates.org", + ID: id, + FilterIDs: []string{"*string:~*opts.*eventType:ConnectionStatusReport"}, + MaxHits: -1, + MinHits: 8, + MinSleep: time.Second, + }, + }, &reply); err != nil { + t.Fatal(err) + } + } + + initDiamConn := func(originHost, originRealm string) io.Closer { + t.Helper() + client, err := NewDiameterClient(cfg.DiameterAgentCfg().Listen, + originHost, originRealm, cfg.DiameterAgentCfg().VendorID, + cfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision, + cfg.DiameterAgentCfg().DictionariesPath, + cfg.DiameterAgentCfg().ListenNet) + if err != nil { + t.Fatal(err) + } + + // TODO: Remove after updating go-diameter dependency. + time.Sleep(10 * time.Millisecond) + + return client + } + + checkConnStatusMetric := func(sqID string, want float64) { + t.Helper() + var metrics map[string]float64 + err := client.Call(context.Background(), utils.StatSv1GetQueueFloatMetrics, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: sqID, + }, + }, &metrics) + if err != nil { + t.Error(err) + } + metricID := "*sum#~*req.ConnectionStatus" + got, ok := metrics[metricID] + if !ok { + t.Errorf("could not find metric %q", metricID) + } + if got != want { + t.Errorf("%q metric value = %.0f, want %.0f", metricID, got, want) + } + } + + setSQProfile("SQ_CONN_1", "host1", "realm1", -1) + setSQProfile("SQ_CONN_2", "host2", "realm1", -1) + setSQProfile("SQ_CONN_3", "host3", "realm2", -1) + + // no connections have been established yet, expect -1 + checkConnStatusMetric("SQ_CONN_1", -1) + checkConnStatusMetric("SQ_CONN_2", -1) + checkConnStatusMetric("SQ_CONN_3", -1) + // scrapePromURL(t) + + // connections have been established, expect 1 + connHost1 := initDiamConn("host1", "realm1") + connHost2 := initDiamConn("host2", "realm1") + connHost3 := initDiamConn("host3", "realm2") + checkConnStatusMetric("SQ_CONN_1", 1) + checkConnStatusMetric("SQ_CONN_2", 1) + checkConnStatusMetric("SQ_CONN_3", 1) + // scrapePromURL(t) + + // connections have been closed, expect 0 + connHost1.Close() + connHost2.Close() + connHost3.Close() + + // Ensure periodic health check happens. + time.Sleep(500 * time.Millisecond) + + checkConnStatusMetric("SQ_CONN_1", 0) + checkConnStatusMetric("SQ_CONN_2", 0) + checkConnStatusMetric("SQ_CONN_3", 0) + // scrapePromURL(t) + + // restart connection from host1 + connHost1 = initDiamConn("host1", "realm1") + checkConnStatusMetric("SQ_CONN_1", 1) + checkConnStatusMetric("SQ_CONN_2", 0) + checkConnStatusMetric("SQ_CONN_3", 0) + t.Cleanup(func() { connHost1.Close() }) + // scrapePromURL(t) +} diff --git a/agents/diamagent.go b/agents/diamagent.go index ee6a7189f..c6586aeb1 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -452,28 +452,127 @@ func (da *DiameterAgent) handleRAA(c diam.Conn, m *diam.Message) { ch <- m } -// handleConns is used to handle all conns that are connected to the agent -// it register the connection so it can be used to send a DPR +const ( + // Connection status values for ConnectionStatus event field: + // -1 : connection closed/down + // 0 : duplicate connection (no metric change) + // 1 : new connection established/up + diamConnStatusDown = -1 + diamConnStatusDuplicate = 0 + diamConnStatusUp = 1 + + // Health check interval for detecting closed connections. + // TODO: Make this configurable. + diamConnHealthCheckInterval = 500 * time.Millisecond +) + +// sendConnStatusReport reports connection status changes to StatS and ThresholdS. +func (da *DiameterAgent) sendConnStatusReport(metadata *smpeer.Metadata, status int, localAddr, remoteAddr net.Addr) { + statsConns := da.cgrCfg.DiameterAgentCfg().StatSConns + thConns := da.cgrCfg.DiameterAgentCfg().ThresholdSConns + + ev := &utils.CGREvent{ + Tenant: da.cgrCfg.GeneralCfg().DefaultTenant, + ID: utils.GenUUID(), + Time: utils.TimePointer(time.Now()), + Event: map[string]any{ + utils.ConnLocalAddr: localAddr.String(), + utils.ConnRemoteAddr: remoteAddr.String(), + utils.OriginHost: metadata.OriginHost, + utils.OriginRealm: metadata.OriginRealm, + utils.ConnStatus: status, + utils.Source: utils.DiameterAgent, + }, + APIOpts: map[string]any{ + utils.MetaEventType: utils.EventConnectionStatusReport, + }, + } + + if len(statsConns) != 0 { + var reply []string + if err := da.connMgr.Call(context.TODO(), statsConns, utils.StatSv1ProcessEvent, + ev, &reply); err != nil { + utils.Logger.Err(fmt.Sprintf("failed to process %s event in %s: %v", + utils.EventConnectionStatusReport, utils.StatS, err)) + } + } + if len(thConns) != 0 { + var reply []string + if err := da.connMgr.Call(context.TODO(), thConns, utils.ThresholdSv1ProcessEvent, + ev, &reply); err != nil { + utils.Logger.Err(fmt.Sprintf("failed to process %s event in %s: %v", + utils.EventConnectionStatusReport, utils.ThresholdS, err)) + } + } +} + +// handleConns handles all connections to the agent and registers them for DPR support. func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) { for c := range peers { - meta, _ := smpeer.FromContext(c.Context()) + meta, ok := smpeer.FromContext(c.Context()) + if !ok { + utils.Logger.Warning(fmt.Sprintf( + "<%s> could not extract peer metadata from connection %s, skipping status tracking", + utils.DiameterAgent, c.RemoteAddr().String())) + continue + } key := string(meta.OriginHost + utils.ConcatenatedKeySep + meta.OriginRealm) da.peersLck.Lock() - da.peers[key] = c // store in peers table + diamConnStatus := diamConnStatusUp + if _, exists := da.peers[key]; exists { + // Connection already exists for this peer. Set status to 0 (duplicate) + // to prevent incrementing StatS metrics. + diamConnStatus = diamConnStatusDuplicate + + utils.Logger.Warning(fmt.Sprintf( + "<%s> a connection from a peer with the same ID (%q) is already registered, overwriting...", + utils.DiameterAgent, key)) + } + da.peers[key] = c da.peersLck.Unlock() - go func(c diam.Conn, key string) { - // wait for disconnect notification - <-c.(diam.CloseNotifier).CloseNotify() - da.peersLck.Lock() - delete(da.peers, key) // remove from peers table - da.peersLck.Unlock() - }(c, key) + localAddr, remoteAddr := c.LocalAddr(), c.RemoteAddr() + da.sendConnStatusReport(meta, diamConnStatus, localAddr, remoteAddr) + go func() { + // Use hybrid approach to detect connection closure. CloseNotify() may not + // fire if the serve() goroutine is blocked in Read(), so we also perform + // periodic write checks as a fallback. + // TODO: Remove fallback once go-diameter fixes CloseNotify race condition. + defer func() { + da.peersLck.Lock() + delete(da.peers, key) + da.peersLck.Unlock() + da.sendConnStatusReport(meta, diamConnStatusDown, localAddr, remoteAddr) + }() + + closeChan := c.(diam.CloseNotifier).CloseNotify() + ticker := time.NewTicker(diamConnHealthCheckInterval) + defer ticker.Stop() + + for { + select { + case <-closeChan: + return + case <-ticker.C: + // Periodic health check: write 0 bytes to detect broken connections. + _, err := c.Connection().Write([]byte{}) + if err != nil { + return + } + } + } + }() } } // handleDPA is used to handle all DisconnectPeer Answers that are received func (da *DiameterAgent) handleDPA(c diam.Conn, m *diam.Message) { - meta, _ := smpeer.FromContext(c.Context()) + meta, ok := smpeer.FromContext(c.Context()) + if !ok { + utils.Logger.Warning(fmt.Sprintf( + "<%s> could not extract peer metadata from DPA connection %s", + utils.DiameterAgent, c.RemoteAddr().String())) + return + } key := string(meta.OriginHost + utils.ConcatenatedKeySep + meta.OriginRealm) da.dpaLck.Lock() diff --git a/agents/diamclient.go b/agents/diamclient.go index 0733b44d9..8742624ec 100644 --- a/agents/diamclient.go +++ b/agents/diamclient.go @@ -116,3 +116,9 @@ func (dc *DiameterClient) ReceivedMessage(rplyTimeout time.Duration) *diam.Messa return nil } } + +// Close disconnects the DiameterClient. Implements io.Closer. +func (dc *DiameterClient) Close() error { + dc.conn.Close() + return nil +} diff --git a/utils/consts.go b/utils/consts.go index 821478acd..5fa9a017f 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -627,6 +627,12 @@ const ( StatUpdate = "StatUpdate" TrendUpdate = "TrendUpdate" EventPerformanceReport = "PerformanceReport" + EventConnectionStatusReport = "ConnectionStatusReport" + + // Connection status event fields. + ConnLocalAddr = "LocalAddr" + ConnRemoteAddr = "RemoteAddr" + ConnStatus = "ConnectionStatus" // -1=down, 0=duplicate, 1=up // ReplyState error constants ErrReplyStateAuthorize = "ERR_AUTHORIZE"