mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-12 02:26:26 +05:00
NewFileCSVStorage() now returns an error besides the storage struct itself, which is logged and returned instead of calling log.Fatal() which was causing the engine to crash. Fixed compilation errors by creating the CSVStorage separately and passing it as an argument to the TpReader constructor. Fixes #3962
464 lines
15 KiB
Go
464 lines
15 KiB
Go
//go:build integration || flaky || call || performance
|
|
|
|
/*
|
|
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
|
Copyright (C) ITsysCOM GmbH
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>
|
|
*/
|
|
|
|
package engine
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"path"
|
|
"path/filepath"
|
|
"slices"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/cgrates/birpc"
|
|
"github.com/cgrates/birpc/context"
|
|
"github.com/cgrates/birpc/jsonrpc"
|
|
|
|
"github.com/cgrates/cgrates/config"
|
|
"github.com/cgrates/cgrates/utils"
|
|
"github.com/cgrates/ltcache"
|
|
"github.com/creack/pty"
|
|
)
|
|
|
|
func InitDataDB(cfg *config.CGRConfig) error {
|
|
d, err := NewDataDBConn(cfg.DataDbCfg().Type,
|
|
cfg.DataDbCfg().Host, cfg.DataDbCfg().Port,
|
|
cfg.DataDbCfg().Name, cfg.DataDbCfg().User,
|
|
cfg.DataDbCfg().Password, cfg.GeneralCfg().DBDataEncoding,
|
|
cfg.DataDbCfg().Opts, cfg.DataDbCfg().Items)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dm := NewDataManager(d, cfg.CacheCfg(), connMgr)
|
|
|
|
if err := dm.DataDB().Flush(""); err != nil {
|
|
return err
|
|
}
|
|
// Write version before starting
|
|
if err := OverwriteDBVersions(dm.dataDB); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func InitStorDB(cfg *config.CGRConfig) error {
|
|
storDB, err := NewStorDBConn(cfg.StorDbCfg().Type,
|
|
cfg.StorDbCfg().Host, cfg.StorDbCfg().Port,
|
|
cfg.StorDbCfg().Name, cfg.StorDbCfg().User,
|
|
cfg.StorDbCfg().Password, cfg.GeneralCfg().DBDataEncoding,
|
|
cfg.StorDbCfg().StringIndexedFields, cfg.StorDbCfg().PrefixIndexedFields,
|
|
cfg.StorDbCfg().Opts, cfg.StorDbCfg().Items)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dbPath := strings.Trim(cfg.StorDbCfg().Type, "*")
|
|
if err := storDB.Flush(path.Join(cfg.DataFolderPath, "storage",
|
|
dbPath)); err != nil {
|
|
return err
|
|
}
|
|
|
|
if slices.Contains([]string{utils.MetaMongo, utils.MetaMySQL, utils.MetaPostgres},
|
|
cfg.StorDbCfg().Type) {
|
|
if err := SetDBVersions(storDB); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func InitConfigDB(cfg *config.CGRConfig) error {
|
|
d, err := NewDataDBConn(cfg.ConfigDBCfg().Type,
|
|
cfg.ConfigDBCfg().Host, cfg.ConfigDBCfg().Port,
|
|
cfg.ConfigDBCfg().Name, cfg.ConfigDBCfg().User,
|
|
cfg.ConfigDBCfg().Password, cfg.GeneralCfg().DBDataEncoding,
|
|
cfg.ConfigDBCfg().Opts, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return d.Flush("")
|
|
}
|
|
|
|
// Return reference towards the command started so we can stop it if necessary
|
|
func StartEngine(cfgPath string, waitEngine int) (*exec.Cmd, error) {
|
|
enginePath, err := exec.LookPath("cgr-engine")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
engine := exec.Command(enginePath, "-config_path", cfgPath)
|
|
if err := engine.Start(); err != nil {
|
|
return nil, err
|
|
}
|
|
cfg, err := config.NewCGRConfigFromPath(context.Background(), cfgPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
fib := utils.FibDuration(time.Millisecond, 0)
|
|
var connected bool
|
|
for i := 0; i < 200; i++ {
|
|
time.Sleep(fib())
|
|
if _, err := jsonrpc.Dial(utils.TCP, cfg.ListenCfg().RPCJSONListen); err != nil {
|
|
utils.Logger.Warning(fmt.Sprintf("Error <%s> when opening test connection to: <%s>",
|
|
err.Error(), cfg.ListenCfg().RPCJSONListen))
|
|
} else {
|
|
connected = true
|
|
break
|
|
}
|
|
}
|
|
if !connected {
|
|
return nil, fmt.Errorf("engine did not open port <%s>", cfg.ListenCfg().RPCJSONListen)
|
|
}
|
|
time.Sleep(time.Duration(waitEngine) * time.Millisecond) // wait for rater to register all subsystems
|
|
return engine, nil
|
|
}
|
|
|
|
// StartEngineWithContext return reference towards the command started so we can stop it if necessary
|
|
func StartEngineWithContext(ctx context.Context, cfgPath string, waitEngine int) (engine *exec.Cmd, err error) {
|
|
engine = exec.CommandContext(ctx, "cgr-engine", "-config_path", cfgPath)
|
|
if err = engine.Start(); err != nil {
|
|
return nil, err
|
|
}
|
|
var cfg *config.CGRConfig
|
|
if cfg, err = config.NewCGRConfigFromPath(context.Background(), cfgPath); err != nil {
|
|
return
|
|
}
|
|
fib := utils.FibDuration(time.Millisecond, 0)
|
|
for i := 0; i < 200; i++ {
|
|
time.Sleep(fib())
|
|
if _, err = jsonrpc.Dial(utils.TCP, cfg.ListenCfg().RPCJSONListen); err != nil {
|
|
continue
|
|
}
|
|
time.Sleep(time.Duration(waitEngine) * time.Millisecond) // wait for rater to register all subsystems
|
|
return
|
|
}
|
|
utils.Logger.Warning(fmt.Sprintf("Error <%s> when opening test connection to: <%s>",
|
|
err.Error(), cfg.ListenCfg().RPCJSONListen))
|
|
err = fmt.Errorf("engine did not open port <%s>", cfg.ListenCfg().RPCJSONListen)
|
|
return
|
|
}
|
|
|
|
func KillEngine(waitEngine int) error {
|
|
return KillProcName("cgr-engine", waitEngine)
|
|
}
|
|
|
|
func StopStartEngine(cfgPath string, waitEngine int) (*exec.Cmd, error) {
|
|
KillEngine(waitEngine)
|
|
return StartEngine(cfgPath, waitEngine)
|
|
}
|
|
|
|
func LoadTariffPlanFromFolder(tpPath, timezone string, dm *DataManager, disableReverse bool,
|
|
cacheConns, schedConns []string) error {
|
|
csvStorage, err := NewFileCSVStorage(utils.CSVSep, tpPath)
|
|
if err != nil {
|
|
return utils.NewErrServerError(err)
|
|
}
|
|
loader, err := NewTpReader(dm.dataDB, csvStorage, "",
|
|
timezone, cacheConns, schedConns, false)
|
|
if err != nil {
|
|
return utils.NewErrServerError(err)
|
|
}
|
|
if err := loader.LoadAll(); err != nil {
|
|
return utils.NewErrServerError(err)
|
|
}
|
|
if err := loader.WriteToDatabase(false, disableReverse); err != nil {
|
|
return utils.NewErrServerError(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type PjsuaAccount struct {
|
|
ID, Username, Password, Realm, Registrar string
|
|
}
|
|
|
|
// Returns file reference where we can write to control pjsua in terminal
|
|
func StartPjsuaListener(acnts []*PjsuaAccount, localPort, waitDur time.Duration) (*os.File, error) {
|
|
cmdArgs := []string{fmt.Sprintf("--local-port=%d", localPort), "--null-audio", "--auto-answer=200", "--max-calls=32", "--app-log-level=0"}
|
|
for idx, acnt := range acnts {
|
|
if idx != 0 {
|
|
cmdArgs = append(cmdArgs, "--next-account")
|
|
}
|
|
cmdArgs = append(cmdArgs, "--id="+acnt.ID, "--registrar="+acnt.Registrar, "--username="+acnt.Username, "--password="+acnt.Password, "--realm="+acnt.Realm)
|
|
}
|
|
pjsuaPath, err := exec.LookPath("pjsua")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pjsua := exec.Command(pjsuaPath, cmdArgs...)
|
|
fPty, err := pty.Start(pjsua)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
buf := new(bytes.Buffer)
|
|
io.Copy(os.Stdout, buf) // Free the content since otherwise pjsua will not start
|
|
time.Sleep(waitDur) // Give time to rater to fire up
|
|
return fPty, nil
|
|
}
|
|
|
|
func PjsuaCallURI(acnt *PjsuaAccount, dstURI, outboundURI string, callDur time.Duration, localPort int) error {
|
|
cmdArgs := []string{"--null-audio", "--app-log-level=0", fmt.Sprintf("--local-port=%d", localPort), fmt.Sprintf("--duration=%d", int(callDur.Seconds())),
|
|
"--outbound=" + outboundURI, "--id=" + acnt.ID, "--username=" + acnt.Username, "--password=" + acnt.Password, "--realm=" + acnt.Realm, dstURI}
|
|
|
|
pjsuaPath, err := exec.LookPath("pjsua")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
pjsua := exec.Command(pjsuaPath, cmdArgs...)
|
|
fPty, err := pty.Start(pjsua)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
buf := new(bytes.Buffer)
|
|
io.Copy(os.Stdout, buf)
|
|
go func() {
|
|
time.Sleep(callDur + 2*time.Second)
|
|
fPty.Write([]byte("q\n")) // Destroy the listener
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func KillProcName(procName string, waitMs int) (err error) {
|
|
if err = exec.Command("pkill", procName).Run(); err != nil {
|
|
return
|
|
}
|
|
time.Sleep(time.Duration(waitMs) * time.Millisecond)
|
|
return
|
|
}
|
|
|
|
func ForceKillProcName(procName string, waitMs int) error {
|
|
if err := exec.Command("pkill", "-9", procName).Run(); err != nil {
|
|
return err
|
|
}
|
|
time.Sleep(time.Duration(waitMs) * time.Millisecond)
|
|
return nil
|
|
}
|
|
|
|
func CallScript(scriptPath string, subcommand string, waitMs int) error {
|
|
if err := exec.Command(scriptPath, subcommand).Run(); err != nil {
|
|
return err
|
|
}
|
|
time.Sleep(time.Duration(waitMs) * time.Millisecond) // Give time to rater to fire up
|
|
return nil
|
|
}
|
|
|
|
func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats {
|
|
return map[string]*ltcache.CacheStats{
|
|
utils.MetaDefault: {},
|
|
utils.CacheAttributeFilterIndexes: {},
|
|
utils.CacheAttributeProfiles: {},
|
|
utils.CacheChargerFilterIndexes: {},
|
|
utils.CacheChargerProfiles: {},
|
|
utils.CacheDispatcherFilterIndexes: {},
|
|
utils.CacheDispatcherProfiles: {},
|
|
utils.CacheDispatcherHosts: {},
|
|
utils.CacheDispatcherRoutes: {},
|
|
utils.CacheDispatcherLoads: {},
|
|
utils.CacheDispatchers: {},
|
|
utils.CacheEventResources: {},
|
|
utils.CacheFilters: {},
|
|
utils.CacheResourceFilterIndexes: {},
|
|
utils.CacheResourceProfiles: {},
|
|
utils.CacheResources: {},
|
|
utils.CacheRPCResponses: {},
|
|
utils.CacheStatFilterIndexes: {},
|
|
utils.CacheStatQueueProfiles: {},
|
|
utils.CacheStatQueues: {},
|
|
utils.CacheSTIR: {},
|
|
utils.CacheRankingProfiles: {},
|
|
utils.CacheRouteFilterIndexes: {},
|
|
utils.CacheRouteProfiles: {},
|
|
utils.CacheThresholdFilterIndexes: {},
|
|
utils.CacheThresholdProfiles: {},
|
|
utils.CacheThresholds: {},
|
|
utils.CacheRateProfiles: {},
|
|
utils.CacheRateProfilesFilterIndexes: {},
|
|
utils.CacheRateFilterIndexes: {},
|
|
utils.CacheDiameterMessages: {},
|
|
utils.CacheClosedSessions: {},
|
|
utils.CacheLoadIDs: {},
|
|
utils.CacheRPCConnections: {},
|
|
utils.CacheCDRIDs: {},
|
|
utils.CacheUCH: {},
|
|
utils.CacheEventCharges: {},
|
|
utils.CacheReverseFilterIndexes: {},
|
|
utils.MetaAPIBan: {},
|
|
utils.MetaSentryPeer: {},
|
|
utils.CacheCapsEvents: {},
|
|
utils.CacheActionProfiles: {},
|
|
utils.CacheActionProfilesFilterIndexes: {},
|
|
utils.CacheAccounts: {},
|
|
utils.CacheAccountsFilterIndexes: {},
|
|
utils.CacheReplicationHosts: {},
|
|
}
|
|
}
|
|
|
|
func NewRPCClient(cfg *config.ListenCfg, encoding string) (*birpc.Client, error) {
|
|
switch encoding {
|
|
case utils.MetaJSON:
|
|
return jsonrpc.Dial(utils.TCP, cfg.RPCJSONListen)
|
|
case utils.MetaGOB:
|
|
return birpc.Dial(utils.TCP, cfg.RPCGOBListen)
|
|
default:
|
|
return nil, errors.New("invalid encoding")
|
|
}
|
|
}
|
|
|
|
// TestEnvironment holds the setup parameters and configurations
|
|
// required for running integration tests.
|
|
type TestEnvironment struct {
|
|
Name string // usually the name of the test
|
|
ConfigPath string // file path to the main configuration file
|
|
ConfigJSON string // contains the configuration JSON content if ConfigPath is missing
|
|
TpPath string // specifies the path to the tariff plans
|
|
TpFiles map[string]string // maps CSV filenames to their content for tariff plan loading
|
|
LogBuffer io.Writer // captures the log output of the test environment
|
|
Encoding string // specifies the data encoding type (e.g., JSON, GOB)
|
|
}
|
|
|
|
// Setup initializes the testing environment using the provided configuration. It loads the configuration
|
|
// from a specified path or creates a new one if the path is not provided. The method starts the engine,
|
|
// establishes an RPC client connection, and loads CSV data if provided. It returns an RPC client and the
|
|
// configuration.
|
|
func (env TestEnvironment) Setup(t *testing.T, ctx *context.Context, engineDelay int) (*birpc.Client, *config.CGRConfig) {
|
|
t.Helper()
|
|
|
|
var cfg *config.CGRConfig
|
|
switch {
|
|
case env.ConfigPath != "":
|
|
var err error
|
|
cfg, err = config.NewCGRConfigFromPath(ctx, env.ConfigPath)
|
|
if err != nil {
|
|
t.Fatalf("failed to init config from path %s: %v", env.ConfigPath, err)
|
|
}
|
|
default:
|
|
cfg, env.ConfigPath = initCfg(t, ctx, env.ConfigJSON)
|
|
}
|
|
|
|
flushDBs(t, cfg, true, true)
|
|
startEngine(t, ctx, cfg, env.ConfigPath, engineDelay, env.LogBuffer)
|
|
|
|
client, err := NewRPCClient(cfg.ListenCfg(), env.Encoding)
|
|
if err != nil {
|
|
t.Fatalf("could not connect to cgr-engine: %v", err)
|
|
}
|
|
|
|
loadCSVs(t, env.TpPath, env.TpFiles)
|
|
|
|
return client, cfg
|
|
}
|
|
|
|
// initCfg creates a new CGRConfig from the provided configuration content string.
|
|
// It generates a temporary directory and file path, writes the content to the configuration
|
|
// file, and returns the created CGRConfig and the configuration directory path.
|
|
func initCfg(t *testing.T, ctx *context.Context, cfgContent string) (cfg *config.CGRConfig, cfgPath string) {
|
|
t.Helper()
|
|
if cfgContent == utils.EmptyString {
|
|
t.Fatal("ConfigJSON is required but empty")
|
|
}
|
|
cfgPath = t.TempDir()
|
|
filePath := filepath.Join(cfgPath, "cgrates.json")
|
|
if err := os.WriteFile(filePath, []byte(cfgContent), 0644); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
cfg, err := config.NewCGRConfigFromPath(ctx, cfgPath)
|
|
if err != nil {
|
|
t.Fatalf("failed to init config from path %s: %v", cfgPath, err)
|
|
}
|
|
return cfg, cfgPath
|
|
}
|
|
|
|
// loadCSVs loads tariff plan data from CSV files. The CSV files are created based on the csvFiles map, where
|
|
// the key represents the file name and the value the contains its contents. Assumes the data is loaded
|
|
// automatically (RunDelay != 0)
|
|
func loadCSVs(t *testing.T, tpPath string, csvFiles map[string]string) {
|
|
|
|
t.Helper()
|
|
if tpPath != "" {
|
|
for fileName, content := range csvFiles {
|
|
filePath := path.Join(tpPath, fileName)
|
|
if err := os.WriteFile(filePath, []byte(content), 0644); err != nil {
|
|
t.Fatalf("could not write to file %s: %v", filePath, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// flushDBs resets the databases specified in the configuration if the corresponding flags are true.
|
|
func flushDBs(t *testing.T, cfg *config.CGRConfig, flushDataDB, flushStorDB bool) {
|
|
t.Helper()
|
|
if flushDataDB {
|
|
if err := InitDataDB(cfg); err != nil {
|
|
t.Fatalf("failed to flush %s dataDB: %v", cfg.DataDbCfg().Type, err)
|
|
}
|
|
}
|
|
if flushStorDB {
|
|
if err := InitStorDB(cfg); err != nil {
|
|
t.Fatalf("failed to flush %s storDB: %v", cfg.StorDbCfg().Type, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// startEngine starts the CGR engine process with the provided configuration. It writes engine logs to the provided
|
|
// logBuffer (if any) and waits for the engine to be ready. If the passed context were to be cancelled, the engine
|
|
// would also shut down.
|
|
func startEngine(t *testing.T, ctx *context.Context, cfg *config.CGRConfig, cfgPath string, waitEngine int,
|
|
logBuffer io.Writer) {
|
|
t.Helper()
|
|
binPath, err := exec.LookPath("cgr-engine")
|
|
if err != nil {
|
|
t.Fatal("could not find cgr-engine executable")
|
|
}
|
|
engine := exec.CommandContext(
|
|
ctx,
|
|
binPath,
|
|
"-config_path", cfgPath,
|
|
"-logger", utils.MetaStdLog,
|
|
)
|
|
if logBuffer != nil {
|
|
engine.Stdout = logBuffer
|
|
engine.Stderr = logBuffer
|
|
}
|
|
if err := engine.Start(); err != nil {
|
|
t.Fatalf("cgr-engine command failed: %v", err)
|
|
}
|
|
t.Cleanup(func() {
|
|
if err := engine.Process.Kill(); err != nil {
|
|
t.Logf("failed to kill cgr-engine process (%d): %v", engine.Process.Pid, err)
|
|
}
|
|
})
|
|
fib := utils.FibDuration(time.Millisecond, 0)
|
|
for i := 0; i < 16; i++ {
|
|
time.Sleep(fib())
|
|
if _, err := jsonrpc.Dial(utils.TCP, cfg.ListenCfg().RPCJSONListen); err == nil {
|
|
break
|
|
}
|
|
}
|
|
if err != nil {
|
|
t.Fatalf("starting cgr-engine on port %s failed: %v", cfg.ListenCfg().RPCJSONListen, err)
|
|
}
|
|
time.Sleep(time.Duration(waitEngine) * time.Millisecond)
|
|
}
|