From 204601a70fa3ab2568b986fb4d8ac13eb6b0a638 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Fri, 29 Nov 2024 13:25:33 +0200 Subject: [PATCH] Consider DNS requests when limiting caps --- agents/agents_caps_it_test.go | 74 +++++++++++++++++++++++++++-------- agents/dnsagent.go | 23 +++++++++-- cmd/cgr-engine/cgr-engine.go | 2 +- services/dnsagent.go | 8 ++-- services/dnsagent_it_test.go | 14 +++---- services/dnsagent_test.go | 4 +- 6 files changed, 91 insertions(+), 34 deletions(-) diff --git a/agents/agents_caps_it_test.go b/agents/agents_caps_it_test.go index 0a4271e4a..507a6c79c 100644 --- a/agents/agents_caps_it_test.go +++ b/agents/agents_caps_it_test.go @@ -36,20 +36,12 @@ import ( "github.com/fiorix/go-diameter/v4/diam/avp" "github.com/fiorix/go-diameter/v4/diam/datatype" "github.com/fiorix/go-diameter/v4/diam/dict" + "github.com/miekg/dns" ) func TestAgentCapsIT(t *testing.T) { - var dbCfg engine.DBCfg switch *utils.DBType { case utils.MetaInternal: - dbCfg = engine.DBCfg{ - DataDB: &engine.DBParams{ - Type: utils.StringPointer(utils.MetaInternal), - }, - StorDB: &engine.DBParams{ - Type: utils.StringPointer(utils.MetaInternal), - }, - } case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: t.SkipNow() default: @@ -71,14 +63,23 @@ func TestAgentCapsIT(t *testing.T) { }, "radius_agent": { "enabled": true +}, +"dns_agent": { + "enabled": true, + "listeners":[ + { + "address": "127.0.0.1:2053", + "network": "udp" + } + ] } }` ng := engine.TestEngine{ ConfigJSON: jsonCfg, - DBCfg: dbCfg, + DBCfg: engine.InternalDBCfg, } - client, cfg := ng.Run(t) + conn, cfg := ng.Run(t) time.Sleep(10 * time.Millisecond) // wait for services to start var i int @@ -97,13 +98,13 @@ func TestAgentCapsIT(t *testing.T) { sendCCR(t, diamClient, &i, "5012") // Caps limit is 2, therefore expecting the same result as in the scenario above. - doneCh := simulateCapsTraffic(t, client, 1, *cfg.CoreSCfg()) + doneCh := simulateCapsTraffic(t, conn, 1, *cfg.CoreSCfg()) time.Sleep(time.Millisecond) // ensure traffic requests have been sent sendCCR(t, diamClient, &i, "5012") <-doneCh // With caps limit reached, Result-Code 3004 (DIAMETER_TOO_BUSY) is expected. - doneCh = simulateCapsTraffic(t, client, 2, *cfg.CoreSCfg()) + doneCh = simulateCapsTraffic(t, conn, 2, *cfg.CoreSCfg()) time.Sleep(time.Millisecond) // ensure traffic requests have been sent sendCCR(t, diamClient, &i, "3004") <-doneCh @@ -122,14 +123,14 @@ func TestAgentCapsIT(t *testing.T) { sendRadReq(t, radClient, radigo.AccessRequest, &i, radigo.AccessAccept) // Caps limit is 2, therefore expecting the same result as in // the scenario above. - doneCh := simulateCapsTraffic(t, client, 1, *cfg.CoreSCfg()) + doneCh := simulateCapsTraffic(t, conn, 1, *cfg.CoreSCfg()) time.Sleep(time.Millisecond) // ensure traffic requests have been sent sendRadReq(t, radClient, radigo.AccessRequest, &i, radigo.AccessAccept) <-doneCh // With caps limit reached, Reply with Code 3 (AccessReject) // and ReplyMessage with the caps error is expected. - doneCh = simulateCapsTraffic(t, client, 2, *cfg.CoreSCfg()) + doneCh = simulateCapsTraffic(t, conn, 2, *cfg.CoreSCfg()) time.Sleep(time.Millisecond) // ensure traffic requests have been sent sendRadReq(t, radClient, radigo.AccessRequest, &i, radigo.AccessReject) <-doneCh @@ -147,18 +148,44 @@ func TestAgentCapsIT(t *testing.T) { // Caps limit is 2, therefore expecting the same result as in // the scenario above. - doneCh := simulateCapsTraffic(t, client, 1, *cfg.CoreSCfg()) + doneCh := simulateCapsTraffic(t, conn, 1, *cfg.CoreSCfg()) time.Sleep(time.Millisecond) // ensure traffic requests have been sent sendRadReq(t, radClient, radigo.AccountingRequest, &i, 0) <-doneCh // With caps limit reached, Reply with Code 5 (AccountingResponse) // and ReplyMessage with the caps error is expected. - doneCh = simulateCapsTraffic(t, client, 2, *cfg.CoreSCfg()) + doneCh = simulateCapsTraffic(t, conn, 2, *cfg.CoreSCfg()) time.Sleep(time.Millisecond) // ensure traffic requests have been sent sendRadReq(t, radClient, radigo.AccountingRequest, &i, radigo.AccountingResponse) <-doneCh }) + + t.Run("DNSAgent", func(t *testing.T) { + client := new(dns.Client) + dc, err := client.Dial(cfg.DNSAgentCfg().Listeners[0].Address) + if err != nil { + t.Fatal(err) + } + + // There is currently no traffic. Expecting ServerFailure Rcode + // because there are no request processors enabled. + writeDNSMsg(t, dc, dns.RcodeServerFailure) + + // Caps limit is 2, therefore expecting the same result as in + // the scenario above. + doneCh := simulateCapsTraffic(t, conn, 1, *cfg.CoreSCfg()) + time.Sleep(time.Millisecond) // ensure traffic requests have been sent + writeDNSMsg(t, dc, dns.RcodeServerFailure) + <-doneCh + + // With caps limit reached, Refused Rcode is expected. + doneCh = simulateCapsTraffic(t, conn, 2, *cfg.CoreSCfg()) + time.Sleep(time.Millisecond) // ensure traffic requests have been sent + writeDNSMsg(t, dc, dns.RcodeRefused) + <-doneCh + + }) } func sendCCR(t *testing.T, client *DiameterClient, reqIdx *int, wantResultCode string) { @@ -244,6 +271,19 @@ func sendRadReq(t *testing.T, client *radigo.Client, reqType radigo.PacketCode, } } +func writeDNSMsg(t *testing.T, conn *dns.Conn, wantRcode int) { + m := new(dns.Msg) + m.SetQuestion("cgrates.org.", dns.TypeA) + if err := conn.WriteMsg(m); err != nil { + t.Error(err) + } + if rply, err := conn.ReadMsg(); err != nil { + t.Error(err) + } else if rply.Rcode != wantRcode { + t.Errorf("reply Msg Rcode=%d, want %d", rply.Rcode, wantRcode) + } +} + func simulateCapsTraffic(t *testing.T, client *birpc.Client, amount int, coresCfg config.CoreSCfg) <-chan struct{} { t.Helper() var wg sync.WaitGroup diff --git a/agents/dnsagent.go b/agents/dnsagent.go index 4f0b5830c..f883a9f52 100644 --- a/agents/dnsagent.go +++ b/agents/dnsagent.go @@ -33,8 +33,13 @@ import ( // NewDNSAgent is the constructor for DNSAgent func NewDNSAgent(cgrCfg *config.CGRConfig, fltrS *engine.FilterS, - connMgr *engine.ConnManager) (da *DNSAgent, err error) { - da = &DNSAgent{cgrCfg: cgrCfg, fltrS: fltrS, connMgr: connMgr} + connMgr *engine.ConnManager, caps *engine.Caps) (da *DNSAgent, err error) { + da = &DNSAgent{ + cgrCfg: cgrCfg, + fltrS: fltrS, + connMgr: connMgr, + caps: caps, + } err = da.initDNSServer() return } @@ -43,9 +48,10 @@ func NewDNSAgent(cgrCfg *config.CGRConfig, fltrS *engine.FilterS, type DNSAgent struct { sync.RWMutex cgrCfg *config.CGRConfig // loaded CGRateS configuration - fltrS *engine.FilterS // connection towards FilterS - servers []*dns.Server connMgr *engine.ConnManager + caps *engine.Caps + fltrS *engine.FilterS // connection towards FilterS + servers []*dns.Server } // initDNSServer instantiates the DNS server @@ -113,6 +119,15 @@ func (da *DNSAgent) Reload() (err error) { // handleMessage is the entry point of all DNS requests // requests are reaching here asynchronously func (da *DNSAgent) handleMessage(w dns.ResponseWriter, req *dns.Msg) { + if da.caps.IsLimited() { + if err := da.caps.Allocate(); err != nil { + rply := newDnsReply(req) + rply.Rcode = dns.RcodeRefused + dnsWriteMsg(w, rply) + return + } + defer da.caps.Deallocate() + } dnsDP := newDnsDP(req) rply := newDnsReply(req) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 860b7e249..54d0b762a 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -617,7 +617,7 @@ func main() { srvManager.AddServices(gvService, attrS, chrS, tS, stS, trS, rnS, reS, routeS, schS, rals, apiSv1, apiSv2, cdrS, smg, coreS, - services.NewDNSAgent(cfg, filterSChan, shdChan, connManager, srvDep), + services.NewDNSAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), services.NewFreeswitchAgent(cfg, shdChan, connManager, srvDep), services.NewKamailioAgent(cfg, shdChan, connManager, srvDep), services.NewAsteriskAgent(cfg, shdChan, connManager, srvDep), // partial reload diff --git a/services/dnsagent.go b/services/dnsagent.go index 7406e79c5..c65511392 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -31,13 +31,14 @@ import ( // NewDNSAgent returns the DNS Agent func NewDNSAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - shdChan *utils.SyncedChan, connMgr *engine.ConnManager, + shdChan *utils.SyncedChan, connMgr *engine.ConnManager, caps *engine.Caps, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &DNSAgent{ cfg: cfg, filterSChan: filterSChan, shdChan: shdChan, connMgr: connMgr, + caps: caps, srvDep: srvDep, } } @@ -53,6 +54,7 @@ type DNSAgent struct { dns *agents.DNSAgent connMgr *engine.ConnManager + caps *engine.Caps srvDep map[string]*sync.WaitGroup } @@ -67,7 +69,7 @@ func (dns *DNSAgent) Start() (err error) { dns.Lock() defer dns.Unlock() - dns.dns, err = agents.NewDNSAgent(dns.cfg, filterS, dns.connMgr) + dns.dns, err = agents.NewDNSAgent(dns.cfg, filterS, dns.connMgr, dns.caps) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to initialize agent, error: <%s>", utils.DNSAgent, err.Error())) dns.dns = nil @@ -90,7 +92,7 @@ func (dns *DNSAgent) Reload() (err error) { close(dns.stopChan) } - dns.dns, err = agents.NewDNSAgent(dns.cfg, filterS, dns.connMgr) + dns.dns, err = agents.NewDNSAgent(dns.cfg, filterS, dns.connMgr, dns.caps) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) dns.dns = nil diff --git a/services/dnsagent_it_test.go b/services/dnsagent_it_test.go index 30e634c26..4a2c3e4c5 100644 --- a/services/dnsagent_it_test.go +++ b/services/dnsagent_it_test.go @@ -58,7 +58,7 @@ func TestDNSAgentStartReloadShut(t *testing.T) { filterSChan <- nil shdChan := utils.NewSyncedChan() srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep) + srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) shdWg := new(sync.WaitGroup) srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, nil) engine.NewConnManager(cfg, nil) @@ -122,7 +122,7 @@ func TestDNSAgentReloadFirst(t *testing.T) { anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, anz, srvDep) - srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep) + srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db) @@ -193,8 +193,8 @@ func TestDNSAgentReload2(t *testing.T) { filterSChan <- nil shdChan := utils.NewSyncedChan() srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep) - agentSrv, err := agents.NewDNSAgent(cfg, nil, nil) + srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) + agentSrv, err := agents.NewDNSAgent(cfg, nil, nil, nil) if err != nil { t.Fatal(err) } @@ -221,7 +221,7 @@ func TestDNSAgentReload4(t *testing.T) { filterSChan <- nil shdChan := utils.NewSyncedChan() srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep) + srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) runtime.Gosched() dnsSrv := srv.(*DNSAgent) @@ -246,7 +246,7 @@ func TestDNSAgentReload5(t *testing.T) { filterSChan <- nil shdChan := utils.NewSyncedChan() srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep) + srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) err := srv.Start() if err != nil { t.Fatalf("\nExpected <%+v>, \nReceived <%+v>", nil, err) @@ -274,7 +274,7 @@ func TestDNSAgentReload6(t *testing.T) { shdChan := utils.NewSyncedChan() srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} cfg.DNSAgentCfg().Listeners[0].Address = "127.0.0.1:0" - srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep) + srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) time.Sleep(10 * time.Millisecond) err := srv.Start() diff --git a/services/dnsagent_test.go b/services/dnsagent_test.go index 85584105c..bb767fe19 100644 --- a/services/dnsagent_test.go +++ b/services/dnsagent_test.go @@ -44,11 +44,11 @@ func TestDNSAgentCoverage(t *testing.T) { cacheSChan := make(chan birpc.ClientConnector, 1) cacheSChan <- cacheSrv srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, srvDep) + srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, nil, srvDep) if srv.IsRunning() { t.Errorf("Expected service to be down") } - dns, _ := agents.NewDNSAgent(cfg, &engine.FilterS{}, nil) + dns, _ := agents.NewDNSAgent(cfg, &engine.FilterS{}, nil, nil) srv2 := DNSAgent{ cfg: cfg, filterSChan: filterSChan,