Update loaders to load filters first and remove them last

This commit is contained in:
ionutboangiu
2021-06-14 13:51:25 +03:00
committed by Dan Christian Bogos
parent a3e12e917b
commit 577face293
3 changed files with 45 additions and 23 deletions

View File

@@ -702,6 +702,17 @@ const CGRATES_CFG_JSON = `
"tp_in_dir": "/var/spool/cgrates/loader/in", // absolute path towards the directory where the TPs are stored
"tp_out_dir": "/var/spool/cgrates/loader/out", // absolute path towards the directory where processed TPs will be moved
"data":[ // data profiles to load
{
"type": "*filters", // data source type
"file_name": "Filters.csv", // file name in the tp_in_dir
"fields": [
{"tag": "Tenant", "path": "Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true},
{"tag": "ID", "path": "ID", "type": "*variable", "value": "~*req.1", "mandatory": true},
{"tag": "Type", "path": "Type", "type": "*variable", "value": "~*req.2"},
{"tag": "Element", "path": "Element", "type": "*variable", "value": "~*req.3"},
{"tag": "Values", "path": "Values", "type": "*variable", "value": "~*req.4"},
],
},
{
"type": "*attributes", // data source type
"file_name": "Attributes.csv", // file name in the tp_in_dir
@@ -717,17 +728,6 @@ const CGRATES_CFG_JSON = `
{"tag": "Blocker", "path": "Blocker", "type": "*variable", "value": "~*req.8"},
],
},
{
"type": "*filters", // data source type
"file_name": "Filters.csv", // file name in the tp_in_dir
"fields": [
{"tag": "Tenant", "path": "Tenant", "type": "*variable", "value": "~*req.0", "mandatory": true},
{"tag": "ID", "path": "ID", "type": "*variable", "value": "~*req.1", "mandatory": true},
{"tag": "Type", "path": "Type", "type": "*variable", "value": "~*req.2"},
{"tag": "Element", "path": "Element", "type": "*variable", "value": "~*req.3"},
{"tag": "Values", "path": "Values", "type": "*variable", "value": "~*req.4"},
],
},
{
"type": "*resources", // data source type
"file_name": "Resources.csv", // file name in the tp_in_dir

View File

@@ -1996,9 +1996,10 @@ func (dm *DataManager) RemoveRateProfile(ctx *context.Context, tenant, id string
if dm == nil {
return utils.ErrNoDatabaseConn
}
oldRpp, err := dm.GetRateProfile(ctx, tenant, id, true, false, utils.NonTransactional)
var oldRpp *utils.RateProfile
oldRpp, err = dm.GetRateProfile(ctx, tenant, id, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return err
return
}
if err = dm.DataDB().RemoveRateProfileDrv(ctx, tenant, id); err != nil {
return
@@ -2008,11 +2009,17 @@ func (dm *DataManager) RemoveRateProfile(ctx *context.Context, tenant, id string
}
if withIndex {
for key, rate := range oldRpp.Rates {
if err = removeIndexFiltersItem(ctx, dm, utils.CacheRateFilterIndexes, tenant, utils.ConcatenatedKey(key, oldRpp.ID), rate.FilterIDs); err != nil {
return
}
if err = removeItemFromFilterIndex(ctx, dm, utils.CacheRateFilterIndexes,
oldRpp.Tenant, oldRpp.ID, key, rate.FilterIDs); err != nil {
return
}
}
if err = removeIndexFiltersItem(ctx, dm, utils.CacheRateProfilesFilterIndexes, tenant, id, oldRpp.FilterIDs); err != nil {
return
}
if err = removeItemFromFilterIndex(ctx, dm, utils.CacheRateProfilesFilterIndexes,
tenant, utils.EmptyString, id, oldRpp.FilterIDs); err != nil {
return

View File

@@ -55,6 +55,7 @@ func NewLoader(dm *engine.DataManager, cfg *config.LoaderSCfg,
dataTpls: make(map[string][]*config.FCTemplate),
flagsTpls: make(map[string]utils.FlagsWithParams),
rdrs: make(map[string]map[string]*openedCSVFile),
rdrTypes: make([]string, len(cfg.Data)),
bufLoaderData: make(map[string][]LoaderData),
dm: dm,
timezone: timezone,
@@ -62,7 +63,8 @@ func NewLoader(dm *engine.DataManager, cfg *config.LoaderSCfg,
connMgr: connMgr,
cacheConns: cacheConns,
}
for _, ldrData := range cfg.Data {
for i, ldrData := range cfg.Data {
ldr.rdrTypes[i] = ldrData.Type
ldr.dataTpls[ldrData.Type] = ldrData.Fields
ldr.flagsTpls[ldrData.Type] = ldrData.Flags
ldr.rdrs[ldrData.Type] = make(map[string]*openedCSVFile)
@@ -98,8 +100,9 @@ type Loader struct {
dataTpls map[string][]*config.FCTemplate // map[loaderType]*config.FCTemplate
flagsTpls map[string]utils.FlagsWithParams //map[loaderType]utils.FlagsWithParams
rdrs map[string]map[string]*openedCSVFile // map[loaderType]map[fileName]*openedCSVFile for common incremental read
procRows int // keep here the last processed row in the file/-s
bufLoaderData map[string][]LoaderData // cache of data read, indexed on tenantID
rdrTypes []string
procRows int // keep here the last processed row in the file/-s
bufLoaderData map[string][]LoaderData // cache of data read, indexed on tenantID
dm *engine.DataManager
timezone string
filterS *engine.FilterS
@@ -118,14 +121,26 @@ func (ldr *Loader) ProcessFolder(ctx *context.Context, caching, loadOption strin
return
}
defer ldr.unlockFolder()
for ldrType := range ldr.rdrs {
if err = ldr.processFiles(ctx, ldrType, caching, loadOption); err != nil {
if stopOnError {
return
switch loadOption {
case utils.MetaStore:
for _, ldrType := range ldr.rdrTypes {
if err = ldr.processFiles(ctx, ldrType, caching, loadOption); err != nil {
if stopOnError {
return
}
utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s",
utils.LoaderS, ldr.ldrID, ldrType, err.Error()))
}
}
case utils.MetaRemove:
for i := len(ldr.rdrTypes) - 1; i >= 0; i-- {
if err = ldr.processFiles(ctx, ldr.rdrTypes[i], caching, loadOption); err != nil {
if stopOnError {
return
}
utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s",
utils.LoaderS, ldr.ldrID, ldr.rdrTypes[i], err.Error()))
}
utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s",
utils.LoaderS, ldr.ldrID, ldrType, err.Error()))
continue
}
}
err = ldr.moveFiles()