//go:build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see
*/
package general_tests
import (
"bufio"
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"testing"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func TestERsLineNr(t *testing.T) {
switch *utils.DBType {
case utils.MetaInternal:
case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("unsupported dbtype value")
}
csvFd, fwvFd, xmlFd, procFd := t.TempDir(), t.TempDir(), t.TempDir(), t.TempDir()
content := fmt.Sprintf(`{
"general": {
"log_level": 7
},
"data_db": {
"db_type": "*internal"
},
"stor_db": {
"db_type": "*internal"
},
"ers": {
"enabled": true,
"sessions_conns": [],
"readers": [
{
"id": "file_csv_reader",
"run_delay": "-1",
"type": "*file_csv",
"source_path": "%s",
"flags": ["*dryrun"],
"processed_path": "%s",
"fields":[
{"tag": "FileName", "path": "*cgreq.FileName", "type": "*variable", "value": "~*vars.*fileName"},
{"tag": "LineNumber", "path": "*cgreq.LineNumber", "type": "*variable", "value": "~*vars.*fileLineNumber"},
{"tag": "Field", "path": "*cgreq.Field", "type": "*variable", "value": "~*req.0"}
]
},
{
"id": "file_fwv_reader",
"run_delay": "-1",
"type": "*file_fwv",
"source_path": "%s",
"flags": ["*dryrun"],
"processed_path": "%s",
"fields":[
{"tag": "FileName2", "path": "*cgreq.FileName", "type": "*variable", "value": "~*vars.*fileName"},
{"tag": "LineNumber", "path": "*cgreq.LineNumber", "type": "*variable", "value": "~*vars.*fileLineNumber"},
{"tag": "FileSeqNr", "path": "*cgreq.FileSeqNr", "type": "*variable", "value": "~*hdr.3-6", "padding":"*zeroleft"},
{"tag": "Field", "path": "*cgreq.Field", "type": "*variable", "value": "~*req.0-5", "padding":"*right"},
{"tag": "NrOfElements", "type": "*variable", "path":"*cgreq.NrOfElements", "value": "~*trl.3-4"},
]
},
{
"id": "file_xml_reader",
"run_delay": "-1",
"type": "*file_xml",
"source_path": "%s",
"flags": ["*dryrun"],
"processed_path": "%s",
"opts": {
"xmlRootPath": "root.field"
},
"fields":[
{"tag": "FileName", "path": "*cgreq.FileName", "type": "*variable", "value": "~*vars.*fileName"},
{"tag": "LineNumber", "path": "*cgreq.LineNumber", "type": "*variable", "value": "~*vars.*fileLineNumber"},
{"tag": "Field", "path": "*cgreq.Field", "type": "*variable", "value": "~*req.root.field"}
]
}
]
}
}`, csvFd, procFd, fwvFd, procFd, xmlFd, procFd)
buf := &bytes.Buffer{}
ng := engine.TestEngine{
ConfigJSON: content,
LogBuffer: buf,
}
_, _ = ng.Run(t)
fileIdx := 0
createFile := func(t *testing.T, dir, ext, content string) {
fileIdx++
filePath := filepath.Join(dir, fmt.Sprintf("file%d%s", fileIdx, ext))
if err := os.WriteFile(filePath, []byte(content), 0644); err != nil {
t.Fatalf("could not write to file %s: %v", filePath, err)
}
}
verifyLogLines := func(t *testing.T, reader io.Reader) {
fieldRegex := regexp.MustCompile(`"Field":"test(\d+)"`)
lineNumberRegex := regexp.MustCompile(`"LineNumber":"(\d+)"`)
records := 0
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
if !strings.Contains(line, `"Field":"test`) {
continue
}
records++
fieldMatch := fieldRegex.FindStringSubmatch(line)
lineNumberMatch := lineNumberRegex.FindStringSubmatch(line)
if len(fieldMatch) != 2 || len(lineNumberMatch) != 2 {
t.Fatalf("invalid log line format: %s", line)
}
testNumber, err := strconv.Atoi(fieldMatch[1])
if err != nil {
t.Fatal(err)
}
lineNumber, err := strconv.Atoi(lineNumberMatch[1])
if err != nil {
t.Fatal(err)
}
if testNumber != lineNumber {
t.Errorf("mismatch in line: %s, field number: %d, line number: %d", line, testNumber, lineNumber)
}
}
if err := scanner.Err(); err != nil {
t.Errorf("error reading input: %v", err)
}
if records != 18 {
t.Errorf("expected ERs to process 18 records, but it processed %d records", records)
}
}
// Create the files inside the source directories of the readers.
createFile(t, csvFd, utils.CSVSuffix, "test1\ntest2\ntest3\ntest4\ntest5\ntest6")
createFile(t, fwvFd, utils.FWVSuffix, `HDR002
test1
test2
test3
test4
test5
test6
TRL6
`)
createFile(t, xmlFd, utils.XMLSuffix, `
test1
test2
test3
test4
test5
test6
`)
time.Sleep(5 * time.Millisecond) // wait for the files to be processed
// Check that the suffixes of the 'test' fields match the LineNumber field.
logData := strings.NewReader(buf.String())
verifyLogLines(t, logData)
}