From cbfeff169d8bbe1e0ddc72b83bf72a8fbdbb08b9 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 29 Mar 2018 09:50:33 +0200 Subject: [PATCH] Loader - move files on completion, API LoaderSv1.ProcessFolder -> LoaderSv1.Load --- apier/v1/loaders.go | 5 +++ config/config_defaults.go | 6 +-- config/config_json_test.go | 6 +-- config/config_test.go | 6 +-- loaders/loader.go | 92 +++++++++++++++++++++++++++----------- loaders/loaders.go | 35 +++++++++++++++ 6 files changed, 114 insertions(+), 36 deletions(-) diff --git a/apier/v1/loaders.go b/apier/v1/loaders.go index d685f7890..1dda5e66f 100644 --- a/apier/v1/loaders.go +++ b/apier/v1/loaders.go @@ -37,3 +37,8 @@ func (ldrSv1 *LoaderSv1) Call(serviceMethod string, args interface{}, reply interface{}) error { return utils.APIerRPCCall(ldrSv1, serviceMethod, args, reply) } + +func (ldrSv1 *LoaderSv1) Load(args *loaders.ArgsProcessFolder, + rply *string) error { + return ldrSv1.ldrS.V1Load(args, rply) +} diff --git a/config/config_defaults.go b/config/config_defaults.go index dd5180067..bd084fb1a 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -458,13 +458,13 @@ const CGRATES_CFG_JSON = ` "enabled": false, // starts as service: . "dry_run": false, // do not send the CDRs to CDRS, just parse them "run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify - "lock_filename": ".cgr.lock", // Filename containing concurrency lock in case of delayed processing + "lock_filename": ".cgr.lck", // Filename containing concurrency lock in case of delayed processing "caches_conns": [ {"address": "*internal"}, // address where to reach the CacheS for data reload, empty for no reloads <""|*internal|x.y.z.y:1234> ], "field_separator": ",", // separator used in case of csv files - "tp_in_dir": "/var/spool/cgrates/tploader/in", // absolute path towards the directory where the CDRs are stored - "tp_out_dir": "/var/spool/cgrates/tploader/out", // absolute path towards the directory where processed CDRs will be moved + "tp_in_dir": "/var/spool/cgrates/loader/in", // absolute path towards the directory where the CDRs are stored + "tp_out_dir": "/var/spool/cgrates/loader/out", // absolute path towards the directory where processed CDRs will be moved "data":[ // data profiles to load { "type": "*attributes", // data source type diff --git a/config/config_json_test.go b/config/config_json_test.go index 7b4667acb..9ece2468e 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -815,13 +815,13 @@ func TestDfLoaderSJsonCfg(t *testing.T) { Enabled: utils.BoolPointer(false), Dry_run: utils.BoolPointer(false), Run_delay: utils.IntPointer(0), - Lock_filename: utils.StringPointer(".cgr.lock"), + Lock_filename: utils.StringPointer(".cgr.lck"), Caches_conns: &[]*HaPoolJsonCfg{&HaPoolJsonCfg{ Address: utils.StringPointer(utils.MetaInternal), }}, Field_separator: utils.StringPointer(","), - Tp_in_dir: utils.StringPointer("/var/spool/cgrates/tploader/in"), - Tp_out_dir: utils.StringPointer("/var/spool/cgrates/tploader/out"), + Tp_in_dir: utils.StringPointer("/var/spool/cgrates/loader/in"), + Tp_out_dir: utils.StringPointer("/var/spool/cgrates/loader/out"), Data: &[]*LoaderSJsonDataType{dataType}, }, } diff --git a/config/config_test.go b/config/config_test.go index 6e8f0fa8e..f872a8ad6 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -923,15 +923,15 @@ func TestLoaderDefaults(t *testing.T) { Enabled: false, DryRun: false, RunDelay: 0, - LockFileName: ".cgr.lock", + LockFileName: ".cgr.lck", CacheSConns: []*HaPoolConfig{ &HaPoolConfig{ Address: utils.MetaInternal, }, }, FieldSeparator: ",", - TpInDir: "/var/spool/cgrates/tploader/in", - TpOutDir: "/var/spool/cgrates/tploader/out", + TpInDir: "/var/spool/cgrates/loader/in", + TpOutDir: "/var/spool/cgrates/loader/out", Data: []*LoaderSDataType{ &LoaderSDataType{ Type: utils.MetaAttributes, diff --git a/loaders/loader.go b/loaders/loader.go index a8c209f19..3a013fcb0 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -22,6 +22,7 @@ import ( "encoding/csv" "fmt" "io" + "io/ioutil" "os" "path" "strings" @@ -42,6 +43,7 @@ func NewLoader(dm *engine.DataManager, cfg *config.LoaderSConfig, ldr = &Loader{ enabled: cfg.Enabled, dryRun: cfg.DryRun, + ldrID: cfg.Id, tpInDir: cfg.TpInDir, tpOutDir: cfg.TpOutDir, lockFilename: cfg.LockFileName, @@ -101,17 +103,13 @@ func (ldr *Loader) ProcessFolder() (err error) { } defer ldr.unlockFolder() for ldrType := range ldr.rdrs { - if err = ldr.openFiles(ldrType); err != nil { + if err = ldr.processFiles(ldrType); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s", utils.LoaderS, ldr.ldrID, ldrType, err.Error())) continue } - if err = ldr.processContent(ldrType); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s>, err: %s", - utils.LoaderS, ldr.ldrID, ldrType, err.Error())) - } } - return + return ldr.moveFiles() } // lockFolder will attempt to lock the folder by creating the lock file @@ -126,6 +124,17 @@ func (ldr *Loader) unlockFolder() (err error) { ldr.lockFilename)) } +func (ldr *Loader) isFolderLocked() (locked bool, err error) { + if _, err = os.Stat(path.Join(ldr.tpInDir, + ldr.lockFilename)); err == nil { + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return +} + // unreferenceFile will cleanup an used file by closing and removing from referece map func (ldr *Loader) unreferenceFile(loaderType, fileName string) (err error) { openedCSVFile := ldr.rdrs[loaderType][fileName] @@ -133,39 +142,36 @@ func (ldr *Loader) unreferenceFile(loaderType, fileName string) (err error) { return openedCSVFile.rdr.Close() } -func (ldr *Loader) storeLoadedData(loaderType string, - lds map[string][]LoaderData) (err error) { - switch loaderType { - case utils.MetaAttributes: - for _, lDataSet := range lds { - attrModels := make(engine.TPAttributes, len(lDataSet)) - for i, ld := range lDataSet { - attrModels[i] = new(engine.TPAttribute) - if err = utils.UpdateStructWithIfaceMap(attrModels[i], ld); err != nil { - return - } - } - for _, tpApf := range attrModels.AsTPAttributes() { - if apf, err := engine.APItoAttributeProfile(tpApf, ldr.timezone); err != nil { - return err - } else if err := ldr.dm.SetAttributeProfile(apf, true); err != nil { - return err - } - } +func (ldr *Loader) moveFiles() (err error) { + filesInDir, _ := ioutil.ReadDir(ldr.tpInDir) + for _, file := range filesInDir { + fName := file.Name() + if fName == ldr.lockFilename { + continue + } + oldPath := path.Join(ldr.tpInDir, fName) + newPath := path.Join(ldr.tpOutDir, fName) + if err = os.Rename(oldPath, newPath); err != nil { + return } } return } -func (ldr *Loader) openFiles(loaderType string) (err error) { +func (ldr *Loader) processFiles(loaderType string) (err error) { for fName := range ldr.rdrs[loaderType] { var rdr *os.File if rdr, err = os.Open(path.Join(ldr.tpInDir, fName)); err != nil { return err } + csvReader := csv.NewReader(rdr) + csvReader.Comment = '#' ldr.rdrs[loaderType][fName] = &openedCSVFile{ - fileName: fName, rdr: rdr, csvRdr: csv.NewReader(rdr)} + fileName: fName, rdr: rdr, csvRdr: csvReader} defer ldr.unreferenceFile(loaderType, fName) + if err = ldr.processContent(loaderType); err != nil { + return + } } return } @@ -233,3 +239,35 @@ func (ldr *Loader) processContent(loaderType string) (err error) { delete(ldr.bufLoaderData, tntID) return } + +func (ldr *Loader) storeLoadedData(loaderType string, + lds map[string][]LoaderData) (err error) { + switch loaderType { + case utils.MetaAttributes: + for _, lDataSet := range lds { + attrModels := make(engine.TPAttributes, len(lDataSet)) + for i, ld := range lDataSet { + attrModels[i] = new(engine.TPAttribute) + if err = utils.UpdateStructWithIfaceMap(attrModels[i], ld); err != nil { + return + } + } + for _, tpApf := range attrModels.AsTPAttributes() { + apf, err := engine.APItoAttributeProfile(tpApf, ldr.timezone) + if err != nil { + return err + } + if ldr.dryRun { + utils.Logger.Info( + fmt.Sprintf("<%s-%s> DRY_RUN: AttributeProfile: %s", + utils.LoaderS, ldr.ldrID, utils.ToJSON(apf))) + continue + } + if err := ldr.dm.SetAttributeProfile(apf, true); err != nil { + return err + } + } + } + } + return +} diff --git a/loaders/loaders.go b/loaders/loaders.go index 8e3cd2400..568f36367 100644 --- a/loaders/loaders.go +++ b/loaders/loaders.go @@ -19,8 +19,12 @@ along with this program. If not, see package loaders import ( + "errors" + "fmt" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" ) func NewLoaderService(dm *engine.DataManager, ldrsCfg []*config.LoaderSConfig, @@ -64,3 +68,34 @@ func (ldrS *LoaderService) ListenAndServe(exitChan chan bool) (err error) { } return } + +type ArgsProcessFolder struct { + LoaderID string + ForceLock bool +} + +func (ldrS *LoaderService) V1Load(args *ArgsProcessFolder, + rply *string) (err error) { + if args.LoaderID == "" { + args.LoaderID = utils.META_DEFAULT + } + ldr, has := ldrS.ldrs[args.LoaderID] + if !has { + return fmt.Errorf("UNKNOWN_LOADER: %s", args.LoaderID) + } + if locked, err := ldr.isFolderLocked(); err != nil { + return utils.NewErrServerError(err) + } else if locked { + if args.ForceLock { + if err := ldr.unlockFolder(); err != nil { + return utils.NewErrServerError(err) + } + } + return errors.New("ANOTHER_LOADER_RUNNING") + } + if err := ldr.ProcessFolder(); err != nil { + return utils.NewErrServerError(err) + } + *rply = utils.OK + return +}