From b0aa272f75c0b8c4b85e2dd2790a0077f3f5dfec Mon Sep 17 00:00:00 2001 From: gezimbll Date: Tue, 10 Dec 2024 13:35:39 +0100 Subject: [PATCH] added it tests for ErSv1.RunReader API --- general_tests/ers_startdelay_it_test.go | 80 +++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/general_tests/ers_startdelay_it_test.go b/general_tests/ers_startdelay_it_test.go index 877a22132..b4347e03d 100644 --- a/general_tests/ers_startdelay_it_test.go +++ b/general_tests/ers_startdelay_it_test.go @@ -22,11 +22,13 @@ import ( "fmt" "os" "path/filepath" + "strings" "testing" "time" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/ers" "github.com/cgrates/cgrates/utils" ) @@ -169,3 +171,81 @@ cgrates.org,call,1001,2014-01-14T00:00:00Z,RP_ANY,`, } }) } + +func TestErsRunReader(t *testing.T) { + switch *utils.DBType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + csvcontent := `csvfile1,1001,1303535,1727779754,1727779754,60` + csvFd, procFd := t.TempDir(), t.TempDir() + filePath := filepath.Join(csvFd, fmt.Sprintf("file1%s", utils.CSVSuffix)) + if err := os.WriteFile(filePath, []byte(csvcontent), 0644); err != nil { + t.Fatalf("could not write to file %s: %v", filePath, err) + } + content := fmt.Sprintf(`{ + "general": { + "log_level": 7 + }, + "data_db": { + "db_type": "*internal" + }, + "stor_db": { + "db_type": "*internal" + }, + "apiers":{ + "enabled":true, + }, + "ers": { + "enabled": true, + "sessions_conns": ["*localhost"], + "readers": [ + { + "id": "file_csv_reader", + "run_delay": "0", + "start_delay":"1s", + "type": "*file_csv", + "source_path": "%s", + "flags": ["*log","*none"], + "processed_path": "%s", + "fields":[ + {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice"}, + {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.0", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, + {"tag":"Category","path":"*cgreq.Category","type":"*constant","value":"call"}, + {"tag":"Subject","path":"*cgreq.Subject","type":"*variable","value":"~*req.1"}, + {"tag":"Destination","path":"*cgreq.Destination","type":"*variable","value":"~*req.2"}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.3"}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.4"}, + {"tag": "Usage", "path": "*cgreq.Usage", "filters": ["*notempty:~*req.5:"],"type": "*variable", "value": "~*req.5;s", "mandatory": true}, + ] + }, + ] + } + }`, csvFd, procFd) + var buf bytes.Buffer + ng := engine.TestEngine{ + ConfigJSON: content, + LogBuffer: &buf, + } + client, _ := ng.Run(t) + t.Run("TestRunReader", func(t *testing.T) { + runReader := ers.V1RunReaderParams{ + Tenant: "cgrates.org", + ReaderID: "file_csv_reader", + } + var reply string + if err := client.Call(context.Background(), utils.ErSv1RunReader, runReader, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Error("expected reply to be OK") + } + }) + time.Sleep(400 * time.Millisecond) + if has := strings.Contains(buf.String(), " LOG, reader: "); !has { + t.Error("expected reader to process files") + } +}