From 7fbde1b3587491a9f1b0315df152468de96aabeb Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 10 Jun 2019 13:40:16 +0200 Subject: [PATCH] CDRC - config sanity moved, partial and unpaired records cache simplified --- cdrc/cdrc.go | 34 ++++++++++++++-------------------- cdrc/partial_cdr.go | 4 ++-- cdrc/unpairedrecords.go | 4 ++-- config/config.go | 9 ++++++++- utils/consts.go | 2 ++ 5 files changed, 28 insertions(+), 25 deletions(-) diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 4b17433c1..c8a118006 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -54,9 +54,9 @@ Parameters specific per config instance: * duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields */ func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection, - closeChan chan struct{}, dfltTimezone string, roundDecimals int, filterS *engine.FilterS) (*Cdrc, error) { + closeChan chan struct{}, dfltTimezone string, roundDecimals int, filterS *engine.FilterS) (cdrc *Cdrc, err error) { cdrcCfg := cdrcCfgs[0] - cdrc := &Cdrc{ + cdrc = &Cdrc{ httpSkipTlsCheck: httpSkipTlsCheck, cdrcCfgs: cdrcCfgs, dfltCdrcCfg: cdrcCfg, @@ -65,26 +65,20 @@ func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.R closeChan: closeChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles), } - var processFile struct{} - for i := 0; i < cdrcCfg.MaxOpenFiles; i++ { - cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop - } - var err error - if cdrc.unpairedRecordsCache, err = NewUnpairedRecordsCache(cdrcCfg.PartialRecordCache, - cdrcCfg.CDROutPath, cdrcCfg.FieldSeparator); err != nil { - return nil, err - } - if cdrc.partialRecordsCache, err = NewPartialRecordsCache(cdrcCfg.PartialRecordCache, - cdrcCfg.PartialCacheExpiryAction, cdrcCfg.CDROutPath, cdrcCfg.FieldSeparator, roundDecimals, - cdrc.timezone, cdrc.httpSkipTlsCheck, cdrc.cdrs, filterS); err != nil { - return nil, err - } - // Before processing, make sure in and out folders exist - for _, dir := range []string{cdrcCfg.CDRInPath, cdrcCfg.CDROutPath} { - if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { - return nil, fmt.Errorf("Nonexistent folder: %s", dir) + if utils.IsSliceMember(utils.MainCDRFields, cdrcCfg.CdrFormat) { + var processFile struct{} + for i := 0; i < cdrcCfg.MaxOpenFiles; i++ { + cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop } } + // unpairedRecordsCache is used with flatStore CDRs + cdrc.unpairedRecordsCache = NewUnpairedRecordsCache(cdrcCfg.PartialRecordCache, + cdrcCfg.CDROutPath, cdrcCfg.FieldSeparator) + cdrc.partialRecordsCache = NewPartialRecordsCache(cdrcCfg.PartialRecordCache, + cdrcCfg.PartialCacheExpiryAction, cdrcCfg.CDROutPath, cdrcCfg.FieldSeparator, roundDecimals, + cdrc.timezone, cdrc.httpSkipTlsCheck, cdrc.cdrs, filterS) + // Before processing, make sure in and out folders exist + cdrc.filterS = filterS cdrc.httpClient = new(http.Client) return cdrc, nil diff --git a/cdrc/partial_cdr.go b/cdrc/partial_cdr.go index 7221159a1..2ab3b3f4f 100644 --- a/cdrc/partial_cdr.go +++ b/cdrc/partial_cdr.go @@ -40,13 +40,13 @@ const ( func NewPartialRecordsCache(ttl time.Duration, expiryAction string, cdrOutDir string, csvSep rune, roundDecimals int, timezone string, httpSkipTlsCheck bool, - cdrs rpcclient.RpcClientConnection, filterS *engine.FilterS) (*PartialRecordsCache, error) { + cdrs rpcclient.RpcClientConnection, filterS *engine.FilterS) *PartialRecordsCache { return &PartialRecordsCache{ttl: ttl, expiryAction: expiryAction, cdrOutDir: cdrOutDir, csvSep: csvSep, roundDecimals: roundDecimals, timezone: timezone, httpSkipTlsCheck: httpSkipTlsCheck, cdrs: cdrs, partialRecords: make(map[string]*PartialCDRRecord), dumpTimers: make(map[string]*time.Timer), - guard: guardian.Guardian, filterS: filterS}, nil + guard: guardian.Guardian, filterS: filterS} } type PartialRecordsCache struct { diff --git a/cdrc/unpairedrecords.go b/cdrc/unpairedrecords.go index 4ed8a62bb..d964c3662 100644 --- a/cdrc/unpairedrecords.go +++ b/cdrc/unpairedrecords.go @@ -31,9 +31,9 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewUnpairedRecordsCache(ttl time.Duration, cdrOutDir string, csvSep rune) (*UnpairedRecordsCache, error) { +func NewUnpairedRecordsCache(ttl time.Duration, cdrOutDir string, csvSep rune) *UnpairedRecordsCache { return &UnpairedRecordsCache{ttl: ttl, cdrOutDir: cdrOutDir, csvSep: csvSep, - partialRecords: make(map[string]map[string]*UnpairedRecord), guard: guardian.Guardian}, nil + partialRecords: make(map[string]map[string]*UnpairedRecord), guard: guardian.Guardian} } type UnpairedRecordsCache struct { diff --git a/config/config.go b/config/config.go index 3b6b8d0b4..940ff6b2d 100755 --- a/config/config.go +++ b/config/config.go @@ -426,7 +426,7 @@ func (self *CGRConfig) checkConfigSanity() error { if len(cdrcInst.ContentFields) == 0 { return errors.New("CdrC enabled but no fields to be processed defined!") } - if cdrcInst.CdrFormat == utils.CSV { + if cdrcInst.CdrFormat == utils.MetaFileCSV { for _, cdrFld := range cdrcInst.ContentFields { for _, rsrFld := range cdrFld.Value { if rsrFld.attrName != "" { @@ -437,6 +437,13 @@ func (self *CGRConfig) checkConfigSanity() error { } } } + if utils.IsSliceMember(utils.MainCDRFields, cdrcInst.CdrFormat) { + for _, dir := range []string{cdrcInst.CDRInPath, cdrcInst.CDROutPath} { + if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { + return fmt.Errorf(" nonexistent folder: %s", dir) + } + } + } } } // Loaders sanity checks diff --git a/utils/consts.go b/utils/consts.go index 82d7ecdc1..a67532664 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -26,6 +26,8 @@ var ( MainCDRFields = []string{CGRID, Source, OriginHost, OriginID, ToR, RequestType, Tenant, Category, Account, Subject, Destination, SetupTime, AnswerTime, Usage, COST, RATED, Partial, RunID, PreRated, CostSource, CostDetails, ExtraInfo, OrderID} + CDRCFileFormats = []string{MetaFileCSV, MetaFScsv, + MetaKamFlatstore, MetaOsipsFlatstore, MetaPartialCSV, MetaFileFWV, MetaFileXML} MainCDRFieldsMap StringMap GitLastLog string // If set, it will be processed as part of versioning