diff --git a/agents/dmtagent_it_test.go b/agents/dmtagent_it_test.go index 0087e4838..7d0dc6f70 100644 --- a/agents/dmtagent_it_test.go +++ b/agents/dmtagent_it_test.go @@ -21,6 +21,7 @@ package agents import ( "flag" //"net" + "fmt" "net/rpc" "net/rpc/jsonrpc" "path" @@ -41,12 +42,15 @@ import ( var testIntegration = flag.Bool("integration", false, "Perform the tests in integration mode, not by default.") // This flag will be passed here via "go test -local" args var waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache") var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") +var interations = flag.Int("iterations", 1, "Number of iterations to do for dry run simulation") +var replyTimeout = flag.String("reply_timeout", "1s", "Maximum duration to wait for a reply") var daCfgPath string var daCfg *config.CGRConfig var apierRpc *rpc.Client var dmtClient *DiameterClient var err error +var rplyTimeout time.Duration func TestDmtAgentInitCfg(t *testing.T) { if !*testIntegration { @@ -61,6 +65,7 @@ func TestDmtAgentInitCfg(t *testing.T) { } daCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush() config.SetCgrConfig(daCfg) + rplyTimeout, _ = utils.ParseDurationWithSecs(*replyTimeout) } // Remove data in both rating and accounting db @@ -88,7 +93,7 @@ func TestDmtAgentStartEngine(t *testing.T) { if !*testIntegration { return } - if _, err := engine.StopStartEngine(daCfgPath, *waitRater); err != nil { + if _, err := engine.StopStartEngine(daCfgPath, 4000); err != nil { t.Fatal(err) } } @@ -244,7 +249,7 @@ func TestDmtAgentSendCCRInit(t *testing.T) { t.Error(err) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) - msg := dmtClient.ReceivedMessage() + msg := dmtClient.ReceivedMessage(rplyTimeout) if avps, err := msg.FindAVPsWithPath([]interface{}{"Granted-Service-Unit", "CC-Time"}, dict.UndefinedVendorID); err != nil { t.Error(err) } else if len(avps) == 0 { @@ -288,7 +293,7 @@ func TestDmtAgentSendCCRUpdate(t *testing.T) { t.Error(err) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) - msg := dmtClient.ReceivedMessage() + msg := dmtClient.ReceivedMessage(rplyTimeout) if avps, err := msg.FindAVPsWithPath([]interface{}{"Granted-Service-Unit", "CC-Time"}, dict.UndefinedVendorID); err != nil { t.Error(err) } else if len(avps) == 0 { @@ -327,7 +332,7 @@ func TestDmtAgentSendCCRUpdate2(t *testing.T) { t.Error(err) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) - msg := dmtClient.ReceivedMessage() + msg := dmtClient.ReceivedMessage(rplyTimeout) if avps, err := msg.FindAVPsWithPath([]interface{}{"Granted-Service-Unit", "CC-Time"}, dict.UndefinedVendorID); err != nil { t.Error(err) } else if len(avps) == 0 { @@ -365,7 +370,7 @@ func TestDmtAgentSendCCRTerminate(t *testing.T) { t.Error(err) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) - msg := dmtClient.ReceivedMessage() + msg := dmtClient.ReceivedMessage(rplyTimeout) if msg == nil { t.Fatal("No answer to CCR terminate received") } @@ -442,7 +447,7 @@ func TestDmtAgentSendCCRSMS(t *testing.T) { } time.Sleep(time.Duration(100) * time.Millisecond) - dmtClient.ReceivedMessage() // Discard the received message so we can test next one + dmtClient.ReceivedMessage(rplyTimeout) // Discard the received message so we can test next one /* if msg == nil { t.Fatal("No message returned") @@ -534,7 +539,7 @@ func TestDmtAgentSendCCRSMSWrongAccount(t *testing.T) { t.Error(err) } time.Sleep(time.Duration(100) * time.Millisecond) - msg := dmtClient.ReceivedMessage() // Discard the received message so we can test next one + msg := dmtClient.ReceivedMessage(rplyTimeout) // Discard the received message so we can test next one if msg == nil { t.Fatal("No message returned") } @@ -568,7 +573,7 @@ func TestDmtAgentSendCCRInitWrongAccount(t *testing.T) { t.Error(err) } time.Sleep(time.Duration(100) * time.Millisecond) - msg := dmtClient.ReceivedMessage() // Discard the received message so we can test next one + msg := dmtClient.ReceivedMessage(rplyTimeout) // Discard the received message so we can test next one if msg == nil { t.Fatal("No message returned") } @@ -643,7 +648,7 @@ func TestDmtAgentSendCCRSimpaEvent(t *testing.T) { t.Error(err) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) - msg := dmtClient.ReceivedMessage() // Discard the received message so we can test next one + msg := dmtClient.ReceivedMessage(rplyTimeout) // Discard the received message so we can test next one if msg == nil { t.Fatal("No message returned") } @@ -731,7 +736,7 @@ func TestDmtAgentSendDataGrpInit(t *testing.T) { t.Error(err) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) - msg := dmtClient.ReceivedMessage() + msg := dmtClient.ReceivedMessage(rplyTimeout) if msg == nil { t.Fatal("No message returned") } @@ -825,7 +830,7 @@ func TestDmtAgentSendDataGrpUpdate(t *testing.T) { t.Error(err) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) - msg := dmtClient.ReceivedMessage() + msg := dmtClient.ReceivedMessage(rplyTimeout) if msg == nil { t.Fatal("No message returned") } @@ -905,7 +910,7 @@ func TestDmtAgentSendDataGrpTerminate(t *testing.T) { t.Error(err) } time.Sleep(time.Duration(3000) * time.Millisecond) - msg := dmtClient.ReceivedMessage() + msg := dmtClient.ReceivedMessage(rplyTimeout) if msg == nil { t.Fatal("No message returned") } @@ -931,6 +936,7 @@ func TestDmtAgentSendDataGrpCDRs(t *testing.T) { } } +/* func TestDmtAgentDryRun1(t *testing.T) { if !*testIntegration { return @@ -951,7 +957,7 @@ func TestDmtAgentDryRun1(t *testing.T) { t.Error(err) } time.Sleep(time.Duration(100) * time.Millisecond) - msg := dmtClient.ReceivedMessage() + msg := dmtClient.ReceivedMessage(rplyTimeout) if msg == nil { t.Fatal("No message returned") } @@ -963,6 +969,84 @@ func TestDmtAgentDryRun1(t *testing.T) { t.Errorf("Expecting 300, received: %s", strResult) } } +*/ + +func TestDmtAgentDryRun1(t *testing.T) { + if !*testIntegration { + return + } + ccr := diam.NewRequest(diam.CreditControl, 4, nil) + ccr.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String("cgrates;1451911932;00082")) + ccr.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA")) + ccr.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org")) + ccr.NewAVP(avp.DestinationRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org")) + ccr.NewAVP(avp.DestinationHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA")) + ccr.NewAVP(avp.UserName, avp.Mbit, 0, datatype.UTF8String("CGR-DA")) + ccr.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(4)) + ccr.NewAVP(avp.ServiceContextID, avp.Mbit, 0, datatype.UTF8String("pubsub1")) // Match specific DryRun profile + ccr.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(2)) + ccr.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Unsigned32(1)) + ccr.NewAVP(avp.EventTimestamp, avp.Mbit, 0, datatype.Time(time.Date(2016, 1, 5, 11, 30, 10, 0, time.UTC))) + ccr.NewAVP(avp.TerminationCause, avp.Mbit, 0, datatype.Enumerated(1)) + ccr.NewAVP(443, avp.Mbit, 0, &diam.GroupedAVP{ // Subscription-Id + AVP: []*diam.AVP{ + diam.NewAVP(450, avp.Mbit, 0, datatype.Enumerated(0)), // Subscription-Id-Type + diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String("1001")), // Subscription-Id-Data + }}) + ccr.NewAVP(443, avp.Mbit, 0, &diam.GroupedAVP{ // Subscription-Id + AVP: []*diam.AVP{ + diam.NewAVP(450, avp.Mbit, 0, datatype.Enumerated(1)), // Subscription-Id-Type + diam.NewAVP(444, avp.Mbit, 0, datatype.UTF8String("208123456789")), // Subscription-Id-Data + }}) + ccr.NewAVP(439, avp.Mbit, 0, datatype.Unsigned32(0)) // Service-Identifier + ccr.NewAVP(437, avp.Mbit, 0, &diam.GroupedAVP{ // Requested-Service-Unit + AVP: []*diam.AVP{ + diam.NewAVP(420, avp.Mbit, 0, datatype.Unsigned32(300)), // CC-Time + }}) + ccr.NewAVP(873, avp.Mbit|avp.Vbit, 10415, &diam.GroupedAVP{ // Service-information + AVP: []*diam.AVP{ + diam.NewAVP(20300, avp.Mbit, 2011, &diam.GroupedAVP{ // IN-Information + AVP: []*diam.AVP{ + diam.NewAVP(20336, avp.Mbit, 2011, datatype.UTF8String("1001")), // CallingPartyAdress + diam.NewAVP(20337, avp.Mbit, 2011, datatype.UTF8String("1002")), // CalledPartyAdress + diam.NewAVP(20339, avp.Mbit, 2011, datatype.Unsigned32(0)), // ChargeFlowType + diam.NewAVP(20302, avp.Mbit, 2011, datatype.UTF8String("33609004940")), // CallingVlrNumber + diam.NewAVP(20303, avp.Mbit, 2011, datatype.UTF8String("208104941749984")), // CallingCellID + diam.NewAVP(20313, avp.Mbit, 2011, datatype.OctetString("0x8090a3")), // BearerCapability + diam.NewAVP(20321, avp.Mbit, 2011, datatype.OctetString("0x401c4132ed665")), // CallreferenceNumber + diam.NewAVP(20322, avp.Mbit, 2011, datatype.UTF8String("33609004940")), // MSCAddress + diam.NewAVP(20386, avp.Mbit, 2011, datatype.UTF8String("20160501010101")), // SSPTime + diam.NewAVP(20938, avp.Mbit, 2011, datatype.OctetString("0x00000001")), // HighLayerCharacteristics + diam.NewAVP(20324, avp.Mbit, 2011, datatype.Integer32(8)), // Time-Zone + }, + }), + }}) + if _, err := ccr.NewAVP("Framed-IP-Address", avp.Mbit, 0, datatype.UTF8String("10.228.16.4")); err != nil { + t.Error(err) + } + tStart := time.Now() + maxLoops := 100000 + for i := 0; i < *interations; i++ { + if err := dmtClient.SendMessage(ccr); err != nil { + t.Error(err) + } + msg := dmtClient.ReceivedMessage(rplyTimeout) + if msg == nil { + t.Fatal("No message returned") + } + /* + if avps, err := msg.FindAVPsWithPath([]interface{}{"Result-Code"}, dict.UndefinedVendorID); err != nil { + t.Error(err) + } else if len(avps) == 0 { + t.Error("Result-Code") + } else if strResult := avpValAsString(avps[0]); strResult != "300" { // Result-Code set in the template + t.Errorf("Expecting 300, received: %s", strResult) + } + */ + } + totalDur := time.Now().Sub(tStart) + fmt.Printf("Total duration: %v resulting %f ops per second\n", totalDur, float64(maxLoops)/totalDur.Seconds()) +} /* func TestDmtAgentLoadCER(t *testing.T) { @@ -982,7 +1066,7 @@ func TestDmtAgentLoadCER(t *testing.T) { t.Error(err) } time.Sleep(time.Duration(100) * time.Millisecond) - msg := dmtClient.ReceivedMessage() + msg := dmtClient.ReceivedMessage(rplyTimeout) if msg == nil { t.Fatal("No message returned") } diff --git a/agents/dmtclient.go b/agents/dmtclient.go index fd02fd5fe..9e3b3dba0 100644 --- a/agents/dmtclient.go +++ b/agents/dmtclient.go @@ -84,11 +84,11 @@ func (dc *DiameterClient) handleALL(c diam.Conn, m *diam.Message) { } // Returns the message out of received buffer -func (dc *DiameterClient) ReceivedMessage() *diam.Message { +func (dc *DiameterClient) ReceivedMessage(rplyTimeout time.Duration) *diam.Message { select { case rcv := <-dc.received: return rcv - case <-time.After(time.Duration(1) * time.Second): // Timeout reading + case <-time.After(rplyTimeout): // Timeout reading return nil } } diff --git a/data/conf/samples/dmtagent/cgrates.json b/data/conf/samples/dmtagent/cgrates.json index 9599f11d2..2dd0eb1ec 100644 --- a/data/conf/samples/dmtagent/cgrates.json +++ b/data/conf/samples/dmtagent/cgrates.json @@ -10,6 +10,25 @@ "http": ":2080", // HTTP listening address }, +"tariffplan_db": { // database used to store active tariff plan configuration + "db_type": "mongo", // stor database type to use: + "db_port": 27017, // the port to reach the stordb + "db_name": "tpdb", +}, + + +"data_db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_type": "mongo", // stor database type to use: + "db_port": 27017, // the port to reach the stordb + "db_name": "datadb", +}, + +"stor_db": { + "db_type": "mongo", // stor database type to use: + "db_port": 27017, // the port to reach the stordb + "db_name": "stordb", +}, + "rals": { "enabled": true, "cdrstats_conns": [