From bffce744f3d482949f54338d3bca2f592928c2b4 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 10 Jun 2019 14:52:05 +0200 Subject: [PATCH] Simplified CDRC csv constructor, removal of unused httpClient --- cdrc/cdrc.go | 49 +++++++++++++++++++++++------------------------- cdrc/csv.go | 19 ++++++++++++++----- cdrc/fwv.go | 4 +--- config/config.go | 7 ------- 4 files changed, 38 insertions(+), 41 deletions(-) diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index c8a118006..45d1450a1 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -24,7 +24,6 @@ import ( "fmt" "io" "io/ioutil" - "net/http" "os" "path" "time" @@ -51,7 +50,7 @@ One instance of CDRC will act on one folder. Common parameters within configs processed: * cdrS, cdrFormat, CDRInPath, CDROutPath, runDelay Parameters specific per config instance: - * duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields + * cdrSourceId, cdrFilter, cdrFields */ func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection, closeChan chan struct{}, dfltTimezone string, roundDecimals int, filterS *engine.FilterS) (cdrc *Cdrc, err error) { @@ -64,6 +63,15 @@ func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.R cdrs: cdrs, closeChan: closeChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles), + roundDecimals: roundDecimals, + } + // Before processing, make sure in and out folders exist + if utils.IsSliceMember(utils.MainCDRFields, cdrcCfg.CdrFormat) { + for _, dir := range []string{cdrcCfg.CDRInPath, cdrcCfg.CDROutPath} { + if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { + return nil, fmt.Errorf(" nonexistent folder: %s", dir) + } + } } if utils.IsSliceMember(utils.MainCDRFields, cdrcCfg.CdrFormat) { var processFile struct{} @@ -71,31 +79,20 @@ func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.R cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop } } - // unpairedRecordsCache is used with flatStore CDRs - cdrc.unpairedRecordsCache = NewUnpairedRecordsCache(cdrcCfg.PartialRecordCache, - cdrcCfg.CDROutPath, cdrcCfg.FieldSeparator) - cdrc.partialRecordsCache = NewPartialRecordsCache(cdrcCfg.PartialRecordCache, - cdrcCfg.PartialCacheExpiryAction, cdrcCfg.CDROutPath, cdrcCfg.FieldSeparator, roundDecimals, - cdrc.timezone, cdrc.httpSkipTlsCheck, cdrc.cdrs, filterS) - // Before processing, make sure in and out folders exist - cdrc.filterS = filterS - cdrc.httpClient = new(http.Client) - return cdrc, nil + return } type Cdrc struct { - httpSkipTlsCheck bool - cdrcCfgs []*config.CdrcCfg // All cdrc config profiles attached to this CDRC (key will be profile instance name) - dfltCdrcCfg *config.CdrcCfg - timezone string - cdrs rpcclient.RpcClientConnection - httpClient *http.Client - closeChan chan struct{} // Used to signal config reloads when we need to span different CDRC-Client - maxOpenFiles chan struct{} // Maximum number of simultaneous files processed - unpairedRecordsCache *UnpairedRecordsCache // Shared between all files in the folder we process - partialRecordsCache *PartialRecordsCache - filterS *engine.FilterS + httpSkipTlsCheck bool + cdrcCfgs []*config.CdrcCfg // All cdrc config profiles attached to this CDRC (key will be profile instance name) + dfltCdrcCfg *config.CdrcCfg + timezone string + cdrs rpcclient.RpcClientConnection + closeChan chan struct{} // Used to signal config reloads when we need to span different CDRC-Client + maxOpenFiles chan struct{} // Maximum number of simultaneous files processed + filterS *engine.FilterS + roundDecimals int } // When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing @@ -184,11 +181,11 @@ func (self *Cdrc) processFile(filePath string) error { csvReader.Comma = self.dfltCdrcCfg.FieldSeparator csvReader.Comment = '#' recordsProcessor = NewCsvRecordsProcessor(csvReader, self.timezone, fn, self.dfltCdrcCfg, - self.cdrcCfgs, self.httpSkipTlsCheck, self.unpairedRecordsCache, self.partialRecordsCache, - self.dfltCdrcCfg.CacheDumpFields, self.filterS) + self.cdrcCfgs, self.httpSkipTlsCheck, + self.dfltCdrcCfg.CacheDumpFields, self.filterS, self.cdrs, self.roundDecimals) case utils.MetaFileFWV: recordsProcessor = NewFwvRecordsProcessor(file, self.dfltCdrcCfg, self.cdrcCfgs, - self.httpClient, self.httpSkipTlsCheck, self.timezone, self.filterS) + self.httpSkipTlsCheck, self.timezone, self.filterS) case utils.MetaFileXML: if recordsProcessor, err = NewXMLRecordsProcessor(file, self.dfltCdrcCfg.CDRRootPath, self.timezone, self.httpSkipTlsCheck, self.cdrcCfgs, self.filterS); err != nil { diff --git a/cdrc/csv.go b/cdrc/csv.go index 244b6eeb6..864a57210 100644 --- a/cdrc/csv.go +++ b/cdrc/csv.go @@ -29,15 +29,24 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) func NewCsvRecordsProcessor(csvReader *csv.Reader, timezone, fileName string, dfltCdrcCfg *config.CdrcCfg, cdrcCfgs []*config.CdrcCfg, - httpSkipTlsCheck bool, unpairedRecordsCache *UnpairedRecordsCache, partialRecordsCache *PartialRecordsCache, - cacheDumpFields []*config.FCTemplate, filterS *engine.FilterS) *CsvRecordsProcessor { - return &CsvRecordsProcessor{csvReader: csvReader, timezone: timezone, fileName: fileName, - dfltCdrcCfg: dfltCdrcCfg, cdrcCfgs: cdrcCfgs, httpSkipTlsCheck: httpSkipTlsCheck, unpairedRecordsCache: unpairedRecordsCache, - partialRecordsCache: partialRecordsCache, partialCacheDumpFields: cacheDumpFields, filterS: filterS} + httpSkipTlsCheck bool, cacheDumpFields []*config.FCTemplate, + filterS *engine.FilterS, cdrs rpcclient.RpcClientConnection, roundDecimals int) *CsvRecordsProcessor { + return &CsvRecordsProcessor{csvReader: csvReader, + timezone: timezone, fileName: fileName, + dfltCdrcCfg: dfltCdrcCfg, cdrcCfgs: cdrcCfgs, + httpSkipTlsCheck: httpSkipTlsCheck, + unpairedRecordsCache: NewUnpairedRecordsCache(dfltCdrcCfg.PartialRecordCache, + dfltCdrcCfg.CDROutPath, dfltCdrcCfg.FieldSeparator), + partialRecordsCache: NewPartialRecordsCache(dfltCdrcCfg.PartialRecordCache, + dfltCdrcCfg.PartialCacheExpiryAction, dfltCdrcCfg.CDROutPath, + dfltCdrcCfg.FieldSeparator, roundDecimals, + timezone, httpSkipTlsCheck, cdrs, filterS), + partialCacheDumpFields: cacheDumpFields, filterS: filterS} } diff --git a/cdrc/fwv.go b/cdrc/fwv.go index 93d90c56b..5bd8cf98f 100644 --- a/cdrc/fwv.go +++ b/cdrc/fwv.go @@ -24,7 +24,6 @@ import ( "fmt" "io" "net" - "net/http" "os" "strconv" "strings" @@ -35,7 +34,7 @@ import ( ) func NewFwvRecordsProcessor(file *os.File, dfltCfg *config.CdrcCfg, - cdrcCfgs []*config.CdrcCfg, httpClient *http.Client, + cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, timezone string, filterS *engine.FilterS) *FwvRecordsProcessor { return &FwvRecordsProcessor{file: file, cdrcCfgs: cdrcCfgs, dfltCfg: dfltCfg, httpSkipTlsCheck: httpSkipTlsCheck, timezone: timezone, filterS: filterS} @@ -45,7 +44,6 @@ type FwvRecordsProcessor struct { file *os.File dfltCfg *config.CdrcCfg // General parameters cdrcCfgs []*config.CdrcCfg - httpClient *http.Client httpSkipTlsCheck bool timezone string lineLen int64 // Length of the line in the file diff --git a/config/config.go b/config/config.go index 940ff6b2d..f1a3a5ca7 100755 --- a/config/config.go +++ b/config/config.go @@ -437,13 +437,6 @@ func (self *CGRConfig) checkConfigSanity() error { } } } - if utils.IsSliceMember(utils.MainCDRFields, cdrcInst.CdrFormat) { - for _, dir := range []string{cdrcInst.CDRInPath, cdrcInst.CDROutPath} { - if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { - return fmt.Errorf(" nonexistent folder: %s", dir) - } - } - } } } // Loaders sanity checks