added multiple listeners for diameter agent

This commit is contained in:
gezimbll
2025-12-02 14:17:03 +01:00
committed by Dan Christian Bogos
parent 897d6f0da1
commit e859be8806
17 changed files with 382 additions and 118 deletions

View File

@@ -85,10 +85,10 @@ func TestAgentCapsIT(t *testing.T) {
var i int
t.Run("DiameterAgent", func(t *testing.T) {
diamClient, err := NewDiameterClient(cfg.DiameterAgentCfg().Listen, "localhost",
diamClient, err := NewDiameterClient(cfg.DiameterAgentCfg().Listeners[0].Address, "localhost",
cfg.DiameterAgentCfg().OriginRealm, cfg.DiameterAgentCfg().VendorID,
cfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision,
cfg.DiameterAgentCfg().DictionariesPath, cfg.DiameterAgentCfg().ListenNet)
cfg.DiameterAgentCfg().DictionariesPath, cfg.DiameterAgentCfg().Listeners[0].Network)
if err != nil {
t.Fatal(err)
}

View File

@@ -139,11 +139,11 @@ func TestDiamConnStats(t *testing.T) {
initDiamConn := func(originHost, originRealm string) io.Closer {
t.Helper()
client, err := NewDiameterClient(cfg.DiameterAgentCfg().Listen,
client, err := NewDiameterClient(cfg.DiameterAgentCfg().Listeners[0].Address,
originHost, originRealm, cfg.DiameterAgentCfg().VendorID,
cfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision,
cfg.DiameterAgentCfg().DictionariesPath,
cfg.DiameterAgentCfg().ListenNet)
cfg.DiameterAgentCfg().Listeners[0].Network)
if err != nil {
t.Fatal(err)
}

View File

@@ -97,11 +97,11 @@ func testDiamEmptyCEItStartEngine(t *testing.T) {
}
func testDiamEmptyCEItConnectDiameterClient(t *testing.T) {
diamClntND, err = NewDiameterClient(daCfgND.DiameterAgentCfg().Listen,
diamClntND, err = NewDiameterClient(daCfgND.DiameterAgentCfg().Listeners[0].Address,
"INTEGRATION_TESTS",
daCfgND.DiameterAgentCfg().OriginRealm, daCfgND.DiameterAgentCfg().VendorID,
daCfgND.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision,
daCfgND.DiameterAgentCfg().DictionariesPath, daCfgND.DiameterAgentCfg().ListenNet)
daCfgND.DiameterAgentCfg().DictionariesPath, daCfgND.DiameterAgentCfg().Listeners[0].Network)
if err.Error() != "missing application" {
t.Fatal(err)
}

View File

@@ -278,10 +278,10 @@ func testDiamItStartEngine(t *testing.T) {
}
func testDiamItConnectDiameterClient(t *testing.T) {
diamClnt, err = NewDiameterClient(daCfg.DiameterAgentCfg().Listen, "INTEGRATION_TESTS",
diamClnt, err = NewDiameterClient(daCfg.DiameterAgentCfg().Listeners[0].Address, "INTEGRATION_TESTS",
daCfg.DiameterAgentCfg().OriginRealm, daCfg.DiameterAgentCfg().VendorID,
daCfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision,
daCfg.DiameterAgentCfg().DictionariesPath, daCfg.DiameterAgentCfg().ListenNet)
daCfg.DiameterAgentCfg().DictionariesPath, daCfg.DiameterAgentCfg().Listeners[0].Network)
if err != nil {
t.Fatal(err)
}

View File

@@ -328,10 +328,10 @@ cgrates.org,DEFAULT,*string:~*req.Account:1001,,*default,*none,10`,
}
time.Sleep(500 * time.Millisecond)
diamClient, err := NewDiameterClient(cfg.DiameterAgentCfg().Listen, "localhost",
diamClient, err := NewDiameterClient(cfg.DiameterAgentCfg().Listeners[0].Address, "localhost",
cfg.DiameterAgentCfg().OriginRealm, cfg.DiameterAgentCfg().VendorID,
cfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision,
cfg.DiameterAgentCfg().DictionariesPath, cfg.DiameterAgentCfg().ListenNet)
cfg.DiameterAgentCfg().DictionariesPath, cfg.DiameterAgentCfg().Listeners[0].Network)
if err != nil {
t.Fatal(err)
}

View File

@@ -46,6 +46,10 @@ const (
dpa = "DPA"
)
var (
diamDictOnce sync.Once
)
// NewDiameterAgent initializes a new DiameterAgent
func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS,
connMgr *engine.ConnManager, caps *engine.Caps) (*DiameterAgent, error) {
@@ -67,7 +71,10 @@ func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS,
da.ctx = context.WithClient(context.TODO(), srv)
dictsPath := cgrCfg.DiameterAgentCfg().DictionariesPath
if len(dictsPath) != 0 {
if err := loadDictionaries(dictsPath, utils.DiameterAgent); err != nil {
diamDictOnce.Do(func() {
err = loadDictionaries(dictsPath, utils.DiameterAgent)
})
if err != nil {
return nil, err
}
}
@@ -107,33 +114,68 @@ type DiameterAgent struct {
// ListenAndServe is called when DiameterAgent is started, usually from within cmd/cgr-engine
func (da *DiameterAgent) ListenAndServe(stopChan <-chan struct{}) (err error) {
utils.Logger.Info(fmt.Sprintf("<%s> Start listening on <%s>", utils.DiameterAgent, da.cgrCfg.DiameterAgentCfg().Listen))
srv := &diam.Server{
Network: da.cgrCfg.DiameterAgentCfg().ListenNet,
Addr: da.cgrCfg.DiameterAgentCfg().Listen,
Handler: da.handlers(),
Dict: nil,
dSM := da.handlers()
errChan := make(chan error, len(da.cgrCfg.DiameterAgentCfg().Listeners))
var activeListeners []net.Listener
for _, lstnrCfg := range da.cgrCfg.DiameterAgentCfg().Listeners {
utils.Logger.Info(fmt.Sprintf("<%s> Start listening on <%s>", utils.DiameterAgent, lstnrCfg.Address))
srv := &diam.Server{
Network: lstnrCfg.Network,
Addr: lstnrCfg.Address,
Handler: dSM,
Dict: nil,
}
lsn, err := diam.MultistreamListen(
utils.FirstNonEmpty(srv.Network, utils.TCP),
utils.FirstNonEmpty(srv.Addr, ":3868"),
)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed to bind listener %s: %v", utils.DiameterAgent, lstnrCfg.Address, err))
for _, l := range activeListeners {
l.Close()
}
return err
}
activeListeners = append(activeListeners, lsn)
go func(s *diam.Server, l net.Listener) {
errChan <- s.Serve(l)
}(srv, lsn)
}
// used to control the server state
var lsn net.Listener
if lsn, err = diam.MultistreamListen(utils.FirstNonEmpty(srv.Network, utils.TCP),
utils.FirstNonEmpty(srv.Addr, ":3868")); err != nil {
return
}
errChan := make(chan error)
go da.handleConns(dSM.HandshakeNotify())
go func() {
errChan <- srv.Serve(lsn)
errCh := dSM.ErrorReports()
for {
select {
case err, ok := <-errCh:
if !ok {
return
}
utils.Logger.Err(fmt.Sprintf("<%s> sm error: %v", utils.DiameterAgent, err))
case <-stopChan:
return
}
}
}()
select {
case err = <-errChan:
return
utils.Logger.Err(fmt.Sprintf("<%s> listener error: %v", utils.DiameterAgent, err))
case <-stopChan:
return lsn.Close()
utils.Logger.Info(fmt.Sprintf("<%s> received stop signal", utils.DiameterAgent))
}
for _, lsn := range activeListeners {
lsn.Close()
}
return err
}
// Creates the message handlers
func (da *DiameterAgent) handlers() diam.Handler {
func (da *DiameterAgent) handlers() *sm.StateMachine {
settings := &sm.Settings{
SupportedApps: da.cgrCfg.DiameterAgentCfg().CeApplications,
OriginHost: datatype.DiameterIdentity(da.cgrCfg.DiameterAgentCfg().OriginHost),
@@ -142,7 +184,21 @@ func (da *DiameterAgent) handlers() diam.Handler {
ProductName: datatype.UTF8String(da.cgrCfg.DiameterAgentCfg().ProductName),
FirmwareRevision: datatype.Unsigned32(utils.DiameterFirmwareRevision),
}
hosts := disectDiamListen(da.cgrCfg.DiameterAgentCfg().Listen)
var hosts []net.IP
for _, l := range da.cgrCfg.DiameterAgentCfg().Listeners {
host, _, err := net.SplitHostPort(l.Address)
if err != nil {
host = l.Address
}
if host == "" {
continue
}
if ip := net.ParseIP(host); ip != nil {
hosts = append(hosts, ip)
}
}
if len(hosts) == 0 {
interfaces, err := net.Interfaces()
if err != nil {
@@ -175,12 +231,6 @@ func (da *DiameterAgent) handlers() diam.Handler {
dSM.HandleFunc(raa, func(c diam.Conn, m *diam.Message) { go da.handleRAA(c, m) })
dSM.HandleFunc(dpa, func(c diam.Conn, m *diam.Message) { go da.handleDPA(c, m) })
}
go da.handleConns(dSM.HandshakeNotify())
go func() {
for err := range dSM.ErrorReports() {
utils.Logger.Err(fmt.Sprintf("<%s> sm error: %v", utils.DiameterAgent, err))
}
}()
return dSM
}

View File

@@ -70,10 +70,10 @@ cgrates.org,DEFAULT,,,*default,*none,0`,
client, cfg := ng.Run(b)
time.Sleep(10 * time.Millisecond) // wait for DiameterAgent service to start
diamClient, err := NewDiameterClient(cfg.DiameterAgentCfg().Listen, "localhost",
diamClient, err := NewDiameterClient(cfg.DiameterAgentCfg().Listeners[0].Address, "localhost",
cfg.DiameterAgentCfg().OriginRealm, cfg.DiameterAgentCfg().VendorID,
cfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision,
cfg.DiameterAgentCfg().DictionariesPath, cfg.DiameterAgentCfg().ListenNet)
cfg.DiameterAgentCfg().DictionariesPath, cfg.DiameterAgentCfg().Listeners[0].Network)
if err != nil {
b.Fatal(err)
}
@@ -210,10 +210,10 @@ cgrates.org,Raw,,,*raw,*constant:*req.RequestType:*none,0`,
client, cfg := ng.Run(b)
time.Sleep(50 * time.Millisecond) // wait for DiameterAgent service to start
diamClient, err := NewDiameterClient(cfg.DiameterAgentCfg().Listen, "localhost",
diamClient, err := NewDiameterClient(cfg.DiameterAgentCfg().Listeners[0].Address, "localhost",
cfg.DiameterAgentCfg().OriginRealm, cfg.DiameterAgentCfg().VendorID,
cfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision,
cfg.DiameterAgentCfg().DictionariesPath, cfg.DiameterAgentCfg().ListenNet)
cfg.DiameterAgentCfg().DictionariesPath, cfg.DiameterAgentCfg().Listeners[0].Network)
if err != nil {
b.Fatal(err)
}