//go:build integration
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see
*/
package ers
import (
"fmt"
"os"
"path"
"reflect"
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
)
var (
xmlCfgPath string
xmlCfgDIR string
xmlCfg *config.CGRConfig
xmlRPC *birpc.Client
xmlTests = []func(t *testing.T){
testCreateDirs,
testXMLITInitConfig,
testXMLITInitCdrDb,
testXMLITResetDataDb,
testXMLITStartEngine,
testXMLITRpcConn,
testXMLITLoadTPFromFolder,
testXMLITHandleCdr1File,
testXmlITAnalyseCDRs,
testXMLITEmptyRootPathCase,
testXMLITEmptyRootPathCaseCheckCDRs,
testCleanupFiles,
testXMLITKillEngine,
}
)
func TestXMLReadFile(t *testing.T) {
switch *utils.DBType {
case utils.MetaInternal:
xmlCfgDIR = "ers_internal"
case utils.MetaMySQL:
xmlCfgDIR = "ers_mysql"
case utils.MetaMongo:
xmlCfgDIR = "ers_mongo"
case utils.MetaPostgres:
xmlCfgDIR = "ers_postgres"
default:
t.Fatal("Unknown Database type")
}
for _, test := range xmlTests {
t.Run(xmlCfgDIR, test)
}
}
func testXMLITInitConfig(t *testing.T) {
var err error
xmlCfgPath = path.Join(*utils.DataDir, "conf", "samples", xmlCfgDIR)
if xmlCfg, err = config.NewCGRConfigFromPath(xmlCfgPath); err != nil {
t.Fatal("Got config error: ", err.Error())
}
}
// InitDb so we can rely on count
func testXMLITInitCdrDb(t *testing.T) {
if err := engine.InitStorDb(xmlCfg); err != nil {
t.Fatal(err)
}
}
// Remove data in both rating and accounting db
func testXMLITResetDataDb(t *testing.T) {
if err := engine.InitDataDB(xmlCfg); err != nil {
t.Fatal(err)
}
}
func testXMLITStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(xmlCfgPath, *utils.WaitRater); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater
func testXMLITRpcConn(t *testing.T) {
var err error
xmlRPC, err = newRPCClient(xmlCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
}
func testXMLITLoadTPFromFolder(t *testing.T) {
attrs := &utils.AttrLoadTpFromFolder{
FolderPath: path.Join(*utils.DataDir, "tariffplans", "testit")}
var loadInst utils.LoadInstance
if err := xmlRPC.Call(context.Background(), utils.APIerSv2LoadTariffPlanFromFolder,
attrs, &loadInst); err != nil {
t.Error(err)
}
time.Sleep(100 * time.Millisecond)
}
var cdrXmlBroadsoft = `
0002183384
CGRateSaabb
20160419210000.104
1+020000
Start
0002183385
CGRateSaabb
20160419210005.247
1+020000
MBC
Normal
1001
2001
Network
1001
Public
+4915117174963
20160419210005.247
1+020000
25160047719:0
Yes
20160419210006.813
20160419210020.296
016
y
local
1001@cgrates.org
Yes
Yes
CGR_GROUP
CGR_GROUP/CGR_GROUP_TRUNK30
Normal
1001@cgrates.org
Primary Device
31.882
gw04.cgrates.org
74122796919420162305@172.16.1.2
PCMA/8000
172.16.1.4
BW2300052501904161738474465@172.16.1.10
31.882
OmniPCX Enterprise R11.0.1 k1.520.22.b
0002183386
CGRateSaabb
20160419210006.909
1+020000
MBC
Normal
1002
2001
Network
+4986517174964
Public
1001
20160419210006.909
1+020000
27280048121:0
Yes
20160419210007.037
20160419210030.322
016
y
local
314028947650@cgrates.org
Yes
Yes
CGR_GROUP
CGR_GROUP/CGR_GROUP_TRUNK65
Normal
31403456100@cgrates.org
Primary Device
26.244
gw01.cgrates.org
108352493719420162306@172.31.250.150
PCMA/8000
172.16.1.4
2345300069121904161716512907@172.16.1.10
26.244
Altitude vBox
0002183486
CGRateSaabb
20160419211500.104
1+020000
End
`
// The default scenario, out of ers defined in .cfg file
func testXMLITHandleCdr1File(t *testing.T) {
fileName := "file1.xml"
tmpFilePath := path.Join("/tmp", fileName)
if err := os.WriteFile(tmpFilePath, []byte(cdrXmlBroadsoft), 0644); err != nil {
t.Fatal(err.Error())
}
if err := os.Rename(tmpFilePath, path.Join("/tmp/xmlErs/in", fileName)); err != nil {
t.Fatal("Error moving file to processing directory: ", err)
}
time.Sleep(200 * time.Millisecond)
}
func testXmlITAnalyseCDRs(t *testing.T) {
var reply []*engine.ExternalCDR
if err := xmlRPC.Call(context.Background(), utils.APIerSv2GetCDRs, &utils.RPCCDRsFilter{}, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(reply) != 6 {
t.Error("Unexpected number of CDRs returned: ", len(reply))
}
for _, cdr := range reply {
if cdr.ExtraFields["ReaderID"] != "XmlDryRun" {
t.Errorf("Expected <%v>, received <%v>", "XmlDryRun", cdr.ExtraFields["ReaderID"])
}
}
if err := xmlRPC.Call(context.Background(), utils.APIerSv2GetCDRs, &utils.RPCCDRsFilter{DestinationPrefixes: []string{"+4915117174963"}}, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(reply) != 3 {
t.Error("Unexpected number of CDRs returned: ", len(reply))
}
}
var xmlPartialCDR = `
1
1001
1002
cgrates.org
20231011125833.158
2
*postpaid
*voice
EmptyRootCDR
3
ignored
20231011125800
20231011125801
`
func testXMLITEmptyRootPathCase(t *testing.T) {
fileName := "partial-cdr.xml"
tmpFilePath := path.Join("/tmp", fileName)
if err := os.WriteFile(tmpFilePath, []byte(xmlPartialCDR), 0644); err != nil {
t.Fatal(err.Error())
}
if err := os.Rename(tmpFilePath, path.Join("/tmp/xmlErs2/in", fileName)); err != nil {
t.Fatal("Error moving file to processing directory: ", err)
}
time.Sleep(100 * time.Millisecond)
}
func testXMLITEmptyRootPathCaseCheckCDRs(t *testing.T) {
var reply []*engine.ExternalCDR
if err := xmlRPC.Call(
context.Background(),
utils.APIerSv2GetCDRs,
utils.RPCCDRsFilter{OriginIDs: []string{"EmptyRootCDR_2"}},
&reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(reply) != 3 {
t.Error("Unexpected number of CDRs returned:", len(reply))
}
}
func testXMLITKillEngine(t *testing.T) {
if err := engine.KillEngine(*utils.WaitRater); err != nil {
t.Error(err)
}
}
func TestNewXMLFileER(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.ERsCfg().Readers[0].SourcePath = "/tmp/xmlErs/out/"
cfg.ERsCfg().Readers[0].ConcurrentReqs = 1
fltrs := &engine.FilterS{}
expEr := &XMLFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
sourceDir: "/tmp/xmlErs/out",
rdrEvents: nil,
rdrError: nil,
rdrExit: nil,
conReqs: make(chan struct{}, 1),
}
eR, err := NewXMLFileER(cfg, 0, nil, nil, nil, fltrs, nil)
expEr.conReqs = nil
eR.(*XMLFileER).conReqs = nil
if err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eR.(*XMLFileER), expEr) {
t.Errorf("Expected %v but received %v", expEr.conReqs, eR.(*XMLFileER).conReqs)
}
}
func TestFileXMLProcessEvent(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.ERsCfg().Readers[0].ProcessedPath = ""
fltrs := &engine.FilterS{}
filePath := "/tmp/TestFileXMLProcessEvent/"
if err := os.MkdirAll(filePath, 0777); err != nil {
t.Error(err)
}
file, err := os.Create(path.Join(filePath, "file1.xml"))
if err != nil {
t.Error(err)
}
xmlData := `
25160047719:0
`
file.Write([]byte(xmlData))
file.Close()
eR := &XMLFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
sourceDir: filePath,
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
//or set the default Fields of cfg.ERsCfg().Readers[0].Fields
eR.Config().Fields = []*config.FCTemplate{
{
Tag: "OriginID",
Type: utils.MetaConstant,
Path: "*cgreq.OriginID",
Value: config.NewRSRParsersMustCompile("25160047719:0", utils.InfieldSep),
Mandatory: true,
},
}
eR.Config().Fields[0].ComputePath()
fileName := "file1.xml"
if err := eR.processFile(fileName); err != nil {
t.Error(err)
}
expEvent := &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]any{
"OriginID": "25160047719:0",
},
APIOpts: make(map[string]any),
}
select {
case data := <-eR.rdrEvents:
expEvent.ID = data.cgrEvent.ID
expEvent.Time = data.cgrEvent.Time
if !reflect.DeepEqual(data.cgrEvent, expEvent) {
t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent))
}
case <-time.After(50 * time.Millisecond):
t.Error("Time limit exceeded")
}
if err := os.RemoveAll(filePath); err != nil {
t.Error(err)
}
}
func TestFileXMLProcessEventError1(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}
filePath := "/tmp/TestFileXMLProcessEvent/"
fname := "file1.xml"
eR := &XMLFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
sourceDir: filePath,
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
errExpect := "open /tmp/TestFileXMLProcessEvent/file1.xml: no such file or directory"
if err := eR.processFile(fname); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
func TestFileXMLProcessEVentError2(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.ERsCfg().Readers[0].Fields = []*config.FCTemplate{}
data, err := engine.NewInternalDB(nil, nil, true, nil, cfg.DataDbCfg().Items)
if err != nil {
t.Error(err)
}
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
filePath := "/tmp/TestFileXMLProcessEvent/"
fname := "file1.xml"
if err := os.MkdirAll(filePath, 0777); err != nil {
t.Error(err)
}
file, err := os.Create(path.Join(filePath, "file1.xml"))
if err != nil {
t.Error(err)
}
xmlData := `
25160047719:0
`
file.Write([]byte(xmlData))
file.Close()
eR := &XMLFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
sourceDir: filePath,
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
eR.Config().Tenant = config.RSRParsers{
{
Rules: "test",
},
}
//
eR.Config().Filters = []string{"Filter1"}
errExpect := "NOT_FOUND:Filter1"
if err := eR.processFile(fname); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
//
eR.Config().Filters = []string{"*exists:~*req..Account:"}
errExpect = "rename /tmp/TestFileXMLProcessEvent/file1.xml /var/spool/cgrates/ers/out/file1.xml: no such file or directory"
if err := eR.processFile(fname); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
if err := os.RemoveAll(filePath); err != nil {
t.Error(err)
}
}
func TestFileXMLProcessEventParseError(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.ERsCfg().Readers[0].ProcessedPath = ""
fltrs := &engine.FilterS{}
filePath := "/tmp/TestFileXMLProcessEvent/"
if err := os.MkdirAll(filePath, 0777); err != nil {
t.Error(err)
}
file, err := os.Create(path.Join(filePath, "file1.xml"))
if err != nil {
t.Error(err)
}
file.Write([]byte(`
test/XMLField>`))
file.Close()
eR := &XMLFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
sourceDir: filePath,
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
conReqs: make(chan struct{}, 1),
}
fileName := "file1.xml"
errExpect := "XML syntax error on line 2: unexpected EOF"
if err := eR.processFile(fileName); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
if err := os.RemoveAll(filePath); err != nil {
t.Error(err)
}
}
func TestFileXML(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}
eR := &XMLFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
sourceDir: "/tmp/xmlErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
}
err := os.MkdirAll(eR.sourceDir, 0777)
if err != nil {
t.Error(err)
}
eR.Config().Fields = []*config.FCTemplate{
{
Tag: "OriginID",
Type: utils.MetaConstant,
Path: "*cgreq.OriginID",
Value: config.NewRSRParsersMustCompile("25160047719:0", utils.InfieldSep),
Mandatory: true,
},
}
eR.Config().Fields[0].ComputePath()
for i := 1; i < 4; i++ {
if _, err := os.Create(path.Join(eR.sourceDir, fmt.Sprintf("file%d.xml", i))); err != nil {
t.Error(err)
}
}
eR.Config().RunDelay = time.Duration(-1)
if err := eR.Serve(); err != nil {
t.Error(err)
}
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
t.Error(err)
}
os.Create(path.Join(eR.sourceDir, "file1.txt"))
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
t.Error(err)
}
}
func TestFileXMLError(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}
eR := &XMLFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
sourceDir: "/tmp/xmlErsError/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
}
err := os.MkdirAll(eR.sourceDir, 0777)
if err != nil {
t.Error(err)
}
eR.Config().Fields = []*config.FCTemplate{
{
Tag: "OriginID",
Type: utils.MetaConstant,
Path: "*cgreq.OriginID",
Value: config.NewRSRParsersMustCompile("25160047719:0", utils.InfieldSep),
Mandatory: true,
},
}
eR.Config().Fields[0].ComputePath()
for i := 1; i < 4; i++ {
if _, err := os.Create(path.Join(eR.sourceDir, fmt.Sprintf("file%d.xml", i))); err != nil {
t.Error(err)
}
}
os.Create(path.Join(eR.sourceDir, "file1.txt"))
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
t.Error(err)
}
}
func TestFileXMLExit(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fltrs := &engine.FilterS{}
eR := &XMLFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: fltrs,
sourceDir: "/tmp/xmlErs/out",
rdrEvents: make(chan *erEvent, 1),
rdrError: make(chan error, 1),
rdrExit: make(chan struct{}),
}
eR.Config().RunDelay = 1 * time.Millisecond
if err := eR.Serve(); err != nil {
t.Error(err)
}
eR.rdrExit <- struct{}{}
}