use remoteAddr as peer key, remove duplicate status

This commit is contained in:
ionutboangiu
2025-11-12 18:11:39 +02:00
committed by Dan Christian Bogos
parent 0c2b9a403a
commit 627e864bc1
5 changed files with 50 additions and 46 deletions

View File

@@ -87,6 +87,7 @@ func TestDiamConnStats(t *testing.T) {
// fmt.Println(ng.LogBuffer)
// })
client, cfg := ng.Run(t)
time.Sleep(100 * time.Millisecond) // wait for diameter server to be up
setSQProfile := func(id, originHost, originRealm string, ttl time.Duration) {
t.Helper()

View File

@@ -450,7 +450,7 @@ func (da *DiameterAgent) handleRAA(c diam.Conn, m *diam.Message) {
}
// sendConnStatusReport reports connection status changes to StatS and ThresholdS.
func (da *DiameterAgent) sendConnStatusReport(metadata *smpeer.Metadata, status string, localAddr, remoteAddr net.Addr) {
func (da *DiameterAgent) sendConnStatusReport(metadata *smpeer.Metadata, status, localAddr, remoteAddr string) {
daCfg := da.cgrCfg.DiameterAgentCfg()
if len(daCfg.StatSConns) == 0 && len(daCfg.ThresholdSConns) == 0 {
return // nothing to do
@@ -460,8 +460,8 @@ func (da *DiameterAgent) sendConnStatusReport(metadata *smpeer.Metadata, status
Tenant: da.cgrCfg.GeneralCfg().DefaultTenant,
ID: utils.GenUUID(),
Event: map[string]any{
utils.ConnLocalAddr: localAddr.String(),
utils.ConnRemoteAddr: remoteAddr.String(),
utils.ConnLocalAddr: localAddr,
utils.ConnRemoteAddr: remoteAddr,
utils.OriginHost: metadata.OriginHost,
utils.OriginRealm: metadata.OriginRealm,
utils.ConnStatus: status,
@@ -496,28 +496,18 @@ func (da *DiameterAgent) sendConnStatusReport(metadata *smpeer.Metadata, status
// handleConns handles all connections to the agent and registers them for DPR support.
func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) {
for c := range peers {
localAddr, remoteAddr := c.LocalAddr().String(), c.RemoteAddr().String()
meta, ok := smpeer.FromContext(c.Context())
if !ok {
utils.Logger.Warning(fmt.Sprintf(
"<%s> could not extract peer metadata from connection %s, skipping status tracking",
utils.DiameterAgent, c.RemoteAddr().String()))
utils.DiameterAgent, remoteAddr))
continue
}
key := string(meta.OriginHost + utils.ConcatenatedKeySep + meta.OriginRealm)
da.peersLck.Lock()
connStatus := utils.ConnStatusUp
if _, exists := da.peers[key]; exists {
// Connection already exists for this peer. Set status to DUPLICATE
// to prevent incrementing StatS metrics.
connStatus = utils.ConnStatusDuplicate
utils.Logger.Warning(fmt.Sprintf(
"<%s> a connection from a peer with the same ID (%q) is already registered, overwriting...",
utils.DiameterAgent, key))
}
da.peers[key] = c
da.peers[remoteAddr] = c
da.peersLck.Unlock()
localAddr, remoteAddr := c.LocalAddr(), c.RemoteAddr()
connStatus := utils.ConnStatusUp
da.sendConnStatusReport(meta, connStatus, localAddr, remoteAddr)
go func() {
// Use hybrid approach to detect connection closure. CloseNotify() may not
@@ -526,7 +516,7 @@ func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) {
// TODO: Remove fallback once go-diameter fixes CloseNotify race condition.
defer func() {
da.peersLck.Lock()
delete(da.peers, key)
delete(da.peers, remoteAddr)
da.peersLck.Unlock()
da.sendConnStatusReport(meta, utils.ConnStatusDown, localAddr, remoteAddr)
}()
@@ -560,17 +550,9 @@ func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) {
// handleDPA is used to handle all DisconnectPeer Answers that are received
func (da *DiameterAgent) handleDPA(c diam.Conn, m *diam.Message) {
meta, ok := smpeer.FromContext(c.Context())
if !ok {
utils.Logger.Warning(fmt.Sprintf(
"<%s> could not extract peer metadata from DPA connection %s",
utils.DiameterAgent, c.RemoteAddr().String()))
return
}
key := string(meta.OriginHost + utils.ConcatenatedKeySep + meta.OriginRealm)
remoteAddr := c.RemoteAddr().String()
da.dpaLck.Lock()
ch, has := da.dpa[key]
ch, has := da.dpa[remoteAddr]
da.dpaLck.Unlock()
if !has {
return
@@ -579,11 +561,12 @@ func (da *DiameterAgent) handleDPA(c diam.Conn, m *diam.Message) {
c.Close()
}
// V1DisconnectPeer sends a DPR meseage to diameter client
// V1DisconnectPeer sends a DPR message to diameter client.
// Looks up connection by RemoteAddr if provided, otherwise by OriginHost+OriginRealm.
func (da *DiameterAgent) V1DisconnectPeer(ctx *context.Context, args *utils.DPRArgs, reply *string) (err error) {
if args == nil {
utils.Logger.Info(
fmt.Sprintf("<%s> cannot send DPR, missing arrguments",
fmt.Sprintf("<%s> cannot send DPR, missing arguments",
utils.DiameterAgent))
return utils.ErrMandatoryIeMissing
}
@@ -591,13 +574,39 @@ func (da *DiameterAgent) V1DisconnectPeer(ctx *context.Context, args *utils.DPRA
if args.DisconnectCause < 0 || args.DisconnectCause > 2 {
return errors.New("WRONG_DISCONNECT_CAUSE")
}
m := diam.NewRequest(diam.DisconnectPeer,
diam.CHARGING_CONTROL_APP_ID, dict.Default)
m.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity(args.OriginHost))
m.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity(args.OriginRealm))
m.NewAVP(avp.DisconnectCause, avp.Mbit, 0, datatype.Enumerated(args.DisconnectCause))
key := args.OriginHost + utils.ConcatenatedKeySep + args.OriginRealm
var conn diam.Conn
var key string
da.peersLck.Lock()
if args.RemoteAddr != "" {
// Direct lookup by RemoteAddr if provided
key = args.RemoteAddr
conn = da.peers[key]
} else {
// Fallback: scan for first connection matching OriginHost+OriginRealm
for rAddr, c := range da.peers {
meta, ok := smpeer.FromContext(c.Context())
if ok &&
string(meta.OriginHost) == args.OriginHost &&
string(meta.OriginRealm) == args.OriginRealm {
key = rAddr
conn = c
break
}
}
}
da.peersLck.Unlock()
if conn == nil {
return utils.ErrNotFound
}
dpaCh := make(chan *diam.Message, 1)
da.dpaLck.Lock()
@@ -608,15 +617,11 @@ func (da *DiameterAgent) V1DisconnectPeer(ctx *context.Context, args *utils.DPRA
delete(da.dpa, key)
da.dpaLck.Unlock()
}()
da.peersLck.Lock()
conn, has := da.peers[key]
da.peersLck.Unlock()
if !has {
return utils.ErrNotFound
}
if err = writeOnConn(conn, m); err != nil {
return utils.ErrServerError
}
select {
case dpa := <-dpaCh:
var avps []*diam.AVP

View File

@@ -605,6 +605,7 @@ type ArgExportToFolder struct {
type DPRArgs struct {
OriginHost string
OriginRealm string
RemoteAddr string
DisconnectCause int
}

View File

@@ -560,12 +560,11 @@ const (
EventConnectionStatusReport = "ConnectionStatusReport"
// Connection status event fields.
ConnLocalAddr = "LocalAddr"
ConnRemoteAddr = "RemoteAddr"
ConnStatus = "ConnectionStatus" // sum metric: UP=1, DOWN=-1, DUPLICATE=0
ConnStatusUp = "UP"
ConnStatusDown = "DOWN"
ConnStatusDuplicate = "DUPLICATE"
ConnLocalAddr = "LocalAddr"
ConnRemoteAddr = "RemoteAddr"
ConnStatus = "ConnectionStatus" // sum metric: UP=1, DOWN=-1
ConnStatusUp = "UP"
ConnStatusDown = "DOWN"
// ReplyState error constants
ErrReplyStateAuthorize = "ERR_AUTHORIZE"

View File

@@ -842,7 +842,7 @@ func (GigawordsConverter) Convert(in any) (any, error) {
}
// ConnStatusConverter converts connection status strings to numeric values.
// Returns 1 for UP, -1 for DOWN, and 0 for DUPLICATE.
// Returns 1 for UP and -1 for DOWN.
type ConnStatusConverter struct{}
// Convert implements DataConverter interface
@@ -853,8 +853,6 @@ func (c ConnStatusConverter) Convert(in any) (any, error) {
return 1, nil
case ConnStatusDown:
return -1, nil
case ConnStatusDuplicate:
return 0, nil
}
return 0, fmt.Errorf("unsupported connection status: %q", status)
}