Added DPR support for DiameterAgent. Closes #1255

This commit is contained in:
Trial97
2020-03-26 13:07:27 +02:00
committed by Dan Christian Bogos
parent c17503a815
commit c63a534456
12 changed files with 236 additions and 11 deletions

View File

@@ -369,3 +369,8 @@ func (sma *AsteriskAgent) V1GetActiveSessionIDs(ignParam string,
func (*AsteriskAgent) V1ReAuthorize(originID string, reply *string) (err error) {
return utils.ErrNotImplemented
}
// V1DisconnectPeer is used to implement the sessions.BiRPClient interface
func (*AsteriskAgent) V1DisconnectPeer(args *utils.DPRArgs, reply *string) (err error) {
return utils.ErrNotImplemented
}

View File

@@ -66,6 +66,7 @@ var (
testDiamItCCRTerminate,
testDiamItCCRSMS,
testDiamItCCRMMS,
testDiamItDRR,
testDiamItKillEngine,
}
)
@@ -1203,3 +1204,57 @@ func testDiamItRAR(t *testing.T) {
t.Errorf("expecting: %s, received: <%s>", eVal, val)
}
}
func testDiamItDRR(t *testing.T) {
if diamConfigDIR == "dispatchers/diamagent" {
t.SkipNow()
}
// ============================================
// prevent nil pointer dereference
// ============================================
if diamClnt == nil {
t.Fatal("Diameter client should not be nil")
}
if diamClnt.conn == nil {
t.Fatal("Diameter connection should not be nil")
}
// ============================================
var wait sync.WaitGroup
wait.Add(1)
go func() {
var reply string
if err := apierRpc.Call(utils.SessionSv1DisconnectPeer, &utils.DPRArgs{
OriginHost: "INTEGRATION_TESTS",
OriginRealm: "cgrates.org",
DisconnectCause: 1, // BUSY
}, &reply); err != nil {
t.Error(err)
}
wait.Done()
}()
drr := diamClnt.ReceivedMessage(rplyTimeout)
if drr == nil {
t.Fatal("No message returned")
}
dra := drr.Answer(2001)
// dra.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("INTEGRATION_TESTS"))
// dra.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org"))
if err := diamClnt.SendMessage(dra); err != nil {
t.Error(err)
}
wait.Wait()
eVal := "1"
if avps, err := drr.FindAVPsWithPath([]interface{}{avp.DisconnectCause}, dict.UndefinedVendorID); err != nil {
t.Error(err)
} else if len(avps) == 0 {
t.Error("Missing AVP")
} else if val, err := diamAVPAsString(avps[0]); err != nil {
t.Error(err)
} else if val != eVal {
t.Errorf("expecting: %s, received: <%s>", eVal, val)
}
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package agents
import (
"errors"
"fmt"
"net"
"strings"
@@ -34,16 +35,25 @@ import (
"github.com/fiorix/go-diameter/diam/datatype"
"github.com/fiorix/go-diameter/diam/dict"
"github.com/fiorix/go-diameter/diam/sm"
"github.com/fiorix/go-diameter/diam/sm/smpeer"
)
const (
all = "ALL"
raa = "RAA"
dpa = "DPA"
)
func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS,
connMgr *engine.ConnManager) (*DiameterAgent, error) {
da := &DiameterAgent{cgrCfg: cgrCfg, filterS: filterS, connMgr: connMgr, raa: make(map[string]chan *diam.Message)}
da := &DiameterAgent{
cgrCfg: cgrCfg,
filterS: filterS,
connMgr: connMgr,
raa: make(map[string]chan *diam.Message),
dra: make(map[string]chan *diam.Message),
peers: make(map[string]diam.Conn),
}
dictsPath := cgrCfg.DiameterAgentCfg().DictionariesPath
if len(dictsPath) != 0 {
if err := loadDictionaries(dictsPath, utils.DiameterAgent); err != nil {
@@ -75,6 +85,11 @@ type DiameterAgent struct {
aReqsLck sync.RWMutex
raa map[string]chan *diam.Message
raaLck sync.RWMutex
peersLck sync.Mutex
peers map[string]diam.Conn // peer index by OriginHost;OriginRealm
dra map[string]chan *diam.Message
draLck sync.RWMutex
}
// ListenAndServe is called when DiameterAgent is started, usually from within cmd/cgr-engine
@@ -119,11 +134,13 @@ func (da *DiameterAgent) handlers() diam.Handler {
if da.cgrCfg.DiameterAgentCfg().SyncedConnReqs {
dSM.HandleFunc(all, da.handleMessage)
dSM.HandleFunc(raa, da.handleRAA)
dSM.HandleFunc(dpa, da.handleDRA)
} else {
dSM.HandleFunc(all, da.handleMessageAsync)
dSM.HandleFunc(raa, func(c diam.Conn, m *diam.Message) { go da.handleRAA(c, m) })
dSM.HandleFunc(dpa, func(c diam.Conn, m *diam.Message) { go da.handleDRA(c, m) })
}
go da.handleConns(dSM.HandshakeNotify())
go func() {
for err := range dSM.ErrorReports() {
utils.Logger.Err(fmt.Sprintf("<%s> sm error: %v", utils.DiameterAgent, err))
@@ -559,20 +576,19 @@ func (da *DiameterAgent) V1ReAuthorize(originID string, reply *string) (err erro
select {
case raa := <-raaCh:
var avps []*diam.AVP
if avps, err = raa.FindAVPsWithPath([]interface{}{"Result-Code"}, dict.UndefinedVendorID); err != nil {
if avps, err = raa.FindAVPsWithPath([]interface{}{avp.ResultCode}, dict.UndefinedVendorID); err != nil {
return
}
if len(avps) == 0 {
return fmt.Errorf("Missing AVP")
}
var resCode string
if resCode, err = diamAVPAsString(avps[0]); err != nil {
var data interface{}
if data, err = diamAVPAsIface(avps[0]); err != nil {
return
} else if data != uint32(diam.Success) {
return fmt.Errorf("Wrong result code: <%v>", data)
}
if resCode != "2001" {
return fmt.Errorf("Wrong result code: <%s>", resCode)
}
case <-time.After(10 * time.Second):
case <-time.After(time.Second):
return utils.ErrTimedOut
}
*reply = utils.OK
@@ -596,3 +612,94 @@ func (da *DiameterAgent) handleRAA(c diam.Conn, m *diam.Message) {
}
ch <- m
}
func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) {
for c := range peers {
meta, _ := smpeer.FromContext(c.Context())
key := string(meta.OriginHost + utils.CONCATENATED_KEY_SEP + meta.OriginRealm)
da.peersLck.Lock()
da.peers[key] = c // store in peers table
da.peersLck.Unlock()
go func(c diam.Conn, key string) {
// wait for disconnect notification
<-c.(diam.CloseNotifier).CloseNotify()
da.peersLck.Lock()
delete(da.peers, key) // remove from peers table
da.peersLck.Unlock()
}(c, key)
}
}
func (da *DiameterAgent) handleDRA(c diam.Conn, m *diam.Message) {
meta, _ := smpeer.FromContext(c.Context())
key := string(meta.OriginHost + utils.CONCATENATED_KEY_SEP + meta.OriginRealm)
da.raaLck.Lock()
ch, has := da.dra[key]
da.raaLck.Unlock()
if !has {
return
}
ch <- m
c.Close()
}
// V1DisconnectPeer sends a DPR meseage to diameter client
func (da *DiameterAgent) V1DisconnectPeer(args *utils.DPRArgs, reply *string) (err error) {
if args == nil {
utils.Logger.Info(
fmt.Sprintf("<%s> cannot send DPR, missing arrguments",
utils.DiameterAgent))
return utils.ErrMandatoryIeMissing
}
if args.DisconnectCause < 0 || args.DisconnectCause > 2 {
return errors.New("WRONG_DISCONNECT_CAUSE")
}
m := diam.NewRequest(diam.DisconnectPeer,
diam.CHARGING_CONTROL_APP_ID, dict.Default)
m.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity(args.OriginHost))
m.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity(args.OriginRealm))
m.NewAVP(avp.DisconnectCause, avp.Mbit, 0, datatype.Enumerated(args.DisconnectCause))
key := args.OriginHost + utils.CONCATENATED_KEY_SEP + args.OriginRealm
draCh := make(chan *diam.Message, 1)
da.draLck.Lock()
da.dra[key] = draCh
da.draLck.Unlock()
defer func() {
da.draLck.Lock()
delete(da.dra, key)
da.draLck.Unlock()
}()
da.peersLck.Lock()
conn, has := da.peers[key]
da.peersLck.Unlock()
if !has {
return utils.ErrNotFound
}
if err = writeOnConn(conn, m); err != nil {
return utils.ErrServerError
}
select {
case dra := <-draCh:
var avps []*diam.AVP
if avps, err = dra.FindAVPsWithPath([]interface{}{avp.ResultCode}, dict.UndefinedVendorID); err != nil {
return
}
if len(avps) == 0 {
return fmt.Errorf("Missing AVP")
}
var data interface{}
if data, err = diamAVPAsIface(avps[0]); err != nil {
return
} else if data != uint32(diam.Success) {
return fmt.Errorf("Wrong result code: <%v>", data)
}
case <-time.After(10 * time.Second):
return utils.ErrTimedOut
}
*reply = utils.OK
return
}

View File

@@ -444,3 +444,8 @@ func (sm *FSsessions) Reload() {
func (*FSsessions) V1ReAuthorize(originID string, reply *string) (err error) {
return utils.ErrNotImplemented
}
// V1DisconnectPeer is used to implement the sessions.BiRPClient interface
func (*FSsessions) V1DisconnectPeer(args *utils.DPRArgs, reply *string) (err error) {
return utils.ErrNotImplemented
}

View File

@@ -428,3 +428,8 @@ func (ka *KamailioAgent) Reload() {
func (*KamailioAgent) V1ReAuthorize(originID string, reply *string) (err error) {
return utils.ErrNotImplemented
}
// V1DisconnectPeer is used to implement the sessions.BiRPClient interface
func (*KamailioAgent) V1DisconnectPeer(args *utils.DPRArgs, reply *string) (err error) {
return utils.ErrNotImplemented
}

View File

@@ -146,3 +146,8 @@ func (ssv1 *SessionSv1) Call(serviceMethod string,
func (ssv1 *SessionSv1) ReAuthorize(args *utils.SessionFilter, reply *string) error {
return ssv1.Ss.BiRPCv1ReAuthorize(nil, args, reply)
}
// DisconnectPeer sends the DPR for the OriginHost and OriginRealm
func (ssv1 *SessionSv1) DisconnectPeer(args *utils.DPRArgs, reply *string) error {
return ssv1.Ss.BiRPCv1DisconnectPeer(nil, args, reply)
}

View File

@@ -53,7 +53,8 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} {
utils.SessionSv1ActivateSessions: ssv1.BiRPCv1ActivateSessions,
utils.SessionSv1DeactivateSessions: ssv1.BiRPCv1DeactivateSessions,
utils.SessionSv1ReAuthorize: ssv1.BiRPCV1ReAuthorize,
utils.SessionSv1ReAuthorize: ssv1.BiRPCV1ReAuthorize,
utils.SessionSv1DisconnectPeer: ssv1.BiRPCV1DisconnectPeer,
}
}
@@ -170,3 +171,9 @@ func (ssv1 *SessionSv1) BiRPCV1ReAuthorize(clnt *rpc2.Client,
args *utils.SessionFilter, reply *string) error {
return ssv1.Ss.BiRPCv1ReAuthorize(clnt, args, reply)
}
// BiRPCV1DisconnectPeer sends the DPR for the OriginHost and OriginRealm
func (ssv1 *SessionSv1) BiRPCV1DisconnectPeer(clnt *rpc2.Client,
args *utils.DPRArgs, reply *string) error {
return ssv1.Ss.BiRPCv1DisconnectPeer(clnt, args, reply)
}

View File

@@ -43,6 +43,7 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium
* [AgentS] Uniformize flags (*auth -> *authorize)
* [SessionS] Move *cost as subflag in *rals for
SessionSv1.ProcessEvent
* [DiameterAgent] Added DPR support
-- Alexandru Tripon <alexandru.tripon@itsyscom.com> Wed, 19 Feb 2020 13:25:52 +0200

View File

@@ -45,6 +45,7 @@ type BiRPClient interface {
V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) (err error)
V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*SessionID) (err error)
V1ReAuthorize(originID string, reply *string) (err error)
V1DisconnectPeer(args *utils.DPRArgs, reply *string) (err error)
}
// getSessionTTL retrieves SessionTTL setting out of ev

View File

@@ -3629,7 +3629,7 @@ func (sS *SessionS) sendRar(s *Session) (err error) {
return
}
// BiRPCv1ReAuthorize sends a RAR for sessions matching sessions
// BiRPCv1ReAuthorize sends a RAR for the matching sessions
func (sS *SessionS) BiRPCv1ReAuthorize(clnt rpcclient.ClientConnector,
args *utils.SessionFilter, reply *string) (err error) {
if args == nil { //protection in case on nil
@@ -3663,3 +3663,29 @@ func (sS *SessionS) BiRPCv1ReAuthorize(clnt rpcclient.ClientConnector,
*reply = utils.OK
return
}
// BiRPCv1DisconnectPeer sends a DPR for the given OriginHost and OriginRealm
func (sS *SessionS) BiRPCv1DisconnectPeer(clnt rpcclient.ClientConnector,
args *utils.DPRArgs, reply *string) (err error) {
hasErrors := false
clients := make(map[string]*biJClient)
sS.biJMux.RLock()
for ID, clnt := range sS.biJIDs {
clients[ID] = clnt
}
sS.biJMux.RUnlock()
for ID, clnt := range clients {
if err = clnt.conn.Call(utils.SessionSv1DisconnectPeer, args, reply); err != nil && err != utils.ErrNotImplemented {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> failed sending DPR for connection with id: <%s>, err: <%s>",
utils.SessionS, ID, err))
hasErrors = true
}
}
if hasErrors {
return utils.ErrPartiallyExecuted
}
*reply = utils.OK
return nil
}

View File

@@ -1422,3 +1422,10 @@ type ArgExportToFolder struct {
Path string
Items []string
}
// DPRArgs are the arguments used by dispatcher to send a Disconnect-Peer-Request
type DPRArgs struct {
OriginHost string
OriginRealm string
DisconnectCause int
}

View File

@@ -1364,6 +1364,7 @@ const (
SessionSv1DeactivateSessions = "SessionSv1.DeactivateSessions"
SMGenericV1InitiateSession = "SMGenericV1.InitiateSession"
SessionSv1ReAuthorize = "SessionSv1.ReAuthorize"
SessionSv1DisconnectPeer = "SessionSv1.DisconnectPeer"
)
// Responder APIs