mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
CDRC - config sanity moved, partial and unpaired records cache simplified
This commit is contained in:
34
cdrc/cdrc.go
34
cdrc/cdrc.go
@@ -54,9 +54,9 @@ Parameters specific per config instance:
|
||||
* duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields
|
||||
*/
|
||||
func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection,
|
||||
closeChan chan struct{}, dfltTimezone string, roundDecimals int, filterS *engine.FilterS) (*Cdrc, error) {
|
||||
closeChan chan struct{}, dfltTimezone string, roundDecimals int, filterS *engine.FilterS) (cdrc *Cdrc, err error) {
|
||||
cdrcCfg := cdrcCfgs[0]
|
||||
cdrc := &Cdrc{
|
||||
cdrc = &Cdrc{
|
||||
httpSkipTlsCheck: httpSkipTlsCheck,
|
||||
cdrcCfgs: cdrcCfgs,
|
||||
dfltCdrcCfg: cdrcCfg,
|
||||
@@ -65,26 +65,20 @@ func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.R
|
||||
closeChan: closeChan,
|
||||
maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles),
|
||||
}
|
||||
var processFile struct{}
|
||||
for i := 0; i < cdrcCfg.MaxOpenFiles; i++ {
|
||||
cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop
|
||||
}
|
||||
var err error
|
||||
if cdrc.unpairedRecordsCache, err = NewUnpairedRecordsCache(cdrcCfg.PartialRecordCache,
|
||||
cdrcCfg.CDROutPath, cdrcCfg.FieldSeparator); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cdrc.partialRecordsCache, err = NewPartialRecordsCache(cdrcCfg.PartialRecordCache,
|
||||
cdrcCfg.PartialCacheExpiryAction, cdrcCfg.CDROutPath, cdrcCfg.FieldSeparator, roundDecimals,
|
||||
cdrc.timezone, cdrc.httpSkipTlsCheck, cdrc.cdrs, filterS); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Before processing, make sure in and out folders exist
|
||||
for _, dir := range []string{cdrcCfg.CDRInPath, cdrcCfg.CDROutPath} {
|
||||
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("Nonexistent folder: %s", dir)
|
||||
if utils.IsSliceMember(utils.MainCDRFields, cdrcCfg.CdrFormat) {
|
||||
var processFile struct{}
|
||||
for i := 0; i < cdrcCfg.MaxOpenFiles; i++ {
|
||||
cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop
|
||||
}
|
||||
}
|
||||
// unpairedRecordsCache is used with flatStore CDRs
|
||||
cdrc.unpairedRecordsCache = NewUnpairedRecordsCache(cdrcCfg.PartialRecordCache,
|
||||
cdrcCfg.CDROutPath, cdrcCfg.FieldSeparator)
|
||||
cdrc.partialRecordsCache = NewPartialRecordsCache(cdrcCfg.PartialRecordCache,
|
||||
cdrcCfg.PartialCacheExpiryAction, cdrcCfg.CDROutPath, cdrcCfg.FieldSeparator, roundDecimals,
|
||||
cdrc.timezone, cdrc.httpSkipTlsCheck, cdrc.cdrs, filterS)
|
||||
// Before processing, make sure in and out folders exist
|
||||
|
||||
cdrc.filterS = filterS
|
||||
cdrc.httpClient = new(http.Client)
|
||||
return cdrc, nil
|
||||
|
||||
@@ -40,13 +40,13 @@ const (
|
||||
|
||||
func NewPartialRecordsCache(ttl time.Duration, expiryAction string, cdrOutDir string, csvSep rune,
|
||||
roundDecimals int, timezone string, httpSkipTlsCheck bool,
|
||||
cdrs rpcclient.RpcClientConnection, filterS *engine.FilterS) (*PartialRecordsCache, error) {
|
||||
cdrs rpcclient.RpcClientConnection, filterS *engine.FilterS) *PartialRecordsCache {
|
||||
return &PartialRecordsCache{ttl: ttl, expiryAction: expiryAction, cdrOutDir: cdrOutDir,
|
||||
csvSep: csvSep, roundDecimals: roundDecimals, timezone: timezone,
|
||||
httpSkipTlsCheck: httpSkipTlsCheck, cdrs: cdrs,
|
||||
partialRecords: make(map[string]*PartialCDRRecord),
|
||||
dumpTimers: make(map[string]*time.Timer),
|
||||
guard: guardian.Guardian, filterS: filterS}, nil
|
||||
guard: guardian.Guardian, filterS: filterS}
|
||||
}
|
||||
|
||||
type PartialRecordsCache struct {
|
||||
|
||||
@@ -31,9 +31,9 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewUnpairedRecordsCache(ttl time.Duration, cdrOutDir string, csvSep rune) (*UnpairedRecordsCache, error) {
|
||||
func NewUnpairedRecordsCache(ttl time.Duration, cdrOutDir string, csvSep rune) *UnpairedRecordsCache {
|
||||
return &UnpairedRecordsCache{ttl: ttl, cdrOutDir: cdrOutDir, csvSep: csvSep,
|
||||
partialRecords: make(map[string]map[string]*UnpairedRecord), guard: guardian.Guardian}, nil
|
||||
partialRecords: make(map[string]map[string]*UnpairedRecord), guard: guardian.Guardian}
|
||||
}
|
||||
|
||||
type UnpairedRecordsCache struct {
|
||||
|
||||
@@ -426,7 +426,7 @@ func (self *CGRConfig) checkConfigSanity() error {
|
||||
if len(cdrcInst.ContentFields) == 0 {
|
||||
return errors.New("CdrC enabled but no fields to be processed defined!")
|
||||
}
|
||||
if cdrcInst.CdrFormat == utils.CSV {
|
||||
if cdrcInst.CdrFormat == utils.MetaFileCSV {
|
||||
for _, cdrFld := range cdrcInst.ContentFields {
|
||||
for _, rsrFld := range cdrFld.Value {
|
||||
if rsrFld.attrName != "" {
|
||||
@@ -437,6 +437,13 @@ func (self *CGRConfig) checkConfigSanity() error {
|
||||
}
|
||||
}
|
||||
}
|
||||
if utils.IsSliceMember(utils.MainCDRFields, cdrcInst.CdrFormat) {
|
||||
for _, dir := range []string{cdrcInst.CDRInPath, cdrcInst.CDROutPath} {
|
||||
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
|
||||
return fmt.Errorf("<CDRC> nonexistent folder: %s", dir)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Loaders sanity checks
|
||||
|
||||
@@ -26,6 +26,8 @@ var (
|
||||
MainCDRFields = []string{CGRID, Source, OriginHost, OriginID, ToR, RequestType, Tenant, Category,
|
||||
Account, Subject, Destination, SetupTime, AnswerTime, Usage, COST, RATED, Partial, RunID,
|
||||
PreRated, CostSource, CostDetails, ExtraInfo, OrderID}
|
||||
CDRCFileFormats = []string{MetaFileCSV, MetaFScsv,
|
||||
MetaKamFlatstore, MetaOsipsFlatstore, MetaPartialCSV, MetaFileFWV, MetaFileXML}
|
||||
MainCDRFieldsMap StringMap
|
||||
|
||||
GitLastLog string // If set, it will be processed as part of versioning
|
||||
|
||||
Reference in New Issue
Block a user