diff --git a/ers/filecsv.go b/ers/filecsv.go
index 1eb3462e5..9d60e6ec1 100644
--- a/ers/filecsv.go
+++ b/ers/filecsv.go
@@ -148,6 +148,7 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) {
return
}
rowNr++ // increment the rowNr after checking if it's not the end of file
+ reqVars[utils.MetaFileLineNumber] = utils.NewNMData(rowNr)
agReq := agents.NewAgentRequest(
config.NewSliceDP(record), reqVars,
nil, nil, rdr.Config().Tenant,
diff --git a/ers/filefwv.go b/ers/filefwv.go
index 847ad5913..695074ead 100644
--- a/ers/filefwv.go
+++ b/ers/filefwv.go
@@ -197,6 +197,7 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) {
}
rowNr++ // increment the rowNr after checking if it's not the end of file
record := string(buf)
+ reqVars[utils.MetaFileLineNumber] = utils.NewNMData(rowNr)
agReq := agents.NewAgentRequest(
config.NewFWVProvider(record), reqVars,
nil, nil, rdr.Config().Tenant,
diff --git a/ers/filexml.go b/ers/filexml.go
index db95d4d09..d175100bb 100644
--- a/ers/filexml.go
+++ b/ers/filexml.go
@@ -166,6 +166,7 @@ func (rdr *XMLFileER) processFile(fPath, fName string) error {
reqVars := utils.NavigableMap2{utils.MetaFileName: utils.NewNMData(fName)}
for _, xmlElmt := range xmlElmts {
rowNr++ // increment the rowNr after checking if it's not the end of file
+ reqVars[utils.MetaFileLineNumber] = utils.NewNMData(rowNr)
agReq := agents.NewAgentRequest(
config.NewXmlProvider(xmlElmt, rdr.Config().XmlRootPath), reqVars,
nil, nil, rdr.Config().Tenant,
diff --git a/general_tests/ers_linenr_it_test.go b/general_tests/ers_linenr_it_test.go
new file mode 100644
index 000000000..ca05edddb
--- /dev/null
+++ b/general_tests/ers_linenr_it_test.go
@@ -0,0 +1,189 @@
+//go:build integration
+
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package general_tests
+
+import (
+ "bufio"
+ "bytes"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "regexp"
+ "strconv"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/cgrates/cgrates/utils"
+)
+
+func TestERsLineNr(t *testing.T) {
+ switch *utils.DBType {
+ case utils.MetaInternal:
+ case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres:
+ t.SkipNow()
+ default:
+ t.Fatal("unsupported dbtype value")
+ }
+ csvFd, fwvFd, xmlFd, procFd := t.TempDir(), t.TempDir(), t.TempDir(), t.TempDir()
+ content := fmt.Sprintf(`{
+"general": {
+ "log_level": 7
+},
+"data_db": {
+ "db_type": "*internal"
+},
+"stor_db": {
+ "db_type": "*internal"
+},
+"ers": {
+ "enabled": true,
+ "sessions_conns": [],
+ "readers": [
+ {
+ "id": "file_csv_reader",
+ "run_delay": "-1",
+ "type": "*file_csv",
+ "source_path": "%s",
+ "flags": ["*dryrun"],
+ "processed_path": "%s",
+ "fields":[
+ {"tag": "FileName", "path": "*cgreq.FileName", "type": "*variable", "value": "~*vars.*fileName"},
+ {"tag": "LineNumber", "path": "*cgreq.LineNumber", "type": "*variable", "value": "~*vars.*fileLineNumber"},
+ {"tag": "Field", "path": "*cgreq.Field", "type": "*variable", "value": "~*req.0"}
+ ]
+ },
+ {
+ "id": "file_fwv_reader",
+ "run_delay": "-1",
+ "type": "*file_fwv",
+ "source_path": "%s",
+ "flags": ["*dryrun"],
+ "processed_path": "%s",
+ "fields":[
+ {"tag": "FileName2", "path": "*cgreq.FileName", "type": "*variable", "value": "~*vars.*fileName"},
+ {"tag": "LineNumber", "path": "*cgreq.LineNumber", "type": "*variable", "value": "~*vars.*fileLineNumber"},
+ {"tag": "FileSeqNr", "path": "*cgreq.FileSeqNr", "type": "*variable", "value": "~*hdr.3-6", "padding":"*zeroleft"},
+ {"tag": "Field", "path": "*cgreq.Field", "type": "*variable", "value": "~*req.0-5", "padding":"*right"},
+ {"tag": "NrOfElements", "type": "*variable", "path":"*cgreq.NrOfElements", "value": "~*trl.3-4"},
+ ]
+ },
+ {
+ "id": "file_xml_reader",
+ "run_delay": "-1",
+ "type": "*file_xml",
+ "source_path": "%s",
+ "flags": ["*dryrun"],
+ "processed_path": "%s",
+ "xml_root_path": "root.field",
+ "fields":[
+ {"tag": "FileName", "path": "*cgreq.FileName", "type": "*variable", "value": "~*vars.*fileName"},
+ {"tag": "LineNumber", "path": "*cgreq.LineNumber", "type": "*variable", "value": "~*vars.*fileLineNumber"},
+ {"tag": "Field", "path": "*cgreq.Field", "type": "*variable", "value": "~*req.root.field"}
+ ]
+ }
+ ]
+}
+}`, csvFd, procFd, fwvFd, procFd, xmlFd, procFd)
+
+ buf := &bytes.Buffer{}
+ testEnv := TestEnvironment{
+ ConfigJSON: content,
+ LogBuffer: buf,
+ }
+ _, _ = testEnv.Setup(t, 0)
+
+ fileIdx := 0
+ createFile := func(t *testing.T, dir, ext, content string) {
+ fileIdx++
+ filePath := filepath.Join(dir, fmt.Sprintf("file%d%s", fileIdx, ext))
+ if err := os.WriteFile(filePath, []byte(content), 0644); err != nil {
+ t.Fatalf("could not write to file %s: %v", filePath, err)
+ }
+ }
+
+ verifyLogLines := func(t *testing.T, reader io.Reader) {
+ fieldRegex := regexp.MustCompile(`"Field":"test(\d+)"`)
+ lineNumberRegex := regexp.MustCompile(`"LineNumber":"(\d+)"`)
+ records := 0
+ scanner := bufio.NewScanner(reader)
+ for scanner.Scan() {
+ line := scanner.Text()
+ if !strings.Contains(line, `"Field":"test`) {
+ continue
+ }
+
+ records++
+
+ fieldMatch := fieldRegex.FindStringSubmatch(line)
+ lineNumberMatch := lineNumberRegex.FindStringSubmatch(line)
+
+ if len(fieldMatch) != 2 || len(lineNumberMatch) != 2 {
+ t.Fatalf("invalid log line format: %s", line)
+ }
+
+ testNumber, err := strconv.Atoi(fieldMatch[1])
+ if err != nil {
+ t.Fatal(err)
+ }
+ lineNumber, err := strconv.Atoi(lineNumberMatch[1])
+ if err != nil {
+ t.Fatal(err)
+ }
+ if testNumber != lineNumber {
+ t.Errorf("mismatch in line: %s, field number: %d, line number: %d", line, testNumber, lineNumber)
+ }
+ }
+
+ if err := scanner.Err(); err != nil {
+ t.Errorf("error reading input: %v", err)
+ }
+
+ if records != 18 {
+ t.Errorf("expected ERs to process 18 records, but it processed %d records", records)
+ }
+ }
+
+ // Create the files inside the source directories of the readers.
+ createFile(t, csvFd, utils.CSVSuffix, "test1\ntest2\ntest3\ntest4\ntest5\ntest6")
+ createFile(t, fwvFd, utils.FWVSuffix, `HDR002
+test1
+test2
+test3
+test4
+test5
+test6
+TRL6
+`)
+ createFile(t, xmlFd, utils.XMLSuffix, `
+
+ test1
+ test2
+ test3
+ test4
+ test5
+ test6
+`)
+
+ time.Sleep(5 * time.Millisecond) // wait for the files to be processed
+
+ // Check that the suffixes of the 'test' fields match the LineNumber field.
+ logData := strings.NewReader(buf.String())
+ verifyLogLines(t, logData)
+}
diff --git a/utils/consts.go b/utils/consts.go
index 4bf310f0d..ab6830e40 100644
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -672,6 +672,7 @@ const (
MetaGroup = "*group"
InternalRPCSet = "InternalRPCSet"
MetaFileName = "*fileName"
+ MetaFileLineNumber = "*fileLineNumber"
MetaBusy = "*busy"
MetaQueue = "*queue"
MetaRounding = "*rounding"