Add MaxOpenFiles configuration option for CDRC, fixes #111

This commit is contained in:
DanB
2015-07-08 13:02:16 +02:00
parent 840c46ef1a
commit 8fb98ef1d2
6 changed files with 21 additions and 2 deletions

View File

@@ -103,7 +103,11 @@ func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrS
}
cdrc := &Cdrc{cdrsAddress: cdrcCfg.Cdrs, CdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir,
runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator,
httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan}
httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles)}
var processFile struct{}
for i := 0; i < cdrcCfg.MaxOpenFiles; i++ {
cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop
}
cdrc.cdrSourceIds = make([]string, len(cdrcCfgs))
cdrc.duMultiplyFactors = make([]float64, len(cdrcCfgs))
cdrc.cdrFilters = make([]utils.RSRFields, len(cdrcCfgs))
@@ -141,6 +145,7 @@ type Cdrc struct {
cdrServer *engine.CdrServer // Reference towards internal cdrServer if that is the case
httpClient *http.Client
exitChan chan struct{}
maxOpenFiles chan struct{} // Maximum number of simultaneous files processed
}
// When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing
@@ -212,6 +217,8 @@ func (self *Cdrc) processCdrDir() error {
// Processe file at filePath and posts the valid cdr rows out of it
func (self *Cdrc) processFile(filePath string) error {
processFile := <-self.maxOpenFiles // Queue here for maxOpenFiles
defer func() { self.maxOpenFiles <- processFile }()
_, fn := path.Split(filePath)
engine.Logger.Info(fmt.Sprintf("<Cdrc> Parsing: %s", filePath))
file, err := os.Open(filePath)

View File

@@ -31,6 +31,7 @@ type CdrcConfig struct {
FieldSeparator rune // The separator to use when reading csvs
DataUsageMultiplyFactor float64 // Conversion factor for data usage
RunDelay time.Duration // Delay between runs, 0 for inotify driven requests
MaxOpenFiles int // Maximum number of files opened simultaneously
CdrInDir string // Folder to process CDRs from
CdrOutDir string // Folder to move processed CDRs to
CdrSourceId string // Source identifier for the processed CDRs
@@ -62,6 +63,9 @@ func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error {
if jsnCfg.Run_delay != nil {
self.RunDelay = time.Duration(*jsnCfg.Run_delay) * time.Second
}
if jsnCfg.Max_open_files != nil {
self.MaxOpenFiles = *jsnCfg.Max_open_files
}
if jsnCfg.Cdr_in_dir != nil {
self.CdrInDir = *jsnCfg.Cdr_in_dir
}
@@ -93,6 +97,7 @@ func (self *CdrcConfig) Clone() *CdrcConfig {
clnCdrc.FieldSeparator = self.FieldSeparator
clnCdrc.DataUsageMultiplyFactor = self.DataUsageMultiplyFactor
clnCdrc.RunDelay = self.RunDelay
clnCdrc.MaxOpenFiles = self.MaxOpenFiles
clnCdrc.CdrInDir = self.CdrInDir
clnCdrc.CdrOutDir = self.CdrOutDir
clnCdrc.CdrSourceId = self.CdrSourceId

View File

@@ -155,11 +155,12 @@ const CGRATES_CFG_JSON = `
"cdr_format": "csv", // CDR file format <csv|freeswitch_csv|fwv>
"field_separator": ",", // separator used in case of csv files
"run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify
"max_open_files": 1024, // maximum simultaneous files to process
"data_usage_multiply_factor": 1024, // conversion factor for data usage
"cdr_in_dir": "/var/log/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored
"cdr_out_dir": "/var/log/cgrates/cdrc/out", // absolute path towards the directory where processed CDRs will be moved
"cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database
"cdr_filter": "", // Filter CDR records to import
"cdr_filter": "", // filter CDR records to import
"cdr_fields":[ // import template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "tor", "cdr_field_id": "tor", "type": "cdrfield", "value": "2", "mandatory": true},
{"tag": "accid", "cdr_field_id": "accid", "type": "cdrfield", "value": "3", "mandatory": true},

View File

@@ -290,6 +290,7 @@ func TestDfCdrcJsonCfg(t *testing.T) {
Cdr_format: utils.StringPointer("csv"),
Field_separator: utils.StringPointer(","),
Run_delay: utils.IntPointer(0),
Max_open_files: utils.IntPointer(1024),
Data_usage_multiply_factor: utils.Float64Pointer(1024.0),
Cdr_in_dir: utils.StringPointer("/var/log/cgrates/cdrc/in"),
Cdr_out_dir: utils.StringPointer("/var/log/cgrates/cdrc/out"),

View File

@@ -41,6 +41,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
FieldSeparator: ',',
DataUsageMultiplyFactor: 1024,
RunDelay: 0,
MaxOpenFiles: 1024,
CdrInDir: "/var/log/cgrates/cdrc/in",
CdrOutDir: "/var/log/cgrates/cdrc/out",
CdrSourceId: "freeswitch_csv",
@@ -81,6 +82,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
FieldSeparator: ',',
DataUsageMultiplyFactor: 1024,
RunDelay: 0,
MaxOpenFiles: 1024,
CdrInDir: "/tmp/cgrates/cdrc1/in",
CdrOutDir: "/tmp/cgrates/cdrc1/out",
CdrSourceId: "csv1",
@@ -121,6 +123,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
FieldSeparator: ',',
DataUsageMultiplyFactor: 0.000976563,
RunDelay: 0,
MaxOpenFiles: 1024,
CdrInDir: "/tmp/cgrates/cdrc2/in",
CdrOutDir: "/tmp/cgrates/cdrc2/out",
CdrSourceId: "csv2",
@@ -141,6 +144,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
FieldSeparator: ',',
DataUsageMultiplyFactor: 1024,
RunDelay: 0,
MaxOpenFiles: 1024,
CdrInDir: "/tmp/cgrates/cdrc3/in",
CdrOutDir: "/tmp/cgrates/cdrc3/out",
CdrSourceId: "csv3",

View File

@@ -134,6 +134,7 @@ type CdrcJsonCfg struct {
Cdr_out_dir *string
Cdr_source_id *string
Cdr_filter *string
Max_open_files *int
Cdr_fields *[]*CdrFieldJsonCfg
}