mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Loader - move files on completion, API LoaderSv1.ProcessFolder -> LoaderSv1.Load
This commit is contained in:
@@ -37,3 +37,8 @@ func (ldrSv1 *LoaderSv1) Call(serviceMethod string,
|
||||
args interface{}, reply interface{}) error {
|
||||
return utils.APIerRPCCall(ldrSv1, serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
func (ldrSv1 *LoaderSv1) Load(args *loaders.ArgsProcessFolder,
|
||||
rply *string) error {
|
||||
return ldrSv1.ldrS.V1Load(args, rply)
|
||||
}
|
||||
|
||||
@@ -458,13 +458,13 @@ const CGRATES_CFG_JSON = `
|
||||
"enabled": false, // starts as service: <true|false>.
|
||||
"dry_run": false, // do not send the CDRs to CDRS, just parse them
|
||||
"run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify
|
||||
"lock_filename": ".cgr.lock", // Filename containing concurrency lock in case of delayed processing
|
||||
"lock_filename": ".cgr.lck", // Filename containing concurrency lock in case of delayed processing
|
||||
"caches_conns": [
|
||||
{"address": "*internal"}, // address where to reach the CacheS for data reload, empty for no reloads <""|*internal|x.y.z.y:1234>
|
||||
],
|
||||
"field_separator": ",", // separator used in case of csv files
|
||||
"tp_in_dir": "/var/spool/cgrates/tploader/in", // absolute path towards the directory where the CDRs are stored
|
||||
"tp_out_dir": "/var/spool/cgrates/tploader/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"tp_in_dir": "/var/spool/cgrates/loader/in", // absolute path towards the directory where the CDRs are stored
|
||||
"tp_out_dir": "/var/spool/cgrates/loader/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"data":[ // data profiles to load
|
||||
{
|
||||
"type": "*attributes", // data source type
|
||||
|
||||
@@ -815,13 +815,13 @@ func TestDfLoaderSJsonCfg(t *testing.T) {
|
||||
Enabled: utils.BoolPointer(false),
|
||||
Dry_run: utils.BoolPointer(false),
|
||||
Run_delay: utils.IntPointer(0),
|
||||
Lock_filename: utils.StringPointer(".cgr.lock"),
|
||||
Lock_filename: utils.StringPointer(".cgr.lck"),
|
||||
Caches_conns: &[]*HaPoolJsonCfg{&HaPoolJsonCfg{
|
||||
Address: utils.StringPointer(utils.MetaInternal),
|
||||
}},
|
||||
Field_separator: utils.StringPointer(","),
|
||||
Tp_in_dir: utils.StringPointer("/var/spool/cgrates/tploader/in"),
|
||||
Tp_out_dir: utils.StringPointer("/var/spool/cgrates/tploader/out"),
|
||||
Tp_in_dir: utils.StringPointer("/var/spool/cgrates/loader/in"),
|
||||
Tp_out_dir: utils.StringPointer("/var/spool/cgrates/loader/out"),
|
||||
Data: &[]*LoaderSJsonDataType{dataType},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -923,15 +923,15 @@ func TestLoaderDefaults(t *testing.T) {
|
||||
Enabled: false,
|
||||
DryRun: false,
|
||||
RunDelay: 0,
|
||||
LockFileName: ".cgr.lock",
|
||||
LockFileName: ".cgr.lck",
|
||||
CacheSConns: []*HaPoolConfig{
|
||||
&HaPoolConfig{
|
||||
Address: utils.MetaInternal,
|
||||
},
|
||||
},
|
||||
FieldSeparator: ",",
|
||||
TpInDir: "/var/spool/cgrates/tploader/in",
|
||||
TpOutDir: "/var/spool/cgrates/tploader/out",
|
||||
TpInDir: "/var/spool/cgrates/loader/in",
|
||||
TpOutDir: "/var/spool/cgrates/loader/out",
|
||||
Data: []*LoaderSDataType{
|
||||
&LoaderSDataType{
|
||||
Type: utils.MetaAttributes,
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"encoding/csv"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
@@ -42,6 +43,7 @@ func NewLoader(dm *engine.DataManager, cfg *config.LoaderSConfig,
|
||||
ldr = &Loader{
|
||||
enabled: cfg.Enabled,
|
||||
dryRun: cfg.DryRun,
|
||||
ldrID: cfg.Id,
|
||||
tpInDir: cfg.TpInDir,
|
||||
tpOutDir: cfg.TpOutDir,
|
||||
lockFilename: cfg.LockFileName,
|
||||
@@ -101,17 +103,13 @@ func (ldr *Loader) ProcessFolder() (err error) {
|
||||
}
|
||||
defer ldr.unlockFolder()
|
||||
for ldrType := range ldr.rdrs {
|
||||
if err = ldr.openFiles(ldrType); err != nil {
|
||||
if err = ldr.processFiles(ldrType); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s",
|
||||
utils.LoaderS, ldr.ldrID, ldrType, err.Error()))
|
||||
continue
|
||||
}
|
||||
if err = ldr.processContent(ldrType); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s>, err: %s",
|
||||
utils.LoaderS, ldr.ldrID, ldrType, err.Error()))
|
||||
}
|
||||
}
|
||||
return
|
||||
return ldr.moveFiles()
|
||||
}
|
||||
|
||||
// lockFolder will attempt to lock the folder by creating the lock file
|
||||
@@ -126,6 +124,17 @@ func (ldr *Loader) unlockFolder() (err error) {
|
||||
ldr.lockFilename))
|
||||
}
|
||||
|
||||
func (ldr *Loader) isFolderLocked() (locked bool, err error) {
|
||||
if _, err = os.Stat(path.Join(ldr.tpInDir,
|
||||
ldr.lockFilename)); err == nil {
|
||||
return true, nil
|
||||
}
|
||||
if os.IsNotExist(err) {
|
||||
return false, nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// unreferenceFile will cleanup an used file by closing and removing from referece map
|
||||
func (ldr *Loader) unreferenceFile(loaderType, fileName string) (err error) {
|
||||
openedCSVFile := ldr.rdrs[loaderType][fileName]
|
||||
@@ -133,39 +142,36 @@ func (ldr *Loader) unreferenceFile(loaderType, fileName string) (err error) {
|
||||
return openedCSVFile.rdr.Close()
|
||||
}
|
||||
|
||||
func (ldr *Loader) storeLoadedData(loaderType string,
|
||||
lds map[string][]LoaderData) (err error) {
|
||||
switch loaderType {
|
||||
case utils.MetaAttributes:
|
||||
for _, lDataSet := range lds {
|
||||
attrModels := make(engine.TPAttributes, len(lDataSet))
|
||||
for i, ld := range lDataSet {
|
||||
attrModels[i] = new(engine.TPAttribute)
|
||||
if err = utils.UpdateStructWithIfaceMap(attrModels[i], ld); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, tpApf := range attrModels.AsTPAttributes() {
|
||||
if apf, err := engine.APItoAttributeProfile(tpApf, ldr.timezone); err != nil {
|
||||
return err
|
||||
} else if err := ldr.dm.SetAttributeProfile(apf, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
func (ldr *Loader) moveFiles() (err error) {
|
||||
filesInDir, _ := ioutil.ReadDir(ldr.tpInDir)
|
||||
for _, file := range filesInDir {
|
||||
fName := file.Name()
|
||||
if fName == ldr.lockFilename {
|
||||
continue
|
||||
}
|
||||
oldPath := path.Join(ldr.tpInDir, fName)
|
||||
newPath := path.Join(ldr.tpOutDir, fName)
|
||||
if err = os.Rename(oldPath, newPath); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (ldr *Loader) openFiles(loaderType string) (err error) {
|
||||
func (ldr *Loader) processFiles(loaderType string) (err error) {
|
||||
for fName := range ldr.rdrs[loaderType] {
|
||||
var rdr *os.File
|
||||
if rdr, err = os.Open(path.Join(ldr.tpInDir, fName)); err != nil {
|
||||
return err
|
||||
}
|
||||
csvReader := csv.NewReader(rdr)
|
||||
csvReader.Comment = '#'
|
||||
ldr.rdrs[loaderType][fName] = &openedCSVFile{
|
||||
fileName: fName, rdr: rdr, csvRdr: csv.NewReader(rdr)}
|
||||
fileName: fName, rdr: rdr, csvRdr: csvReader}
|
||||
defer ldr.unreferenceFile(loaderType, fName)
|
||||
if err = ldr.processContent(loaderType); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -233,3 +239,35 @@ func (ldr *Loader) processContent(loaderType string) (err error) {
|
||||
delete(ldr.bufLoaderData, tntID)
|
||||
return
|
||||
}
|
||||
|
||||
func (ldr *Loader) storeLoadedData(loaderType string,
|
||||
lds map[string][]LoaderData) (err error) {
|
||||
switch loaderType {
|
||||
case utils.MetaAttributes:
|
||||
for _, lDataSet := range lds {
|
||||
attrModels := make(engine.TPAttributes, len(lDataSet))
|
||||
for i, ld := range lDataSet {
|
||||
attrModels[i] = new(engine.TPAttribute)
|
||||
if err = utils.UpdateStructWithIfaceMap(attrModels[i], ld); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, tpApf := range attrModels.AsTPAttributes() {
|
||||
apf, err := engine.APItoAttributeProfile(tpApf, ldr.timezone)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ldr.dryRun {
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s-%s> DRY_RUN: AttributeProfile: %s",
|
||||
utils.LoaderS, ldr.ldrID, utils.ToJSON(apf)))
|
||||
continue
|
||||
}
|
||||
if err := ldr.dm.SetAttributeProfile(apf, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -19,8 +19,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package loaders
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewLoaderService(dm *engine.DataManager, ldrsCfg []*config.LoaderSConfig,
|
||||
@@ -64,3 +68,34 @@ func (ldrS *LoaderService) ListenAndServe(exitChan chan bool) (err error) {
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type ArgsProcessFolder struct {
|
||||
LoaderID string
|
||||
ForceLock bool
|
||||
}
|
||||
|
||||
func (ldrS *LoaderService) V1Load(args *ArgsProcessFolder,
|
||||
rply *string) (err error) {
|
||||
if args.LoaderID == "" {
|
||||
args.LoaderID = utils.META_DEFAULT
|
||||
}
|
||||
ldr, has := ldrS.ldrs[args.LoaderID]
|
||||
if !has {
|
||||
return fmt.Errorf("UNKNOWN_LOADER: %s", args.LoaderID)
|
||||
}
|
||||
if locked, err := ldr.isFolderLocked(); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
} else if locked {
|
||||
if args.ForceLock {
|
||||
if err := ldr.unlockFolder(); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
}
|
||||
return errors.New("ANOTHER_LOADER_RUNNING")
|
||||
}
|
||||
if err := ldr.ProcessFolder(); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
*rply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user