Updated loaders

This commit is contained in:
Trial97
2021-11-02 15:00:20 +02:00
committed by Dan Christian Bogos
parent 7d0106e9c2
commit dd0f248450
3 changed files with 22 additions and 16 deletions

View File

@@ -403,7 +403,7 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID str
return
}
func newLoader(cfg *config.CGRConfig, ldrCfg *config.LoaderSCfg, dm *engine.DataManager,
func newLoader(cfg *config.CGRConfig, ldrCfg *config.LoaderSCfg, dm *engine.DataManager, dataCache map[string]*ltcache.Cache,
timezone string, filterS *engine.FilterS, connMgr *engine.ConnManager, cacheConns []string) *loader {
return &loader{
cfg: cfg,
@@ -413,7 +413,7 @@ func newLoader(cfg *config.CGRConfig, ldrCfg *config.LoaderSCfg, dm *engine.Data
filterS: filterS,
connMgr: connMgr,
cacheConns: cacheConns,
dataCache: ltcache.NewCache(-1, 0, false, nil),
dataCache: dataCache,
Locker: newLocker(ldrCfg.LockFilePath),
}
}
@@ -427,12 +427,12 @@ type loader struct {
connMgr *engine.ConnManager
cacheConns []string
dataCache *ltcache.Cache
dataCache map[string]*ltcache.Cache
Locker
}
func (l *loader) process(ctx *context.Context, tntID *utils.TenantID, lDataSet []utils.MapStorage, lType, action, caching string, dryRun, withIndex, partialRates bool) (err error) {
if lType == "" { // do not set in DB; ToDo: how to determine if is cache or not
func (l *loader) process(ctx *context.Context, tntID *utils.TenantID, lDataSet []utils.MapStorage, lType, action, caching string, dryRun, withIndex, partialRates, partial bool) (err error) {
if partial { // do not set in DB; ToDo: how to determine if is cache or not
return
}
switch action {
@@ -494,7 +494,7 @@ func (l *loader) process(ctx *context.Context, tntID *utils.TenantID, lDataSet [
return engine.CallCache(l.connMgr, ctx, l.cacheConns, caching, cacheArgs, cacheIDs, nil, false, l.ldrCfg.Tenant)
}
func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*config.FCTemplate, lType, action, caching string, dryRun, withIndex, partialRates bool) (err error) {
func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*config.FCTemplate, lType, action, caching string, dryRun, withIndex, partialRates, partial bool) (err error) {
var prevTntID *utils.TenantID
var lData []utils.MapStorage
for lineNr := 1; ; lineNr++ {
@@ -509,7 +509,7 @@ func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*confi
return
}
var data utils.MapStorage
if data, err = newRecord(ctx, config.NewSliceDP(record, nil), tmpls, l.ldrCfg.Tenant, l.filterS, l.cfg, l.dataCache); err != nil {
if data, err = newRecord(ctx, config.NewSliceDP(record, nil), tmpls, l.ldrCfg.Tenant, l.filterS, l.cfg, l.dataCache[lType]); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> <%s> file<%s> line: %d, error: %s",
utils.LoaderS, l.ldrCfg.ID, csv.Path(), lineNr, err))
@@ -518,7 +518,7 @@ func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*confi
tntID := TenantIDFromMap(data)
if !prevTntID.Equal(tntID) {
if prevTntID != nil {
if err = l.process(ctx, prevTntID, lData, lType, action, caching, dryRun, withIndex, partialRates); err != nil {
if err = l.process(ctx, prevTntID, lData, lType, action, caching, dryRun, withIndex, partialRates, partial); err != nil {
return
}
}
@@ -527,7 +527,7 @@ func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*confi
}
lData = append(lData, data)
}
return l.process(ctx, prevTntID, lData, lType, action, caching, dryRun, withIndex, partialRates)
return l.process(ctx, prevTntID, lData, lType, action, caching, dryRun, withIndex, partialRates, partial)
}
func (l *loader) processFile(ctx *context.Context, cfg *config.LoaderDataType, inPath, outPath, action, caching string, dryRun, withIndex bool) (err error) {
@@ -545,7 +545,7 @@ func (l *loader) processFile(ctx *context.Context, cfg *config.LoaderDataType, i
}
defer csv.Close()
if err = l.processData(ctx, csv, cfg.Fields, cfg.Type, action, caching,
dryRun, withIndex, cfg.Flags.GetBool(utils.MetaPartial)); err != nil || // encounterd error
dryRun, withIndex, cfg.Flags.GetBool(utils.PartialRatesOpt), cfg.Flags.GetBool(utils.PartialOpt)); err != nil || // encounterd error
outPath == utils.EmptyString || // or no moving
csvType != utils.MetaFileCSV { // or the type can not be moved(e.g. url)
return

View File

@@ -27,12 +27,17 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/ltcache"
)
func NewLoaderService(cfg *config.CGRConfig, dm *engine.DataManager,
timezone string, filterS *engine.FilterS,
connMgr *engine.ConnManager) (ldrS *LoaderService) {
ldrS = &LoaderService{cfg: cfg}
ldrS = &LoaderService{cfg: cfg, cache: make(map[string]*ltcache.Cache)}
for i := range cfg.LoaderCfg()[0].Data {
cfg := cfg.LoaderCfg()[0].Data[i]
ldrS.cache[cfg.Type] = ltcache.NewCache(-1, 0, false, nil)
}
ldrS.createLoaders(dm, timezone, filterS, connMgr)
return
}
@@ -40,8 +45,9 @@ func NewLoaderService(cfg *config.CGRConfig, dm *engine.DataManager,
// LoaderService is the Loader service handling independent Loaders
type LoaderService struct {
sync.RWMutex
cfg *config.CGRConfig
ldrs map[string]*loader
cfg *config.CGRConfig
cache map[string]*ltcache.Cache
ldrs map[string]*loader
}
// Enabled returns true if at least one loader is enabled
@@ -90,7 +96,7 @@ func (ldrS *LoaderService) createLoaders(dm *engine.DataManager,
ldrS.ldrs = make(map[string]*loader)
for _, ldrCfg := range ldrS.cfg.LoaderCfg() {
if ldrCfg.Enabled {
ldrS.ldrs[ldrCfg.ID] = newLoader(ldrS.cfg, ldrCfg, dm, timezone, filterS, connMgr, ldrCfg.CacheSConns)
ldrS.ldrs[ldrCfg.ID] = newLoader(ldrS.cfg, ldrCfg, dm, ldrS.cache, timezone, filterS, connMgr, ldrCfg.CacheSConns)
}
}
}

View File

@@ -723,7 +723,6 @@ const (
MetaWeekly = "*weekly"
RateS = "RateS"
Underline = "_"
MetaPartial = "*partial"
MetaBusy = "*busy"
MetaQueue = "*queue"
MetaMonthEnd = "*month_end"
@@ -2412,7 +2411,8 @@ const (
KafkaMaxWait = "kafkaMaxWait"
// partial
PartialOpt = "*partial"
PartialOpt = "*partial"
PartialRatesOpt = "*partial_rates"
PartialOrderFieldOpt = "partialOrderField"
PartialCacheActionOpt = "partialCacheAction"