diff --git a/agents/diam_it_test.go b/agents/diam_it_test.go index 4f541123a..eb3b14e6a 100644 --- a/agents/diam_it_test.go +++ b/agents/diam_it_test.go @@ -23,6 +23,7 @@ import ( "os/exec" "path" "strings" + "sync" "testing" "time" @@ -59,6 +60,9 @@ var ( testDiamItDryRun, testDiamItCCRInit, testDiamItCCRUpdate, + + testDiamItRAR, + testDiamItCCRTerminate, testDiamItCCRSMS, testDiamItKillEngine, @@ -680,6 +684,7 @@ func testDiamItCCRUpdate(t *testing.T) { }, }), }}) + m.NewAVP(avp.UserName, avp.Mbit, 0, datatype.UTF8String("User1")) // ============================================ // prevent nil pointer dereference // ============================================ @@ -1008,3 +1013,114 @@ func testDiamItKillEngine(t *testing.T) { t.Error(err) } } + +func testDiamItRAR(t *testing.T) { + if diamConfigDIR == "dispatchers/diamagent" { + t.SkipNow() + } + // ============================================ + // prevent nil pointer dereference + // ============================================ + if diamClnt == nil { + t.Fatal("Diameter client should not be nil") + } + if diamClnt.conn == nil { + t.Fatal("Diameter connection should not be nil") + } + // ============================================ + var wait sync.WaitGroup + wait.Add(1) + go func() { + var reply string + if err := apierRpc.Call(utils.SessionSv1SendRAR, nil, &reply); err != nil { + t.Error(err) + } + wait.Done() + }() + rar := diamClnt.ReceivedMessage(rplyTimeout) + if rar == nil { + t.Fatal("No message returned") + } + + raa := rar.Answer(2001) + raa.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String("bb97be2b9f37c2be9614fff71c8b1d08b1acbff8")) + + if err := diamClnt.SendMessage(raa); err != nil { + t.Error(err) + } + + wait.Wait() + + m := diam.NewRequest(diam.CreditControl, 4, nil) + m.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String("bb97be2b9f37c2be9614fff71c8b1d08b1acbff8")) + m.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("192.168.1.1")) + m.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org")) + m.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(4)) + m.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(2)) + m.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Unsigned32(1)) + m.NewAVP(avp.DestinationHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA")) + m.NewAVP(avp.DestinationRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org")) + m.NewAVP(avp.ServiceContextID, avp.Mbit, 0, datatype.UTF8String("voice@DiamItCCRInit")) + m.NewAVP(avp.EventTimestamp, avp.Mbit, 0, datatype.Time(time.Date(2018, 10, 4, 14, 57, 20, 0, time.UTC))) + m.NewAVP(avp.SubscriptionID, avp.Mbit, 0, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(450, avp.Mbit, 0, datatype.Enumerated(0)), // Subscription-Id-Type + diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String("1006")), // Subscription-Id-Data + }}) + m.NewAVP(avp.ServiceIdentifier, avp.Mbit, 0, datatype.Unsigned32(0)) + m.NewAVP(avp.RequestedServiceUnit, avp.Mbit, 0, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(420, avp.Mbit, 0, datatype.Unsigned32(301))}}) + m.NewAVP(avp.UsedServiceUnit, avp.Mbit, 0, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(420, avp.Mbit, 0, datatype.Unsigned32(301))}}) + m.NewAVP(873, avp.Mbit, 10415, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(20300, avp.Mbit, 2011, &diam.GroupedAVP{ // IN-Information + AVP: []*diam.AVP{ + diam.NewAVP(831, avp.Mbit, 10415, datatype.UTF8String("1006")), // Calling-Party-Address + diam.NewAVP(832, avp.Mbit, 10415, datatype.UTF8String("1002")), // Called-Party-Address + diam.NewAVP(20327, avp.Mbit, 2011, datatype.UTF8String("1002")), // Real-Called-Number + diam.NewAVP(20339, avp.Mbit, 2011, datatype.Unsigned32(0)), // Charge-Flow-Type + diam.NewAVP(20302, avp.Mbit, 2011, datatype.UTF8String("")), // Calling-Vlr-Number + diam.NewAVP(20303, avp.Mbit, 2011, datatype.UTF8String("")), // Calling-CellID-Or-SAI + diam.NewAVP(20313, avp.Mbit, 2011, datatype.OctetString("")), // Bearer-Capability + diam.NewAVP(20321, avp.Mbit, 2011, datatype.UTF8String("bb97be2b9f37c2be9614fff71c8b1d08b1acbff8")), // Call-Reference-Number + diam.NewAVP(20322, avp.Mbit, 2011, datatype.UTF8String("")), // MSC-Address + diam.NewAVP(20324, avp.Mbit, 2011, datatype.Unsigned32(0)), // Time-Zone + diam.NewAVP(20385, avp.Mbit, 2011, datatype.UTF8String("")), // Called-Party-NP + diam.NewAVP(20386, avp.Mbit, 2011, datatype.UTF8String("")), // SSP-Time + }, + }), + }}) + if err := diamClnt.SendMessage(m); err != nil { + t.Error(err) + } + msg := diamClnt.ReceivedMessage(rplyTimeout) + if msg == nil { + t.Fatal("No message returned") + } + // Result-Code + eVal := "2001" + if avps, err := msg.FindAVPsWithPath([]interface{}{"Result-Code"}, dict.UndefinedVendorID); err != nil { + t.Error(err) + } else if len(avps) == 0 { + t.Error("Missing AVP") + } else if val, err := diamAVPAsString(avps[0]); err != nil { + t.Error(err) + } else if val != eVal { + t.Errorf("expecting: %s, received: <%s>", eVal, val) + } + // Result-Code + eVal = "301" // 5 mins of session + if avps, err := msg.FindAVPsWithPath([]interface{}{"Granted-Service-Unit", "CC-Time"}, + dict.UndefinedVendorID); err != nil { + t.Error(err) + } else if len(avps) == 0 { + t.Error("Missing AVP") + } else if val, err := diamAVPAsString(avps[0]); err != nil { + t.Error(err) + } else if val != eVal { + t.Errorf("expecting: %s, received: <%s>", eVal, val) + } +} diff --git a/agents/diamagent.go b/agents/diamagent.go index 4a6ee3460..e797c05b4 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -23,19 +23,22 @@ import ( "net" "strings" "sync" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" "github.com/fiorix/go-diameter/diam" + "github.com/fiorix/go-diameter/diam/avp" "github.com/fiorix/go-diameter/diam/datatype" + "github.com/fiorix/go-diameter/diam/dict" "github.com/fiorix/go-diameter/diam/sm" ) func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engine.ConnManager) (*DiameterAgent, error) { - da := &DiameterAgent{cgrCfg: cgrCfg, filterS: filterS, connMgr: connMgr} + da := &DiameterAgent{cgrCfg: cgrCfg, filterS: filterS, connMgr: connMgr, raa: make(map[string]chan *diam.Message)} dictsPath := cgrCfg.DiameterAgentCfg().DictionariesPath if len(dictsPath) != 0 { if err := loadDictionaries(dictsPath, utils.DiameterAgent); err != nil { @@ -65,6 +68,8 @@ type DiameterAgent struct { connMgr *engine.ConnManager aReqs int aReqsLck sync.RWMutex + raa map[string]chan *diam.Message + raaLck sync.RWMutex } // ListenAndServe is called when DiameterAgent is started, usually from within cmd/cgr-engine @@ -108,8 +113,10 @@ func (da *DiameterAgent) handlers() diam.Handler { dSM := sm.New(settings) if da.cgrCfg.DiameterAgentCfg().SyncedConnReqs { dSM.HandleFunc("ALL", da.handleMessage) + dSM.HandleFunc("RA", da.handleRAA) } else { dSM.HandleFunc("ALL", da.handleMessageAsync) + dSM.HandleFunc("RAA", func(c diam.Conn, m *diam.Message) { go da.handleRAA(c, m) }) } go func() { @@ -508,7 +515,7 @@ func (da *DiameterAgent) V1SendRAR(originID string, reply *string) (err error) { utils.DiameterAgent, originID, err.Error())) return utils.ErrServerError } - m := diam.NewRequest(dmd.m.Header.CommandCode, + m := diam.NewRequest(diam.ReAuth, dmd.m.Header.ApplicationID, dmd.m.Dictionary()) if err = updateDiamMsgFromNavMap(m, aReq.diamreq, da.cgrCfg.GeneralCfg().DefaultTimezone); err != nil { @@ -517,9 +524,55 @@ func (da *DiameterAgent) V1SendRAR(originID string, reply *string) (err error) { utils.DiameterAgent, originID, err.Error())) return utils.ErrServerError } + raaCh := make(chan *diam.Message, 1) + da.raaLck.Lock() + da.raa[originID] = raaCh + da.raaLck.Unlock() + defer func() { + da.raaLck.Lock() + delete(da.raa, originID) + da.raaLck.Unlock() + }() if err = writeOnConn(dmd.c, m); err != nil { return utils.ErrServerError } + select { + case raa := <-raaCh: + var avps []*diam.AVP + if avps, err = raa.FindAVPsWithPath([]interface{}{"Result-Code"}, dict.UndefinedVendorID); err != nil { + return + } + if len(avps) == 0 { + return fmt.Errorf("Missing AVP") + } + var resCode string + if resCode, err = diamAVPAsString(avps[0]); err != nil { + return + } + if resCode != "2001" { + return fmt.Errorf("Wrong result code: <%s>", resCode) + } + case <-time.After(10 * time.Second): + return utils.ErrTimedOut + } *reply = utils.OK return } + +func (da *DiameterAgent) handleRAA(c diam.Conn, m *diam.Message) { + avp, err := m.FindAVP(avp.SessionID, dict.UndefinedVendorID) + if err != nil { + return + } + originID, err := diamAVPAsString(avp) + if err != nil { + return + } + da.raaLck.Lock() + ch, has := da.raa[originID] + da.raaLck.Unlock() + if !has { + return + } + ch <- m +} diff --git a/data/conf/samples/diamagent_internal/cgrates.json b/data/conf/samples/diamagent_internal/cgrates.json index 1dda0388f..e34e224c1 100644 --- a/data/conf/samples/diamagent_internal/cgrates.json +++ b/data/conf/samples/diamagent_internal/cgrates.json @@ -57,6 +57,7 @@ "diameter_agent": { "enabled": true, "asr_template": "*asr", + "rar_template": "*rar", }, "apiers": { diff --git a/data/conf/samples/diamagent_mongo/cgrates.json b/data/conf/samples/diamagent_mongo/cgrates.json index cdd8c017e..e90a28fe0 100644 --- a/data/conf/samples/diamagent_mongo/cgrates.json +++ b/data/conf/samples/diamagent_mongo/cgrates.json @@ -62,6 +62,7 @@ "diameter_agent": { "enabled": true, "asr_template": "*asr", + "rar_template": "*rar", }, "apiers": { diff --git a/data/conf/samples/diamagent_mysql/cgrates.json b/data/conf/samples/diamagent_mysql/cgrates.json index 1fb47b46b..2ac565475 100644 --- a/data/conf/samples/diamagent_mysql/cgrates.json +++ b/data/conf/samples/diamagent_mysql/cgrates.json @@ -58,6 +58,7 @@ "diameter_agent": { "enabled": true, "asr_template": "*asr", + "rar_template": "*rar", }, "apiers": { diff --git a/data/conf/samples/diamsctpagent_internal/cgrates.json b/data/conf/samples/diamsctpagent_internal/cgrates.json index faa3e0195..fde270907 100755 --- a/data/conf/samples/diamsctpagent_internal/cgrates.json +++ b/data/conf/samples/diamsctpagent_internal/cgrates.json @@ -59,7 +59,8 @@ "enabled": true, "listen_net":"sctp", "listen": "127.0.0.1:3869", // address where to listen for diameter requests - "sessions_conns": ["*localhost"], + "sessions_conns": ["*internal"], + "rar_template": "*rar", }, "apiers": { diff --git a/data/conf/samples/diamsctpagent_mongo/cgrates.json b/data/conf/samples/diamsctpagent_mongo/cgrates.json index 6b70040b7..da9cbcb59 100755 --- a/data/conf/samples/diamsctpagent_mongo/cgrates.json +++ b/data/conf/samples/diamsctpagent_mongo/cgrates.json @@ -63,7 +63,8 @@ "enabled": true, "listen_net":"sctp", "listen": "127.0.0.1:3869", // address where to listen for diameter requests - "sessions_conns": ["*localhost"], + "sessions_conns": ["*internal"], + "rar_template": "*rar", }, "apiers": { diff --git a/data/conf/samples/diamsctpagent_mysql/cgrates.json b/data/conf/samples/diamsctpagent_mysql/cgrates.json index 6c91de1a3..3a8050af5 100755 --- a/data/conf/samples/diamsctpagent_mysql/cgrates.json +++ b/data/conf/samples/diamsctpagent_mysql/cgrates.json @@ -59,7 +59,8 @@ "enabled": true, "listen_net":"sctp", "listen": "127.0.0.1:3869", // address where to listen for diameter requests - "sessions_conns": ["*localhost"], + "sessions_conns": ["*internal"], + "rar_template": "*rar", }, "apiers": { diff --git a/sessions/sessions.go b/sessions/sessions.go index b13c8f74f..8cec96bd9 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -3641,7 +3641,12 @@ func (sS *SessionS) BiRPCv1SendRAR(clnt rpcclient.ClientConnector, if len(aSs) == 0 { return utils.ErrNotFound } + cache := utils.NewStringSet(nil) for _, as := range aSs { + if cache.Has(as.CGRID) { + continue + } + cache.Add(as.CGRID) ss := sS.getSessions(as.CGRID, false) if len(ss) == 0 { continue