From dd0f2484507342082c7d2bd9331b6ac7a5565fbb Mon Sep 17 00:00:00 2001 From: Trial97 Date: Tue, 2 Nov 2021 15:00:20 +0200 Subject: [PATCH] Updated loaders --- loaders/loader.go | 20 ++++++++++---------- loaders/loaders.go | 14 ++++++++++---- utils/consts.go | 4 ++-- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/loaders/loader.go b/loaders/loader.go index 21279c0d1..b6a13d811 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -403,7 +403,7 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID str return } -func newLoader(cfg *config.CGRConfig, ldrCfg *config.LoaderSCfg, dm *engine.DataManager, +func newLoader(cfg *config.CGRConfig, ldrCfg *config.LoaderSCfg, dm *engine.DataManager, dataCache map[string]*ltcache.Cache, timezone string, filterS *engine.FilterS, connMgr *engine.ConnManager, cacheConns []string) *loader { return &loader{ cfg: cfg, @@ -413,7 +413,7 @@ func newLoader(cfg *config.CGRConfig, ldrCfg *config.LoaderSCfg, dm *engine.Data filterS: filterS, connMgr: connMgr, cacheConns: cacheConns, - dataCache: ltcache.NewCache(-1, 0, false, nil), + dataCache: dataCache, Locker: newLocker(ldrCfg.LockFilePath), } } @@ -427,12 +427,12 @@ type loader struct { connMgr *engine.ConnManager cacheConns []string - dataCache *ltcache.Cache + dataCache map[string]*ltcache.Cache Locker } -func (l *loader) process(ctx *context.Context, tntID *utils.TenantID, lDataSet []utils.MapStorage, lType, action, caching string, dryRun, withIndex, partialRates bool) (err error) { - if lType == "" { // do not set in DB; ToDo: how to determine if is cache or not +func (l *loader) process(ctx *context.Context, tntID *utils.TenantID, lDataSet []utils.MapStorage, lType, action, caching string, dryRun, withIndex, partialRates, partial bool) (err error) { + if partial { // do not set in DB; ToDo: how to determine if is cache or not return } switch action { @@ -494,7 +494,7 @@ func (l *loader) process(ctx *context.Context, tntID *utils.TenantID, lDataSet [ return engine.CallCache(l.connMgr, ctx, l.cacheConns, caching, cacheArgs, cacheIDs, nil, false, l.ldrCfg.Tenant) } -func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*config.FCTemplate, lType, action, caching string, dryRun, withIndex, partialRates bool) (err error) { +func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*config.FCTemplate, lType, action, caching string, dryRun, withIndex, partialRates, partial bool) (err error) { var prevTntID *utils.TenantID var lData []utils.MapStorage for lineNr := 1; ; lineNr++ { @@ -509,7 +509,7 @@ func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*confi return } var data utils.MapStorage - if data, err = newRecord(ctx, config.NewSliceDP(record, nil), tmpls, l.ldrCfg.Tenant, l.filterS, l.cfg, l.dataCache); err != nil { + if data, err = newRecord(ctx, config.NewSliceDP(record, nil), tmpls, l.ldrCfg.Tenant, l.filterS, l.cfg, l.dataCache[lType]); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> <%s> file<%s> line: %d, error: %s", utils.LoaderS, l.ldrCfg.ID, csv.Path(), lineNr, err)) @@ -518,7 +518,7 @@ func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*confi tntID := TenantIDFromMap(data) if !prevTntID.Equal(tntID) { if prevTntID != nil { - if err = l.process(ctx, prevTntID, lData, lType, action, caching, dryRun, withIndex, partialRates); err != nil { + if err = l.process(ctx, prevTntID, lData, lType, action, caching, dryRun, withIndex, partialRates, partial); err != nil { return } } @@ -527,7 +527,7 @@ func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*confi } lData = append(lData, data) } - return l.process(ctx, prevTntID, lData, lType, action, caching, dryRun, withIndex, partialRates) + return l.process(ctx, prevTntID, lData, lType, action, caching, dryRun, withIndex, partialRates, partial) } func (l *loader) processFile(ctx *context.Context, cfg *config.LoaderDataType, inPath, outPath, action, caching string, dryRun, withIndex bool) (err error) { @@ -545,7 +545,7 @@ func (l *loader) processFile(ctx *context.Context, cfg *config.LoaderDataType, i } defer csv.Close() if err = l.processData(ctx, csv, cfg.Fields, cfg.Type, action, caching, - dryRun, withIndex, cfg.Flags.GetBool(utils.MetaPartial)); err != nil || // encounterd error + dryRun, withIndex, cfg.Flags.GetBool(utils.PartialRatesOpt), cfg.Flags.GetBool(utils.PartialOpt)); err != nil || // encounterd error outPath == utils.EmptyString || // or no moving csvType != utils.MetaFileCSV { // or the type can not be moved(e.g. url) return diff --git a/loaders/loaders.go b/loaders/loaders.go index 5aa9d388a..dc747c2f4 100644 --- a/loaders/loaders.go +++ b/loaders/loaders.go @@ -27,12 +27,17 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/ltcache" ) func NewLoaderService(cfg *config.CGRConfig, dm *engine.DataManager, timezone string, filterS *engine.FilterS, connMgr *engine.ConnManager) (ldrS *LoaderService) { - ldrS = &LoaderService{cfg: cfg} + ldrS = &LoaderService{cfg: cfg, cache: make(map[string]*ltcache.Cache)} + for i := range cfg.LoaderCfg()[0].Data { + cfg := cfg.LoaderCfg()[0].Data[i] + ldrS.cache[cfg.Type] = ltcache.NewCache(-1, 0, false, nil) + } ldrS.createLoaders(dm, timezone, filterS, connMgr) return } @@ -40,8 +45,9 @@ func NewLoaderService(cfg *config.CGRConfig, dm *engine.DataManager, // LoaderService is the Loader service handling independent Loaders type LoaderService struct { sync.RWMutex - cfg *config.CGRConfig - ldrs map[string]*loader + cfg *config.CGRConfig + cache map[string]*ltcache.Cache + ldrs map[string]*loader } // Enabled returns true if at least one loader is enabled @@ -90,7 +96,7 @@ func (ldrS *LoaderService) createLoaders(dm *engine.DataManager, ldrS.ldrs = make(map[string]*loader) for _, ldrCfg := range ldrS.cfg.LoaderCfg() { if ldrCfg.Enabled { - ldrS.ldrs[ldrCfg.ID] = newLoader(ldrS.cfg, ldrCfg, dm, timezone, filterS, connMgr, ldrCfg.CacheSConns) + ldrS.ldrs[ldrCfg.ID] = newLoader(ldrS.cfg, ldrCfg, dm, ldrS.cache, timezone, filterS, connMgr, ldrCfg.CacheSConns) } } } diff --git a/utils/consts.go b/utils/consts.go index 01cd7132a..64874ccf3 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -723,7 +723,6 @@ const ( MetaWeekly = "*weekly" RateS = "RateS" Underline = "_" - MetaPartial = "*partial" MetaBusy = "*busy" MetaQueue = "*queue" MetaMonthEnd = "*month_end" @@ -2412,7 +2411,8 @@ const ( KafkaMaxWait = "kafkaMaxWait" // partial - PartialOpt = "*partial" + PartialOpt = "*partial" + PartialRatesOpt = "*partial_rates" PartialOrderFieldOpt = "partialOrderField" PartialCacheActionOpt = "partialCacheAction"