From c9115da4b7c2d0c4cea343822d86ca316971ec3b Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 22 Oct 2025 18:59:17 +0300 Subject: [PATCH] use remoteAddr as peer key, remove duplicate status --- agents/diam_conn_stats_test.go | 1 + agents/diamagent.go | 79 ++++++++++++++++++---------------- agents/diamagent_test.go | 8 ++++ utils/apitpdata.go | 1 + utils/consts.go | 11 +++-- utils/dataconverter.go | 4 +- 6 files changed, 58 insertions(+), 46 deletions(-) diff --git a/agents/diam_conn_stats_test.go b/agents/diam_conn_stats_test.go index eaa463466..d6b6ee595 100644 --- a/agents/diam_conn_stats_test.go +++ b/agents/diam_conn_stats_test.go @@ -86,6 +86,7 @@ func TestDiamConnStats(t *testing.T) { // fmt.Println(ng.LogBuffer) // }) client, cfg := ng.Run(t) + time.Sleep(100 * time.Millisecond) // wait for diameter server to be up setSQProfile := func(id, originHost, originRealm string, ttl time.Duration) { t.Helper() diff --git a/agents/diamagent.go b/agents/diamagent.go index 41937e2ca..00ab97891 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -453,7 +453,7 @@ func (da *DiameterAgent) handleRAA(c diam.Conn, m *diam.Message) { } // sendConnStatusReport reports connection status changes to StatS and ThresholdS. -func (da *DiameterAgent) sendConnStatusReport(metadata *smpeer.Metadata, status string, localAddr, remoteAddr net.Addr) { +func (da *DiameterAgent) sendConnStatusReport(metadata *smpeer.Metadata, status, localAddr, remoteAddr string) { daCfg := da.cgrCfg.DiameterAgentCfg() if len(daCfg.StatSConns) == 0 && len(daCfg.ThresholdSConns) == 0 { return // nothing to do @@ -464,8 +464,8 @@ func (da *DiameterAgent) sendConnStatusReport(metadata *smpeer.Metadata, status ID: utils.GenUUID(), Time: utils.TimePointer(time.Now()), Event: map[string]any{ - utils.ConnLocalAddr: localAddr.String(), - utils.ConnRemoteAddr: remoteAddr.String(), + utils.ConnLocalAddr: localAddr, + utils.ConnRemoteAddr: remoteAddr, utils.OriginHost: metadata.OriginHost, utils.OriginRealm: metadata.OriginRealm, utils.ConnStatus: status, @@ -500,28 +500,18 @@ func (da *DiameterAgent) sendConnStatusReport(metadata *smpeer.Metadata, status // handleConns handles all connections to the agent and registers them for DPR support. func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) { for c := range peers { + localAddr, remoteAddr := c.LocalAddr().String(), c.RemoteAddr().String() meta, ok := smpeer.FromContext(c.Context()) if !ok { utils.Logger.Warning(fmt.Sprintf( "<%s> could not extract peer metadata from connection %s, skipping status tracking", - utils.DiameterAgent, c.RemoteAddr().String())) + utils.DiameterAgent, remoteAddr)) continue } - key := string(meta.OriginHost + utils.ConcatenatedKeySep + meta.OriginRealm) da.peersLck.Lock() - connStatus := utils.ConnStatusUp - if _, exists := da.peers[key]; exists { - // Connection already exists for this peer. Set status to DUPLICATE - // to prevent incrementing StatS metrics. - connStatus = utils.ConnStatusDuplicate - - utils.Logger.Warning(fmt.Sprintf( - "<%s> a connection from a peer with the same ID (%q) is already registered, overwriting...", - utils.DiameterAgent, key)) - } - da.peers[key] = c + da.peers[remoteAddr] = c da.peersLck.Unlock() - localAddr, remoteAddr := c.LocalAddr(), c.RemoteAddr() + connStatus := utils.ConnStatusUp da.sendConnStatusReport(meta, connStatus, localAddr, remoteAddr) go func() { // Use hybrid approach to detect connection closure. CloseNotify() may not @@ -530,7 +520,7 @@ func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) { // TODO: Remove fallback once go-diameter fixes CloseNotify race condition. defer func() { da.peersLck.Lock() - delete(da.peers, key) + delete(da.peers, remoteAddr) da.peersLck.Unlock() da.sendConnStatusReport(meta, utils.ConnStatusDown, localAddr, remoteAddr) }() @@ -564,17 +554,9 @@ func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) { // handleDPA is used to handle all DisconnectPeer Answers that are received func (da *DiameterAgent) handleDPA(c diam.Conn, m *diam.Message) { - meta, ok := smpeer.FromContext(c.Context()) - if !ok { - utils.Logger.Warning(fmt.Sprintf( - "<%s> could not extract peer metadata from DPA connection %s", - utils.DiameterAgent, c.RemoteAddr().String())) - return - } - key := string(meta.OriginHost + utils.ConcatenatedKeySep + meta.OriginRealm) - + remoteAddr := c.RemoteAddr().String() da.dpaLck.Lock() - ch, has := da.dpa[key] + ch, has := da.dpa[remoteAddr] da.dpaLck.Unlock() if !has { return @@ -583,11 +565,12 @@ func (da *DiameterAgent) handleDPA(c diam.Conn, m *diam.Message) { c.Close() } -// V1DisconnectPeer sends a DPR meseage to diameter client +// V1DisconnectPeer sends a DPR message to diameter client. +// Looks up connection by RemoteAddr if provided, otherwise by OriginHost+OriginRealm. func (da *DiameterAgent) V1DisconnectPeer(ctx *context.Context, args *utils.DPRArgs, reply *string) (err error) { if args == nil { utils.Logger.Info( - fmt.Sprintf("<%s> cannot send DPR, missing arrguments", + fmt.Sprintf("<%s> cannot send DPR, missing arguments", utils.DiameterAgent)) return utils.ErrMandatoryIeMissing } @@ -595,13 +578,39 @@ func (da *DiameterAgent) V1DisconnectPeer(ctx *context.Context, args *utils.DPRA if args.DisconnectCause < 0 || args.DisconnectCause > 2 { return errors.New("WRONG_DISCONNECT_CAUSE") } + m := diam.NewRequest(diam.DisconnectPeer, diam.CHARGING_CONTROL_APP_ID, dict.Default) m.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity(args.OriginHost)) m.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity(args.OriginRealm)) m.NewAVP(avp.DisconnectCause, avp.Mbit, 0, datatype.Enumerated(args.DisconnectCause)) - key := args.OriginHost + utils.ConcatenatedKeySep + args.OriginRealm + var conn diam.Conn + var key string + + da.peersLck.Lock() + if args.RemoteAddr != "" { + // Direct lookup by RemoteAddr if provided + key = args.RemoteAddr + conn = da.peers[key] + } else { + // Fallback: scan for first connection matching OriginHost+OriginRealm + for rAddr, c := range da.peers { + meta, ok := smpeer.FromContext(c.Context()) + if ok && + string(meta.OriginHost) == args.OriginHost && + string(meta.OriginRealm) == args.OriginRealm { + key = rAddr + conn = c + break + } + } + } + da.peersLck.Unlock() + + if conn == nil { + return utils.ErrNotFound + } dpaCh := make(chan *diam.Message, 1) da.dpaLck.Lock() @@ -612,15 +621,11 @@ func (da *DiameterAgent) V1DisconnectPeer(ctx *context.Context, args *utils.DPRA delete(da.dpa, key) da.dpaLck.Unlock() }() - da.peersLck.Lock() - conn, has := da.peers[key] - da.peersLck.Unlock() - if !has { - return utils.ErrNotFound - } + if err = writeOnConn(conn, m); err != nil { return utils.ErrServerError } + select { case dpa := <-dpaCh: var avps []*diam.AVP diff --git a/agents/diamagent_test.go b/agents/diamagent_test.go index cadf875e9..2ef9ddb19 100644 --- a/agents/diamagent_test.go +++ b/agents/diamagent_test.go @@ -598,6 +598,14 @@ func TestV1DisconnectPeer(t *testing.T) { t.Errorf("Expected WRONG_DISCONNECT_CAUSE error, got: %v", err) } args.DisconnectCause = 1 + + // No RemoteAddr. Falls back to OriginHost+OriginRealm lookup. + err = agent.V1DisconnectPeer(nil, args, nil) + if err != utils.ErrNotFound { + t.Errorf("Expected ErrNotFound for no matching connection, got: %v", err) + } + + args.RemoteAddr = "192.168.1.1:12345" err = agent.V1DisconnectPeer(nil, args, nil) if err != utils.ErrNotFound { t.Errorf("Expected ErrNotFound, got: %v", err) diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 71b4cbf31..3e0fc7dfa 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -2593,6 +2593,7 @@ type ArgExportToFolder struct { type DPRArgs struct { OriginHost string OriginRealm string + RemoteAddr string DisconnectCause int } diff --git a/utils/consts.go b/utils/consts.go index a29fbb2da..7c2bcd6d9 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -630,12 +630,11 @@ const ( EventConnectionStatusReport = "ConnectionStatusReport" // Connection status event fields. - ConnLocalAddr = "LocalAddr" - ConnRemoteAddr = "RemoteAddr" - ConnStatus = "ConnectionStatus" // sum metric: UP=1, DOWN=-1, DUPLICATE=0 - ConnStatusUp = "UP" - ConnStatusDown = "DOWN" - ConnStatusDuplicate = "DUPLICATE" + ConnLocalAddr = "LocalAddr" + ConnRemoteAddr = "RemoteAddr" + ConnStatus = "ConnectionStatus" // sum metric: UP=1, DOWN=-1 + ConnStatusUp = "UP" + ConnStatusDown = "DOWN" // ReplyState error constants ErrReplyStateAuthorize = "ERR_AUTHORIZE" diff --git a/utils/dataconverter.go b/utils/dataconverter.go index 3ecadc23e..cf774bc93 100644 --- a/utils/dataconverter.go +++ b/utils/dataconverter.go @@ -845,7 +845,7 @@ func (ts TimeStringConverter) Convert(in any) (out any, err error) { } // ConnStatusConverter converts connection status strings to numeric values. -// Returns 1 for UP, -1 for DOWN, and 0 for DUPLICATE. +// Returns 1 for UP and -1 for DOWN. type ConnStatusConverter struct{} // Convert implements DataConverter interface @@ -856,8 +856,6 @@ func (c ConnStatusConverter) Convert(in any) (any, error) { return 1, nil case ConnStatusDown: return -1, nil - case ConnStatusDuplicate: - return 0, nil } return 0, fmt.Errorf("unsupported connection status: %q", status) }