diff --git a/agents/agent_caps_it_test.go b/agents/agent_caps_it_test.go new file mode 100644 index 000000000..6e4d8a3fa --- /dev/null +++ b/agents/agent_caps_it_test.go @@ -0,0 +1,183 @@ +//go:build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package agents + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "github.com/fiorix/go-diameter/v4/diam" + "github.com/fiorix/go-diameter/v4/diam/avp" + "github.com/fiorix/go-diameter/v4/diam/datatype" + "github.com/fiorix/go-diameter/v4/diam/dict" +) + +func TestDiameterAgentCapsIT(t *testing.T) { + var dbCfg engine.DBCfg + switch *utils.DBType { + case utils.MetaInternal: + dbCfg = engine.DBCfg{ + DataDB: &engine.DBParams{ + Type: utils.StringPointer(utils.MetaInternal), + }, + StorDB: &engine.DBParams{ + Type: utils.StringPointer(utils.MetaInternal), + }, + } + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + + jsonCfg := `{ +"cores": { + "caps": 2, + "caps_strategy": "*busy", + "shutdown_timeout": "5ms" +}, +"sessions":{ + "enabled": true +}, +"diameter_agent": { + "enabled": true, + "synced_conn_requests": true +} +}` + + ng := engine.TestEngine{ + ConfigJSON: jsonCfg, + DBCfg: dbCfg, + Encoding: *utils.Encoding, + } + client, cfg := ng.Run(t) + + time.Sleep(10 * time.Millisecond) // wait for DiameterAgent service to start + diamClient, err := NewDiameterClient(cfg.DiameterAgentCfg().Listen, "localhost", + cfg.DiameterAgentCfg().OriginRealm, cfg.DiameterAgentCfg().VendorID, + cfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision, + cfg.DiameterAgentCfg().DictionariesPath, cfg.DiameterAgentCfg().ListenNet) + if err != nil { + t.Fatal(err) + } + + reqIdx := 0 + sendCCR := func(t *testing.T, replyTimeout time.Duration, wg *sync.WaitGroup, wantResultCode string) { + if wg != nil { + defer wg.Done() + } + reqIdx++ + ccr := diam.NewRequest(diam.CreditControl, 4, nil) + ccr.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String(fmt.Sprintf("session%d", reqIdx))) + ccr.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA")) + ccr.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org")) + ccr.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(4)) + ccr.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(1)) + ccr.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Unsigned32(1)) + + if err := diamClient.SendMessage(ccr); err != nil { + t.Errorf("failed to send diameter message: %v", err) + return + } + + reply := diamClient.ReceivedMessage(replyTimeout) + if reply == nil { + t.Error("received empty reply") + return + } + + avps, err := reply.FindAVPsWithPath([]any{"Result-Code"}, dict.UndefinedVendorID) + if err != nil { + t.Error(err) + return + } + if len(avps) == 0 { + t.Error("missing AVPs in reply") + return + } + + resultCode, err := diamAVPAsString(avps[0]) + if err != nil { + t.Error(err) + } + if resultCode != wantResultCode { + t.Errorf("Result-Code=%s, want %s", resultCode, wantResultCode) + } + } + + // There is currently no traffic. Expecting Result-Code 5012 (DIAMETER_UNABLE_TO_COMPLY), + // because there are no request processors enabled. + diamReplyTimeout := 2 * time.Second + sendCCR(t, diamReplyTimeout, nil, "5012") + + // Caps limit is 2, therefore expecting the same result as in the scenario above. + doneCh := simulateCapsTraffic(t, client, 1, *cfg.CoreSCfg()) + time.Sleep(time.Millisecond) // ensure traffic requests have been sent + sendCCR(t, diamReplyTimeout, nil, "5012") + <-doneCh + + // With caps limit reached, Result-Code 3004 (DIAMETER_TOO_BUSY) is expected. + doneCh = simulateCapsTraffic(t, client, 2, *cfg.CoreSCfg()) + time.Sleep(time.Millisecond) // ensure traffic requests have been sent + sendCCR(t, diamReplyTimeout, nil, "3004") + <-doneCh + + // TODO: Check caps functionality with async diameter requests. +} + +func simulateCapsTraffic(t *testing.T, client *birpc.Client, amount int, coresCfg config.CoreSCfg) <-chan struct{} { + t.Helper() + var wg sync.WaitGroup + var reply string + for i := range amount { + wg.Add(1) + go func() { + t.Helper() + defer wg.Done() + if err := client.Call(context.Background(), utils.CoreSv1Sleep, + &utils.DurationArgs{ + // Use the ShutdownTimeout CoreS setting + // instead of having to pass an extra + // variable to the function. + Duration: coresCfg.ShutdownTimeout, + }, &reply); err != nil { + if coresCfg.CapsStrategy == utils.MetaBusy && i >= coresCfg.Caps { + return // no need to handle errors for this scenario + } + t.Errorf("CoreSv1.Sleep unexpected error: %v", err) + } + }() + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + return done +} diff --git a/agents/diamagent.go b/agents/diamagent.go index 5c09cfa0a..0e0363c01 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -48,11 +48,12 @@ const ( // NewDiameterAgent initializes a new DiameterAgent func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS, - connMgr *engine.ConnManager) (*DiameterAgent, error) { + connMgr *engine.ConnManager, caps *engine.Caps) (*DiameterAgent, error) { da := &DiameterAgent{ cgrCfg: cgrCfg, filterS: filterS, connMgr: connMgr, + caps: caps, raa: make(map[string]chan *diam.Message), dpa: make(map[string]chan *diam.Message), peers: make(map[string]diam.Conn), @@ -87,6 +88,7 @@ type DiameterAgent struct { cgrCfg *config.CGRConfig filterS *engine.FilterS connMgr *engine.ConnManager + caps *engine.Caps raaLck sync.RWMutex raa map[string]chan *diam.Message @@ -163,7 +165,7 @@ func (da *DiameterAgent) handlers() diam.Handler { dSM.HandleFunc(raa, da.handleRAA) dSM.HandleFunc(dpa, da.handleDPA) } else { - dSM.HandleFunc(all, da.handleMessageAsync) + dSM.HandleFunc(all, func(c diam.Conn, m *diam.Message) { go da.handleMessage(c, m) }) dSM.HandleFunc(raa, func(c diam.Conn, m *diam.Message) { go da.handleRAA(c, m) }) dSM.HandleFunc(dpa, func(c diam.Conn, m *diam.Message) { go da.handleDPA(c, m) }) } @@ -176,18 +178,13 @@ func (da *DiameterAgent) handlers() diam.Handler { return dSM } -// handleMessageAsync will dispatch the message into it's own goroutine -func (da *DiameterAgent) handleMessageAsync(c diam.Conn, m *diam.Message) { - go da.handleMessage(c, m) -} - // handleALL is the handler of all messages coming in via Diameter func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { dApp, err := m.Dictionary().App(m.Header.ApplicationID) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> decoding app: %d, err: %s", utils.DiameterAgent, m.Header.ApplicationID, err.Error())) - writeOnConn(c, diamBareErr(m, diam.NoCommonApplication)) + writeOnConn(c, diamErrMsg(m, diam.NoCommonApplication, err.Error())) return } dCmd, err := m.Dictionary().FindCommand( @@ -196,7 +193,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { if err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> decoding app: %d, command %d, err: %s", utils.DiameterAgent, m.Header.ApplicationID, m.Header.CommandCode, err.Error())) - writeOnConn(c, diamBareErr(m, diam.CommandUnsupported)) + writeOnConn(c, diamErrMsg(m, diam.CommandUnsupported, err.Error())) return } diamDP := newDADataProvider(c, m) @@ -212,20 +209,14 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { utils.RemoteHost: utils.NewLeafNode(c.RemoteAddr().String()), }, } - // build the negative error answer - diamErr, err := diamErr( - m, diam.UnableToComply, reqVars, - da.cgrCfg.TemplatesCfg()[utils.MetaErr], - da.cgrCfg.GeneralCfg().DefaultTenant, - da.cgrCfg.GeneralCfg().DefaultTimezone, - da.filterS) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s building errDiam for message: %s", - utils.DiameterAgent, err.Error(), m)) - writeOnConn(c, diamBareErr(m, diam.CommandUnsupported)) - return + if da.caps.IsLimited() { + if err := da.caps.Allocate(); err != nil { + diamErr(c, m, diam.TooBusy, reqVars, da.cgrCfg, da.filterS) + return + } + defer da.caps.Deallocate() } + // cache message for ASR if da.cgrCfg.DiameterAgentCfg().ASRTemplate != "" || da.cgrCfg.DiameterAgentCfg().RARTemplate != "" { @@ -234,16 +225,18 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { utils.Logger.Warning( fmt.Sprintf("<%s> failed retrieving Session-Id err: %s, message: %s", utils.DiameterAgent, err.Error(), m)) - writeOnConn(c, diamErr) + diamErr(c, m, diam.UnableToComply, reqVars, da.cgrCfg, da.filterS) + return } // cache message data needed for building up the ASR if errCh := engine.Cache.Set(context.TODO(), utils.CacheDiameterMessages, sessID, &diamMsgData{c, m, reqVars}, nil, true, utils.NonTransactional); errCh != nil { utils.Logger.Warning(fmt.Sprintf("<%s> failed message: %s to set Cache: %s", utils.DiameterAgent, m, errCh.Error())) - writeOnConn(c, diamErr) + diamErr(c, m, diam.UnableToComply, reqVars, da.cgrCfg, da.filterS) return } } + cgrRplyNM := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{}} opts := utils.MapStorage{} rply := utils.NewOrderedNavigableMap() // share it among different processors @@ -274,14 +267,14 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing message: %s", utils.DiameterAgent, err.Error(), m)) - writeOnConn(c, diamErr) + diamErr(c, m, diam.UnableToComply, reqVars, da.cgrCfg, da.filterS) return } if !processed { utils.Logger.Warning( fmt.Sprintf("<%s> no request processor enabled, ignoring message %s from %s", utils.DiameterAgent, m, c.RemoteAddr())) - writeOnConn(c, diamErr) + diamErr(c, m, diam.UnableToComply, reqVars, da.cgrCfg, da.filterS) return } a, err := diamAnswer(m, 0, false, @@ -290,7 +283,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { utils.Logger.Warning( fmt.Sprintf("<%s> err: %s, replying to message: %+v", utils.DiameterAgent, err.Error(), m)) - writeOnConn(c, diamErr) + diamErr(c, m, diam.UnableToComply, reqVars, da.cgrCfg, da.filterS) return } writeOnConn(c, a) diff --git a/agents/libdiam.go b/agents/libdiam.go index dfb449108..cd0aac1e3 100644 --- a/agents/libdiam.go +++ b/agents/libdiam.go @@ -454,25 +454,42 @@ func diamAnswer(m *diam.Message, resCode uint32, errFlag bool, return } -// negDiamAnswer is used to return the negative answer we need previous to -func diamErr(m *diam.Message, resCode uint32, - reqVars *utils.DataNode, - tpl []*config.FCTemplate, tnt, tmz string, - filterS *engine.FilterS) (a *diam.Message, err error) { - aReq := NewAgentRequest( - newDADataProvider(nil, m), reqVars, - nil, nil, nil, nil, - tnt, tmz, filterS, nil) - if err = aReq.SetFields(tpl); err != nil { +// diamErr handles Diameter error scenarios by attempting to build a customized error answer +// based on the *err template. +func diamErr(c diam.Conn, m *diam.Message, resCode uint32, reqVars *utils.DataNode, cfg *config.CGRConfig, + filterS *engine.FilterS) { + tnt := cfg.GeneralCfg().DefaultTenant + tmz := cfg.GeneralCfg().DefaultTimezone + aReq := NewAgentRequest(newDADataProvider(nil, m), reqVars, + nil, nil, nil, nil, tnt, tmz, filterS, nil) + if err := aReq.SetFields(cfg.TemplatesCfg()[utils.MetaErr]); err != nil { + utils.Logger.Warning(fmt.Sprintf( + "<%s> message: %s - failed to parse *err template: %v", + utils.DiameterAgent, m, err)) + writeOnConn(c, diamErrMsg(m, diam.UnableToComply, + fmt.Sprintf("failed to parse *err template: %v", err))) return } - return diamAnswer(m, resCode, true, aReq.Reply, tmz) + diamAns, err := diamAnswer(m, resCode, true, aReq.Reply, tmz) + if err != nil { + utils.Logger.Warning(fmt.Sprintf( + "<%s> message: %s - failed to build error answer: %v", + utils.DiameterAgent, m, err)) + writeOnConn(c, diamErrMsg(m, diam.UnableToComply, + fmt.Sprintf("failed to build error answer: %v", err))) + return + } + writeOnConn(c, diamAns) } -func diamBareErr(m *diam.Message, resCode uint32) (a *diam.Message) { - a = m.Answer(resCode) - a.Header.CommandFlags = diam.ErrorFlag - return +// diamErrMsg creates a Diameter error answer with the given result code and optional error message. +func diamErrMsg(m *diam.Message, resCode uint32, msg string) *diam.Message { + ans := m.Answer(resCode) + ans.Header.CommandFlags = diam.ErrorFlag + if msg != "" { + ans.NewAVP(avp.ErrorMessage, 0, 0, datatype.UTF8String(msg)) + } + return ans } func disectDiamListen(addrs string) (ipAddrs []net.IP) { diff --git a/services/cgr-engine.go b/services/cgr-engine.go index 37609504c..2b1c16549 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -245,10 +245,10 @@ func (cgr *CGREngine) InitServices(setVersions bool, cpuPrfFl *os.File) { NewFreeswitchAgent(cgr.cfg, cgr.cM, cgr.srvDep), NewKamailioAgent(cgr.cfg, cgr.cM, cgr.srvDep), NewJanusAgent(cgr.cfg, cgr.iFilterSCh, cgr.server, cgr.cM, cgr.srvDep), - NewAsteriskAgent(cgr.cfg, cgr.cM, cgr.srvDep), // partial reload - NewRadiusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), // partial reload - NewDiameterAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), // partial reload - NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.server, cgr.cM, cgr.srvDep), // no reload + NewAsteriskAgent(cgr.cfg, cgr.cM, cgr.srvDep), // partial reload + NewRadiusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), // partial reload + NewDiameterAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.caps, cgr.srvDep), // partial reload + NewHTTPAgent(cgr.cfg, cgr.iFilterSCh, cgr.server, cgr.cM, cgr.srvDep), // no reload NewSIPAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), NewEventExporterService(cgr.cfg, cgr.iFilterSCh, diff --git a/services/diameteragent.go b/services/diameteragent.go index 462437d17..f0cc849b7 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -32,12 +32,13 @@ import ( // NewDiameterAgent returns the Diameter Agent func NewDiameterAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - connMgr *engine.ConnManager, + connMgr *engine.ConnManager, caps *engine.Caps, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &DiameterAgent{ cfg: cfg, filterSChan: filterSChan, connMgr: connMgr, + caps: caps, srvDep: srvDep, } } @@ -51,6 +52,7 @@ type DiameterAgent struct { da *agents.DiameterAgent connMgr *engine.ConnManager + caps *engine.Caps lnet string laddr string @@ -59,38 +61,39 @@ type DiameterAgent struct { } // Start should handle the sercive start -func (da *DiameterAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err error) { +func (da *DiameterAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) error { if da.IsRunning() { return utils.ErrServiceAlreadyRunning } - var filterS *engine.FilterS - if filterS, err = waitForFilterS(ctx, da.filterSChan); err != nil { - return + filterS, err := waitForFilterS(ctx, da.filterSChan) + if err != nil { + return err } da.Lock() defer da.Unlock() - return da.start(filterS, shtDwn) + return da.start(filterS, shtDwn, da.caps) } -func (da *DiameterAgent) start(filterS *engine.FilterS, shtDwn context.CancelFunc) (err error) { - da.da, err = agents.NewDiameterAgent(da.cfg, filterS, da.connMgr) +func (da *DiameterAgent) start(filterS *engine.FilterS, shtDwn context.CancelFunc, caps *engine.Caps) error { + var err error + da.da, err = agents.NewDiameterAgent(da.cfg, filterS, da.connMgr, caps) if err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", + utils.Logger.Err(fmt.Sprintf("<%s> failed to initialize agent: %v", utils.DiameterAgent, err)) - return + return err } da.lnet = da.cfg.DiameterAgentCfg().ListenNet da.laddr = da.cfg.DiameterAgentCfg().Listen da.stopChan = make(chan struct{}) go func(d *agents.DiameterAgent) { - if err = d.ListenAndServe(da.stopChan); err != nil { + if err := d.ListenAndServe(da.stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.DiameterAgent, err)) shtDwn() } }(da.da) - return + return nil } // Reload handles the change of config @@ -106,7 +109,7 @@ func (da *DiameterAgent) Reload(ctx *context.Context, shtDwn context.CancelFunc) if filterS, err = waitForFilterS(ctx, da.filterSChan); err != nil { return } - return da.start(filterS, shtDwn) + return da.start(filterS, shtDwn, da.caps) } // Shutdown stops the service @@ -122,7 +125,7 @@ func (da *DiameterAgent) Shutdown() (err error) { func (da *DiameterAgent) IsRunning() bool { da.RLock() defer da.RUnlock() - return da != nil && da.da != nil + return da.da != nil } // ServiceName returns the service name diff --git a/services/diameteragent_it_test.go b/services/diameteragent_it_test.go index 45af9e9ac..e39dbea45 100644 --- a/services/diameteragent_it_test.go +++ b/services/diameteragent_it_test.go @@ -49,7 +49,7 @@ func TestDiameterAgentReload1(t *testing.T) { anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep) - srv := NewDiameterAgent(cfg, filterSChan, nil, srvDep) + srv := NewDiameterAgent(cfg, filterSChan, nil, nil, srvDep) engine.NewConnManager(cfg) srvMngr.AddServices(srv, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db) @@ -102,7 +102,7 @@ func TestDiameterAgentReload2(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewDiameterAgent(cfg, filterSChan, nil, srvDep) + srv := NewDiameterAgent(cfg, filterSChan, nil, nil, srvDep) if srv.IsRunning() { t.Errorf("Expected service to be down") } @@ -119,12 +119,12 @@ func TestDiameterAgentReload3(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewDiameterAgent(cfg, filterSChan, nil, srvDep) + srv := NewDiameterAgent(cfg, filterSChan, nil, nil, srvDep) cfg.DiameterAgentCfg().ListenNet = "bad" cfg.DiameterAgentCfg().DictionariesPath = "" - err := srv.(*DiameterAgent).start(nil, func() {}) + err := srv.(*DiameterAgent).start(nil, func() {}, nil) if err != nil { t.Fatal(err) }