mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
add diameter connection status tracking
This commit is contained in:
committed by
Dan Christian Bogos
parent
8ce296cdaf
commit
2f40fbacbf
210
agents/diam_conn_stats_test.go
Normal file
210
agents/diam_conn_stats_test.go
Normal file
@@ -0,0 +1,210 @@
|
||||
//go:build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package agents
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestDiamConnStats(t *testing.T) {
|
||||
switch *utils.DBType {
|
||||
case utils.MetaInternal:
|
||||
case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres:
|
||||
t.SkipNow()
|
||||
default:
|
||||
t.Fatal("unsupported dbtype value")
|
||||
}
|
||||
|
||||
ng := engine.TestEngine{
|
||||
ConfigJSON: `{
|
||||
"general": {
|
||||
"log_level": 7
|
||||
},
|
||||
"apiers": {
|
||||
"enabled": true
|
||||
},
|
||||
// "prometheus_agent": {
|
||||
// "enabled": true,
|
||||
// "stats_conns": ["*internal"],
|
||||
// "stat_queue_ids": [
|
||||
// "SQ_CONN_1",
|
||||
// "SQ_CONN_2",
|
||||
// "SQ_CONN_3"
|
||||
// ]
|
||||
// },
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "-1",
|
||||
"string_indexed_fields": ["*req.OriginHost"]
|
||||
},
|
||||
"thresholds": {
|
||||
"enabled": true,
|
||||
"store_interval": "-1"
|
||||
},
|
||||
"sessions": {
|
||||
"enabled": true
|
||||
},
|
||||
"diameter_agent": {
|
||||
"enabled": true,
|
||||
"stats_conns": ["*localhost"],
|
||||
// "thresholds_conns": ["*localhost"]
|
||||
}
|
||||
}`,
|
||||
DBCfg: engine.InternalDBCfg,
|
||||
// LogBuffer: &bytes.Buffer{},
|
||||
GracefulShutdown: true,
|
||||
}
|
||||
// defer fmt.Println(ng.LogBuffer)
|
||||
client, cfg := ng.Run(t)
|
||||
|
||||
setSQProfile := func(id, originHost, originRealm string, ttl time.Duration) {
|
||||
t.Helper()
|
||||
var reply string
|
||||
if err := client.Call(context.Background(), utils.APIerSv1SetStatQueueProfile,
|
||||
engine.StatQueueProfileWithAPIOpts{
|
||||
StatQueueProfile: &engine.StatQueueProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: id,
|
||||
FilterIDs: []string{
|
||||
"*string:~*opts.*eventType:ConnectionStatusReport",
|
||||
fmt.Sprintf("*string:~*req.OriginHost:%s", originHost),
|
||||
fmt.Sprintf("*string:~*req.OriginRealm:%s", originRealm),
|
||||
},
|
||||
QueueLength: -1,
|
||||
TTL: ttl,
|
||||
Metrics: []*engine.MetricWithFilters{
|
||||
{
|
||||
MetricID: "*sum#~*req.ConnectionStatus",
|
||||
},
|
||||
},
|
||||
Stored: true,
|
||||
MinItems: 1,
|
||||
},
|
||||
}, &reply); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
_ = func(id string) {
|
||||
t.Helper()
|
||||
var reply string
|
||||
if err := client.Call(context.Background(), utils.APIerSv1SetThresholdProfile,
|
||||
engine.ThresholdProfileWithAPIOpts{
|
||||
ThresholdProfile: &engine.ThresholdProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: id,
|
||||
FilterIDs: []string{"*string:~*opts.*eventType:ConnectionStatusReport"},
|
||||
MaxHits: -1,
|
||||
MinHits: 8,
|
||||
MinSleep: time.Second,
|
||||
},
|
||||
}, &reply); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
initDiamConn := func(originHost, originRealm string) io.Closer {
|
||||
t.Helper()
|
||||
client, err := NewDiameterClient(cfg.DiameterAgentCfg().Listen,
|
||||
originHost, originRealm, cfg.DiameterAgentCfg().VendorID,
|
||||
cfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision,
|
||||
cfg.DiameterAgentCfg().DictionariesPath,
|
||||
cfg.DiameterAgentCfg().ListenNet)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// TODO: Remove after updating go-diameter dependency.
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
return client
|
||||
}
|
||||
|
||||
checkConnStatusMetric := func(sqID string, want float64) {
|
||||
t.Helper()
|
||||
var metrics map[string]float64
|
||||
err := client.Call(context.Background(), utils.StatSv1GetQueueFloatMetrics,
|
||||
&utils.TenantIDWithAPIOpts{
|
||||
TenantID: &utils.TenantID{
|
||||
Tenant: "cgrates.org",
|
||||
ID: sqID,
|
||||
},
|
||||
}, &metrics)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
metricID := "*sum#~*req.ConnectionStatus"
|
||||
got, ok := metrics[metricID]
|
||||
if !ok {
|
||||
t.Errorf("could not find metric %q", metricID)
|
||||
}
|
||||
if got != want {
|
||||
t.Errorf("%q metric value = %.0f, want %.0f", metricID, got, want)
|
||||
}
|
||||
}
|
||||
|
||||
setSQProfile("SQ_CONN_1", "host1", "realm1", -1)
|
||||
setSQProfile("SQ_CONN_2", "host2", "realm1", -1)
|
||||
setSQProfile("SQ_CONN_3", "host3", "realm2", -1)
|
||||
|
||||
// no connections have been established yet, expect -1
|
||||
checkConnStatusMetric("SQ_CONN_1", -1)
|
||||
checkConnStatusMetric("SQ_CONN_2", -1)
|
||||
checkConnStatusMetric("SQ_CONN_3", -1)
|
||||
// scrapePromURL(t)
|
||||
|
||||
// connections have been established, expect 1
|
||||
connHost1 := initDiamConn("host1", "realm1")
|
||||
connHost2 := initDiamConn("host2", "realm1")
|
||||
connHost3 := initDiamConn("host3", "realm2")
|
||||
checkConnStatusMetric("SQ_CONN_1", 1)
|
||||
checkConnStatusMetric("SQ_CONN_2", 1)
|
||||
checkConnStatusMetric("SQ_CONN_3", 1)
|
||||
// scrapePromURL(t)
|
||||
|
||||
// connections have been closed, expect 0
|
||||
connHost1.Close()
|
||||
connHost2.Close()
|
||||
connHost3.Close()
|
||||
|
||||
// Ensure periodic health check happens.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
checkConnStatusMetric("SQ_CONN_1", 0)
|
||||
checkConnStatusMetric("SQ_CONN_2", 0)
|
||||
checkConnStatusMetric("SQ_CONN_3", 0)
|
||||
// scrapePromURL(t)
|
||||
|
||||
// restart connection from host1
|
||||
connHost1 = initDiamConn("host1", "realm1")
|
||||
checkConnStatusMetric("SQ_CONN_1", 1)
|
||||
checkConnStatusMetric("SQ_CONN_2", 0)
|
||||
checkConnStatusMetric("SQ_CONN_3", 0)
|
||||
t.Cleanup(func() { connHost1.Close() })
|
||||
// scrapePromURL(t)
|
||||
}
|
||||
@@ -452,28 +452,127 @@ func (da *DiameterAgent) handleRAA(c diam.Conn, m *diam.Message) {
|
||||
ch <- m
|
||||
}
|
||||
|
||||
// handleConns is used to handle all conns that are connected to the agent
|
||||
// it register the connection so it can be used to send a DPR
|
||||
const (
|
||||
// Connection status values for ConnectionStatus event field:
|
||||
// -1 : connection closed/down
|
||||
// 0 : duplicate connection (no metric change)
|
||||
// 1 : new connection established/up
|
||||
diamConnStatusDown = -1
|
||||
diamConnStatusDuplicate = 0
|
||||
diamConnStatusUp = 1
|
||||
|
||||
// Health check interval for detecting closed connections.
|
||||
// TODO: Make this configurable.
|
||||
diamConnHealthCheckInterval = 500 * time.Millisecond
|
||||
)
|
||||
|
||||
// sendConnStatusReport reports connection status changes to StatS and ThresholdS.
|
||||
func (da *DiameterAgent) sendConnStatusReport(metadata *smpeer.Metadata, status int, localAddr, remoteAddr net.Addr) {
|
||||
statsConns := da.cgrCfg.DiameterAgentCfg().StatSConns
|
||||
thConns := da.cgrCfg.DiameterAgentCfg().ThresholdSConns
|
||||
|
||||
ev := &utils.CGREvent{
|
||||
Tenant: da.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
ID: utils.GenUUID(),
|
||||
Time: utils.TimePointer(time.Now()),
|
||||
Event: map[string]any{
|
||||
utils.ConnLocalAddr: localAddr.String(),
|
||||
utils.ConnRemoteAddr: remoteAddr.String(),
|
||||
utils.OriginHost: metadata.OriginHost,
|
||||
utils.OriginRealm: metadata.OriginRealm,
|
||||
utils.ConnStatus: status,
|
||||
utils.Source: utils.DiameterAgent,
|
||||
},
|
||||
APIOpts: map[string]any{
|
||||
utils.MetaEventType: utils.EventConnectionStatusReport,
|
||||
},
|
||||
}
|
||||
|
||||
if len(statsConns) != 0 {
|
||||
var reply []string
|
||||
if err := da.connMgr.Call(context.TODO(), statsConns, utils.StatSv1ProcessEvent,
|
||||
ev, &reply); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("failed to process %s event in %s: %v",
|
||||
utils.EventConnectionStatusReport, utils.StatS, err))
|
||||
}
|
||||
}
|
||||
if len(thConns) != 0 {
|
||||
var reply []string
|
||||
if err := da.connMgr.Call(context.TODO(), thConns, utils.ThresholdSv1ProcessEvent,
|
||||
ev, &reply); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("failed to process %s event in %s: %v",
|
||||
utils.EventConnectionStatusReport, utils.ThresholdS, err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleConns handles all connections to the agent and registers them for DPR support.
|
||||
func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) {
|
||||
for c := range peers {
|
||||
meta, _ := smpeer.FromContext(c.Context())
|
||||
meta, ok := smpeer.FromContext(c.Context())
|
||||
if !ok {
|
||||
utils.Logger.Warning(fmt.Sprintf(
|
||||
"<%s> could not extract peer metadata from connection %s, skipping status tracking",
|
||||
utils.DiameterAgent, c.RemoteAddr().String()))
|
||||
continue
|
||||
}
|
||||
key := string(meta.OriginHost + utils.ConcatenatedKeySep + meta.OriginRealm)
|
||||
da.peersLck.Lock()
|
||||
da.peers[key] = c // store in peers table
|
||||
diamConnStatus := diamConnStatusUp
|
||||
if _, exists := da.peers[key]; exists {
|
||||
// Connection already exists for this peer. Set status to 0 (duplicate)
|
||||
// to prevent incrementing StatS metrics.
|
||||
diamConnStatus = diamConnStatusDuplicate
|
||||
|
||||
utils.Logger.Warning(fmt.Sprintf(
|
||||
"<%s> a connection from a peer with the same ID (%q) is already registered, overwriting...",
|
||||
utils.DiameterAgent, key))
|
||||
}
|
||||
da.peers[key] = c
|
||||
da.peersLck.Unlock()
|
||||
go func(c diam.Conn, key string) {
|
||||
// wait for disconnect notification
|
||||
<-c.(diam.CloseNotifier).CloseNotify()
|
||||
da.peersLck.Lock()
|
||||
delete(da.peers, key) // remove from peers table
|
||||
da.peersLck.Unlock()
|
||||
}(c, key)
|
||||
localAddr, remoteAddr := c.LocalAddr(), c.RemoteAddr()
|
||||
da.sendConnStatusReport(meta, diamConnStatus, localAddr, remoteAddr)
|
||||
go func() {
|
||||
// Use hybrid approach to detect connection closure. CloseNotify() may not
|
||||
// fire if the serve() goroutine is blocked in Read(), so we also perform
|
||||
// periodic write checks as a fallback.
|
||||
// TODO: Remove fallback once go-diameter fixes CloseNotify race condition.
|
||||
defer func() {
|
||||
da.peersLck.Lock()
|
||||
delete(da.peers, key)
|
||||
da.peersLck.Unlock()
|
||||
da.sendConnStatusReport(meta, diamConnStatusDown, localAddr, remoteAddr)
|
||||
}()
|
||||
|
||||
closeChan := c.(diam.CloseNotifier).CloseNotify()
|
||||
ticker := time.NewTicker(diamConnHealthCheckInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-closeChan:
|
||||
return
|
||||
case <-ticker.C:
|
||||
// Periodic health check: write 0 bytes to detect broken connections.
|
||||
_, err := c.Connection().Write([]byte{})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// handleDPA is used to handle all DisconnectPeer Answers that are received
|
||||
func (da *DiameterAgent) handleDPA(c diam.Conn, m *diam.Message) {
|
||||
meta, _ := smpeer.FromContext(c.Context())
|
||||
meta, ok := smpeer.FromContext(c.Context())
|
||||
if !ok {
|
||||
utils.Logger.Warning(fmt.Sprintf(
|
||||
"<%s> could not extract peer metadata from DPA connection %s",
|
||||
utils.DiameterAgent, c.RemoteAddr().String()))
|
||||
return
|
||||
}
|
||||
key := string(meta.OriginHost + utils.ConcatenatedKeySep + meta.OriginRealm)
|
||||
|
||||
da.dpaLck.Lock()
|
||||
|
||||
@@ -116,3 +116,9 @@ func (dc *DiameterClient) ReceivedMessage(rplyTimeout time.Duration) *diam.Messa
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Close disconnects the DiameterClient. Implements io.Closer.
|
||||
func (dc *DiameterClient) Close() error {
|
||||
dc.conn.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user