//go:build integration // +build integration /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see */ package ers import ( "fmt" "os" "path" "reflect" "testing" "time" "github.com/cgrates/birpc/context" "github.com/cgrates/birpc" "github.com/cgrates/cgrates/utils" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" ) var ( xmlCfgPath string xmlCfgDIR string xmlCfg *config.CGRConfig xmlRPC *birpc.Client xmlTests = []func(t *testing.T){ testCreateDirs, testXMLITInitConfig, testXMLITInitCdrDb, testXMLITResetDataDb, testXMLITStartEngine, testXMLITRpcConn, testXMLITLoadTPFromFolder, testXMLITHandleCdr1File, testXmlITAnalyseCDRs, testXMLITEmptyRootPathCase, testXMLITEmptyRootPathCaseCheckCDRs, testCleanupFiles, testXMLITKillEngine, } ) func TestXMLReadFile(t *testing.T) { switch *utils.DBType { case utils.MetaInternal: xmlCfgDIR = "ers_internal" case utils.MetaMySQL: xmlCfgDIR = "ers_mysql" case utils.MetaMongo: xmlCfgDIR = "ers_mongo" case utils.MetaPostgres: xmlCfgDIR = "ers_postgres" default: t.Fatal("Unknown Database type") } for _, test := range xmlTests { t.Run(xmlCfgDIR, test) } } func testXMLITInitConfig(t *testing.T) { var err error xmlCfgPath = path.Join(*utils.DataDir, "conf", "samples", xmlCfgDIR) if xmlCfg, err = config.NewCGRConfigFromPath(xmlCfgPath); err != nil { t.Fatal("Got config error: ", err.Error()) } } // InitDb so we can rely on count func testXMLITInitCdrDb(t *testing.T) { if err := engine.InitStorDb(xmlCfg); err != nil { t.Fatal(err) } } // Remove data in both rating and accounting db func testXMLITResetDataDb(t *testing.T) { if err := engine.InitDataDB(xmlCfg); err != nil { t.Fatal(err) } } func testXMLITStartEngine(t *testing.T) { if _, err := engine.StopStartEngine(xmlCfgPath, *utils.WaitRater); err != nil { t.Fatal(err) } } // Connect rpc client to rater func testXMLITRpcConn(t *testing.T) { var err error xmlRPC, err = newRPCClient(xmlCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } } func testXMLITLoadTPFromFolder(t *testing.T) { attrs := &utils.AttrLoadTpFromFolder{ FolderPath: path.Join(*utils.DataDir, "tariffplans", "testit")} var loadInst utils.LoadInstance if err := xmlRPC.Call(context.Background(), utils.APIerSv2LoadTariffPlanFromFolder, attrs, &loadInst); err != nil { t.Error(err) } time.Sleep(100 * time.Millisecond) } var cdrXmlBroadsoft = ` 0002183384 CGRateSaabb 20160419210000.104 1+020000 Start 0002183385 CGRateSaabb 20160419210005.247 1+020000 MBC Normal 1001 2001 Network 1001 Public +4915117174963 20160419210005.247 1+020000 25160047719:0 Yes 20160419210006.813 20160419210020.296 016 y local 1001@cgrates.org Yes Yes CGR_GROUP CGR_GROUP/CGR_GROUP_TRUNK30 Normal 1001@cgrates.org Primary Device 31.882 gw04.cgrates.org 74122796919420162305@172.16.1.2 PCMA/8000 172.16.1.4 BW2300052501904161738474465@172.16.1.10 31.882 OmniPCX Enterprise R11.0.1 k1.520.22.b 0002183386 CGRateSaabb 20160419210006.909 1+020000 MBC Normal 1002 2001 Network +4986517174964 Public 1001 20160419210006.909 1+020000 27280048121:0 Yes 20160419210007.037 20160419210030.322 016 y local 314028947650@cgrates.org Yes Yes CGR_GROUP CGR_GROUP/CGR_GROUP_TRUNK65 Normal 31403456100@cgrates.org Primary Device 26.244 gw01.cgrates.org 108352493719420162306@172.31.250.150 PCMA/8000 172.16.1.4 2345300069121904161716512907@172.16.1.10 26.244 Altitude vBox 0002183486 CGRateSaabb 20160419211500.104 1+020000 End ` // The default scenario, out of ers defined in .cfg file func testXMLITHandleCdr1File(t *testing.T) { fileName := "file1.xml" tmpFilePath := path.Join("/tmp", fileName) if err := os.WriteFile(tmpFilePath, []byte(cdrXmlBroadsoft), 0644); err != nil { t.Fatal(err.Error()) } if err := os.Rename(tmpFilePath, path.Join("/tmp/xmlErs/in", fileName)); err != nil { t.Fatal("Error moving file to processing directory: ", err) } time.Sleep(200 * time.Millisecond) } func testXmlITAnalyseCDRs(t *testing.T) { var reply []*engine.ExternalCDR if err := xmlRPC.Call(context.Background(), utils.APIerSv2GetCDRs, &utils.RPCCDRsFilter{}, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if len(reply) != 6 { t.Error("Unexpected number of CDRs returned: ", len(reply)) } for _, cdr := range reply { if cdr.ExtraFields["ReaderID"] != "XmlDryRun" { t.Errorf("Expected <%v>, received <%v>", "XmlDryRun", cdr.ExtraFields["ReaderID"]) } } if err := xmlRPC.Call(context.Background(), utils.APIerSv2GetCDRs, &utils.RPCCDRsFilter{DestinationPrefixes: []string{"+4915117174963"}}, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if len(reply) != 3 { t.Error("Unexpected number of CDRs returned: ", len(reply)) } } var xmlPartialCDR = ` 1 1001 1002 cgrates.org 20231011125833.158 2 *postpaid *voice EmptyRootCDR 3 ignored 20231011125800 20231011125801 ` func testXMLITEmptyRootPathCase(t *testing.T) { fileName := "partial-cdr.xml" tmpFilePath := path.Join("/tmp", fileName) if err := os.WriteFile(tmpFilePath, []byte(xmlPartialCDR), 0644); err != nil { t.Fatal(err.Error()) } if err := os.Rename(tmpFilePath, path.Join("/tmp/xmlErs2/in", fileName)); err != nil { t.Fatal("Error moving file to processing directory: ", err) } time.Sleep(100 * time.Millisecond) } func testXMLITEmptyRootPathCaseCheckCDRs(t *testing.T) { var reply []*engine.ExternalCDR if err := xmlRPC.Call( context.Background(), utils.APIerSv2GetCDRs, utils.RPCCDRsFilter{OriginIDs: []string{"EmptyRootCDR_2"}}, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if len(reply) != 3 { t.Error("Unexpected number of CDRs returned:", len(reply)) } } func testXMLITKillEngine(t *testing.T) { if err := engine.KillEngine(*utils.WaitRater); err != nil { t.Error(err) } } func TestNewXMLFileER(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers[0].SourcePath = "/tmp/xmlErs/out/" cfg.ERsCfg().Readers[0].ConcurrentReqs = 1 fltrs := &engine.FilterS{} expEr := &XMLFileER{ cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, sourceDir: "/tmp/xmlErs/out", rdrEvents: nil, rdrError: nil, rdrExit: nil, conReqs: make(chan struct{}, 1), } eR, err := NewXMLFileER(cfg, 0, nil, nil, nil, fltrs, nil) expEr.conReqs = nil eR.(*XMLFileER).conReqs = nil if err != nil { t.Error(err) } else if !reflect.DeepEqual(eR.(*XMLFileER), expEr) { t.Errorf("Expected %v but received %v", expEr.conReqs, eR.(*XMLFileER).conReqs) } } func TestFileXMLProcessEvent(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers[0].ProcessedPath = "" fltrs := &engine.FilterS{} filePath := "/tmp/TestFileXMLProcessEvent/" if err := os.MkdirAll(filePath, 0777); err != nil { t.Error(err) } file, err := os.Create(path.Join(filePath, "file1.xml")) if err != nil { t.Error(err) } xmlData := ` 25160047719:0 ` file.Write([]byte(xmlData)) file.Close() eR := &XMLFileER{ cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } //or set the default Fields of cfg.ERsCfg().Readers[0].Fields eR.Config().Fields = []*config.FCTemplate{ { Tag: "OriginID", Type: utils.MetaConstant, Path: "*cgreq.OriginID", Value: config.NewRSRParsersMustCompile("25160047719:0", utils.InfieldSep), Mandatory: true, }, } eR.Config().Fields[0].ComputePath() fileName := "file1.xml" if err := eR.processFile(fileName); err != nil { t.Error(err) } expEvent := &utils.CGREvent{ Tenant: "cgrates.org", Event: map[string]any{ "OriginID": "25160047719:0", }, APIOpts: make(map[string]any), } select { case data := <-eR.rdrEvents: expEvent.ID = data.cgrEvent.ID expEvent.Time = data.cgrEvent.Time if !reflect.DeepEqual(data.cgrEvent, expEvent) { t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent)) } case <-time.After(50 * time.Millisecond): t.Error("Time limit exceeded") } if err := os.RemoveAll(filePath); err != nil { t.Error(err) } } func TestFileXMLProcessEventError1(t *testing.T) { cfg := config.NewDefaultCGRConfig() fltrs := &engine.FilterS{} filePath := "/tmp/TestFileXMLProcessEvent/" fname := "file1.xml" eR := &XMLFileER{ cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } errExpect := "open /tmp/TestFileXMLProcessEvent/file1.xml: no such file or directory" if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } } func TestFileXMLProcessEVentError2(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers[0].Fields = []*config.FCTemplate{} data, err := engine.NewInternalDB(nil, nil, true, nil, cfg.DataDbCfg().Items) if err != nil { t.Error(err) } dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) fltrs := engine.NewFilterS(cfg, nil, dm) filePath := "/tmp/TestFileXMLProcessEvent/" fname := "file1.xml" if err := os.MkdirAll(filePath, 0777); err != nil { t.Error(err) } file, err := os.Create(path.Join(filePath, "file1.xml")) if err != nil { t.Error(err) } xmlData := ` 25160047719:0 ` file.Write([]byte(xmlData)) file.Close() eR := &XMLFileER{ cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } eR.Config().Tenant = config.RSRParsers{ { Rules: "test", }, } // eR.Config().Filters = []string{"Filter1"} errExpect := "NOT_FOUND:Filter1" if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } // eR.Config().Filters = []string{"*exists:~*req..Account:"} errExpect = "rename /tmp/TestFileXMLProcessEvent/file1.xml /var/spool/cgrates/ers/out/file1.xml: no such file or directory" if err := eR.processFile(fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } if err := os.RemoveAll(filePath); err != nil { t.Error(err) } } func TestFileXMLProcessEventParseError(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers[0].ProcessedPath = "" fltrs := &engine.FilterS{} filePath := "/tmp/TestFileXMLProcessEvent/" if err := os.MkdirAll(filePath, 0777); err != nil { t.Error(err) } file, err := os.Create(path.Join(filePath, "file1.xml")) if err != nil { t.Error(err) } file.Write([]byte(` test/XMLField>`)) file.Close() eR := &XMLFileER{ cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, sourceDir: filePath, rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } fileName := "file1.xml" errExpect := "XML syntax error on line 2: unexpected EOF" if err := eR.processFile(fileName); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } if err := os.RemoveAll(filePath); err != nil { t.Error(err) } } func TestFileXML(t *testing.T) { cfg := config.NewDefaultCGRConfig() fltrs := &engine.FilterS{} eR := &XMLFileER{ cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, sourceDir: "/tmp/xmlErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), } err := os.MkdirAll(eR.sourceDir, 0777) if err != nil { t.Error(err) } eR.Config().Fields = []*config.FCTemplate{ { Tag: "OriginID", Type: utils.MetaConstant, Path: "*cgreq.OriginID", Value: config.NewRSRParsersMustCompile("25160047719:0", utils.InfieldSep), Mandatory: true, }, } eR.Config().Fields[0].ComputePath() for i := 1; i < 4; i++ { if _, err := os.Create(path.Join(eR.sourceDir, fmt.Sprintf("file%d.xml", i))); err != nil { t.Error(err) } } eR.Config().RunDelay = time.Duration(-1) if err := eR.Serve(); err != nil { t.Error(err) } eR.Config().RunDelay = 1 * time.Millisecond if err := eR.Serve(); err != nil { t.Error(err) } os.Create(path.Join(eR.sourceDir, "file1.txt")) eR.Config().RunDelay = 1 * time.Millisecond if err := eR.Serve(); err != nil { t.Error(err) } } func TestFileXMLError(t *testing.T) { cfg := config.NewDefaultCGRConfig() fltrs := &engine.FilterS{} eR := &XMLFileER{ cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, sourceDir: "/tmp/xmlErsError/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), } err := os.MkdirAll(eR.sourceDir, 0777) if err != nil { t.Error(err) } eR.Config().Fields = []*config.FCTemplate{ { Tag: "OriginID", Type: utils.MetaConstant, Path: "*cgreq.OriginID", Value: config.NewRSRParsersMustCompile("25160047719:0", utils.InfieldSep), Mandatory: true, }, } eR.Config().Fields[0].ComputePath() for i := 1; i < 4; i++ { if _, err := os.Create(path.Join(eR.sourceDir, fmt.Sprintf("file%d.xml", i))); err != nil { t.Error(err) } } os.Create(path.Join(eR.sourceDir, "file1.txt")) eR.Config().RunDelay = 1 * time.Millisecond if err := eR.Serve(); err != nil { t.Error(err) } } func TestFileXMLExit(t *testing.T) { cfg := config.NewDefaultCGRConfig() fltrs := &engine.FilterS{} eR := &XMLFileER{ cgrCfg: cfg, cfgIdx: 0, fltrS: fltrs, sourceDir: "/tmp/xmlErs/out", rdrEvents: make(chan *erEvent, 1), rdrError: make(chan error, 1), rdrExit: make(chan struct{}), } eR.Config().RunDelay = 1 * time.Millisecond if err := eR.Serve(); err != nil { t.Error(err) } eR.rdrExit <- struct{}{} }