From 577face2939e449d7f8509ba7c07870f411377fc Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Mon, 14 Jun 2021 13:51:25 +0300 Subject: [PATCH] Update loaders to load filters first and remove them last --- config/config_defaults.go | 22 +++++++++++----------- engine/datamanager.go | 11 +++++++++-- loaders/loader.go | 35 +++++++++++++++++++++++++---------- 3 files changed, 45 insertions(+), 23 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index 352c5b23c..e105e4cb3 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -702,6 +702,17 @@ const CGRATES_CFG_JSON = ` "tp_in_dir": "/var/spool/cgrates/loader/in", // absolute path towards the directory where the TPs are stored "tp_out_dir": "/var/spool/cgrates/loader/out", // absolute path towards the directory where processed TPs will be moved "data":[ // data profiles to load + { + "type": "*filters", // data source type + "file_name": "Filters.csv", // file name in the tp_in_dir + "fields": [ + {"tag": "Tenant", "path": "Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true}, + {"tag": "ID", "path": "ID", "type": "*variable", "value": "~*req.1", "mandatory": true}, + {"tag": "Type", "path": "Type", "type": "*variable", "value": "~*req.2"}, + {"tag": "Element", "path": "Element", "type": "*variable", "value": "~*req.3"}, + {"tag": "Values", "path": "Values", "type": "*variable", "value": "~*req.4"}, + ], + }, { "type": "*attributes", // data source type "file_name": "Attributes.csv", // file name in the tp_in_dir @@ -717,17 +728,6 @@ const CGRATES_CFG_JSON = ` {"tag": "Blocker", "path": "Blocker", "type": "*variable", "value": "~*req.8"}, ], }, - { - "type": "*filters", // data source type - "file_name": "Filters.csv", // file name in the tp_in_dir - "fields": [ - {"tag": "Tenant", "path": "Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true}, - {"tag": "ID", "path": "ID", "type": "*variable", "value": "~*req.1", "mandatory": true}, - {"tag": "Type", "path": "Type", "type": "*variable", "value": "~*req.2"}, - {"tag": "Element", "path": "Element", "type": "*variable", "value": "~*req.3"}, - {"tag": "Values", "path": "Values", "type": "*variable", "value": "~*req.4"}, - ], - }, { "type": "*resources", // data source type "file_name": "Resources.csv", // file name in the tp_in_dir diff --git a/engine/datamanager.go b/engine/datamanager.go index f6ca4c2ee..c5970e6f7 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -1996,9 +1996,10 @@ func (dm *DataManager) RemoveRateProfile(ctx *context.Context, tenant, id string if dm == nil { return utils.ErrNoDatabaseConn } - oldRpp, err := dm.GetRateProfile(ctx, tenant, id, true, false, utils.NonTransactional) + var oldRpp *utils.RateProfile + oldRpp, err = dm.GetRateProfile(ctx, tenant, id, true, false, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { - return err + return } if err = dm.DataDB().RemoveRateProfileDrv(ctx, tenant, id); err != nil { return @@ -2008,11 +2009,17 @@ func (dm *DataManager) RemoveRateProfile(ctx *context.Context, tenant, id string } if withIndex { for key, rate := range oldRpp.Rates { + if err = removeIndexFiltersItem(ctx, dm, utils.CacheRateFilterIndexes, tenant, utils.ConcatenatedKey(key, oldRpp.ID), rate.FilterIDs); err != nil { + return + } if err = removeItemFromFilterIndex(ctx, dm, utils.CacheRateFilterIndexes, oldRpp.Tenant, oldRpp.ID, key, rate.FilterIDs); err != nil { return } } + if err = removeIndexFiltersItem(ctx, dm, utils.CacheRateProfilesFilterIndexes, tenant, id, oldRpp.FilterIDs); err != nil { + return + } if err = removeItemFromFilterIndex(ctx, dm, utils.CacheRateProfilesFilterIndexes, tenant, utils.EmptyString, id, oldRpp.FilterIDs); err != nil { return diff --git a/loaders/loader.go b/loaders/loader.go index 5e7b42353..0cee1b7bd 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -55,6 +55,7 @@ func NewLoader(dm *engine.DataManager, cfg *config.LoaderSCfg, dataTpls: make(map[string][]*config.FCTemplate), flagsTpls: make(map[string]utils.FlagsWithParams), rdrs: make(map[string]map[string]*openedCSVFile), + rdrTypes: make([]string, len(cfg.Data)), bufLoaderData: make(map[string][]LoaderData), dm: dm, timezone: timezone, @@ -62,7 +63,8 @@ func NewLoader(dm *engine.DataManager, cfg *config.LoaderSCfg, connMgr: connMgr, cacheConns: cacheConns, } - for _, ldrData := range cfg.Data { + for i, ldrData := range cfg.Data { + ldr.rdrTypes[i] = ldrData.Type ldr.dataTpls[ldrData.Type] = ldrData.Fields ldr.flagsTpls[ldrData.Type] = ldrData.Flags ldr.rdrs[ldrData.Type] = make(map[string]*openedCSVFile) @@ -98,8 +100,9 @@ type Loader struct { dataTpls map[string][]*config.FCTemplate // map[loaderType]*config.FCTemplate flagsTpls map[string]utils.FlagsWithParams //map[loaderType]utils.FlagsWithParams rdrs map[string]map[string]*openedCSVFile // map[loaderType]map[fileName]*openedCSVFile for common incremental read - procRows int // keep here the last processed row in the file/-s - bufLoaderData map[string][]LoaderData // cache of data read, indexed on tenantID + rdrTypes []string + procRows int // keep here the last processed row in the file/-s + bufLoaderData map[string][]LoaderData // cache of data read, indexed on tenantID dm *engine.DataManager timezone string filterS *engine.FilterS @@ -118,14 +121,26 @@ func (ldr *Loader) ProcessFolder(ctx *context.Context, caching, loadOption strin return } defer ldr.unlockFolder() - for ldrType := range ldr.rdrs { - if err = ldr.processFiles(ctx, ldrType, caching, loadOption); err != nil { - if stopOnError { - return + switch loadOption { + case utils.MetaStore: + for _, ldrType := range ldr.rdrTypes { + if err = ldr.processFiles(ctx, ldrType, caching, loadOption); err != nil { + if stopOnError { + return + } + utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s", + utils.LoaderS, ldr.ldrID, ldrType, err.Error())) + } + } + case utils.MetaRemove: + for i := len(ldr.rdrTypes) - 1; i >= 0; i-- { + if err = ldr.processFiles(ctx, ldr.rdrTypes[i], caching, loadOption); err != nil { + if stopOnError { + return + } + utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s", + utils.LoaderS, ldr.ldrID, ldr.rdrTypes[i], err.Error())) } - utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s", - utils.LoaderS, ldr.ldrID, ldrType, err.Error())) - continue } } err = ldr.moveFiles()