diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index dea018578..e83bf5ad0 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -55,7 +55,7 @@ Common parameters within configs processed: Parameters specific per config instance: * duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields */ -func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection, closeChan chan struct{}, dfltTimezone string) (*Cdrc, error) { +func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection, closeChan chan struct{}, dfltTimezone string, roundDecimals int) (*Cdrc, error) { var cdrcCfg *config.CdrcConfig for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one break @@ -71,6 +71,9 @@ func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclien if cdrc.unpairedRecordsCache, err = NewUnpairedRecordsCache(cdrcCfg.PartialRecordCache, cdrcCfg.CdrOutDir, cdrcCfg.FieldSeparator); err != nil { return nil, err } + if cdrc.partialRecordsCache, err = NewPartialRecordsCache(cdrcCfg.PartialRecordCache, cdrcCfg.CdrOutDir, cdrcCfg.FieldSeparator, roundDecimals, cdrc.timezone, cdrc.httpSkipTlsCheck); err != nil { + return nil, err + } // Before processing, make sure in and out folders exist for _, dir := range []string{cdrcCfg.CdrInDir, cdrcCfg.CdrOutDir} { if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { @@ -91,6 +94,7 @@ type Cdrc struct { closeChan chan struct{} // Used to signal config reloads when we need to span different CDRC-Client maxOpenFiles chan struct{} // Maximum number of simultaneous files processed unpairedRecordsCache *UnpairedRecordsCache // Shared between all files in the folder we process + partialRecordsCache *PartialRecordsCache } // When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing @@ -174,11 +178,11 @@ func (self *Cdrc) processFile(filePath string) error { } var recordsProcessor RecordsProcessor switch self.dfltCdrcCfg.CdrFormat { - case CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE: + case CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE, utils.PartialCSV: csvReader := csv.NewReader(bufio.NewReader(file)) csvReader.Comma = self.dfltCdrcCfg.FieldSeparator recordsProcessor = NewCsvRecordsProcessor(csvReader, self.timezone, fn, self.dfltCdrcCfg, self.cdrcCfgs, - self.httpSkipTlsCheck, self.unpairedRecordsCache) + self.httpSkipTlsCheck, self.unpairedRecordsCache, self.partialRecordsCache, self.dfltCdrcCfg.CacheDumpFields) case utils.FWV: recordsProcessor = NewFwvRecordsProcessor(file, self.dfltCdrcCfg, self.cdrcCfgs, self.httpClient, self.httpSkipTlsCheck, self.timezone) case utils.XML: diff --git a/cdrc/csv.go b/cdrc/csv.go index 2efa833b7..f3ab090a9 100644 --- a/cdrc/csv.go +++ b/cdrc/csv.go @@ -33,22 +33,24 @@ import ( func NewCsvRecordsProcessor(csvReader *csv.Reader, timezone, fileName string, dfltCdrcCfg *config.CdrcConfig, cdrcCfgs []*config.CdrcConfig, - httpSkipTlsCheck bool, unpairedRecordsCache *UnpairedRecordsCache) *CsvRecordsProcessor { + httpSkipTlsCheck bool, unpairedRecordsCache *UnpairedRecordsCache, partialRecordsCache *PartialRecordsCache, cacheDumpFields []*config.CfgCdrField) *CsvRecordsProcessor { return &CsvRecordsProcessor{csvReader: csvReader, timezone: timezone, fileName: fileName, - dfltCdrcCfg: dfltCdrcCfg, cdrcCfgs: cdrcCfgs, - httpSkipTlsCheck: httpSkipTlsCheck, unpairedRecordsCache: unpairedRecordsCache} + dfltCdrcCfg: dfltCdrcCfg, cdrcCfgs: cdrcCfgs, httpSkipTlsCheck: httpSkipTlsCheck, unpairedRecordsCache: unpairedRecordsCache, + partialRecordsCache: partialRecordsCache, partialCacheDumpFields: cacheDumpFields} } type CsvRecordsProcessor struct { - csvReader *csv.Reader - timezone string // Timezone for CDRs which are not clearly specifying it - fileName string - dfltCdrcCfg *config.CdrcConfig - cdrcCfgs []*config.CdrcConfig - processedRecordsNr int64 // Number of content records in file - httpSkipTlsCheck bool - unpairedRecordsCache *UnpairedRecordsCache // Shared by cdrc so we can cache for all files in a folder + csvReader *csv.Reader + timezone string // Timezone for CDRs which are not clearly specifying it + fileName string + dfltCdrcCfg *config.CdrcConfig + cdrcCfgs []*config.CdrcConfig + processedRecordsNr int64 // Number of content records in file + httpSkipTlsCheck bool + unpairedRecordsCache *UnpairedRecordsCache // Shared by cdrc so we can cache for all files in a folder + partialRecordsCache *PartialRecordsCache // Cache records which are of type "Partial" + partialCacheDumpFields []*config.CfgCdrField } func (self *CsvRecordsProcessor) ProcessedRecordsNr() int64 { @@ -116,11 +118,17 @@ func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR, if filterBreak { // Stop importing cdrc fields profile due to non matching filter continue } - if storedCdr, err := self.recordToStoredCdr(record, cdrcCfg); err != nil { + storedCdr, err := self.recordToStoredCdr(record, cdrcCfg) + if err != nil { return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error()) - } else { - recordCdrs = append(recordCdrs, storedCdr) + } else if self.dfltCdrcCfg.CdrFormat == utils.PartialCSV { + if storedCdr, err = self.partialRecordsCache.MergePartialCDRRecord(NewPartialCDRRecord(storedCdr, self.partialCacheDumpFields)); err != nil { + return nil, fmt.Errorf("Failed merging PartialCDR, error: %s", err.Error()) + } else if storedCdr == nil { // CDR was absorbed by cache since it was partial + continue + } } + recordCdrs = append(recordCdrs, storedCdr) if !cdrcCfg.ContinueOnSuccess { break } diff --git a/cdrc/partial_cdr.go b/cdrc/partial_cdr.go index 9f865e6d8..cf465d0e1 100644 --- a/cdrc/partial_cdr.go +++ b/cdrc/partial_cdr.go @@ -113,12 +113,15 @@ func (prc *PartialRecordsCache) uncachePartialCDR(pCDR *PartialCDRRecord) { } // Returns PartialCDR only if merge was possible -func (prc *PartialRecordsCache) MergePartialCDR(pCDR *PartialCDRRecord) (*engine.CDR, error) { +func (prc *PartialRecordsCache) MergePartialCDRRecord(pCDR *PartialCDRRecord) (*engine.CDR, error) { if pCDR.Len() == 0 || pCDR.cdrs[0].OriginID == "" { // Sanity check return nil, nil } originID := pCDR.cdrs[0].OriginID pCDRIf, err := prc.guard.Guard(func() (interface{}, error) { + if _, hasIt := prc.partialRecords[originID]; !hasIt && pCDR.Len() == 1 && !pCDR.cdrs[0].Partial { + return pCDR, nil // Special case when not a partial CDR and not having cached CDRs on same OriginID + } cachedPartialCDR := prc.cachePartialCDR(pCDR) var final bool for _, cdr := range pCDR.cdrs { @@ -136,6 +139,10 @@ func (prc *PartialRecordsCache) MergePartialCDR(pCDR *PartialCDRRecord) (*engine return pCDRIf.(*engine.CDR), err } +func NewPartialCDRRecord(cdr *engine.CDR, cacheDumpFlds []*config.CfgCdrField) *PartialCDRRecord { + return &PartialCDRRecord{cdrs: []*engine.CDR{cdr}, cacheDumpFields: cacheDumpFlds} +} + // PartialCDRRecord is a record which can be updated later // different from PartialFlatstoreRecordsCache which is incomplete (eg: need to calculate duration out of 2 records) type PartialCDRRecord struct { diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 26da8300c..cfaadb9f6 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -117,7 +117,7 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne exitChan <- true return } - cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan, cfg.DefaultTimezone) + cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan, cfg.DefaultTimezone, cfg.RoundingDecimals) if err != nil { utils.Logger.Crit(fmt.Sprintf("Cdrc config parsing error: %s", err.Error())) exitChan <- true diff --git a/general_tests/cdrs_replication_it_test.go b/general_tests/cdrs_replication_it_test.go index 1a496cd0b..511bb6216 100644 --- a/general_tests/cdrs_replication_it_test.go +++ b/general_tests/cdrs_replication_it_test.go @@ -66,6 +66,12 @@ func TestCdrsInitCdrDb(t *testing.T) { if err := engine.InitStorDb(cdrsSlaveCfg); err != nil { t.Fatal(err) } + /* + if err := os.Mkdir(cdrsMasterCfg.HttpFailedDir, 0700); err != nil { + t.Error(err) + } + */ + } func TestCdrsStartMasterEngine(t *testing.T) { @@ -149,7 +155,7 @@ func TestCdrsFileFailover(t *testing.T) { if !*testIntegration { return } - time.Sleep(time.Duration(*waitRater) * time.Millisecond) + time.Sleep(time.Duration(3**waitRater) * time.Millisecond) failoverContent := []byte(`Account=1001&AnswerTime=2013-12-07T08%3A42%3A26Z&Category=call&Destination=1002&Direction=%2Aout&DisconnectCause=&OriginHost=192.168.1.1&OriginID=httpjsonrpc1&PDD=0&RequestType=%2Apseudoprepaid&SetupTime=2013-12-07T08%3A42%3A24Z&Source=UNKNOWN&Subject=1001&Supplier=&Tenant=cgrates.org&ToR=%2Avoice&Usage=10&field_extr1=val_extr1&fieldextr2=valextr2`) var rplCfg *config.CdrReplicationCfg for _, rplCfg = range cdrsMasterCfg.CDRSCdrReplication { @@ -158,6 +164,9 @@ func TestCdrsFileFailover(t *testing.T) { } } filesInDir, _ := ioutil.ReadDir(cdrsMasterCfg.HttpFailedDir) + if len(filesInDir) == 0 { + t.Fatalf("No files in directory: %s", cdrsMasterCfg.HttpFailedDir) + } var fileName string for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config fileName = file.Name()