Merge branch 'master' into faststart

This commit is contained in:
Radu Ioan Fericean
2016-06-24 13:25:01 +03:00
3 changed files with 119 additions and 16 deletions

View File

@@ -21,6 +21,7 @@ package agents
import (
"flag"
//"net"
"fmt"
"net/rpc"
"net/rpc/jsonrpc"
"path"
@@ -41,12 +42,15 @@ import (
var testIntegration = flag.Bool("integration", false, "Perform the tests in integration mode, not by default.") // This flag will be passed here via "go test -local" args
var waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache")
var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
var interations = flag.Int("iterations", 1, "Number of iterations to do for dry run simulation")
var replyTimeout = flag.String("reply_timeout", "1s", "Maximum duration to wait for a reply")
var daCfgPath string
var daCfg *config.CGRConfig
var apierRpc *rpc.Client
var dmtClient *DiameterClient
var err error
var rplyTimeout time.Duration
func TestDmtAgentInitCfg(t *testing.T) {
if !*testIntegration {
@@ -61,6 +65,7 @@ func TestDmtAgentInitCfg(t *testing.T) {
}
daCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush()
config.SetCgrConfig(daCfg)
rplyTimeout, _ = utils.ParseDurationWithSecs(*replyTimeout)
}
// Remove data in both rating and accounting db
@@ -88,7 +93,7 @@ func TestDmtAgentStartEngine(t *testing.T) {
if !*testIntegration {
return
}
if _, err := engine.StopStartEngine(daCfgPath, *waitRater); err != nil {
if _, err := engine.StopStartEngine(daCfgPath, 4000); err != nil {
t.Fatal(err)
}
}
@@ -244,7 +249,7 @@ func TestDmtAgentSendCCRInit(t *testing.T) {
t.Error(err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
msg := dmtClient.ReceivedMessage()
msg := dmtClient.ReceivedMessage(rplyTimeout)
if avps, err := msg.FindAVPsWithPath([]interface{}{"Granted-Service-Unit", "CC-Time"}, dict.UndefinedVendorID); err != nil {
t.Error(err)
} else if len(avps) == 0 {
@@ -288,7 +293,7 @@ func TestDmtAgentSendCCRUpdate(t *testing.T) {
t.Error(err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
msg := dmtClient.ReceivedMessage()
msg := dmtClient.ReceivedMessage(rplyTimeout)
if avps, err := msg.FindAVPsWithPath([]interface{}{"Granted-Service-Unit", "CC-Time"}, dict.UndefinedVendorID); err != nil {
t.Error(err)
} else if len(avps) == 0 {
@@ -327,7 +332,7 @@ func TestDmtAgentSendCCRUpdate2(t *testing.T) {
t.Error(err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
msg := dmtClient.ReceivedMessage()
msg := dmtClient.ReceivedMessage(rplyTimeout)
if avps, err := msg.FindAVPsWithPath([]interface{}{"Granted-Service-Unit", "CC-Time"}, dict.UndefinedVendorID); err != nil {
t.Error(err)
} else if len(avps) == 0 {
@@ -365,7 +370,7 @@ func TestDmtAgentSendCCRTerminate(t *testing.T) {
t.Error(err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
msg := dmtClient.ReceivedMessage()
msg := dmtClient.ReceivedMessage(rplyTimeout)
if msg == nil {
t.Fatal("No answer to CCR terminate received")
}
@@ -442,7 +447,7 @@ func TestDmtAgentSendCCRSMS(t *testing.T) {
}
time.Sleep(time.Duration(100) * time.Millisecond)
dmtClient.ReceivedMessage() // Discard the received message so we can test next one
dmtClient.ReceivedMessage(rplyTimeout) // Discard the received message so we can test next one
/*
if msg == nil {
t.Fatal("No message returned")
@@ -534,7 +539,7 @@ func TestDmtAgentSendCCRSMSWrongAccount(t *testing.T) {
t.Error(err)
}
time.Sleep(time.Duration(100) * time.Millisecond)
msg := dmtClient.ReceivedMessage() // Discard the received message so we can test next one
msg := dmtClient.ReceivedMessage(rplyTimeout) // Discard the received message so we can test next one
if msg == nil {
t.Fatal("No message returned")
}
@@ -568,7 +573,7 @@ func TestDmtAgentSendCCRInitWrongAccount(t *testing.T) {
t.Error(err)
}
time.Sleep(time.Duration(100) * time.Millisecond)
msg := dmtClient.ReceivedMessage() // Discard the received message so we can test next one
msg := dmtClient.ReceivedMessage(rplyTimeout) // Discard the received message so we can test next one
if msg == nil {
t.Fatal("No message returned")
}
@@ -643,7 +648,7 @@ func TestDmtAgentSendCCRSimpaEvent(t *testing.T) {
t.Error(err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
msg := dmtClient.ReceivedMessage() // Discard the received message so we can test next one
msg := dmtClient.ReceivedMessage(rplyTimeout) // Discard the received message so we can test next one
if msg == nil {
t.Fatal("No message returned")
}
@@ -731,7 +736,7 @@ func TestDmtAgentSendDataGrpInit(t *testing.T) {
t.Error(err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
msg := dmtClient.ReceivedMessage()
msg := dmtClient.ReceivedMessage(rplyTimeout)
if msg == nil {
t.Fatal("No message returned")
}
@@ -825,7 +830,7 @@ func TestDmtAgentSendDataGrpUpdate(t *testing.T) {
t.Error(err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
msg := dmtClient.ReceivedMessage()
msg := dmtClient.ReceivedMessage(rplyTimeout)
if msg == nil {
t.Fatal("No message returned")
}
@@ -905,7 +910,7 @@ func TestDmtAgentSendDataGrpTerminate(t *testing.T) {
t.Error(err)
}
time.Sleep(time.Duration(3000) * time.Millisecond)
msg := dmtClient.ReceivedMessage()
msg := dmtClient.ReceivedMessage(rplyTimeout)
if msg == nil {
t.Fatal("No message returned")
}
@@ -931,6 +936,7 @@ func TestDmtAgentSendDataGrpCDRs(t *testing.T) {
}
}
/*
func TestDmtAgentDryRun1(t *testing.T) {
if !*testIntegration {
return
@@ -951,7 +957,7 @@ func TestDmtAgentDryRun1(t *testing.T) {
t.Error(err)
}
time.Sleep(time.Duration(100) * time.Millisecond)
msg := dmtClient.ReceivedMessage()
msg := dmtClient.ReceivedMessage(rplyTimeout)
if msg == nil {
t.Fatal("No message returned")
}
@@ -963,6 +969,84 @@ func TestDmtAgentDryRun1(t *testing.T) {
t.Errorf("Expecting 300, received: %s", strResult)
}
}
*/
func TestDmtAgentDryRun1(t *testing.T) {
if !*testIntegration {
return
}
ccr := diam.NewRequest(diam.CreditControl, 4, nil)
ccr.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String("cgrates;1451911932;00082"))
ccr.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA"))
ccr.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org"))
ccr.NewAVP(avp.DestinationRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org"))
ccr.NewAVP(avp.DestinationHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA"))
ccr.NewAVP(avp.UserName, avp.Mbit, 0, datatype.UTF8String("CGR-DA"))
ccr.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(4))
ccr.NewAVP(avp.ServiceContextID, avp.Mbit, 0, datatype.UTF8String("pubsub1")) // Match specific DryRun profile
ccr.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(2))
ccr.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Unsigned32(1))
ccr.NewAVP(avp.EventTimestamp, avp.Mbit, 0, datatype.Time(time.Date(2016, 1, 5, 11, 30, 10, 0, time.UTC)))
ccr.NewAVP(avp.TerminationCause, avp.Mbit, 0, datatype.Enumerated(1))
ccr.NewAVP(443, avp.Mbit, 0, &diam.GroupedAVP{ // Subscription-Id
AVP: []*diam.AVP{
diam.NewAVP(450, avp.Mbit, 0, datatype.Enumerated(0)), // Subscription-Id-Type
diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String("1001")), // Subscription-Id-Data
}})
ccr.NewAVP(443, avp.Mbit, 0, &diam.GroupedAVP{ // Subscription-Id
AVP: []*diam.AVP{
diam.NewAVP(450, avp.Mbit, 0, datatype.Enumerated(1)), // Subscription-Id-Type
diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String("208123456789")), // Subscription-Id-Data
}})
ccr.NewAVP(439, avp.Mbit, 0, datatype.Unsigned32(0)) // Service-Identifier
ccr.NewAVP(437, avp.Mbit, 0, &diam.GroupedAVP{ // Requested-Service-Unit
AVP: []*diam.AVP{
diam.NewAVP(420, avp.Mbit, 0, datatype.Unsigned32(300)), // CC-Time
}})
ccr.NewAVP(873, avp.Mbit|avp.Vbit, 10415, &diam.GroupedAVP{ // Service-information
AVP: []*diam.AVP{
diam.NewAVP(20300, avp.Mbit, 2011, &diam.GroupedAVP{ // IN-Information
AVP: []*diam.AVP{
diam.NewAVP(20336, avp.Mbit, 2011, datatype.UTF8String("1001")), // CallingPartyAdress
diam.NewAVP(20337, avp.Mbit, 2011, datatype.UTF8String("1002")), // CalledPartyAdress
diam.NewAVP(20339, avp.Mbit, 2011, datatype.Unsigned32(0)), // ChargeFlowType
diam.NewAVP(20302, avp.Mbit, 2011, datatype.UTF8String("33609004940")), // CallingVlrNumber
diam.NewAVP(20303, avp.Mbit, 2011, datatype.UTF8String("208104941749984")), // CallingCellID
diam.NewAVP(20313, avp.Mbit, 2011, datatype.OctetString("0x8090a3")), // BearerCapability
diam.NewAVP(20321, avp.Mbit, 2011, datatype.OctetString("0x401c4132ed665")), // CallreferenceNumber
diam.NewAVP(20322, avp.Mbit, 2011, datatype.UTF8String("33609004940")), // MSCAddress
diam.NewAVP(20386, avp.Mbit, 2011, datatype.UTF8String("20160501010101")), // SSPTime
diam.NewAVP(20938, avp.Mbit, 2011, datatype.OctetString("0x00000001")), // HighLayerCharacteristics
diam.NewAVP(20324, avp.Mbit, 2011, datatype.Integer32(8)), // Time-Zone
},
}),
}})
if _, err := ccr.NewAVP("Framed-IP-Address", avp.Mbit, 0, datatype.UTF8String("10.228.16.4")); err != nil {
t.Error(err)
}
tStart := time.Now()
maxLoops := 100000
for i := 0; i < *interations; i++ {
if err := dmtClient.SendMessage(ccr); err != nil {
t.Error(err)
}
msg := dmtClient.ReceivedMessage(rplyTimeout)
if msg == nil {
t.Fatal("No message returned")
}
/*
if avps, err := msg.FindAVPsWithPath([]interface{}{"Result-Code"}, dict.UndefinedVendorID); err != nil {
t.Error(err)
} else if len(avps) == 0 {
t.Error("Result-Code")
} else if strResult := avpValAsString(avps[0]); strResult != "300" { // Result-Code set in the template
t.Errorf("Expecting 300, received: %s", strResult)
}
*/
}
totalDur := time.Now().Sub(tStart)
fmt.Printf("Total duration: %v resulting %f ops per second\n", totalDur, float64(maxLoops)/totalDur.Seconds())
}
/*
func TestDmtAgentLoadCER(t *testing.T) {
@@ -982,7 +1066,7 @@ func TestDmtAgentLoadCER(t *testing.T) {
t.Error(err)
}
time.Sleep(time.Duration(100) * time.Millisecond)
msg := dmtClient.ReceivedMessage()
msg := dmtClient.ReceivedMessage(rplyTimeout)
if msg == nil {
t.Fatal("No message returned")
}

View File

@@ -84,11 +84,11 @@ func (dc *DiameterClient) handleALL(c diam.Conn, m *diam.Message) {
}
// Returns the message out of received buffer
func (dc *DiameterClient) ReceivedMessage() *diam.Message {
func (dc *DiameterClient) ReceivedMessage(rplyTimeout time.Duration) *diam.Message {
select {
case rcv := <-dc.received:
return rcv
case <-time.After(time.Duration(1) * time.Second): // Timeout reading
case <-time.After(rplyTimeout): // Timeout reading
return nil
}
}

View File

@@ -10,6 +10,25 @@
"http": ":2080", // HTTP listening address
},
"tariffplan_db": { // database used to store active tariff plan configuration
"db_type": "mongo", // stor database type to use: <mysql|postgres>
"db_port": 27017, // the port to reach the stordb
"db_name": "tpdb",
},
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "mongo", // stor database type to use: <mysql|postgres>
"db_port": 27017, // the port to reach the stordb
"db_name": "datadb",
},
"stor_db": {
"db_type": "mongo", // stor database type to use: <mysql|postgres>
"db_port": 27017, // the port to reach the stordb
"db_name": "stordb",
},
"rals": {
"enabled": true,
"cdrstats_conns": [