mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Fixed configurable lock file and integration tests in loaders
This commit is contained in:
committed by
Dan Christian Bogos
parent
93e79b3b88
commit
fb48016c51
@@ -21,6 +21,7 @@ package config
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -128,6 +129,12 @@ func (cfg *CGRConfig) checkConfigSanity() error {
|
||||
return fmt.Errorf("<%s> nonexistent folder: %s", utils.LoaderS, ldrSCfg.TpOutDir)
|
||||
}
|
||||
}
|
||||
if ldrSCfg.LockFilePath != utils.EmptyString { // tpOutDir support empty string for no moving files after process
|
||||
pathL := ldrSCfg.GetLockFilePath()
|
||||
if _, err := os.Stat(path.Dir(pathL)); err != nil && os.IsNotExist(err) {
|
||||
return fmt.Errorf("<%s> nonexistent folder: %s", utils.LoaderS, pathL)
|
||||
}
|
||||
}
|
||||
for _, data := range ldrSCfg.Data {
|
||||
if !posibleLoaderTypes.Has(data.Type) {
|
||||
return fmt.Errorf("<%s> unsupported data type %s", utils.LoaderS, data.Type)
|
||||
|
||||
@@ -19,6 +19,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package config
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -134,9 +137,6 @@ func (l *LoaderSCfg) loadFromJSONCfg(jsnCfg *LoaderJsonCfg, msgTemplates map[str
|
||||
return
|
||||
}
|
||||
}
|
||||
if jsnCfg.Lockfile_path != nil {
|
||||
l.LockFilePath = *jsnCfg.Lockfile_path
|
||||
}
|
||||
if jsnCfg.Caches_conns != nil {
|
||||
l.CacheSConns = make([]string, len(*jsnCfg.Caches_conns))
|
||||
for idx, connID := range *jsnCfg.Caches_conns {
|
||||
@@ -157,6 +157,9 @@ func (l *LoaderSCfg) loadFromJSONCfg(jsnCfg *LoaderJsonCfg, msgTemplates map[str
|
||||
if jsnCfg.Tp_out_dir != nil {
|
||||
l.TpOutDir = *jsnCfg.Tp_out_dir
|
||||
}
|
||||
if jsnCfg.Lockfile_path != nil {
|
||||
l.LockFilePath = *jsnCfg.Lockfile_path
|
||||
}
|
||||
if jsnCfg.Data != nil {
|
||||
data := make([]*LoaderDataType, len(*jsnCfg.Data))
|
||||
for idx, jsnLoCfg := range *jsnCfg.Data {
|
||||
@@ -171,6 +174,18 @@ func (l *LoaderSCfg) loadFromJSONCfg(jsnCfg *LoaderJsonCfg, msgTemplates map[str
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l LoaderSCfg) GetLockFilePath() (pathL string) {
|
||||
pathL = l.LockFilePath
|
||||
if !filepath.IsAbs(pathL) {
|
||||
pathL = path.Join(l.TpInDir, pathL)
|
||||
}
|
||||
|
||||
if file, err := os.Stat(pathL); err == nil && file.IsDir() {
|
||||
pathL = path.Join(pathL, l.ID+".lck")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Clone itself into a new LoaderDataType
|
||||
func (lData LoaderDataType) Clone() (cln *LoaderDataType) {
|
||||
cln = &LoaderDataType{
|
||||
|
||||
@@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package config
|
||||
|
||||
import (
|
||||
"path"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -438,3 +439,76 @@ func TestLoaderSCfgsClone(t *testing.T) {
|
||||
t.Errorf("Expected clone to not modify the cloned")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLockFolderRelativePath(t *testing.T) {
|
||||
ldr := &LoaderSCfg{
|
||||
TpInDir: "/var/spool/cgrates/loader/in/",
|
||||
TpOutDir: "/var/spool/cgrates/loader/out/",
|
||||
LockFilePath: utils.ResourcesCsv,
|
||||
}
|
||||
|
||||
jsonCfg := &LoaderJsonCfg{
|
||||
ID: utils.StringPointer("loaderid"),
|
||||
Enabled: utils.BoolPointer(true),
|
||||
Tenant: utils.StringPointer("cgrates.org"),
|
||||
Dry_run: utils.BoolPointer(false),
|
||||
Lockfile_path: utils.StringPointer(utils.ResourcesCsv),
|
||||
Field_separator: utils.StringPointer(utils.InfieldSep),
|
||||
Tp_in_dir: utils.StringPointer("/var/spool/cgrates/loader/in/"),
|
||||
Tp_out_dir: utils.StringPointer("/var/spool/cgrates/loader/out/"),
|
||||
}
|
||||
expPath := path.Join(ldr.LockFilePath)
|
||||
if err = ldr.loadFromJSONCfg(jsonCfg, map[string][]*FCTemplate{}, utils.InfieldSep); err != nil {
|
||||
t.Error(err)
|
||||
} else if ldr.LockFilePath != expPath {
|
||||
t.Errorf("Expected %v \n but received \n %v", expPath, ldr.LockFilePath)
|
||||
}
|
||||
}
|
||||
func TestLockFolderNonRelativePath(t *testing.T) {
|
||||
ldr := &LoaderSCfg{
|
||||
TpInDir: "/var/spool/cgrates/loader/in/",
|
||||
TpOutDir: "/var/spool/cgrates/loader/out/",
|
||||
LockFilePath: utils.ResourcesCsv,
|
||||
}
|
||||
|
||||
jsonCfg := &LoaderJsonCfg{
|
||||
ID: utils.StringPointer("loaderid"),
|
||||
Enabled: utils.BoolPointer(true),
|
||||
Tenant: utils.StringPointer("cgrates.org"),
|
||||
Dry_run: utils.BoolPointer(false),
|
||||
Lockfile_path: utils.StringPointer(path.Join("/tmp/", utils.ResourcesCsv)),
|
||||
Field_separator: utils.StringPointer(utils.InfieldSep),
|
||||
Tp_in_dir: utils.StringPointer("/var/spool/cgrates/loader/in/"),
|
||||
Tp_out_dir: utils.StringPointer("/var/spool/cgrates/loader/out/"),
|
||||
}
|
||||
expPath := path.Join("/tmp/", utils.ResourcesCsv)
|
||||
if err = ldr.loadFromJSONCfg(jsonCfg, map[string][]*FCTemplate{}, utils.InfieldSep); err != nil {
|
||||
t.Error(err)
|
||||
} else if ldr.LockFilePath != expPath {
|
||||
t.Errorf("Expected %v \n but received \n %v", expPath, ldr.LockFilePath)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLockFolderIsDir(t *testing.T) {
|
||||
ldr := &LoaderSCfg{
|
||||
LockFilePath: "test",
|
||||
}
|
||||
|
||||
jsonCfg := &LoaderJsonCfg{
|
||||
ID: utils.StringPointer("loaderid"),
|
||||
Enabled: utils.BoolPointer(true),
|
||||
Tenant: utils.StringPointer("cgrates.org"),
|
||||
Dry_run: utils.BoolPointer(false),
|
||||
Lockfile_path: utils.StringPointer("/tmp"),
|
||||
Field_separator: utils.StringPointer(utils.InfieldSep),
|
||||
Tp_in_dir: utils.StringPointer("/var/spool/cgrates/loader/in/"),
|
||||
Tp_out_dir: utils.StringPointer("/var/spool/cgrates/loader/out/"),
|
||||
}
|
||||
expPath := path.Join("/tmp")
|
||||
|
||||
if err = ldr.loadFromJSONCfg(jsonCfg, map[string][]*FCTemplate{}, utils.InfieldSep); err != nil {
|
||||
t.Error(err)
|
||||
} else if ldr.LockFilePath != expPath {
|
||||
t.Errorf("Expected %v \n but received \n %v", expPath, ldr.LockFilePath)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ func NewLoader(dm *engine.DataManager, cfg *config.LoaderSCfg,
|
||||
ldrID: cfg.ID,
|
||||
tpInDir: cfg.TpInDir,
|
||||
tpOutDir: cfg.TpOutDir,
|
||||
lockFilepath: cfg.LockFilePath,
|
||||
lockFilepath: cfg.GetLockFilePath(),
|
||||
fieldSep: cfg.FieldSeparator,
|
||||
runDelay: cfg.RunDelay,
|
||||
dataTpls: make(map[string][]*config.FCTemplate),
|
||||
@@ -131,8 +131,9 @@ func (ldr *Loader) ProcessFolder(caching, loadOption string, stopOnError bool) (
|
||||
|
||||
// lockFolder will attempt to lock the folder by creating the lock file
|
||||
func (ldr *Loader) lockFolder() (err error) {
|
||||
if _, err = os.Stat(ldr.lockFilepath); err != nil && os.IsNotExist(err) {
|
||||
return fmt.Errorf("file: %v not found", ldr.lockFilepath)
|
||||
// If the path is an empty string, we should not be locking
|
||||
if ldr.lockFilepath == utils.EmptyString {
|
||||
return
|
||||
}
|
||||
_, err = os.OpenFile(ldr.lockFilepath,
|
||||
os.O_RDONLY|os.O_CREATE, 0644)
|
||||
@@ -140,6 +141,10 @@ func (ldr *Loader) lockFolder() (err error) {
|
||||
}
|
||||
|
||||
func (ldr *Loader) unlockFolder() (err error) {
|
||||
// If the path is an empty string, we should not be locking
|
||||
if ldr.lockFilepath == utils.EmptyString {
|
||||
return
|
||||
}
|
||||
if _, err = os.Stat(ldr.lockFilepath); err == nil {
|
||||
return os.Remove(ldr.lockFilepath)
|
||||
}
|
||||
@@ -147,6 +152,10 @@ func (ldr *Loader) unlockFolder() (err error) {
|
||||
}
|
||||
|
||||
func (ldr *Loader) isFolderLocked() (locked bool, err error) {
|
||||
// If the path is an empty string, we should not be locking
|
||||
if ldr.lockFilepath == utils.EmptyString {
|
||||
return
|
||||
}
|
||||
if _, err = os.Stat(ldr.lockFilepath); err == nil {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package loaders
|
||||
|
||||
import (
|
||||
"encoding/csv"
|
||||
"io"
|
||||
"net/rpc"
|
||||
"os"
|
||||
@@ -433,7 +434,7 @@ cgrates.org,NewRes1
|
||||
fieldSep: utils.FieldsSep,
|
||||
tpInDir: flPath,
|
||||
tpOutDir: "/tmp",
|
||||
lockFilepath: utils.ResourcesCsv,
|
||||
lockFilepath: "/tmp/testProcessFile/.lck",
|
||||
bufLoaderData: make(map[string][]LoaderData),
|
||||
timezone: "UTC",
|
||||
}
|
||||
@@ -457,17 +458,17 @@ cgrates.org,NewRes1
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
resCsv := `
|
||||
#Tenant[0],ID[1]
|
||||
cgrates.org,NewRes1
|
||||
`
|
||||
rdr := io.NopCloser(strings.NewReader(resCsv))
|
||||
// resCsv := `
|
||||
// #Tenant[0],ID[1]
|
||||
// cgrates.org,NewRes1
|
||||
// `
|
||||
// rdr := io.NopCloser(strings.NewReader(resCsv))
|
||||
|
||||
ldr.rdrs = map[string]map[string]*openedCSVFile{
|
||||
utils.MetaResources: {
|
||||
utils.ResourcesCsv: &openedCSVFile{
|
||||
fileName: utils.ResourcesCsv,
|
||||
rdr: rdr,
|
||||
rdr: io.NopCloser(nil),
|
||||
csvRdr: csv.NewReader(nil),
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -495,6 +496,16 @@ cgrates.org,NewRes1
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
file, err = os.Create(path.Join(flPath, utils.ResourcesCsv))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
file.Write([]byte(`
|
||||
#Tenant[0],ID[1]
|
||||
cgrates.org,NewRes1
|
||||
`))
|
||||
file.Close()
|
||||
|
||||
//cannot move file when tpOutDir is empty
|
||||
ldr.tpOutDir = utils.EmptyString
|
||||
if err := ldr.processFile("unusedValue", utils.ResourcesCsv); err != nil {
|
||||
@@ -503,7 +514,7 @@ cgrates.org,NewRes1
|
||||
|
||||
if err := os.Remove(path.Join("/tmp", utils.ResourcesCsv)); err != nil {
|
||||
t.Error(err)
|
||||
} else if err := os.Remove(flPath); err != nil {
|
||||
} else if err := os.RemoveAll(flPath); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
@@ -579,21 +590,30 @@ func testProcessFileLockFolder(t *testing.T) {
|
||||
}
|
||||
|
||||
ldr := &Loader{
|
||||
ldrID: "testProcessFileLockFolder",
|
||||
tpInDir: flPath,
|
||||
tpOutDir: "/tmp",
|
||||
ldrID: "testProcessFileLockFolder",
|
||||
tpInDir: flPath,
|
||||
tpOutDir: "/tmp",
|
||||
lockFilepath: "/tmp/test/.cgr.lck",
|
||||
fieldSep: utils.InfieldSep,
|
||||
}
|
||||
|
||||
resCsv := `
|
||||
#Tenant[0],ID[1]
|
||||
cgrates.org,NewRes1
|
||||
`
|
||||
rdr := io.NopCloser(strings.NewReader(resCsv))
|
||||
|
||||
ldr.rdrs = map[string]map[string]*openedCSVFile{
|
||||
utils.MetaResources: {
|
||||
utils.ResourcesCsv: &openedCSVFile{
|
||||
fileName: utils.ResourcesCsv,
|
||||
rdr: rdr,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
//unable to lock the folder, because lockFileName is missing
|
||||
expected := "open /tmp/testProcessFileLockFolder: is a directory"
|
||||
expected := "open /tmp/test/.cgr.lck: no such file or directory"
|
||||
if err := ldr.processFile("unusedValue", utils.ResourcesCsv); err == nil || err.Error() != expected {
|
||||
t.Errorf("Expected %+v, received %+v", expected, err)
|
||||
}
|
||||
@@ -674,27 +694,37 @@ func testProcessFileRenameError(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
resCsv := `
|
||||
#Tenant[0],ID[1]
|
||||
cgrates.org,NewRes1
|
||||
`
|
||||
rdr := io.NopCloser(strings.NewReader(resCsv))
|
||||
// resCsv := `
|
||||
// #Tenant[0],ID[1]
|
||||
// cgrates.org,NewRes1
|
||||
// `
|
||||
// rdr := io.NopCloser(strings.NewReader(resCsv))
|
||||
|
||||
ldr.rdrs = map[string]map[string]*openedCSVFile{
|
||||
utils.MetaResources: {
|
||||
utils.ResourcesCsv: &openedCSVFile{
|
||||
fileName: utils.ResourcesCsv,
|
||||
rdr: rdr,
|
||||
rdr: io.NopCloser(nil),
|
||||
csvRdr: csv.NewReader(nil),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
file, err := os.Create(path.Join(flPath1, utils.ResourcesCsv))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
file.Write([]byte(`
|
||||
#Tenant[0],ID[1]
|
||||
cgrates.org,NewRes1
|
||||
`))
|
||||
file.Close()
|
||||
|
||||
expected := "rename /tmp/testProcessFileLockFolder/Resources.csv INEXISTING_FILE/Resources.csv: no such file or directory"
|
||||
if err := ldr.processFile("unusedValue", utils.ResourcesCsv); err == nil || err.Error() != expected {
|
||||
t.Errorf("Expected %+v, received %+v", expected, err)
|
||||
}
|
||||
|
||||
if err := os.Remove(flPath1); err != nil {
|
||||
if err := os.RemoveAll(flPath1); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
@@ -768,7 +798,7 @@ func testNewLockFolderNotFound(t *testing.T) {
|
||||
timezone: "UTC",
|
||||
}
|
||||
|
||||
errExpect := "file: /tmp/testNewLockFolder/Resources.csv not found"
|
||||
errExpect := "open /tmp/testNewLockFolder/Resources.csv: no such file or directory"
|
||||
if err := ldr.lockFolder(); err == nil || err.Error() != errExpect {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user