diff --git a/data/conf/samples/cdrsonexpmaster_mongo/cdrsreplicationmaster.json b/data/conf/samples/cdrsonexpmaster_mongo/cdrsreplicationmaster.json index bc9b0f726..969238a1f 100644 --- a/data/conf/samples/cdrsonexpmaster_mongo/cdrsreplicationmaster.json +++ b/data/conf/samples/cdrsonexpmaster_mongo/cdrsreplicationmaster.json @@ -32,7 +32,7 @@ "store_cdrs": false, // store cdrs in storDb "chargers_conns":["*internal"], "rals_conns": ["*internal"], - "online_cdr_exports": ["http_localhost", "amqp_localhost", "http_test_file", "amqp_test_file","aws_test_file","sqs_test_file","kafka_localhost","s3_test_file", "eventcost_filter"], + "online_cdr_exports": ["http_localhost", "amqp_localhost", "http_test_file", "kafka_localhost", "eventcost_filter"], "ees_conns": ["*localhost"] }, @@ -44,6 +44,7 @@ "enabled": true, }, + "ees": { "enabled": true, "exporters": [ @@ -52,7 +53,9 @@ "type": "*http_post", "export_path": "http://127.0.0.1:12080/cdr_http", "tenant": "cgrates.org", + "synchronous": true, "attempts": 1, + "filters":["*string:~*opts.ExporterID:http_localhost"], "fields":[ {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, @@ -104,54 +107,12 @@ "type": "*http_post", "export_path": "http://127.0.0.1:12080/invalid", "tenant": "cgrates.org", + "synchronous": true, "attempts": 1, "fields":[ {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, ], }, - { - "id": "aws_test_file", - "type": "*amqpv1_json_map", - "export_path": "amqps://guest:guest@localhost:25672/", - "opts": { - "queueID": "cgrates_cdrs", - }, - "tenant": "cgrates.org", - "attempts": 1, - "fields":[ - {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"} - ], - }, - { - "id": "sqs_test_file", - "type": "*sqs_json_map", - // export_path for sqs: "endpoint" - "export_path": "http://sqs.eu-west-2.amazonaws.com/", - "opts": { - "awsRegion": "eu-west-2", - "awsKey": "testkey", - "awsSecret": "testsecret", - "queueID": "cgrates-cdrs", - }, - "tenant": "cgrates.org", - "attempts": 1, - "fields":[ - {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"} - ], - }, - { - "id": "amqp_test_file", - "type": "*amqp_json_map", - "export_path": "amqp://guest:guest@localhost:25672/", - "opts": { - "queueID": "cgrates_cdrs", - }, - "tenant": "cgrates.org", - "attempts": 1, - "fields":[ - {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"} - ], - }, { "id": "kafka_localhost", "type": "*kafka_json_map", @@ -160,23 +121,7 @@ "topic": "cgrates_cdrs", }, "tenant": "cgrates.org", - "attempts": 1, - "fields":[ - {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"} - ], - }, - { - "id": "s3_test_file", - "type": "*s3_json_map", - // export_path for s3: "endpoint" - "export_path": "http://s3.us-east-2.amazonaws.com/", - "opts": { - "awsRegion": "eu-west-2", - "awsKey": "testkey", - "awsSecret": "testsecret", - "queueID": "cgrates-cdrs", - }, - "tenant": "cgrates.org", + "synchronous": true, "attempts": 1, "fields":[ {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"} @@ -191,6 +136,7 @@ }, "tenant": "cgrates.org", "filters":["*string:~*ec.Cost:100"], + "synchronous": true, "attempts": 1, "fields":[ {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"} diff --git a/data/conf/samples/cdrsonexpmaster_mysql/cdrsreplicationmaster.json b/data/conf/samples/cdrsonexpmaster_mysql/cdrsreplicationmaster.json index 5d750e471..2ded3f3ba 100644 --- a/data/conf/samples/cdrsonexpmaster_mysql/cdrsreplicationmaster.json +++ b/data/conf/samples/cdrsonexpmaster_mysql/cdrsreplicationmaster.json @@ -30,7 +30,7 @@ "store_cdrs": false, // store cdrs in storDb "chargers_conns":["*internal"], "rals_conns": ["*internal"], - "online_cdr_exports": ["http_localhost", "amqp_localhost", "http_test_file", "amqp_test_file","aws_test_file","sqs_test_file","kafka_localhost","s3_test_file", "eventcost_filter"], + "online_cdr_exports": ["http_localhost", "amqp_localhost", "http_test_file", "kafka_localhost", "eventcost_filter"], "ees_conns": ["*localhost"] }, @@ -51,7 +51,9 @@ "type": "*http_post", "export_path": "http://127.0.0.1:12080/cdr_http", "tenant": "cgrates.org", + "synchronous": true, "attempts": 1, + "filters":["*string:~*opts.ExporterID:http_localhost"], "fields":[ {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, @@ -103,54 +105,12 @@ "type": "*http_post", "export_path": "http://127.0.0.1:12080/invalid", "tenant": "cgrates.org", + "synchronous": true, "attempts": 1, "fields":[ {"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, ], }, - { - "id": "aws_test_file", - "type": "*amqpv1_json_map", - "export_path": "amqps://guest:guest@localhost:25672/", - "opts": { - "queueID": "cgrates_cdrs", - }, - "tenant": "cgrates.org", - "attempts": 1, - "fields":[ - {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"} - ], - }, - { - "id": "sqs_test_file", - "type": "*sqs_json_map", - // export_path for sqs: "endpoint" - "export_path": "http://sqs.eu-west-2.amazonaws.com/", - "opts": { - "awsRegion": "eu-west-2", - "awsKey": "testkey", - "awsSecret": "testsecret", - "queueID": "cgrates-cdrs", - }, - "tenant": "cgrates.org", - "attempts": 1, - "fields":[ - {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"} - ], - }, - { - "id": "amqp_test_file", - "type": "*amqp_json_map", - "export_path": "amqp://guest:guest@localhost:25672/", - "opts": { - "queueID": "cgrates_cdrs", - }, - "tenant": "cgrates.org", - "attempts": 1, - "fields":[ - {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"} - ], - }, { "id": "kafka_localhost", "type": "*kafka_json_map", @@ -159,23 +119,7 @@ "topic": "cgrates_cdrs", }, "tenant": "cgrates.org", - "attempts": 1, - "fields":[ - {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"} - ], - }, - { - "id": "s3_test_file", - "type": "*s3_json_map", - // export_path for s3: "endpoint" - "export_path": "http://s3.us-east-2.amazonaws.com/", - "opts": { - "awsRegion": "eu-west-2", - "awsKey": "testkey", - "awsSecret": "testsecret", - "queueID": "cgrates-cdrs", - }, - "tenant": "cgrates.org", + "synchronous": true, "attempts": 1, "fields":[ {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"} @@ -190,6 +134,7 @@ }, "tenant": "cgrates.org", "filters":["*string:~*ec.Cost:100"], + "synchronous": true, "attempts": 1, "fields":[ {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"} diff --git a/engine/pstr_http.go b/engine/pstr_http.go index 0e21c0601..b99a8d471 100644 --- a/engine/pstr_http.go +++ b/engine/pstr_http.go @@ -115,6 +115,7 @@ func (pstr *HTTPPoster) do(req *http.Request) (respBody []byte, err error) { return } if resp.StatusCode > 299 { + err = fmt.Errorf("unexpected status code received: <%d>", resp.StatusCode) utils.Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", pstr.addr, resp.StatusCode)) return } diff --git a/general_tests/cdrs_onlexp_it_test.go b/general_tests/cdrs_onlexp_it_test.go index 3dee85904..d8a176a37 100644 --- a/general_tests/cdrs_onlexp_it_test.go +++ b/general_tests/cdrs_onlexp_it_test.go @@ -25,6 +25,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net/http" "net/url" "os" "os/exec" @@ -48,7 +49,7 @@ var ( cdrsMasterRpc *rpcclient.RPCClient httpCGRID = utils.UUIDSha1Prefix() amqpCGRID = utils.UUIDSha1Prefix() - failoverContent = []interface{}{[]byte(fmt.Sprintf(`{"CGRID":"%s"}`, httpCGRID)), []byte(fmt.Sprintf(`{"CGRID":"%s"}`, amqpCGRID))} + failoverContent = [][]byte{[]byte(fmt.Sprintf(`{"CGRID":"%s"}`, httpCGRID)), []byte(fmt.Sprintf(`{"CGRID":"%s"}`, amqpCGRID))} sTestsCDRsOnExp = []func(t *testing.T){ testCDRsOnExpInitConfig, @@ -69,7 +70,7 @@ var ( func TestCDRsOnExp(t *testing.T) { switch *dbType { - case utils.MetaInternal: + case utils.MetaInternal, utils.MetaPostgres: t.SkipNow() case utils.MetaMySQL: cdrsMasterCfgDIR = "cdrsonexpmaster_mysql" @@ -77,8 +78,6 @@ func TestCDRsOnExp(t *testing.T) { case utils.MetaMongo: cdrsMasterCfgDIR = "cdrsonexpmaster_mongo" cdrsSlaveCfgDIR = "cdrsonexpslave_mongo" - case utils.MetaPostgres: - t.SkipNow() default: t.Fatal("Unknown Database type") } @@ -160,13 +159,33 @@ func testCDRsOnExpAMQPQueuesCreation(t *testing.T) { if err = ch.QueueBind(q1.Name, "cgr_cdrs", "exchangename", false, nil); err != nil { t.Fatal(err) } + if err = ch.Close(); err != nil { + t.Error(err) + } + if err = conn.Close(); err != nil { + t.Error(err) + } + v, err := kafka.Dial("tcp", "localhost:9092") + if err != nil { + t.Fatal(err) + } + if err := v.CreateTopics(kafka.TopicConfig{ + Topic: "cgrates_cdrs", + NumPartitions: 1, + ReplicationFactor: 1, + }); err != nil { + t.Fatal(err) + } + if err = v.Close(); err != nil { + t.Fatal(err) + } } // Connect rpc client to rater func testCDRsOnExpInitMasterRPC(t *testing.T) { var err error cdrsMasterRpc, err = rpcclient.NewRPCClient(utils.TCP, cdrsMasterCfg.ListenCfg().RPCJSONListen, false, "", "", "", 1, 1, - time.Second, 2*time.Second, rpcclient.JSONrpc, nil, false) + time.Second, 5*time.Second, rpcclient.JSONrpc, nil, false) if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } @@ -221,7 +240,7 @@ func testCDRsOnExpDisableOnlineExport(t *testing.T) { var reply string if err := cdrsMasterRpc.Call(utils.CDRsV1ProcessEvent, &engine.ArgV1ProcessEvent{ - Flags: []string{"*export:false"}, + Flags: []string{"*export:false", "*chargers:false"}, CGREventWithOpts: utils.CGREventWithOpts{ CGREvent: testCdr.AsCGREvent(), }, @@ -249,7 +268,7 @@ func testCDRsOnExpHttpCdrReplication(t *testing.T) { OriginID: "httpjsonrpc1", OriginHost: "192.168.1.1", Source: "UNKNOWN", - RequestType: utils.META_PSEUDOPREPAID, + RequestType: utils.META_NONE, Tenant: "cgrates.org", Category: "call", Account: "1001", @@ -269,8 +288,12 @@ func testCDRsOnExpHttpCdrReplication(t *testing.T) { var reply string // we expect that the cdr export to fail and go into the failed post directory if err := cdrsMasterRpc.Call(utils.CDRsV1ProcessEvent, - &engine.ArgV1ProcessEvent{CGREventWithOpts: utils.CGREventWithOpts{ - CGREvent: testCdr1.AsCGREvent()}}, &reply); err == nil || err.Error() != utils.ErrPartiallyExecuted.Error() { + &engine.ArgV1ProcessEvent{ + CGREventWithOpts: utils.CGREventWithOpts{ + CGREvent: testCdr1.AsCGREvent(), + Opts: map[string]interface{}{"ExporterID": "http_localhost"}, + }, + }, &reply); err == nil || err.Error() != utils.ErrPartiallyExecuted.Error() { t.Error("Unexpected error: ", err) } time.Sleep(time.Duration(*waitRater) * time.Millisecond) @@ -280,7 +303,6 @@ func testCDRsOnExpHttpCdrReplication(t *testing.T) { t.Fatal("Could not connect to rater: ", err.Error()) } // ToDo: Fix cdr_http to be compatible with rest of processCdr methods - time.Sleep(200 * time.Millisecond) var rcvedCdrs []*engine.ExternalCDR if err := cdrsSlaveRpc.Call(utils.APIerSv2GetCDRs, &utils.RPCCDRsFilter{CGRIDs: []string{testCdr1.CGRID}, RunIDs: []string{utils.MetaDefault}}, &rcvedCdrs); err != nil { @@ -290,10 +312,10 @@ func testCDRsOnExpHttpCdrReplication(t *testing.T) { } else { rcvSetupTime, _ := utils.ParseTimeDetectLayout(rcvedCdrs[0].SetupTime, "") rcvAnswerTime, _ := utils.ParseTimeDetectLayout(rcvedCdrs[0].AnswerTime, "") - //rcvUsage, _ := utils.ParseDurationWithSecs(rcvedCdrs[0].Usage) if rcvedCdrs[0].CGRID != testCdr1.CGRID || + rcvedCdrs[0].RunID != testCdr1.RunID || rcvedCdrs[0].ToR != testCdr1.ToR || - rcvedCdrs[0].OriginHost != testCdr1.OriginHost || + rcvedCdrs[0].OriginID != testCdr1.OriginID || rcvedCdrs[0].RequestType != testCdr1.RequestType || rcvedCdrs[0].Tenant != testCdr1.Tenant || rcvedCdrs[0].Category != testCdr1.Category || @@ -302,10 +324,8 @@ func testCDRsOnExpHttpCdrReplication(t *testing.T) { rcvedCdrs[0].Destination != testCdr1.Destination || !rcvSetupTime.Equal(testCdr1.SetupTime) || !rcvAnswerTime.Equal(testCdr1.AnswerTime) || - //rcvUsage != 10 || - rcvedCdrs[0].RunID != testCdr1.RunID { - //rcvedCdrs[0].Cost != testCdr1.Cost || - //!reflect.DeepEqual(rcvedCdrs[0].ExtraFields, testCdr1.ExtraFields) { + rcvedCdrs[0].Usage != testCdr1.Usage.String() || + rcvedCdrs[0].Cost != testCdr1.Cost { t.Errorf("Expected: %+v, received: %+v", utils.ToJSON(testCdr1), utils.ToJSON(rcvedCdrs[0])) } } @@ -357,7 +377,6 @@ func testCDRsOnExpAMQPReplication(t *testing.T) { t.Error("No message received from RabbitMQ") } conn.Close() - time.Sleep(500 * time.Millisecond) // restart RabbitMQ server so we can test reconnects if err := exec.Command("service", "rabbitmq-server", "restart").Run(); err != nil { t.Error(err) @@ -463,25 +482,22 @@ func checkContent(ev *engine.ExportEvents, content []interface{}) error { for i, con := range ev.Events { recv[i] = utils.IfaceAsString(con) } - return fmt.Errorf("Expecting: one of %q, received: %q", utils.ToJSON(exp), utils.ToJSON(recv)) + return fmt.Errorf("Expecting: one of %s, received: %s", utils.ToJSON(exp), utils.ToJSON(recv)) } return nil } func testCDRsOnExpFileFailover(t *testing.T) { - time.Sleep(5 * time.Second) v1 := url.Values{} v2 := url.Values{} v1.Set("OriginID", "httpjsonrpc1") v2.Set("OriginID", "amqpreconnect") - httpContent := []interface{}{v1, v2} + httpContent := []interface{}{&engine.HTTPPosterRequest{Body: v1, Header: http.Header{"Content-Type": []string{"application/x-www-form-urlencoded"}}}, + &engine.HTTPPosterRequest{Body: v2, Header: http.Header{"Content-Type": []string{"application/x-www-form-urlencoded"}}}} filesInDir, _ := ioutil.ReadDir(cdrsMasterCfg.GeneralCfg().FailedPostsDir) if len(filesInDir) == 0 { t.Fatalf("No files in directory: %s", cdrsMasterCfg.GeneralCfg().FailedPostsDir) } - expectedFormats := utils.NewStringSet([]string{utils.MetaHTTPPost, utils.MetaAMQPjsonMap, - utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, utils.MetaS3jsonMap}) - rcvFormats := utils.NewStringSet([]string{}) for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config fileName := file.Name() filePath := path.Join(cdrsMasterCfg.GeneralCfg().FailedPostsDir, fileName) @@ -494,18 +510,13 @@ func testCDRsOnExpFileFailover(t *testing.T) { t.Error("Expected at least one event") continue } - rcvFormats.Add(ev.Format) - content := failoverContent - if ev.Format == utils.MetaHTTPPost { - content = httpContent + if ev.Format != utils.MetaHTTPPost { + t.Errorf("Expected %s to be only failed exporter,received <%s>", utils.MetaHTTPPost, ev.Format) } - if err := checkContent(ev, content); err != nil { + if err := checkContent(ev, httpContent); err != nil { t.Errorf("For file <%s> and event <%s> received %s", filePath, utils.ToJSON(ev), err) } } - if !reflect.DeepEqual(expectedFormats, rcvFormats) { - t.Errorf("Missing format expecting: %s received: %s", utils.ToJSON(expectedFormats), utils.ToJSON(rcvFormats)) - } } func testCDRsOnExpKafkaPosterFileFailover(t *testing.T) { @@ -522,37 +533,13 @@ func testCDRsOnExpKafkaPosterFileFailover(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) if m, err := reader.ReadMessage(ctx); err != nil { t.Fatal(err) - } else if !reflect.DeepEqual(failoverContent[0].([]byte), m.Value) && !reflect.DeepEqual(failoverContent[1].([]byte), m.Value) { // Checking just the prefix should do since some content is dynamic - t.Errorf("Expecting: %v or %v, received: %v", utils.IfaceAsString(failoverContent[0]), utils.IfaceAsString(failoverContent[1]), string(m.Value)) + } else if !reflect.DeepEqual(failoverContent[0], m.Value) && !reflect.DeepEqual(failoverContent[1], m.Value) { // Checking just the prefix should do since some content is dynamic + t.Errorf("Expecting: %v or %v, received: %v", string(failoverContent[0]), string(failoverContent[1]), string(m.Value)) } cancel() } } -/* -// Performance test, check `lsof -a -p 8427 | wc -l` - -func testCdrsHttpCdrReplication2(t *testing.T) { - cdrs := make([]*engine.CDR, 0) - for i := 0; i < 10000; i++ { - cdr := &engine.CDR{OriginID: fmt.Sprintf("httpjsonrpc_%d", i), - ToR: utils.VOICE, OriginHost: "192.168.1.1", Source: "UNKNOWN", RequestType: utils.META_PSEUDOPREPAID, - Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "1002", - SetupTime: time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC), AnswerTime: time.Date(2013, 12, 7, 8, 42, 26, 0, time.UTC), - Usage: 10 * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}} - cdrs = append(cdrs, cdr) - } - var reply string - for _, cdr := range cdrs { - if err := cdrsMasterRpc.Call(utils.CDRsV1ProcessCDR, cdr, &reply); err != nil { - t.Error("Unexpected error: ", err.Error()) - } else if reply != utils.OK { - t.Error("Unexpected reply received: ", reply) - } - } -} -*/ - func testCDRsOnExpStopEngine(t *testing.T) { if err := engine.KillEngine(100); err != nil { t.Error(err) diff --git a/general_tests/data_it_test.go b/general_tests/data_it_test.go index dda42304b..215dc05fd 100644 --- a/general_tests/data_it_test.go +++ b/general_tests/data_it_test.go @@ -318,7 +318,7 @@ func testV1DataDataDebitUsage1G0(t *testing.T) { &engine.UsageRecordWithOpts{UsageRecord: usageRecord}, &reply); err != nil { t.Error(err) } - if time.Now().Sub(tStart) > 50*time.Millisecond { + if time.Now().Sub(tStart) > 100*time.Millisecond { t.Errorf("Take's too long for GetDataCost : %+v", time.Now().Sub(tStart)) }