Simplified CDRC csv constructor, removal of unused httpClient

This commit is contained in:
DanB
2019-06-10 14:52:05 +02:00
parent 7fbde1b358
commit bffce744f3
4 changed files with 38 additions and 41 deletions

View File

@@ -24,7 +24,6 @@ import (
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"time"
@@ -51,7 +50,7 @@ One instance of CDRC will act on one folder.
Common parameters within configs processed:
* cdrS, cdrFormat, CDRInPath, CDROutPath, runDelay
Parameters specific per config instance:
* duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields
* cdrSourceId, cdrFilter, cdrFields
*/
func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection,
closeChan chan struct{}, dfltTimezone string, roundDecimals int, filterS *engine.FilterS) (cdrc *Cdrc, err error) {
@@ -64,6 +63,15 @@ func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.R
cdrs: cdrs,
closeChan: closeChan,
maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles),
roundDecimals: roundDecimals,
}
// Before processing, make sure in and out folders exist
if utils.IsSliceMember(utils.MainCDRFields, cdrcCfg.CdrFormat) {
for _, dir := range []string{cdrcCfg.CDRInPath, cdrcCfg.CDROutPath} {
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
return nil, fmt.Errorf("<CDRC> nonexistent folder: %s", dir)
}
}
}
if utils.IsSliceMember(utils.MainCDRFields, cdrcCfg.CdrFormat) {
var processFile struct{}
@@ -71,31 +79,20 @@ func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.R
cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop
}
}
// unpairedRecordsCache is used with flatStore CDRs
cdrc.unpairedRecordsCache = NewUnpairedRecordsCache(cdrcCfg.PartialRecordCache,
cdrcCfg.CDROutPath, cdrcCfg.FieldSeparator)
cdrc.partialRecordsCache = NewPartialRecordsCache(cdrcCfg.PartialRecordCache,
cdrcCfg.PartialCacheExpiryAction, cdrcCfg.CDROutPath, cdrcCfg.FieldSeparator, roundDecimals,
cdrc.timezone, cdrc.httpSkipTlsCheck, cdrc.cdrs, filterS)
// Before processing, make sure in and out folders exist
cdrc.filterS = filterS
cdrc.httpClient = new(http.Client)
return cdrc, nil
return
}
type Cdrc struct {
httpSkipTlsCheck bool
cdrcCfgs []*config.CdrcCfg // All cdrc config profiles attached to this CDRC (key will be profile instance name)
dfltCdrcCfg *config.CdrcCfg
timezone string
cdrs rpcclient.RpcClientConnection
httpClient *http.Client
closeChan chan struct{} // Used to signal config reloads when we need to span different CDRC-Client
maxOpenFiles chan struct{} // Maximum number of simultaneous files processed
unpairedRecordsCache *UnpairedRecordsCache // Shared between all files in the folder we process
partialRecordsCache *PartialRecordsCache
filterS *engine.FilterS
httpSkipTlsCheck bool
cdrcCfgs []*config.CdrcCfg // All cdrc config profiles attached to this CDRC (key will be profile instance name)
dfltCdrcCfg *config.CdrcCfg
timezone string
cdrs rpcclient.RpcClientConnection
closeChan chan struct{} // Used to signal config reloads when we need to span different CDRC-Client
maxOpenFiles chan struct{} // Maximum number of simultaneous files processed
filterS *engine.FilterS
roundDecimals int
}
// When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing
@@ -184,11 +181,11 @@ func (self *Cdrc) processFile(filePath string) error {
csvReader.Comma = self.dfltCdrcCfg.FieldSeparator
csvReader.Comment = '#'
recordsProcessor = NewCsvRecordsProcessor(csvReader, self.timezone, fn, self.dfltCdrcCfg,
self.cdrcCfgs, self.httpSkipTlsCheck, self.unpairedRecordsCache, self.partialRecordsCache,
self.dfltCdrcCfg.CacheDumpFields, self.filterS)
self.cdrcCfgs, self.httpSkipTlsCheck,
self.dfltCdrcCfg.CacheDumpFields, self.filterS, self.cdrs, self.roundDecimals)
case utils.MetaFileFWV:
recordsProcessor = NewFwvRecordsProcessor(file, self.dfltCdrcCfg, self.cdrcCfgs,
self.httpClient, self.httpSkipTlsCheck, self.timezone, self.filterS)
self.httpSkipTlsCheck, self.timezone, self.filterS)
case utils.MetaFileXML:
if recordsProcessor, err = NewXMLRecordsProcessor(file, self.dfltCdrcCfg.CDRRootPath,
self.timezone, self.httpSkipTlsCheck, self.cdrcCfgs, self.filterS); err != nil {

View File

@@ -29,15 +29,24 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func NewCsvRecordsProcessor(csvReader *csv.Reader, timezone, fileName string,
dfltCdrcCfg *config.CdrcCfg, cdrcCfgs []*config.CdrcCfg,
httpSkipTlsCheck bool, unpairedRecordsCache *UnpairedRecordsCache, partialRecordsCache *PartialRecordsCache,
cacheDumpFields []*config.FCTemplate, filterS *engine.FilterS) *CsvRecordsProcessor {
return &CsvRecordsProcessor{csvReader: csvReader, timezone: timezone, fileName: fileName,
dfltCdrcCfg: dfltCdrcCfg, cdrcCfgs: cdrcCfgs, httpSkipTlsCheck: httpSkipTlsCheck, unpairedRecordsCache: unpairedRecordsCache,
partialRecordsCache: partialRecordsCache, partialCacheDumpFields: cacheDumpFields, filterS: filterS}
httpSkipTlsCheck bool, cacheDumpFields []*config.FCTemplate,
filterS *engine.FilterS, cdrs rpcclient.RpcClientConnection, roundDecimals int) *CsvRecordsProcessor {
return &CsvRecordsProcessor{csvReader: csvReader,
timezone: timezone, fileName: fileName,
dfltCdrcCfg: dfltCdrcCfg, cdrcCfgs: cdrcCfgs,
httpSkipTlsCheck: httpSkipTlsCheck,
unpairedRecordsCache: NewUnpairedRecordsCache(dfltCdrcCfg.PartialRecordCache,
dfltCdrcCfg.CDROutPath, dfltCdrcCfg.FieldSeparator),
partialRecordsCache: NewPartialRecordsCache(dfltCdrcCfg.PartialRecordCache,
dfltCdrcCfg.PartialCacheExpiryAction, dfltCdrcCfg.CDROutPath,
dfltCdrcCfg.FieldSeparator, roundDecimals,
timezone, httpSkipTlsCheck, cdrs, filterS),
partialCacheDumpFields: cacheDumpFields, filterS: filterS}
}

View File

@@ -24,7 +24,6 @@ import (
"fmt"
"io"
"net"
"net/http"
"os"
"strconv"
"strings"
@@ -35,7 +34,7 @@ import (
)
func NewFwvRecordsProcessor(file *os.File, dfltCfg *config.CdrcCfg,
cdrcCfgs []*config.CdrcCfg, httpClient *http.Client,
cdrcCfgs []*config.CdrcCfg,
httpSkipTlsCheck bool, timezone string, filterS *engine.FilterS) *FwvRecordsProcessor {
return &FwvRecordsProcessor{file: file, cdrcCfgs: cdrcCfgs, dfltCfg: dfltCfg,
httpSkipTlsCheck: httpSkipTlsCheck, timezone: timezone, filterS: filterS}
@@ -45,7 +44,6 @@ type FwvRecordsProcessor struct {
file *os.File
dfltCfg *config.CdrcCfg // General parameters
cdrcCfgs []*config.CdrcCfg
httpClient *http.Client
httpSkipTlsCheck bool
timezone string
lineLen int64 // Length of the line in the file

View File

@@ -437,13 +437,6 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
}
if utils.IsSliceMember(utils.MainCDRFields, cdrcInst.CdrFormat) {
for _, dir := range []string{cdrcInst.CDRInPath, cdrcInst.CDROutPath} {
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
return fmt.Errorf("<CDRC> nonexistent folder: %s", dir)
}
}
}
}
}
// Loaders sanity checks