From a832ea9d7cf0334fa913ba8348a8f69b655c6aaf Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Thu, 26 Sep 2024 17:23:31 +0300 Subject: [PATCH] Add test for ERs *fileLineNumber variable --- ers/filecsv.go | 1 + ers/filefwv.go | 1 + ers/filexml.go | 1 + general_tests/ers_linenr_it_test.go | 189 ++++++++++++++++++++++++++++ utils/consts.go | 1 + 5 files changed, 193 insertions(+) create mode 100644 general_tests/ers_linenr_it_test.go diff --git a/ers/filecsv.go b/ers/filecsv.go index 1eb3462e5..9d60e6ec1 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -148,6 +148,7 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { return } rowNr++ // increment the rowNr after checking if it's not the end of file + reqVars[utils.MetaFileLineNumber] = utils.NewNMData(rowNr) agReq := agents.NewAgentRequest( config.NewSliceDP(record), reqVars, nil, nil, rdr.Config().Tenant, diff --git a/ers/filefwv.go b/ers/filefwv.go index 847ad5913..695074ead 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -197,6 +197,7 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { } rowNr++ // increment the rowNr after checking if it's not the end of file record := string(buf) + reqVars[utils.MetaFileLineNumber] = utils.NewNMData(rowNr) agReq := agents.NewAgentRequest( config.NewFWVProvider(record), reqVars, nil, nil, rdr.Config().Tenant, diff --git a/ers/filexml.go b/ers/filexml.go index db95d4d09..d175100bb 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -166,6 +166,7 @@ func (rdr *XMLFileER) processFile(fPath, fName string) error { reqVars := utils.NavigableMap2{utils.MetaFileName: utils.NewNMData(fName)} for _, xmlElmt := range xmlElmts { rowNr++ // increment the rowNr after checking if it's not the end of file + reqVars[utils.MetaFileLineNumber] = utils.NewNMData(rowNr) agReq := agents.NewAgentRequest( config.NewXmlProvider(xmlElmt, rdr.Config().XmlRootPath), reqVars, nil, nil, rdr.Config().Tenant, diff --git a/general_tests/ers_linenr_it_test.go b/general_tests/ers_linenr_it_test.go new file mode 100644 index 000000000..ca05edddb --- /dev/null +++ b/general_tests/ers_linenr_it_test.go @@ -0,0 +1,189 @@ +//go:build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package general_tests + +import ( + "bufio" + "bytes" + "fmt" + "io" + "os" + "path/filepath" + "regexp" + "strconv" + "strings" + "testing" + "time" + + "github.com/cgrates/cgrates/utils" +) + +func TestERsLineNr(t *testing.T) { + switch *utils.DBType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + csvFd, fwvFd, xmlFd, procFd := t.TempDir(), t.TempDir(), t.TempDir(), t.TempDir() + content := fmt.Sprintf(`{ +"general": { + "log_level": 7 +}, +"data_db": { + "db_type": "*internal" +}, +"stor_db": { + "db_type": "*internal" +}, +"ers": { + "enabled": true, + "sessions_conns": [], + "readers": [ + { + "id": "file_csv_reader", + "run_delay": "-1", + "type": "*file_csv", + "source_path": "%s", + "flags": ["*dryrun"], + "processed_path": "%s", + "fields":[ + {"tag": "FileName", "path": "*cgreq.FileName", "type": "*variable", "value": "~*vars.*fileName"}, + {"tag": "LineNumber", "path": "*cgreq.LineNumber", "type": "*variable", "value": "~*vars.*fileLineNumber"}, + {"tag": "Field", "path": "*cgreq.Field", "type": "*variable", "value": "~*req.0"} + ] + }, + { + "id": "file_fwv_reader", + "run_delay": "-1", + "type": "*file_fwv", + "source_path": "%s", + "flags": ["*dryrun"], + "processed_path": "%s", + "fields":[ + {"tag": "FileName2", "path": "*cgreq.FileName", "type": "*variable", "value": "~*vars.*fileName"}, + {"tag": "LineNumber", "path": "*cgreq.LineNumber", "type": "*variable", "value": "~*vars.*fileLineNumber"}, + {"tag": "FileSeqNr", "path": "*cgreq.FileSeqNr", "type": "*variable", "value": "~*hdr.3-6", "padding":"*zeroleft"}, + {"tag": "Field", "path": "*cgreq.Field", "type": "*variable", "value": "~*req.0-5", "padding":"*right"}, + {"tag": "NrOfElements", "type": "*variable", "path":"*cgreq.NrOfElements", "value": "~*trl.3-4"}, + ] + }, + { + "id": "file_xml_reader", + "run_delay": "-1", + "type": "*file_xml", + "source_path": "%s", + "flags": ["*dryrun"], + "processed_path": "%s", + "xml_root_path": "root.field", + "fields":[ + {"tag": "FileName", "path": "*cgreq.FileName", "type": "*variable", "value": "~*vars.*fileName"}, + {"tag": "LineNumber", "path": "*cgreq.LineNumber", "type": "*variable", "value": "~*vars.*fileLineNumber"}, + {"tag": "Field", "path": "*cgreq.Field", "type": "*variable", "value": "~*req.root.field"} + ] + } + ] +} +}`, csvFd, procFd, fwvFd, procFd, xmlFd, procFd) + + buf := &bytes.Buffer{} + testEnv := TestEnvironment{ + ConfigJSON: content, + LogBuffer: buf, + } + _, _ = testEnv.Setup(t, 0) + + fileIdx := 0 + createFile := func(t *testing.T, dir, ext, content string) { + fileIdx++ + filePath := filepath.Join(dir, fmt.Sprintf("file%d%s", fileIdx, ext)) + if err := os.WriteFile(filePath, []byte(content), 0644); err != nil { + t.Fatalf("could not write to file %s: %v", filePath, err) + } + } + + verifyLogLines := func(t *testing.T, reader io.Reader) { + fieldRegex := regexp.MustCompile(`"Field":"test(\d+)"`) + lineNumberRegex := regexp.MustCompile(`"LineNumber":"(\d+)"`) + records := 0 + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + line := scanner.Text() + if !strings.Contains(line, `"Field":"test`) { + continue + } + + records++ + + fieldMatch := fieldRegex.FindStringSubmatch(line) + lineNumberMatch := lineNumberRegex.FindStringSubmatch(line) + + if len(fieldMatch) != 2 || len(lineNumberMatch) != 2 { + t.Fatalf("invalid log line format: %s", line) + } + + testNumber, err := strconv.Atoi(fieldMatch[1]) + if err != nil { + t.Fatal(err) + } + lineNumber, err := strconv.Atoi(lineNumberMatch[1]) + if err != nil { + t.Fatal(err) + } + if testNumber != lineNumber { + t.Errorf("mismatch in line: %s, field number: %d, line number: %d", line, testNumber, lineNumber) + } + } + + if err := scanner.Err(); err != nil { + t.Errorf("error reading input: %v", err) + } + + if records != 18 { + t.Errorf("expected ERs to process 18 records, but it processed %d records", records) + } + } + + // Create the files inside the source directories of the readers. + createFile(t, csvFd, utils.CSVSuffix, "test1\ntest2\ntest3\ntest4\ntest5\ntest6") + createFile(t, fwvFd, utils.FWVSuffix, `HDR002 +test1 +test2 +test3 +test4 +test5 +test6 +TRL6 +`) + createFile(t, xmlFd, utils.XMLSuffix, ` + + test1 + test2 + test3 + test4 + test5 + test6 +`) + + time.Sleep(5 * time.Millisecond) // wait for the files to be processed + + // Check that the suffixes of the 'test' fields match the LineNumber field. + logData := strings.NewReader(buf.String()) + verifyLogLines(t, logData) +} diff --git a/utils/consts.go b/utils/consts.go index 4bf310f0d..ab6830e40 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -672,6 +672,7 @@ const ( MetaGroup = "*group" InternalRPCSet = "InternalRPCSet" MetaFileName = "*fileName" + MetaFileLineNumber = "*fileLineNumber" MetaBusy = "*busy" MetaQueue = "*queue" MetaRounding = "*rounding"