From 9ca8bd852200f143f4f1cc6ef7e7f9469d9a57aa Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 10 Dec 2015 14:05:15 +0100 Subject: [PATCH] CDRS considers NotFound from mongo, improvements for diameter agent --- agents/dmtagent_it_test.go | 56 ++++++++++++++++++++++++++++++++++++++ agents/dmtclient.go | 13 +++++++-- agents/libdmt_test.go | 6 ++++ engine/cdrs.go | 5 ++-- 4 files changed, 75 insertions(+), 5 deletions(-) diff --git a/agents/dmtagent_it_test.go b/agents/dmtagent_it_test.go index 00d3da108..1f8121cbd 100644 --- a/agents/dmtagent_it_test.go +++ b/agents/dmtagent_it_test.go @@ -31,6 +31,9 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessionmanager" "github.com/cgrates/cgrates/utils" + "github.com/fiorix/go-diameter/diam" + "github.com/fiorix/go-diameter/diam/avp" + "github.com/fiorix/go-diameter/diam/datatype" ) var testIntegration = flag.Bool("integration", false, "Perform the tests in integration mode, not by default.") // This flag will be passed here via "go test -local" args @@ -321,6 +324,59 @@ func TestDmtAgentCdrs(t *testing.T) { } } +func TestDmtAgentHuaweiSim1(t *testing.T) { + if !*testIntegration { + return + } + m := diam.NewRequest(diam.CreditControl, 4, nil) + m.NewAVP("Session-Id", avp.Mbit, 0, datatype.UTF8String("simuhuawei;1449573472;00002")) + m.NewAVP("Origin-Host", avp.Mbit, 0, datatype.DiameterIdentity("simuhuawei")) + m.NewAVP("Origin-Realm", avp.Mbit, 0, datatype.DiameterIdentity("routing1.huawei.com")) + m.NewAVP("Destination-Host", avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA")) + m.NewAVP("Destination-Realm", avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org")) + m.NewAVP("Auth-Application-Id", avp.Mbit, 0, datatype.Unsigned32(4)) + m.NewAVP("Service-Context-Id", avp.Mbit, 0, datatype.UTF8String("voice@huawei.com")) + m.NewAVP("CC-Request-Type", avp.Mbit, 0, datatype.Enumerated(1)) + m.NewAVP("CC-Request-Number", avp.Mbit, 0, datatype.Enumerated(0)) + m.NewAVP("Event-Timestamp", avp.Mbit, 0, datatype.Time(time.Now())) + m.NewAVP("Subscription-Id", avp.Mbit, 0, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(450, avp.Mbit, 0, datatype.Enumerated(0)), // Subscription-Id-Type + diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String("33708000003")), // Subscription-Id-Data + }}) + m.NewAVP("Subscription-Id", avp.Mbit, 0, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(450, avp.Mbit, 0, datatype.Enumerated(1)), // Subscription-Id-Type + diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String("208708000003")), // Subscription-Id-Data + }}) + m.NewAVP("Service-Identifier", avp.Mbit, 0, datatype.Unsigned32(0)) + m.NewAVP("Requested-Service-Unit", avp.Mbit, 0, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(420, avp.Mbit, 0, datatype.Unsigned32(360))}}) // CC-Time + m.NewAVP(873, avp.Mbit, 10415, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(20300, avp.Mbit, 2011, &diam.GroupedAVP{ // IN-Information + AVP: []*diam.AVP{ + diam.NewAVP(831, avp.Mbit, 10415, datatype.UTF8String("33708000003")), // Calling-Party-Address + diam.NewAVP(832, avp.Mbit, 10415, datatype.UTF8String("780029555")), // Called-Party-Address + diam.NewAVP(20327, avp.Mbit, 2011, datatype.UTF8String("33780029555")), // Real-Called-Number + diam.NewAVP(20339, avp.Mbit, 2011, datatype.Unsigned32(0)), // Charge-Flow-Type + diam.NewAVP(20302, avp.Mbit, 2011, datatype.UTF8String("33609")), // Calling-Vlr-Number + diam.NewAVP(20303, avp.Mbit, 2011, datatype.UTF8String("208102000018370")), // Calling-CellID-Or-SAI + diam.NewAVP(20313, avp.Mbit, 2011, datatype.UTF8String("80:90:a3")), // Bearer-Capability + diam.NewAVP(20321, avp.Mbit, 2011, datatype.UTF8String("40:04:41:31:06:46:18")), // Call-Reference-Number + diam.NewAVP(20322, avp.Mbit, 2011, datatype.UTF8String("3333609")), // MSC-Address + diam.NewAVP(20324, avp.Mbit, 2011, datatype.Unsigned32(8)), // Time-Zone + diam.NewAVP(20385, avp.Mbit, 2011, datatype.UTF8String("6002")), // Called-Party-NP + diam.NewAVP(20386, avp.Mbit, 2011, datatype.UTF8String("20151208121752")), // SSP-Time + }, + }), + }}) + if err := dmtClient.SendMessage(m); err != nil { + t.Error(err) + } +} + func TestDmtAgentStopEngine(t *testing.T) { if !*testIntegration { return diff --git a/agents/dmtclient.go b/agents/dmtclient.go index 36f3320a1..95fbd550b 100644 --- a/agents/dmtclient.go +++ b/agents/dmtclient.go @@ -70,13 +70,20 @@ func NewDiameterClient(addr, originHost, originRealm string, vendorId int, produ type DiameterClient struct { conn diam.Conn handlers diam.Handler + received chan *diam.Message } -func (self *DiameterClient) SendMessage(m *diam.Message) error { - _, err := m.WriteTo(self.conn) +func (dc *DiameterClient) SendMessage(m *diam.Message) error { + _, err := m.WriteTo(dc.conn) return err } -func (self *DiameterClient) handleALL(c diam.Conn, m *diam.Message) { +func (dc *DiameterClient) handleALL(c diam.Conn, m *diam.Message) { utils.Logger.Warning(fmt.Sprintf(" Received unexpected message from %s:\n%s", c.RemoteAddr(), m)) + dc.received <- m +} + +// Returns the message out of received buffer +func (dc *DiameterClient) ReceivedMessage() *diam.Message { + return <-dc.received } diff --git a/agents/libdmt_test.go b/agents/libdmt_test.go index 69010b523..dd1b3c179 100644 --- a/agents/libdmt_test.go +++ b/agents/libdmt_test.go @@ -19,6 +19,7 @@ along with this program. If not, see package agents import ( + "fmt" "testing" "time" @@ -67,6 +68,11 @@ func TestUsageFromCCR(t *testing.T) { if usage := usageFromCCR(3, 1, 35, time.Duration(300)*time.Second); usage != time.Duration(35)*time.Second { t.Error(usage) } + if usage := usageFromCCR(1, 0, 360, time.Duration(360)*time.Second); usage != time.Duration(360)*time.Second { + t.Error(usage) + } else { + fmt.Printf("Usage: %v", usage) + } } func TestAvpValAsString(t *testing.T) { diff --git a/engine/cdrs.go b/engine/cdrs.go index 787c9ccb7..07056d539 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -28,6 +28,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/jinzhu/gorm" + mgov2 "gopkg.in/mgo.v2" ) var cdrServer *CdrServer // Share the server so we can use it in http handlers @@ -109,7 +110,7 @@ func (self *CdrServer) LogCallCost(ccl *CallCostLog) error { if ccl.CheckDuplicate { _, err := self.guard.Guard(func() (interface{}, error) { cc, err := self.cdrDb.GetCallCostLog(ccl.CgrId, ccl.Source, ccl.RunId) - if err != nil && err != gorm.RecordNotFound { + if err != nil && err != gorm.RecordNotFound && err != mgov2.ErrNotFound { return nil, err } if cc != nil { @@ -387,7 +388,7 @@ func (self *CdrServer) rateCDR(storedCdr *StoredCdr) error { } time.Sleep(delay()) } - if err != nil && err == gorm.RecordNotFound { //calculate CDR as for pseudoprepaid + if err != nil && (err == gorm.RecordNotFound || err == mgov2.ErrNotFound) { //calculate CDR as for pseudoprepaid utils.Logger.Warning(fmt.Sprintf(" WARNING: Could not find CallCostLog for cgrid: %s, source: %s, runid: %s, will recalculate", storedCdr.CgrId, utils.SESSION_MANAGER_SOURCE, storedCdr.MediationRunId)) qryCC, err = self.getCostFromRater(storedCdr) }