diff --git a/agents/astagent.go b/agents/astagent.go index 460af0de6..b5b3cd498 100644 --- a/agents/astagent.go +++ b/agents/astagent.go @@ -369,3 +369,8 @@ func (sma *AsteriskAgent) V1GetActiveSessionIDs(ignParam string, func (*AsteriskAgent) V1ReAuthorize(originID string, reply *string) (err error) { return utils.ErrNotImplemented } + +// V1DisconnectPeer is used to implement the sessions.BiRPClient interface +func (*AsteriskAgent) V1DisconnectPeer(args *utils.DPRArgs, reply *string) (err error) { + return utils.ErrNotImplemented +} diff --git a/agents/diam_it_test.go b/agents/diam_it_test.go index c0a166459..4967e0dcd 100644 --- a/agents/diam_it_test.go +++ b/agents/diam_it_test.go @@ -66,6 +66,7 @@ var ( testDiamItCCRTerminate, testDiamItCCRSMS, testDiamItCCRMMS, + testDiamItDRR, testDiamItKillEngine, } ) @@ -1203,3 +1204,57 @@ func testDiamItRAR(t *testing.T) { t.Errorf("expecting: %s, received: <%s>", eVal, val) } } + +func testDiamItDRR(t *testing.T) { + if diamConfigDIR == "dispatchers/diamagent" { + t.SkipNow() + } + // ============================================ + // prevent nil pointer dereference + // ============================================ + if diamClnt == nil { + t.Fatal("Diameter client should not be nil") + } + if diamClnt.conn == nil { + t.Fatal("Diameter connection should not be nil") + } + // ============================================ + var wait sync.WaitGroup + wait.Add(1) + go func() { + var reply string + if err := apierRpc.Call(utils.SessionSv1DisconnectPeer, &utils.DPRArgs{ + OriginHost: "INTEGRATION_TESTS", + OriginRealm: "cgrates.org", + DisconnectCause: 1, // BUSY + }, &reply); err != nil { + t.Error(err) + } + wait.Done() + }() + drr := diamClnt.ReceivedMessage(rplyTimeout) + if drr == nil { + t.Fatal("No message returned") + } + + dra := drr.Answer(2001) + // dra.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("INTEGRATION_TESTS")) + // dra.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org")) + + if err := diamClnt.SendMessage(dra); err != nil { + t.Error(err) + } + + wait.Wait() + + eVal := "1" + if avps, err := drr.FindAVPsWithPath([]interface{}{avp.DisconnectCause}, dict.UndefinedVendorID); err != nil { + t.Error(err) + } else if len(avps) == 0 { + t.Error("Missing AVP") + } else if val, err := diamAVPAsString(avps[0]); err != nil { + t.Error(err) + } else if val != eVal { + t.Errorf("expecting: %s, received: <%s>", eVal, val) + } +} diff --git a/agents/diamagent.go b/agents/diamagent.go index 3b369384e..b7f1b165b 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -19,6 +19,7 @@ along with this program. If not, see package agents import ( + "errors" "fmt" "net" "strings" @@ -34,16 +35,25 @@ import ( "github.com/fiorix/go-diameter/diam/datatype" "github.com/fiorix/go-diameter/diam/dict" "github.com/fiorix/go-diameter/diam/sm" + "github.com/fiorix/go-diameter/diam/sm/smpeer" ) const ( all = "ALL" raa = "RAA" + dpa = "DPA" ) func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engine.ConnManager) (*DiameterAgent, error) { - da := &DiameterAgent{cgrCfg: cgrCfg, filterS: filterS, connMgr: connMgr, raa: make(map[string]chan *diam.Message)} + da := &DiameterAgent{ + cgrCfg: cgrCfg, + filterS: filterS, + connMgr: connMgr, + raa: make(map[string]chan *diam.Message), + dra: make(map[string]chan *diam.Message), + peers: make(map[string]diam.Conn), + } dictsPath := cgrCfg.DiameterAgentCfg().DictionariesPath if len(dictsPath) != 0 { if err := loadDictionaries(dictsPath, utils.DiameterAgent); err != nil { @@ -75,6 +85,11 @@ type DiameterAgent struct { aReqsLck sync.RWMutex raa map[string]chan *diam.Message raaLck sync.RWMutex + + peersLck sync.Mutex + peers map[string]diam.Conn // peer index by OriginHost;OriginRealm + dra map[string]chan *diam.Message + draLck sync.RWMutex } // ListenAndServe is called when DiameterAgent is started, usually from within cmd/cgr-engine @@ -119,11 +134,13 @@ func (da *DiameterAgent) handlers() diam.Handler { if da.cgrCfg.DiameterAgentCfg().SyncedConnReqs { dSM.HandleFunc(all, da.handleMessage) dSM.HandleFunc(raa, da.handleRAA) + dSM.HandleFunc(dpa, da.handleDRA) } else { dSM.HandleFunc(all, da.handleMessageAsync) dSM.HandleFunc(raa, func(c diam.Conn, m *diam.Message) { go da.handleRAA(c, m) }) + dSM.HandleFunc(dpa, func(c diam.Conn, m *diam.Message) { go da.handleDRA(c, m) }) } - + go da.handleConns(dSM.HandshakeNotify()) go func() { for err := range dSM.ErrorReports() { utils.Logger.Err(fmt.Sprintf("<%s> sm error: %v", utils.DiameterAgent, err)) @@ -559,20 +576,19 @@ func (da *DiameterAgent) V1ReAuthorize(originID string, reply *string) (err erro select { case raa := <-raaCh: var avps []*diam.AVP - if avps, err = raa.FindAVPsWithPath([]interface{}{"Result-Code"}, dict.UndefinedVendorID); err != nil { + if avps, err = raa.FindAVPsWithPath([]interface{}{avp.ResultCode}, dict.UndefinedVendorID); err != nil { return } if len(avps) == 0 { return fmt.Errorf("Missing AVP") } - var resCode string - if resCode, err = diamAVPAsString(avps[0]); err != nil { + var data interface{} + if data, err = diamAVPAsIface(avps[0]); err != nil { return + } else if data != uint32(diam.Success) { + return fmt.Errorf("Wrong result code: <%v>", data) } - if resCode != "2001" { - return fmt.Errorf("Wrong result code: <%s>", resCode) - } - case <-time.After(10 * time.Second): + case <-time.After(time.Second): return utils.ErrTimedOut } *reply = utils.OK @@ -596,3 +612,94 @@ func (da *DiameterAgent) handleRAA(c diam.Conn, m *diam.Message) { } ch <- m } + +func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) { + for c := range peers { + meta, _ := smpeer.FromContext(c.Context()) + key := string(meta.OriginHost + utils.CONCATENATED_KEY_SEP + meta.OriginRealm) + da.peersLck.Lock() + da.peers[key] = c // store in peers table + da.peersLck.Unlock() + go func(c diam.Conn, key string) { + // wait for disconnect notification + <-c.(diam.CloseNotifier).CloseNotify() + da.peersLck.Lock() + delete(da.peers, key) // remove from peers table + da.peersLck.Unlock() + }(c, key) + } +} + +func (da *DiameterAgent) handleDRA(c diam.Conn, m *diam.Message) { + meta, _ := smpeer.FromContext(c.Context()) + key := string(meta.OriginHost + utils.CONCATENATED_KEY_SEP + meta.OriginRealm) + + da.raaLck.Lock() + ch, has := da.dra[key] + da.raaLck.Unlock() + if !has { + return + } + ch <- m + c.Close() +} + +// V1DisconnectPeer sends a DPR meseage to diameter client +func (da *DiameterAgent) V1DisconnectPeer(args *utils.DPRArgs, reply *string) (err error) { + if args == nil { + utils.Logger.Info( + fmt.Sprintf("<%s> cannot send DPR, missing arrguments", + utils.DiameterAgent)) + return utils.ErrMandatoryIeMissing + } + + if args.DisconnectCause < 0 || args.DisconnectCause > 2 { + return errors.New("WRONG_DISCONNECT_CAUSE") + } + m := diam.NewRequest(diam.DisconnectPeer, + diam.CHARGING_CONTROL_APP_ID, dict.Default) + m.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity(args.OriginHost)) + m.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity(args.OriginRealm)) + m.NewAVP(avp.DisconnectCause, avp.Mbit, 0, datatype.Enumerated(args.DisconnectCause)) + + key := args.OriginHost + utils.CONCATENATED_KEY_SEP + args.OriginRealm + + draCh := make(chan *diam.Message, 1) + da.draLck.Lock() + da.dra[key] = draCh + da.draLck.Unlock() + defer func() { + da.draLck.Lock() + delete(da.dra, key) + da.draLck.Unlock() + }() + da.peersLck.Lock() + conn, has := da.peers[key] + da.peersLck.Unlock() + if !has { + return utils.ErrNotFound + } + if err = writeOnConn(conn, m); err != nil { + return utils.ErrServerError + } + select { + case dra := <-draCh: + var avps []*diam.AVP + if avps, err = dra.FindAVPsWithPath([]interface{}{avp.ResultCode}, dict.UndefinedVendorID); err != nil { + return + } + if len(avps) == 0 { + return fmt.Errorf("Missing AVP") + } + var data interface{} + if data, err = diamAVPAsIface(avps[0]); err != nil { + return + } else if data != uint32(diam.Success) { + return fmt.Errorf("Wrong result code: <%v>", data) + } + case <-time.After(10 * time.Second): + return utils.ErrTimedOut + } + *reply = utils.OK + return +} diff --git a/agents/fsagent.go b/agents/fsagent.go index 00832f9ac..81bb179cc 100644 --- a/agents/fsagent.go +++ b/agents/fsagent.go @@ -444,3 +444,8 @@ func (sm *FSsessions) Reload() { func (*FSsessions) V1ReAuthorize(originID string, reply *string) (err error) { return utils.ErrNotImplemented } + +// V1DisconnectPeer is used to implement the sessions.BiRPClient interface +func (*FSsessions) V1DisconnectPeer(args *utils.DPRArgs, reply *string) (err error) { + return utils.ErrNotImplemented +} diff --git a/agents/kamagent.go b/agents/kamagent.go index 533fe2f07..7c39bc315 100644 --- a/agents/kamagent.go +++ b/agents/kamagent.go @@ -428,3 +428,8 @@ func (ka *KamailioAgent) Reload() { func (*KamailioAgent) V1ReAuthorize(originID string, reply *string) (err error) { return utils.ErrNotImplemented } + +// V1DisconnectPeer is used to implement the sessions.BiRPClient interface +func (*KamailioAgent) V1DisconnectPeer(args *utils.DPRArgs, reply *string) (err error) { + return utils.ErrNotImplemented +} diff --git a/apier/v1/sessions.go b/apier/v1/sessions.go index 586cdf647..63c18de2b 100644 --- a/apier/v1/sessions.go +++ b/apier/v1/sessions.go @@ -146,3 +146,8 @@ func (ssv1 *SessionSv1) Call(serviceMethod string, func (ssv1 *SessionSv1) ReAuthorize(args *utils.SessionFilter, reply *string) error { return ssv1.Ss.BiRPCv1ReAuthorize(nil, args, reply) } + +// DisconnectPeer sends the DPR for the OriginHost and OriginRealm +func (ssv1 *SessionSv1) DisconnectPeer(args *utils.DPRArgs, reply *string) error { + return ssv1.Ss.BiRPCv1DisconnectPeer(nil, args, reply) +} diff --git a/apier/v1/sessionsbirpc.go b/apier/v1/sessionsbirpc.go index 7d4fd5dba..893505185 100644 --- a/apier/v1/sessionsbirpc.go +++ b/apier/v1/sessionsbirpc.go @@ -53,7 +53,8 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} { utils.SessionSv1ActivateSessions: ssv1.BiRPCv1ActivateSessions, utils.SessionSv1DeactivateSessions: ssv1.BiRPCv1DeactivateSessions, - utils.SessionSv1ReAuthorize: ssv1.BiRPCV1ReAuthorize, + utils.SessionSv1ReAuthorize: ssv1.BiRPCV1ReAuthorize, + utils.SessionSv1DisconnectPeer: ssv1.BiRPCV1DisconnectPeer, } } @@ -170,3 +171,9 @@ func (ssv1 *SessionSv1) BiRPCV1ReAuthorize(clnt *rpc2.Client, args *utils.SessionFilter, reply *string) error { return ssv1.Ss.BiRPCv1ReAuthorize(clnt, args, reply) } + +// BiRPCV1DisconnectPeer sends the DPR for the OriginHost and OriginRealm +func (ssv1 *SessionSv1) BiRPCV1DisconnectPeer(clnt *rpc2.Client, + args *utils.DPRArgs, reply *string) error { + return ssv1.Ss.BiRPCv1DisconnectPeer(clnt, args, reply) +} diff --git a/packages/debian/changelog b/packages/debian/changelog index 293db97bd..4724a5f9b 100644 --- a/packages/debian/changelog +++ b/packages/debian/changelog @@ -43,6 +43,7 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium * [AgentS] Uniformize flags (*auth -> *authorize) * [SessionS] Move *cost as subflag in *rals for SessionSv1.ProcessEvent + * [DiameterAgent] Added DPR support -- Alexandru Tripon Wed, 19 Feb 2020 13:25:52 +0200 diff --git a/sessions/libsessions.go b/sessions/libsessions.go index 9ea4d7d16..5ea28a9de 100644 --- a/sessions/libsessions.go +++ b/sessions/libsessions.go @@ -45,6 +45,7 @@ type BiRPClient interface { V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) (err error) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*SessionID) (err error) V1ReAuthorize(originID string, reply *string) (err error) + V1DisconnectPeer(args *utils.DPRArgs, reply *string) (err error) } // getSessionTTL retrieves SessionTTL setting out of ev diff --git a/sessions/sessions.go b/sessions/sessions.go index 181a98c00..0dea2e9f4 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -3629,7 +3629,7 @@ func (sS *SessionS) sendRar(s *Session) (err error) { return } -// BiRPCv1ReAuthorize sends a RAR for sessions matching sessions +// BiRPCv1ReAuthorize sends a RAR for the matching sessions func (sS *SessionS) BiRPCv1ReAuthorize(clnt rpcclient.ClientConnector, args *utils.SessionFilter, reply *string) (err error) { if args == nil { //protection in case on nil @@ -3663,3 +3663,29 @@ func (sS *SessionS) BiRPCv1ReAuthorize(clnt rpcclient.ClientConnector, *reply = utils.OK return } + +// BiRPCv1DisconnectPeer sends a DPR for the given OriginHost and OriginRealm +func (sS *SessionS) BiRPCv1DisconnectPeer(clnt rpcclient.ClientConnector, + args *utils.DPRArgs, reply *string) (err error) { + hasErrors := false + clients := make(map[string]*biJClient) + sS.biJMux.RLock() + for ID, clnt := range sS.biJIDs { + clients[ID] = clnt + } + sS.biJMux.RUnlock() + for ID, clnt := range clients { + if err = clnt.conn.Call(utils.SessionSv1DisconnectPeer, args, reply); err != nil && err != utils.ErrNotImplemented { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> failed sending DPR for connection with id: <%s>, err: <%s>", + utils.SessionS, ID, err)) + hasErrors = true + } + } + if hasErrors { + return utils.ErrPartiallyExecuted + } + *reply = utils.OK + return nil +} diff --git a/utils/apitpdata.go b/utils/apitpdata.go index f4c44aa40..c3a4304cb 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1422,3 +1422,10 @@ type ArgExportToFolder struct { Path string Items []string } + +// DPRArgs are the arguments used by dispatcher to send a Disconnect-Peer-Request +type DPRArgs struct { + OriginHost string + OriginRealm string + DisconnectCause int +} diff --git a/utils/consts.go b/utils/consts.go index 9c914129f..6bdd827e5 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -1364,6 +1364,7 @@ const ( SessionSv1DeactivateSessions = "SessionSv1.DeactivateSessions" SMGenericV1InitiateSession = "SMGenericV1.InitiateSession" SessionSv1ReAuthorize = "SessionSv1.ReAuthorize" + SessionSv1DisconnectPeer = "SessionSv1.DisconnectPeer" ) // Responder APIs