diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 951965900..564c537cb 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -103,7 +103,11 @@ func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrS } cdrc := &Cdrc{cdrsAddress: cdrcCfg.Cdrs, CdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir, runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator, - httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan} + httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles)} + var processFile struct{} + for i := 0; i < cdrcCfg.MaxOpenFiles; i++ { + cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop + } cdrc.cdrSourceIds = make([]string, len(cdrcCfgs)) cdrc.duMultiplyFactors = make([]float64, len(cdrcCfgs)) cdrc.cdrFilters = make([]utils.RSRFields, len(cdrcCfgs)) @@ -141,6 +145,7 @@ type Cdrc struct { cdrServer *engine.CdrServer // Reference towards internal cdrServer if that is the case httpClient *http.Client exitChan chan struct{} + maxOpenFiles chan struct{} // Maximum number of simultaneous files processed } // When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing @@ -212,6 +217,8 @@ func (self *Cdrc) processCdrDir() error { // Processe file at filePath and posts the valid cdr rows out of it func (self *Cdrc) processFile(filePath string) error { + processFile := <-self.maxOpenFiles // Queue here for maxOpenFiles + defer func() { self.maxOpenFiles <- processFile }() _, fn := path.Split(filePath) engine.Logger.Info(fmt.Sprintf(" Parsing: %s", filePath)) file, err := os.Open(filePath) diff --git a/config/cdrcconfig.go b/config/cdrcconfig.go index 13eeea24b..4ef7a6d07 100644 --- a/config/cdrcconfig.go +++ b/config/cdrcconfig.go @@ -31,6 +31,7 @@ type CdrcConfig struct { FieldSeparator rune // The separator to use when reading csvs DataUsageMultiplyFactor float64 // Conversion factor for data usage RunDelay time.Duration // Delay between runs, 0 for inotify driven requests + MaxOpenFiles int // Maximum number of files opened simultaneously CdrInDir string // Folder to process CDRs from CdrOutDir string // Folder to move processed CDRs to CdrSourceId string // Source identifier for the processed CDRs @@ -62,6 +63,9 @@ func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error { if jsnCfg.Run_delay != nil { self.RunDelay = time.Duration(*jsnCfg.Run_delay) * time.Second } + if jsnCfg.Max_open_files != nil { + self.MaxOpenFiles = *jsnCfg.Max_open_files + } if jsnCfg.Cdr_in_dir != nil { self.CdrInDir = *jsnCfg.Cdr_in_dir } @@ -93,6 +97,7 @@ func (self *CdrcConfig) Clone() *CdrcConfig { clnCdrc.FieldSeparator = self.FieldSeparator clnCdrc.DataUsageMultiplyFactor = self.DataUsageMultiplyFactor clnCdrc.RunDelay = self.RunDelay + clnCdrc.MaxOpenFiles = self.MaxOpenFiles clnCdrc.CdrInDir = self.CdrInDir clnCdrc.CdrOutDir = self.CdrOutDir clnCdrc.CdrSourceId = self.CdrSourceId diff --git a/config/config_defaults.go b/config/config_defaults.go index c510f4fdf..99dc4ed3f 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -155,11 +155,12 @@ const CGRATES_CFG_JSON = ` "cdr_format": "csv", // CDR file format "field_separator": ",", // separator used in case of csv files "run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify + "max_open_files": 1024, // maximum simultaneous files to process "data_usage_multiply_factor": 1024, // conversion factor for data usage "cdr_in_dir": "/var/log/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored "cdr_out_dir": "/var/log/cgrates/cdrc/out", // absolute path towards the directory where processed CDRs will be moved "cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database - "cdr_filter": "", // Filter CDR records to import + "cdr_filter": "", // filter CDR records to import "cdr_fields":[ // import template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "tor", "cdr_field_id": "tor", "type": "cdrfield", "value": "2", "mandatory": true}, {"tag": "accid", "cdr_field_id": "accid", "type": "cdrfield", "value": "3", "mandatory": true}, diff --git a/config/config_json_test.go b/config/config_json_test.go index 36fcd03a2..5e2e3c1ed 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -290,6 +290,7 @@ func TestDfCdrcJsonCfg(t *testing.T) { Cdr_format: utils.StringPointer("csv"), Field_separator: utils.StringPointer(","), Run_delay: utils.IntPointer(0), + Max_open_files: utils.IntPointer(1024), Data_usage_multiply_factor: utils.Float64Pointer(1024.0), Cdr_in_dir: utils.StringPointer("/var/log/cgrates/cdrc/in"), Cdr_out_dir: utils.StringPointer("/var/log/cgrates/cdrc/out"), diff --git a/config/configcdrc_test.go b/config/configcdrc_test.go index de0db50b0..1dda59c00 100644 --- a/config/configcdrc_test.go +++ b/config/configcdrc_test.go @@ -41,6 +41,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) { FieldSeparator: ',', DataUsageMultiplyFactor: 1024, RunDelay: 0, + MaxOpenFiles: 1024, CdrInDir: "/var/log/cgrates/cdrc/in", CdrOutDir: "/var/log/cgrates/cdrc/out", CdrSourceId: "freeswitch_csv", @@ -81,6 +82,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) { FieldSeparator: ',', DataUsageMultiplyFactor: 1024, RunDelay: 0, + MaxOpenFiles: 1024, CdrInDir: "/tmp/cgrates/cdrc1/in", CdrOutDir: "/tmp/cgrates/cdrc1/out", CdrSourceId: "csv1", @@ -121,6 +123,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) { FieldSeparator: ',', DataUsageMultiplyFactor: 0.000976563, RunDelay: 0, + MaxOpenFiles: 1024, CdrInDir: "/tmp/cgrates/cdrc2/in", CdrOutDir: "/tmp/cgrates/cdrc2/out", CdrSourceId: "csv2", @@ -141,6 +144,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) { FieldSeparator: ',', DataUsageMultiplyFactor: 1024, RunDelay: 0, + MaxOpenFiles: 1024, CdrInDir: "/tmp/cgrates/cdrc3/in", CdrOutDir: "/tmp/cgrates/cdrc3/out", CdrSourceId: "csv3", diff --git a/config/libconfig_json.go b/config/libconfig_json.go index f01c7dd77..8dfe5b257 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -134,6 +134,7 @@ type CdrcJsonCfg struct { Cdr_out_dir *string Cdr_source_id *string Cdr_filter *string + Max_open_files *int Cdr_fields *[]*CdrFieldJsonCfg }