Consider diameter requests when limiting caps

- add possibility to pass custom Error-Message AVP to negative diameter
  answers
- negative answer is now built only when an error occurs
- remove tests that were testing behaviour with max concurrent requests 0
This commit is contained in:
ionutboangiu
2024-10-18 20:34:30 +03:00
committed by Dan Christian Bogos
parent e78654716d
commit 7df0494913
23 changed files with 341 additions and 552 deletions

View File

@@ -0,0 +1,182 @@
//go:build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"fmt"
"sync"
"testing"
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/fiorix/go-diameter/v4/diam"
"github.com/fiorix/go-diameter/v4/diam/avp"
"github.com/fiorix/go-diameter/v4/diam/datatype"
"github.com/fiorix/go-diameter/v4/diam/dict"
)
func TestDiameterAgentCapsIT(t *testing.T) {
var dbCfg engine.DBCfg
switch *utils.DBType {
case utils.MetaInternal:
dbCfg = engine.DBCfg{
DataDB: &engine.DBParams{
Type: utils.StringPointer(utils.MetaInternal),
},
StorDB: &engine.DBParams{
Type: utils.StringPointer(utils.MetaInternal),
},
}
case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("unsupported dbtype value")
}
jsonCfg := `{
"cores": {
"caps": 2,
"caps_strategy": "*busy",
"shutdown_timeout": "5ms"
},
"sessions":{
"enabled": true
},
"diameter_agent": {
"enabled": true,
"synced_conn_requests": true
}
}`
ng := engine.TestEngine{
ConfigJSON: jsonCfg,
DBCfg: dbCfg,
}
client, cfg := ng.Run(t)
time.Sleep(10 * time.Millisecond) // wait for DiameterAgent service to start
diamClient, err := NewDiameterClient(cfg.DiameterAgentCfg().Listen, "localhost",
cfg.DiameterAgentCfg().OriginRealm, cfg.DiameterAgentCfg().VendorID,
cfg.DiameterAgentCfg().ProductName, utils.DiameterFirmwareRevision,
cfg.DiameterAgentCfg().DictionariesPath, cfg.DiameterAgentCfg().ListenNet)
if err != nil {
t.Fatal(err)
}
reqIdx := 0
sendCCR := func(t *testing.T, replyTimeout time.Duration, wg *sync.WaitGroup, wantResultCode string) {
if wg != nil {
defer wg.Done()
}
reqIdx++
ccr := diam.NewRequest(diam.CreditControl, 4, nil)
ccr.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String(fmt.Sprintf("session%d", reqIdx)))
ccr.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA"))
ccr.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org"))
ccr.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(4))
ccr.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(1))
ccr.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Unsigned32(1))
if err := diamClient.SendMessage(ccr); err != nil {
t.Errorf("failed to send diameter message: %v", err)
return
}
reply := diamClient.ReceivedMessage(replyTimeout)
if reply == nil {
t.Error("received empty reply")
return
}
avps, err := reply.FindAVPsWithPath([]any{"Result-Code"}, dict.UndefinedVendorID)
if err != nil {
t.Error(err)
return
}
if len(avps) == 0 {
t.Error("missing AVPs in reply")
return
}
resultCode, err := diamAVPAsString(avps[0])
if err != nil {
t.Error(err)
}
if resultCode != wantResultCode {
t.Errorf("Result-Code=%s, want %s", resultCode, wantResultCode)
}
}
// There is currently no traffic. Expecting Result-Code 5012 (DIAMETER_UNABLE_TO_COMPLY),
// because there are no request processors enabled.
diamReplyTimeout := 2 * time.Second
sendCCR(t, diamReplyTimeout, nil, "5012")
// Caps limit is 2, therefore expecting the same result as in the scenario above.
doneCh := simulateCapsTraffic(t, client, 1, *cfg.CoreSCfg())
time.Sleep(time.Millisecond) // ensure traffic requests have been sent
sendCCR(t, diamReplyTimeout, nil, "5012")
<-doneCh
// With caps limit reached, Result-Code 3004 (DIAMETER_TOO_BUSY) is expected.
doneCh = simulateCapsTraffic(t, client, 2, *cfg.CoreSCfg())
time.Sleep(time.Millisecond) // ensure traffic requests have been sent
sendCCR(t, diamReplyTimeout, nil, "3004")
<-doneCh
// TODO: Check caps functionality with async diameter requests.
}
func simulateCapsTraffic(t *testing.T, client *birpc.Client, amount int, coresCfg config.CoreSCfg) <-chan struct{} {
t.Helper()
var wg sync.WaitGroup
var reply string
for i := range amount {
wg.Add(1)
go func() {
t.Helper()
defer wg.Done()
if err := client.Call(context.Background(), utils.CoreSv1Sleep,
&utils.DurationArgs{
// Use the ShutdownTimeout CoreS setting
// instead of having to pass an extra
// variable to the function.
Duration: coresCfg.ShutdownTimeout,
}, &reply); err != nil {
if coresCfg.CapsStrategy == utils.MetaBusy && i >= coresCfg.Caps {
return // no need to handle errors for this scenario
}
t.Errorf("CoreSv1.Sleep unexpected error: %v", err)
}
}()
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
return done
}

View File

@@ -159,26 +159,6 @@ func TestDiamItBiRPC(t *testing.T) {
}
}
func TestDiamItMaxConn(t *testing.T) {
switch *utils.DBType {
case utils.MetaInternal:
diamConfigDIR = "diamagentmaxconn_internal"
case utils.MetaMySQL:
diamConfigDIR = "diamagentmaxconn_mysql"
case utils.MetaMongo:
diamConfigDIR = "diamagentmaxconn_mongo"
case utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("Unknown Database type")
}
for _, stest := range sTestsDiam[:7] {
t.Run(diamConfigDIR, stest)
}
t.Run(diamConfigDIR, testDiamItDryRunMaxConn)
t.Run(diamConfigDIR, testDiamItKillEngine)
}
func TestDiamItSessionDisconnect(t *testing.T) {
switch *utils.DBType {
case utils.MetaInternal:
@@ -533,75 +513,6 @@ func testDiamItDryRun(t *testing.T) {
}
}
func testDiamItDryRunMaxConn(t *testing.T) {
ccr := diam.NewRequest(diam.CreditControl, 4, nil)
ccr.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String("cgrates;1451911932;00082"))
ccr.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA"))
ccr.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org"))
ccr.NewAVP(avp.DestinationRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org"))
ccr.NewAVP(avp.DestinationHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA"))
ccr.NewAVP(avp.UserName, avp.Mbit, 0, datatype.UTF8String("CGR-DA"))
ccr.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(4))
ccr.NewAVP(avp.ServiceContextID, avp.Mbit, 0, datatype.UTF8String("TestDiamItDryRun")) // Match specific DryRun profile
ccr.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(1))
ccr.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Unsigned32(1))
ccr.NewAVP(avp.EventTimestamp, avp.Mbit, 0, datatype.Time(time.Date(2016, 1, 5, 11, 30, 10, 0, time.UTC)))
ccr.NewAVP(avp.TerminationCause, avp.Mbit, 0, datatype.Enumerated(1))
if _, err := ccr.NewAVP("Framed-IP-Address", avp.Mbit, 0, datatype.UTF8String("10.228.16.4")); err != nil {
t.Error(err)
}
for i := 0; i < *interations; i++ {
if err := diamClnt.SendMessage(ccr); err != nil {
t.Error(err)
}
msg := diamClnt.ReceivedMessage(rplyTimeout)
if msg == nil {
t.Fatal("No message returned")
}
// Result-Code
eVal := "5012"
if avps, err := msg.FindAVPsWithPath([]any{"Result-Code"}, dict.UndefinedVendorID); err != nil {
t.Error(err)
} else if len(avps) == 0 {
t.Error("Missing AVP")
} else if val, err := diamAVPAsString(avps[0]); err != nil {
t.Error(err)
} else if val != eVal {
t.Errorf("expecting: %s, received: <%s>", eVal, val)
}
eVal = "cgrates;1451911932;00082"
if avps, err := msg.FindAVPsWithPath([]any{"Session-Id"}, dict.UndefinedVendorID); err != nil {
t.Error(err)
} else if len(avps) == 0 {
t.Error("Missing AVP")
} else if val, err := diamAVPAsString(avps[0]); err != nil {
t.Error(err)
} else if val != eVal {
t.Errorf("expecting: %s, received: <%s>", eVal, val)
}
eVal = "CGR-DA"
if avps, err := msg.FindAVPsWithPath([]any{"Origin-Host"}, dict.UndefinedVendorID); err != nil {
t.Error(err)
} else if len(avps) == 0 {
t.Error("Missing AVP")
} else if val, err := diamAVPAsString(avps[0]); err != nil {
t.Error(err)
} else if val != eVal {
t.Errorf("expecting: %s, received: <%s>", eVal, val)
}
eVal = "cgrates.org"
if avps, err := msg.FindAVPsWithPath([]any{"Origin-Realm"}, dict.UndefinedVendorID); err != nil {
t.Error(err)
} else if len(avps) == 0 {
t.Error("Missing AVP")
} else if val, err := diamAVPAsString(avps[0]); err != nil {
t.Error(err)
} else if val != eVal {
t.Errorf("expecting: %s, received: <%s>", eVal, val)
}
}
}
func testDiamItCCRInit(t *testing.T) {
m := diam.NewRequest(diam.CreditControl, 4, nil)
m.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String("bb97be2b9f37c2be9614fff71c8b1d08b1acbff8"))

View File

@@ -48,11 +48,12 @@ const (
// NewDiameterAgent initializes a new DiameterAgent
func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS,
connMgr *engine.ConnManager) (*DiameterAgent, error) {
connMgr *engine.ConnManager, caps *engine.Caps) (*DiameterAgent, error) {
da := &DiameterAgent{
cgrCfg: cgrCfg,
filterS: filterS,
connMgr: connMgr,
caps: caps,
raa: make(map[string]chan *diam.Message),
dpa: make(map[string]chan *diam.Message),
peers: make(map[string]diam.Conn),
@@ -89,18 +90,17 @@ func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS,
// DiameterAgent describes the diameter server
type DiameterAgent struct {
cgrCfg *config.CGRConfig
filterS *engine.FilterS
connMgr *engine.ConnManager
aReqs int
aReqsLck sync.RWMutex
raa map[string]chan *diam.Message
raaLck sync.RWMutex
cgrCfg *config.CGRConfig
filterS *engine.FilterS
connMgr *engine.ConnManager
caps *engine.Caps
raaLck sync.RWMutex
raa map[string]chan *diam.Message
peersLck sync.Mutex
peers map[string]diam.Conn // peer index by OriginHost;OriginRealm
dpa map[string]chan *diam.Message
dpaLck sync.RWMutex
dpa map[string]chan *diam.Message
ctx *context.Context
}
@@ -170,7 +170,7 @@ func (da *DiameterAgent) handlers() diam.Handler {
dSM.HandleFunc(raa, da.handleRAA)
dSM.HandleFunc(dpa, da.handleDPA)
} else {
dSM.HandleFunc(all, da.handleMessageAsync)
dSM.HandleFunc(all, func(c diam.Conn, m *diam.Message) { go da.handleMessage(c, m) })
dSM.HandleFunc(raa, func(c diam.Conn, m *diam.Message) { go da.handleRAA(c, m) })
dSM.HandleFunc(dpa, func(c diam.Conn, m *diam.Message) { go da.handleDPA(c, m) })
}
@@ -183,18 +183,13 @@ func (da *DiameterAgent) handlers() diam.Handler {
return dSM
}
// handleMessageAsync will dispatch the message into it's own goroutine
func (da *DiameterAgent) handleMessageAsync(c diam.Conn, m *diam.Message) {
go da.handleMessage(c, m)
}
// handleALL is the handler of all messages coming in via Diameter
func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
dApp, err := m.Dictionary().App(m.Header.ApplicationID)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> decoding app: %d, err: %s",
utils.DiameterAgent, m.Header.ApplicationID, err.Error()))
writeOnConn(c, diamBareErr(m, diam.NoCommonApplication))
writeOnConn(c, diamErrMsg(m, diam.NoCommonApplication, ""))
return
}
dCmd, err := m.Dictionary().FindCommand(
@@ -203,7 +198,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> decoding app: %d, command %d, err: %s",
utils.DiameterAgent, m.Header.ApplicationID, m.Header.CommandCode, err.Error()))
writeOnConn(c, diamBareErr(m, diam.CommandUnsupported))
writeOnConn(c, diamErrMsg(m, diam.CommandUnsupported, ""))
return
}
diamDP := newDADataProvider(c, m)
@@ -219,20 +214,15 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
utils.RemoteHost: utils.NewLeafNode(c.RemoteAddr().String()),
},
}
// build the negative error answer
diamErr, err := diamErr(
m, diam.UnableToComply, reqVars,
da.cgrCfg.TemplatesCfg()[utils.MetaErr],
da.cgrCfg.GeneralCfg().DefaultTenant,
da.cgrCfg.GeneralCfg().DefaultTimezone,
da.filterS)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s building errDiam for message: %s",
utils.DiameterAgent, err.Error(), m))
writeOnConn(c, diamBareErr(m, diam.CommandUnsupported))
return
handleErr := newDiamErrHandler(m, reqVars, da.cgrCfg, da.filterS)
if da.caps.IsLimited() {
if err := da.caps.Allocate(); err != nil {
handleErr(c, diam.TooBusy)
return
}
defer da.caps.Deallocate()
}
// cache message for ASR
if da.cgrCfg.DiameterAgentCfg().ASRTemplate != "" ||
da.cgrCfg.DiameterAgentCfg().RARTemplate != "" {
@@ -241,35 +231,18 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed retrieving Session-Id err: %s, message: %s",
utils.DiameterAgent, err.Error(), m))
writeOnConn(c, diamErr)
handleErr(c, diam.UnableToComply)
return
}
// cache message data needed for building up the ASR
if errCh := engine.Cache.Set(utils.CacheDiameterMessages, sessID, &diamMsgData{c, m, reqVars},
nil, true, utils.NonTransactional); errCh != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> failed message: %s to set Cache: %s", utils.DiameterAgent, m, errCh.Error()))
writeOnConn(c, diamErr)
handleErr(c, diam.UnableToComply)
return
}
}
// handle MaxActiveReqs
if da.cgrCfg.DiameterAgentCfg().ConcurrentReqs != -1 {
da.aReqsLck.Lock()
if da.aReqs == da.cgrCfg.DiameterAgentCfg().ConcurrentReqs {
utils.Logger.Err(
fmt.Sprintf("<%s> denying request due to maximum active requests reached: %d, message: %s",
utils.DiameterAgent, da.cgrCfg.DiameterAgentCfg().ConcurrentReqs, m))
writeOnConn(c, diamErr)
da.aReqsLck.Unlock()
return
}
da.aReqs++
da.aReqsLck.Unlock()
defer func() { // schedule decrement when returning out of function
da.aReqsLck.Lock()
da.aReqs--
da.aReqsLck.Unlock()
}()
}
cgrRplyNM := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{}}
opts := utils.MapStorage{}
rply := utils.NewOrderedNavigableMap() // share it among different processors
@@ -303,14 +276,14 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing message: %s",
utils.DiameterAgent, err.Error(), m))
writeOnConn(c, diamErr)
handleErr(c, diam.UnableToComply)
return
}
if !processed {
utils.Logger.Warning(
fmt.Sprintf("<%s> no request processor enabled, ignoring message %s from %s",
utils.DiameterAgent, m, c.RemoteAddr()))
writeOnConn(c, diamErr)
handleErr(c, diam.UnableToComply)
return
}
a, err := diamAnswer(m, 0, false,
@@ -319,7 +292,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
utils.Logger.Warning(
fmt.Sprintf("<%s> err: %s, replying to message: %+v",
utils.DiameterAgent, err.Error(), m))
writeOnConn(c, diamErr)
handleErr(c, diam.UnableToComply)
return
}
writeOnConn(c, a)

View File

@@ -454,25 +454,35 @@ func diamAnswer(m *diam.Message, resCode uint32, errFlag bool,
return
}
// negDiamAnswer is used to return the negative answer we need previous to
func diamErr(m *diam.Message, resCode uint32,
reqVars *utils.DataNode,
tpl []*config.FCTemplate, tnt, tmz string,
filterS *engine.FilterS) (a *diam.Message, err error) {
aReq := NewAgentRequest(
newDADataProvider(nil, m), reqVars,
nil, nil, nil, nil,
tnt, tmz, filterS, nil)
if err = aReq.SetFields(tpl); err != nil {
return
func newDiamErrHandler(m *diam.Message, reqVars *utils.DataNode, cfg *config.CGRConfig,
filterS *engine.FilterS) func(c diam.Conn, resCode uint32) {
return func(c diam.Conn, resCode uint32) {
tnt := cfg.GeneralCfg().DefaultTenant
tmz := cfg.GeneralCfg().DefaultTimezone
aReq := NewAgentRequest(newDADataProvider(nil, m), reqVars,
nil, nil, nil, nil, tnt, tmz, filterS, nil)
if err := aReq.SetFields(cfg.TemplatesCfg()[utils.MetaErr]); err != nil {
writeOnConn(c, diamErrMsg(m, diam.UnableToComply,
fmt.Sprintf("failed to parse *err template: %v", err)))
return
}
diamAns, err := diamAnswer(m, resCode, true, aReq.Reply, tmz)
if err != nil {
writeOnConn(c, diamErrMsg(m, diam.UnableToComply,
fmt.Sprintf("failed to construct error response: %v", err)))
return
}
writeOnConn(c, diamAns)
}
return diamAnswer(m, resCode, true, aReq.Reply, tmz)
}
func diamBareErr(m *diam.Message, resCode uint32) (a *diam.Message) {
a = m.Answer(resCode)
a.Header.CommandFlags = diam.ErrorFlag
return
func diamErrMsg(m *diam.Message, resCode uint32, msg string) *diam.Message {
ans := m.Answer(resCode)
ans.Header.CommandFlags = diam.ErrorFlag
if msg != "" {
ans.NewAVP(avp.ErrorMessage, 0, 0, datatype.UTF8String(msg))
}
return ans
}
func disectDiamListen(addrs string) (ipAddrs []net.IP) {

View File

@@ -1209,45 +1209,6 @@ func TestLibDiamHeaderLen(t *testing.T) {
}
}
func TestLibdiamDiamBareErr(t *testing.T) {
testMessage := &diam.Message{
Header: &diam.Header{
CommandFlags: 0,
},
}
resCode := uint32(200)
returnedMessage := diamBareErr(testMessage, resCode)
if returnedMessage == nil {
t.Error("Expected a non-nil message, got nil")
return
}
if returnedMessage.Header.CommandFlags != diam.ErrorFlag {
t.Errorf("Expected CommandFlags to be %d, got %d", diam.ErrorFlag, returnedMessage.Header.CommandFlags)
}
}
func TestLibdiamDiamErr(t *testing.T) {
tMessage := &diam.Message{
Header: &diam.Header{
CommandFlags: 0,
},
}
resCode := uint32(200)
reqVars := &utils.DataNode{}
tpl := []*config.FCTemplate{}
tnt := "cgrates.org"
tmz := "UTC"
filterS := &engine.FilterS{}
returnedMessage, err := diamErr(tMessage, resCode, reqVars, tpl, tnt, tmz, filterS)
if err != nil {
t.Errorf("Expected no error, but got: %v", err)
}
if returnedMessage == nil {
t.Error("Expected a non-nil diam.Message, got nil")
}
}
func TestLibDiamAVPAsString(t *testing.T) {
testCases := []struct {
name string

View File

@@ -620,10 +620,10 @@ func main() {
services.NewDNSAgent(cfg, filterSChan, shdChan, connManager, srvDep),
services.NewFreeswitchAgent(cfg, shdChan, connManager, srvDep),
services.NewKamailioAgent(cfg, shdChan, connManager, srvDep),
services.NewAsteriskAgent(cfg, shdChan, connManager, srvDep), // partial reload
services.NewRadiusAgent(cfg, filterSChan, shdChan, connManager, srvDep), // partial reload
services.NewDiameterAgent(cfg, filterSChan, shdChan, connManager, srvDep), // partial reload
services.NewHTTPAgent(cfg, filterSChan, server, connManager, srvDep), // no reload
services.NewAsteriskAgent(cfg, shdChan, connManager, srvDep), // partial reload
services.NewRadiusAgent(cfg, filterSChan, shdChan, connManager, srvDep), // partial reload
services.NewDiameterAgent(cfg, filterSChan, shdChan, connManager, caps, srvDep), // partial reload
services.NewHTTPAgent(cfg, filterSChan, server, connManager, srvDep), // no reload
ldrs, anz, dspS, dspH, dmService, storDBService,
services.NewEventExporterService(cfg, filterSChan,
connManager, server, internalEEsChan, anz, srvDep),

View File

@@ -729,7 +729,6 @@ const CGRATES_CFG_JSON = `
"origin_realm": "cgrates.org", // diameter Origin-Realm AVP used in replies
"vendor_id": 0, // diameter Vendor-Id AVP used in replies
"product_name": "CGRateS", // diameter Product-Name AVP used in replies
"concurrent_requests": -1, // limit the number of active requests processed by the server <-1|0-n>
"synced_conn_requests": false, // process one request at the time per connection
"asr_template": "", // enable AbortSession message being sent to client on DisconnectSession
"rar_template": "", // template used to build the Re-Auth-Request

View File

@@ -1020,7 +1020,6 @@ func TestDiameterAgentJsonCfg(t *testing.T) {
Origin_realm: utils.StringPointer("cgrates.org"),
Vendor_id: utils.IntPointer(0),
Product_name: utils.StringPointer("CGRateS"),
Concurrent_requests: utils.IntPointer(-1),
Synced_conn_requests: utils.BoolPointer(false),
Asr_template: utils.StringPointer(""),
Rar_template: utils.StringPointer(""),

File diff suppressed because one or more lines are too long

View File

@@ -34,7 +34,6 @@ type DiameterAgentCfg struct {
OriginRealm string
VendorID int
ProductName string
ConcurrentReqs int // limit the maximum number of requests processed
SyncedConnReqs bool
ASRTemplate string
RARTemplate string
@@ -81,9 +80,6 @@ func (da *DiameterAgentCfg) loadFromJSONCfg(jsnCfg *DiameterAgentJsonCfg, separa
if jsnCfg.Product_name != nil {
da.ProductName = *jsnCfg.Product_name
}
if jsnCfg.Concurrent_requests != nil {
da.ConcurrentReqs = *jsnCfg.Concurrent_requests
}
if jsnCfg.Synced_conn_requests != nil {
da.SyncedConnReqs = *jsnCfg.Synced_conn_requests
}
@@ -121,19 +117,18 @@ func (da *DiameterAgentCfg) loadFromJSONCfg(jsnCfg *DiameterAgentJsonCfg, separa
// AsMapInterface returns the config as a map[string]any
func (da *DiameterAgentCfg) AsMapInterface(separator string) (initialMP map[string]any) {
initialMP = map[string]any{
utils.EnabledCfg: da.Enabled,
utils.ListenNetCfg: da.ListenNet,
utils.ListenCfg: da.Listen,
utils.DictionariesPathCfg: da.DictionariesPath,
utils.OriginHostCfg: da.OriginHost,
utils.OriginRealmCfg: da.OriginRealm,
utils.VendorIDCfg: da.VendorID,
utils.ProductNameCfg: da.ProductName,
utils.ConcurrentRequestsCfg: da.ConcurrentReqs,
utils.SyncedConnReqsCfg: da.SyncedConnReqs,
utils.ASRTemplateCfg: da.ASRTemplate,
utils.RARTemplateCfg: da.RARTemplate,
utils.ForcedDisconnectCfg: da.ForcedDisconnect,
utils.EnabledCfg: da.Enabled,
utils.ListenNetCfg: da.ListenNet,
utils.ListenCfg: da.Listen,
utils.DictionariesPathCfg: da.DictionariesPath,
utils.OriginHostCfg: da.OriginHost,
utils.OriginRealmCfg: da.OriginRealm,
utils.VendorIDCfg: da.VendorID,
utils.ProductNameCfg: da.ProductName,
utils.SyncedConnReqsCfg: da.SyncedConnReqs,
utils.ASRTemplateCfg: da.ASRTemplate,
utils.RARTemplateCfg: da.RARTemplate,
utils.ForcedDisconnectCfg: da.ForcedDisconnect,
}
requestProcessors := make([]map[string]any, len(da.RequestProcessors))
@@ -168,7 +163,6 @@ func (da DiameterAgentCfg) Clone() (cln *DiameterAgentCfg) {
OriginRealm: da.OriginRealm,
VendorID: da.VendorID,
ProductName: da.ProductName,
ConcurrentReqs: da.ConcurrentReqs,
SyncedConnReqs: da.SyncedConnReqs,
ASRTemplate: da.ASRTemplate,
RARTemplate: da.RARTemplate,

View File

@@ -36,7 +36,6 @@ func TestDiameterAgentCfgloadFromJsonCfg(t *testing.T) {
Origin_realm: utils.StringPointer("cgrates.org"),
Vendor_id: utils.IntPointer(0),
Product_name: utils.StringPointer("randomName"),
Concurrent_requests: utils.IntPointer(10),
Synced_conn_requests: utils.BoolPointer(true),
Asr_template: utils.StringPointer("randomTemplate"),
Rar_template: utils.StringPointer("randomTemplate"),
@@ -58,7 +57,6 @@ func TestDiameterAgentCfgloadFromJsonCfg(t *testing.T) {
OriginRealm: "cgrates.org",
VendorID: 0,
ProductName: "randomName",
ConcurrentReqs: 10,
SyncedConnReqs: true,
ASRTemplate: "randomTemplate",
RARTemplate: "randomTemplate",
@@ -146,20 +144,19 @@ func TestDiameterAgentCfgAsMapInterface(t *testing.T) {
},
}`
eMap := map[string]any{
utils.ASRTemplateCfg: "",
utils.ConcurrentRequestsCfg: -1,
utils.DictionariesPathCfg: "/usr/share/cgrates/diameter/dict/",
utils.EnabledCfg: false,
utils.ForcedDisconnectCfg: "*none",
utils.ListenCfg: "127.0.0.1:3868",
utils.ListenNetCfg: "tcp",
utils.OriginHostCfg: "CGR-DA",
utils.OriginRealmCfg: "cgrates.org",
utils.ProductNameCfg: "CGRateS",
utils.RARTemplateCfg: "",
utils.SessionSConnsCfg: []string{rpcclient.BiRPCInternal, utils.MetaInternal, "*conn1"},
utils.SyncedConnReqsCfg: true,
utils.VendorIDCfg: 0,
utils.ASRTemplateCfg: "",
utils.DictionariesPathCfg: "/usr/share/cgrates/diameter/dict/",
utils.EnabledCfg: false,
utils.ForcedDisconnectCfg: "*none",
utils.ListenCfg: "127.0.0.1:3868",
utils.ListenNetCfg: "tcp",
utils.OriginHostCfg: "CGR-DA",
utils.OriginRealmCfg: "cgrates.org",
utils.ProductNameCfg: "CGRateS",
utils.RARTemplateCfg: "",
utils.SessionSConnsCfg: []string{rpcclient.BiRPCInternal, utils.MetaInternal, "*conn1"},
utils.SyncedConnReqsCfg: true,
utils.VendorIDCfg: 0,
utils.RequestProcessorsCfg: []map[string]any{
{
utils.IDCfg: utils.CGRateSLwr,
@@ -207,21 +204,20 @@ func TestDiameterAgentCfgAsMapInterface1(t *testing.T) {
},
}`
eMap := map[string]any{
utils.ASRTemplateCfg: "",
utils.ConcurrentRequestsCfg: -1,
utils.DictionariesPathCfg: "/usr/share/cgrates/diameter",
utils.EnabledCfg: true,
utils.ForcedDisconnectCfg: "*none",
utils.ListenCfg: "127.0.0.1:3868",
utils.ListenNetCfg: "tcp",
utils.OriginHostCfg: "CGR-DA",
utils.OriginRealmCfg: "cgrates.org",
utils.ProductNameCfg: "CGRateS",
utils.RARTemplateCfg: "",
utils.SessionSConnsCfg: []string{rpcclient.BiRPCInternal},
utils.SyncedConnReqsCfg: false,
utils.VendorIDCfg: 0,
utils.RequestProcessorsCfg: []map[string]any{},
utils.ASRTemplateCfg: "",
utils.DictionariesPathCfg: "/usr/share/cgrates/diameter",
utils.EnabledCfg: true,
utils.ForcedDisconnectCfg: "*none",
utils.ListenCfg: "127.0.0.1:3868",
utils.ListenNetCfg: "tcp",
utils.OriginHostCfg: "CGR-DA",
utils.OriginRealmCfg: "cgrates.org",
utils.ProductNameCfg: "CGRateS",
utils.RARTemplateCfg: "",
utils.SessionSConnsCfg: []string{rpcclient.BiRPCInternal},
utils.SyncedConnReqsCfg: false,
utils.VendorIDCfg: 0,
utils.RequestProcessorsCfg: []map[string]any{},
}
if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil {
t.Error(err)
@@ -241,7 +237,6 @@ func TestDiameterAgentCfgClone(t *testing.T) {
OriginRealm: "cgrates.org",
VendorID: 0,
ProductName: "randomName",
ConcurrentReqs: 10,
SyncedConnReqs: true,
ASRTemplate: "randomTemplate",
RARTemplate: "randomTemplate",

View File

@@ -535,7 +535,6 @@ type DiameterAgentJsonCfg struct {
Origin_realm *string
Vendor_id *int
Product_name *string
Concurrent_requests *int
Synced_conn_requests *bool
Asr_template *string
Rar_template *string

View File

@@ -1,77 +0,0 @@
{
// CGRateS Configuration file
//
// Used for cgradmin
// Starts rater, scheduler
"general": {
"log_level": 7,
},
"listen": {
"rpc_json": ":2012", // RPC JSON listening address
"rpc_gob": ":2013", // RPC GOB listening address
"http": ":2080", // HTTP listening address
},
"data_db": {
"db_type": "*internal",
},
"stor_db": {
"db_type": "*internal",
},
"rals": {
"enabled": true,
},
"schedulers": {
"enabled": true,
},
"cdrs": {
"enabled": true,
},
"attributes": {
"enabled": true,
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],
},
"sessions": {
"enabled": true,
"attributes_conns": ["*localhost"],
"chargers_conns": ["*localhost"],
"rals_conns": ["*localhost"],
"cdrs_conns": ["*localhost"],
},
"diameter_agent": {
"enabled": true,
"concurrent_requests": 0,
"sessions_conns": ["*localhost"],
"request_processors": [
{
"id": "maxconn",
"filters": ["*string:~*vars.*cmd:CCR", "*string:~*req.Service-Context-Id:TestDiamItDryRun"],
"flags": ["*dryrun","*continue"],
"request_fields":[],
"reply_fields":[],
},
],
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
}

View File

@@ -1,81 +0,0 @@
{
// CGRateS Configuration file
//
// Used for cgradmin
// Starts rater, scheduler
"general": {
"log_level": 7,
},
"listen": {
"rpc_json": ":2012", // RPC JSON listening address
"rpc_gob": ":2013", // RPC GOB listening address
"http": ":2080", // HTTP listening address
},
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "mongo", // stor database type to use: <mysql|postgres>
"db_port": 27017, // the port to reach the stordb
"db_name": "datadb",
"db_password": "",
},
"stor_db": {
"db_type": "mongo", // stor database type to use: <mysql|postgres>
"db_port": 27017, // the port to reach the stordb
"db_name": "stordb",
"db_password": "",
},
"rals": {
"enabled": true,
},
"schedulers": {
"enabled": true,
},
"cdrs": {
"enabled": true,
},
"attributes": {
"enabled": true,
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],
},
"sessions": {
"enabled": true,
"attributes_conns": ["*localhost"],
"chargers_conns": ["*localhost"],
"rals_conns": ["*localhost"],
"cdrs_conns": ["*localhost"],
},
"diameter_agent": {
"enabled": true,
"concurrent_requests": 0,
"sessions_conns": ["*localhost"],
"request_processors": [
{
"id": "maxconn",
"filters": ["*string:~*vars.*cmd:CCR", "*string:~*req.Service-Context-Id:TestDiamItDryRun"],
"flags": ["*dryrun","*continue"],
"request_fields":[],
"reply_fields":[],
},
],
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
}

View File

@@ -1,77 +0,0 @@
{
// CGRateS Configuration file
//
// Used for cgradmin
// Starts rater, scheduler
"general": {
"log_level": 7,
},
"listen": {
"rpc_json": ":2012", // RPC JSON listening address
"rpc_gob": ":2013", // RPC GOB listening address
"http": ":2080", // HTTP listening address
},
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "redis", // data_db type: <redis|mongo>
"db_port": 6379, // data_db port to reach the database
"db_name": "10", // data_db database name to connect to
},
"stor_db": {
"db_password": "CGRateS.org",
},
"rals": {
"enabled": true,
},
"schedulers": {
"enabled": true,
},
"cdrs": {
"enabled": true,
},
"attributes": {
"enabled": true,
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],
},
"sessions": {
"enabled": true,
"attributes_conns": ["*localhost"],
"chargers_conns": ["*localhost"],
"rals_conns": ["*localhost"],
"cdrs_conns": ["*localhost"],
},
"diameter_agent": {
"enabled": true,
"concurrent_requests": 0,
"sessions_conns": ["*localhost"],
"request_processors": [
{
"id": "maxconn",
"filters": ["*string:~*vars.*cmd:CCR", "*string:~*req.Service-Context-Id:TestDiamItDryRun"],
"flags": ["*dryrun","*continue"],
"request_fields":[],
"reply_fields":[],
},
],
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
}

View File

@@ -65,7 +65,6 @@
"origin_realm": "diameter.test",
"vendor_id": 0,
"product_name": "CGRateS",
"concurrent_requests": -1,
"synced_conn_requests": false,
"asr_template": "*asr",
"rar_template": "*rar",

View File

@@ -36,7 +36,6 @@ With explanations in the comments:
"origin_realm": "cgrates.org", // diameter Origin-Realm AVP used in replies
"vendor_id": 0, // diameter Vendor-Id AVP used in replies
"product_name": "CGRateS", // diameter Product-Name AVP used in replies
"concurrent_requests": -1, // limit the number of active requests processed by the server <-1|0-n>
"synced_conn_requests": false, // process one request at the time per connection
"asr_template": "*asr", // enable AbortSession message being sent to client
"request_processors": [ // decision logic for message processing
@@ -171,9 +170,6 @@ Most of the parameters are explained in :ref:`JSON configuration <configuration>
listen_net
The network the *DiameterAgent* will bind to. CGRateS supports both **tcp** and **sctp** specified in Diameter_ standard.
concurrent_requests
The maximum number of active requests processed at one time by the *DiameterAgent*. When this number is reached, new inbound requests will be rejected with *DiameterError* code until the concurrent number drops bellow again. The default value of *-1* imposes no limits.
asr_template
The template (out of templates config section) used to build the AbortSession message. If not specified the ASR message is never sent out.

View File

@@ -183,3 +183,13 @@ func TestCapsStatsGetAverageOnEvict(t *testing.T) {
}
Cache = tmp
}
func BenchmarkCaps(b *testing.B) {
caps := NewCaps(10, utils.MetaBusy)
for i := 0; i < b.N; i++ {
if caps.IsLimited() {
caps.Allocate()
caps.Deallocate()
}
}
}

View File

@@ -723,7 +723,6 @@ func testSectConfigSReloadDiameterAgent(t *testing.T) {
"origin_realm": "cgrates.org",
"vendor_id": 1,
"product_name": "CGRateS",
"concurrent_requests": -1,
"synced_conn_requests": false,
"asr_template": "asr_template",
"rar_template": "rar_template",
@@ -749,7 +748,7 @@ func testSectConfigSReloadDiameterAgent(t *testing.T) {
} else if reply != utils.OK {
t.Errorf("Expected OK received: %+v", reply)
}
cfgStr := "{\"diameter_agent\":{\"asr_template\":\"asr_template\",\"concurrent_requests\":-1,\"dictionaries_path\":\"/usr/share/cgrates/diameter/dict/\",\"enabled\":true,\"forced_disconnect\":\"*none\",\"listen\":\"127.0.0.1:3868\",\"listen_net\":\"tcp\",\"origin_host\":\"CGR-DA\",\"origin_realm\":\"cgrates.org\",\"product_name\":\"CGRateS\",\"rar_template\":\"rar_template\",\"request_processors\":[{\"filters\":[],\"flags\":[\"1\"],\"id\":\"cgrates\",\"reply_fields\":[{\"path\":\"randomPath\",\"tag\":\"randomPath\"}],\"request_fields\":[{\"path\":\"randomPath\",\"tag\":\"randomPath\"}],\"tenant\":\"1\",\"timezone\":\"\"}],\"sessions_conns\":[\"*birpc_internal\"],\"synced_conn_requests\":false,\"vendor_id\":1}}"
cfgStr := `{"diameter_agent":{"asr_template":"asr_template","dictionaries_path":"/usr/share/cgrates/diameter/dict/","enabled":true,"forced_disconnect":"*none","listen":"127.0.0.1:3868","listen_net":"tcp","origin_host":"CGR-DA","origin_realm":"cgrates.org","product_name":"CGRateS","rar_template":"rar_template","request_processors":[{"filters":[],"flags":["1"],"id":"cgrates","reply_fields":[{"path":"randomPath","tag":"randomPath"}],"request_fields":[{"path":"randomPath","tag":"randomPath"}],"tenant":"1","timezone":""}],"sessions_conns":["*birpc_internal"],"synced_conn_requests":false,"vendor_id":1}}`
var rpl string
if err := testSectRPC.Call(context.Background(), utils.ConfigSv1GetConfigAsJSON, &config.SectionWithAPIOpts{
Tenant: "cgrates.org",

View File

@@ -31,13 +31,14 @@ import (
// NewDiameterAgent returns the Diameter Agent
func NewDiameterAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager, caps *engine.Caps,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &DiameterAgent{
cfg: cfg,
filterSChan: filterSChan,
shdChan: shdChan,
connMgr: connMgr,
caps: caps,
srvDep: srvDep,
}
}
@@ -52,6 +53,7 @@ type DiameterAgent struct {
da *agents.DiameterAgent
connMgr *engine.ConnManager
caps *engine.Caps
lnet string
laddr string
@@ -70,12 +72,12 @@ func (da *DiameterAgent) Start() (err error) {
da.Lock()
defer da.Unlock()
return da.start(filterS)
return da.start(filterS, da.caps)
}
func (da *DiameterAgent) start(filterS *engine.FilterS) error {
func (da *DiameterAgent) start(filterS *engine.FilterS, caps *engine.Caps) error {
var err error
da.da, err = agents.NewDiameterAgent(da.cfg, filterS, da.connMgr)
da.da, err = agents.NewDiameterAgent(da.cfg, filterS, da.connMgr, caps)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed to initialize agent, error: %s",
utils.DiameterAgent, err))
@@ -106,7 +108,7 @@ func (da *DiameterAgent) Reload() (err error) {
close(da.stopChan)
filterS := <-da.filterSChan
da.filterSChan <- filterS
return da.start(filterS)
return da.start(filterS, da.caps)
}
// Shutdown stops the service

View File

@@ -59,7 +59,7 @@ func TestDiameterAgentReload1(t *testing.T) {
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
shdChan, nil, anz, srvDep)
diamSrv := NewDiameterAgent(cfg, filterSChan, shdChan, nil, srvDep)
diamSrv := NewDiameterAgent(cfg, filterSChan, shdChan, nil, nil, srvDep)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(diamSrv, sS,
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
@@ -124,7 +124,7 @@ func TestDiameterAgentReload2(t *testing.T) {
}
cacheSChan <- cacheSrv
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewDiameterAgent(cfg, filterSChan, shdChan, nil, srvDep)
srv := NewDiameterAgent(cfg, filterSChan, shdChan, nil, nil, srvDep)
if srv.IsRunning() {
t.Errorf("Expected service to be down")
}
@@ -145,6 +145,7 @@ func TestDiameterAgentReload3(t *testing.T) {
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
shdChan := utils.NewSyncedChan()
caps := engine.NewCaps(cfg.CoreSCfg().Caps, cfg.CoreSCfg().CapsStrategy)
chS := engine.NewCacheS(cfg, nil, nil)
cacheSChan := make(chan birpc.ClientConnector, 1)
cacheSrv, err := engine.NewService(chS)
@@ -153,13 +154,13 @@ func TestDiameterAgentReload3(t *testing.T) {
}
cacheSChan <- cacheSrv
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewDiameterAgent(cfg, filterSChan, shdChan, nil, srvDep)
srv := NewDiameterAgent(cfg, filterSChan, shdChan, nil, caps, srvDep)
cfg.DiameterAgentCfg().ListenNet = "bad"
cfg.DiameterAgentCfg().DictionariesPath = ""
err = srv.(*DiameterAgent).start(nil)
if err != nil {
da := srv.(*DiameterAgent)
if err = da.start(nil, da.caps); err != nil {
t.Fatal(err)
}
cfg.DiameterAgentCfg().Enabled = false

View File

@@ -44,7 +44,7 @@ func TestNewDiameterAgent(t *testing.T) {
connMgr := &engine.ConnManager{}
srvDep := make(map[string]*sync.WaitGroup)
service := NewDiameterAgent(cfg, filterSChan, shdChan, connMgr, srvDep)
service := NewDiameterAgent(cfg, filterSChan, shdChan, connMgr, nil, srvDep)
da, ok := service.(*DiameterAgent)
if !ok {

View File

@@ -2372,22 +2372,21 @@ const (
AsteriskConnsCfg = "asterisk_conns"
// DiameterAgentCfg
ListenNetCfg = "listen_net"
NetworkCfg = "network"
ListenersCfg = "listeners"
ConcurrentRequestsCfg = "concurrent_requests"
ListenCfg = "listen"
DictionariesPathCfg = "dictionaries_path"
OriginHostCfg = "origin_host"
OriginRealmCfg = "origin_realm"
VendorIDCfg = "vendor_id"
ProductNameCfg = "product_name"
SyncedConnReqsCfg = "synced_conn_requests"
ASRTemplateCfg = "asr_template"
RARTemplateCfg = "rar_template"
ForcedDisconnectCfg = "forced_disconnect"
TemplatesCfg = "templates"
RequestProcessorsCfg = "request_processors"
ListenNetCfg = "listen_net"
NetworkCfg = "network"
ListenersCfg = "listeners"
ListenCfg = "listen"
DictionariesPathCfg = "dictionaries_path"
OriginHostCfg = "origin_host"
OriginRealmCfg = "origin_realm"
VendorIDCfg = "vendor_id"
ProductNameCfg = "product_name"
SyncedConnReqsCfg = "synced_conn_requests"
ASRTemplateCfg = "asr_template"
RARTemplateCfg = "rar_template"
ForcedDisconnectCfg = "forced_disconnect"
TemplatesCfg = "templates"
RequestProcessorsCfg = "request_processors"
JanusConnsCfg = "janus_conns"
// RequestProcessor
@@ -2427,12 +2426,13 @@ const (
PrecacheCfg = "precache"
// CdreCfg
ExportPathCfg = "export_path"
AttributeSContextCfg = "attributes_context"
SynchronousCfg = "synchronous"
AttemptsCfg = "attempts"
AttributeContextCfg = "attribute_context"
AttributeIDsCfg = "attribute_ids"
ExportPathCfg = "export_path"
AttributeSContextCfg = "attributes_context"
SynchronousCfg = "synchronous"
AttemptsCfg = "attempts"
AttributeContextCfg = "attribute_context"
AttributeIDsCfg = "attribute_ids"
ConcurrentRequestsCfg = "concurrent_requests"
//LoaderSCfg
DryRunCfg = "dry_run"