mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
LoaderS NewLoader method
This commit is contained in:
@@ -463,7 +463,6 @@ const CGRATES_CFG_JSON = `
|
||||
{"address": "*internal"}, // address where to reach the CacheS for data reload, empty for no reloads <""|*internal|x.y.z.y:1234>
|
||||
],
|
||||
"field_separator": ",", // separator used in case of csv files
|
||||
"max_open_files": 1024, // maximum simultaneous files to process, 0 for unlimited
|
||||
"tp_in_dir": "/var/spool/cgrates/tploader/in", // absolute path towards the directory where the CDRs are stored
|
||||
"tp_out_dir": "/var/spool/cgrates/tploader/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"data":[ // data profiles to load
|
||||
|
||||
@@ -820,7 +820,6 @@ func TestDfLoaderSJsonCfg(t *testing.T) {
|
||||
Address: utils.StringPointer(utils.MetaInternal),
|
||||
}},
|
||||
Field_separator: utils.StringPointer(","),
|
||||
Max_open_files: utils.IntPointer(1024),
|
||||
Tp_in_dir: utils.StringPointer("/var/spool/cgrates/tploader/in"),
|
||||
Tp_out_dir: utils.StringPointer("/var/spool/cgrates/tploader/out"),
|
||||
Data: &[]*LoaderSJsonDataType{dataType},
|
||||
|
||||
@@ -930,7 +930,6 @@ func TestLoaderDefaults(t *testing.T) {
|
||||
},
|
||||
},
|
||||
FieldSeparator: ",",
|
||||
MaxOpenFiles: 1024,
|
||||
TpInDir: "/var/spool/cgrates/tploader/in",
|
||||
TpOutDir: "/var/spool/cgrates/tploader/out",
|
||||
Data: []*LoaderSDataType{
|
||||
|
||||
@@ -443,7 +443,6 @@ type LoaderSJsonCfg struct {
|
||||
Lock_filename *string
|
||||
Caches_conns *[]*HaPoolJsonCfg
|
||||
Field_separator *string
|
||||
Max_open_files *int
|
||||
Tp_in_dir *string
|
||||
Tp_out_dir *string
|
||||
Data *[]*LoaderSJsonDataType
|
||||
|
||||
@@ -38,7 +38,6 @@ type LoaderSConfig struct {
|
||||
LockFileName string
|
||||
CacheSConns []*HaPoolConfig
|
||||
FieldSeparator string
|
||||
MaxOpenFiles int
|
||||
TpInDir string
|
||||
TpOutDir string
|
||||
Data []*LoaderSDataType
|
||||
@@ -106,9 +105,6 @@ func (self *LoaderSConfig) loadFromJsonCfg(jsnCfg *LoaderSJsonCfg) error {
|
||||
if jsnCfg.Field_separator != nil {
|
||||
self.FieldSeparator = *jsnCfg.Field_separator
|
||||
}
|
||||
if jsnCfg.Max_open_files != nil {
|
||||
self.MaxOpenFiles = *jsnCfg.Max_open_files
|
||||
}
|
||||
if jsnCfg.Tp_in_dir != nil {
|
||||
self.TpInDir = *jsnCfg.Tp_in_dir
|
||||
}
|
||||
@@ -139,7 +135,6 @@ func (self *LoaderSConfig) Clone() *LoaderSConfig {
|
||||
clnLoader.CacheSConns[idx] = &clonedVal
|
||||
}
|
||||
clnLoader.FieldSeparator = self.FieldSeparator
|
||||
clnLoader.MaxOpenFiles = self.MaxOpenFiles
|
||||
clnLoader.TpInDir = self.TpInDir
|
||||
clnLoader.TpOutDir = self.TpOutDir
|
||||
clnLoader.Data = make([]*LoaderSDataType, len(self.Data))
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -36,8 +37,42 @@ type openedCSVFile struct {
|
||||
csvRdr *csv.Reader
|
||||
}
|
||||
|
||||
func NewLoader(dm *engine.DataManager, cfg *config.LoaderSConfig,
|
||||
timezone string) (ldr *Loader, err error) {
|
||||
ldr = &Loader{
|
||||
enabled: cfg.Enabled,
|
||||
dryRun: cfg.DryRun,
|
||||
tpInDir: cfg.TpInDir,
|
||||
tpOutDir: cfg.TpOutDir,
|
||||
lockFilename: cfg.LockFileName,
|
||||
fieldSep: cfg.FieldSeparator,
|
||||
dataTpls: make(map[string][]*config.CfgCdrField),
|
||||
rdrs: make(map[string]map[string]*openedCSVFile),
|
||||
bufLoaderData: make(map[string][]LoaderData),
|
||||
dm: dm,
|
||||
timezone: timezone,
|
||||
}
|
||||
for _, ldrData := range cfg.Data {
|
||||
ldr.dataTpls[ldrData.Type] = ldrData.Fields
|
||||
ldr.rdrs[ldrData.Type] = make(map[string]*openedCSVFile)
|
||||
if ldrData.Filename != "" {
|
||||
ldr.rdrs[ldrData.Type][ldrData.Filename] = nil
|
||||
}
|
||||
for _, cfgFld := range ldrData.Fields { // add all possible files to be opened
|
||||
for _, cfgFldVal := range cfgFld.Value {
|
||||
if idx := strings.Index(cfgFldVal.Id, utils.InInFieldSep); idx != -1 {
|
||||
ldr.rdrs[ldrData.Type][cfgFldVal.Id[:idx]] = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Loader is one instance loading from a folder
|
||||
type Loader struct {
|
||||
enabled bool
|
||||
dryRun bool
|
||||
ldrID string
|
||||
tpInDir string
|
||||
tpOutDir string
|
||||
|
||||
@@ -18,7 +18,27 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
package loaders
|
||||
|
||||
import (
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
)
|
||||
|
||||
func NewLoaderS(dm *engine.DataManager, cfg []*config.LoaderSConfig,
|
||||
timezone string) (ldrS *LoaderS, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// LoaderS is the Loader service handling independent Loaders
|
||||
type LoaderS struct {
|
||||
loaders map[string]*Loader
|
||||
}
|
||||
|
||||
// isEnabled returns true if at least one loader is enabled
|
||||
func (ldrS *LoaderS) isEnabled() bool {
|
||||
for _, ldr := range ldrS.loaders {
|
||||
if ldr.enabled {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user