PartialCDRRecordsCache inside CDRC

This commit is contained in:
DanB
2016-07-31 13:43:53 +02:00
parent 2ca2e607d3
commit c7eaa9988e
5 changed files with 48 additions and 20 deletions

View File

@@ -55,7 +55,7 @@ Common parameters within configs processed:
Parameters specific per config instance:
* duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields
*/
func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection, closeChan chan struct{}, dfltTimezone string) (*Cdrc, error) {
func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection, closeChan chan struct{}, dfltTimezone string, roundDecimals int) (*Cdrc, error) {
var cdrcCfg *config.CdrcConfig
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
break
@@ -71,6 +71,9 @@ func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclien
if cdrc.unpairedRecordsCache, err = NewUnpairedRecordsCache(cdrcCfg.PartialRecordCache, cdrcCfg.CdrOutDir, cdrcCfg.FieldSeparator); err != nil {
return nil, err
}
if cdrc.partialRecordsCache, err = NewPartialRecordsCache(cdrcCfg.PartialRecordCache, cdrcCfg.CdrOutDir, cdrcCfg.FieldSeparator, roundDecimals, cdrc.timezone, cdrc.httpSkipTlsCheck); err != nil {
return nil, err
}
// Before processing, make sure in and out folders exist
for _, dir := range []string{cdrcCfg.CdrInDir, cdrcCfg.CdrOutDir} {
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
@@ -91,6 +94,7 @@ type Cdrc struct {
closeChan chan struct{} // Used to signal config reloads when we need to span different CDRC-Client
maxOpenFiles chan struct{} // Maximum number of simultaneous files processed
unpairedRecordsCache *UnpairedRecordsCache // Shared between all files in the folder we process
partialRecordsCache *PartialRecordsCache
}
// When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing
@@ -174,11 +178,11 @@ func (self *Cdrc) processFile(filePath string) error {
}
var recordsProcessor RecordsProcessor
switch self.dfltCdrcCfg.CdrFormat {
case CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE:
case CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE, utils.PartialCSV:
csvReader := csv.NewReader(bufio.NewReader(file))
csvReader.Comma = self.dfltCdrcCfg.FieldSeparator
recordsProcessor = NewCsvRecordsProcessor(csvReader, self.timezone, fn, self.dfltCdrcCfg, self.cdrcCfgs,
self.httpSkipTlsCheck, self.unpairedRecordsCache)
self.httpSkipTlsCheck, self.unpairedRecordsCache, self.partialRecordsCache, self.dfltCdrcCfg.CacheDumpFields)
case utils.FWV:
recordsProcessor = NewFwvRecordsProcessor(file, self.dfltCdrcCfg, self.cdrcCfgs, self.httpClient, self.httpSkipTlsCheck, self.timezone)
case utils.XML:

View File

@@ -33,22 +33,24 @@ import (
func NewCsvRecordsProcessor(csvReader *csv.Reader, timezone, fileName string,
dfltCdrcCfg *config.CdrcConfig, cdrcCfgs []*config.CdrcConfig,
httpSkipTlsCheck bool, unpairedRecordsCache *UnpairedRecordsCache) *CsvRecordsProcessor {
httpSkipTlsCheck bool, unpairedRecordsCache *UnpairedRecordsCache, partialRecordsCache *PartialRecordsCache, cacheDumpFields []*config.CfgCdrField) *CsvRecordsProcessor {
return &CsvRecordsProcessor{csvReader: csvReader, timezone: timezone, fileName: fileName,
dfltCdrcCfg: dfltCdrcCfg, cdrcCfgs: cdrcCfgs,
httpSkipTlsCheck: httpSkipTlsCheck, unpairedRecordsCache: unpairedRecordsCache}
dfltCdrcCfg: dfltCdrcCfg, cdrcCfgs: cdrcCfgs, httpSkipTlsCheck: httpSkipTlsCheck, unpairedRecordsCache: unpairedRecordsCache,
partialRecordsCache: partialRecordsCache, partialCacheDumpFields: cacheDumpFields}
}
type CsvRecordsProcessor struct {
csvReader *csv.Reader
timezone string // Timezone for CDRs which are not clearly specifying it
fileName string
dfltCdrcCfg *config.CdrcConfig
cdrcCfgs []*config.CdrcConfig
processedRecordsNr int64 // Number of content records in file
httpSkipTlsCheck bool
unpairedRecordsCache *UnpairedRecordsCache // Shared by cdrc so we can cache for all files in a folder
csvReader *csv.Reader
timezone string // Timezone for CDRs which are not clearly specifying it
fileName string
dfltCdrcCfg *config.CdrcConfig
cdrcCfgs []*config.CdrcConfig
processedRecordsNr int64 // Number of content records in file
httpSkipTlsCheck bool
unpairedRecordsCache *UnpairedRecordsCache // Shared by cdrc so we can cache for all files in a folder
partialRecordsCache *PartialRecordsCache // Cache records which are of type "Partial"
partialCacheDumpFields []*config.CfgCdrField
}
func (self *CsvRecordsProcessor) ProcessedRecordsNr() int64 {
@@ -116,11 +118,17 @@ func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR,
if filterBreak { // Stop importing cdrc fields profile due to non matching filter
continue
}
if storedCdr, err := self.recordToStoredCdr(record, cdrcCfg); err != nil {
storedCdr, err := self.recordToStoredCdr(record, cdrcCfg)
if err != nil {
return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error())
} else {
recordCdrs = append(recordCdrs, storedCdr)
} else if self.dfltCdrcCfg.CdrFormat == utils.PartialCSV {
if storedCdr, err = self.partialRecordsCache.MergePartialCDRRecord(NewPartialCDRRecord(storedCdr, self.partialCacheDumpFields)); err != nil {
return nil, fmt.Errorf("Failed merging PartialCDR, error: %s", err.Error())
} else if storedCdr == nil { // CDR was absorbed by cache since it was partial
continue
}
}
recordCdrs = append(recordCdrs, storedCdr)
if !cdrcCfg.ContinueOnSuccess {
break
}

View File

@@ -113,12 +113,15 @@ func (prc *PartialRecordsCache) uncachePartialCDR(pCDR *PartialCDRRecord) {
}
// Returns PartialCDR only if merge was possible
func (prc *PartialRecordsCache) MergePartialCDR(pCDR *PartialCDRRecord) (*engine.CDR, error) {
func (prc *PartialRecordsCache) MergePartialCDRRecord(pCDR *PartialCDRRecord) (*engine.CDR, error) {
if pCDR.Len() == 0 || pCDR.cdrs[0].OriginID == "" { // Sanity check
return nil, nil
}
originID := pCDR.cdrs[0].OriginID
pCDRIf, err := prc.guard.Guard(func() (interface{}, error) {
if _, hasIt := prc.partialRecords[originID]; !hasIt && pCDR.Len() == 1 && !pCDR.cdrs[0].Partial {
return pCDR, nil // Special case when not a partial CDR and not having cached CDRs on same OriginID
}
cachedPartialCDR := prc.cachePartialCDR(pCDR)
var final bool
for _, cdr := range pCDR.cdrs {
@@ -136,6 +139,10 @@ func (prc *PartialRecordsCache) MergePartialCDR(pCDR *PartialCDRRecord) (*engine
return pCDRIf.(*engine.CDR), err
}
func NewPartialCDRRecord(cdr *engine.CDR, cacheDumpFlds []*config.CfgCdrField) *PartialCDRRecord {
return &PartialCDRRecord{cdrs: []*engine.CDR{cdr}, cacheDumpFields: cacheDumpFlds}
}
// PartialCDRRecord is a record which can be updated later
// different from PartialFlatstoreRecordsCache which is incomplete (eg: need to calculate duration out of 2 records)
type PartialCDRRecord struct {

View File

@@ -117,7 +117,7 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne
exitChan <- true
return
}
cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan, cfg.DefaultTimezone)
cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan, cfg.DefaultTimezone, cfg.RoundingDecimals)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("Cdrc config parsing error: %s", err.Error()))
exitChan <- true

View File

@@ -66,6 +66,12 @@ func TestCdrsInitCdrDb(t *testing.T) {
if err := engine.InitStorDb(cdrsSlaveCfg); err != nil {
t.Fatal(err)
}
/*
if err := os.Mkdir(cdrsMasterCfg.HttpFailedDir, 0700); err != nil {
t.Error(err)
}
*/
}
func TestCdrsStartMasterEngine(t *testing.T) {
@@ -149,7 +155,7 @@ func TestCdrsFileFailover(t *testing.T) {
if !*testIntegration {
return
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
time.Sleep(time.Duration(3**waitRater) * time.Millisecond)
failoverContent := []byte(`Account=1001&AnswerTime=2013-12-07T08%3A42%3A26Z&Category=call&Destination=1002&Direction=%2Aout&DisconnectCause=&OriginHost=192.168.1.1&OriginID=httpjsonrpc1&PDD=0&RequestType=%2Apseudoprepaid&SetupTime=2013-12-07T08%3A42%3A24Z&Source=UNKNOWN&Subject=1001&Supplier=&Tenant=cgrates.org&ToR=%2Avoice&Usage=10&field_extr1=val_extr1&fieldextr2=valextr2`)
var rplCfg *config.CdrReplicationCfg
for _, rplCfg = range cdrsMasterCfg.CDRSCdrReplication {
@@ -158,6 +164,9 @@ func TestCdrsFileFailover(t *testing.T) {
}
}
filesInDir, _ := ioutil.ReadDir(cdrsMasterCfg.HttpFailedDir)
if len(filesInDir) == 0 {
t.Fatalf("No files in directory: %s", cdrsMasterCfg.HttpFailedDir)
}
var fileName string
for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config
fileName = file.Name()