From 61344c1dbf84fac8004721bb0faa82a2be4e591b Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 12 Jul 2015 15:58:21 +0200 Subject: [PATCH] cdrc.dumpUnpairedRecords mecanism to auto-clean cache of the partial flatstore CDR files --- cdrc/cdrc.go | 64 +++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 32a5c9fef..2147d9518 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -39,8 +39,9 @@ import ( ) const ( - CSV = "csv" - FS_CSV = "freeswitch_csv" + CSV = "csv" + FS_CSV = "freeswitch_csv" + UNPAIRED_SUFFIX = ".unpaired" ) // Populates the @@ -176,6 +177,7 @@ func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrS idx := 0 for _, cfg := range cdrcCfgs { if idx == 0 { // Steal the config from just one instance since it should be the same for all + cdrc.partialRecordCache = cfg.PartialRecordCache cdrc.failedCallsPrefix = cfg.FailedCallsPrefix } cdrc.cdrSourceIds[idx] = cfg.CdrSourceId @@ -199,20 +201,21 @@ type Cdrc struct { CdrFormat, cdrInDir, cdrOutDir string - failedCallsPrefix string // Configured failedCallsPrefix, used in case of flatstore CDRs - cdrSourceIds []string // Should be in sync with cdrFields on indexes - runDelay time.Duration - csvSep rune - duMultiplyFactors []float64 - cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes - cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters - httpSkipTlsCheck bool - cdrServer *engine.CdrServer // Reference towards internal cdrServer if that is the case - httpClient *http.Client - exitChan chan struct{} - maxOpenFiles chan struct{} // Maximum number of simultaneous files processed - partialRecords map[string]map[string]*PartialFlatstoreRecord // [FileName"][AccId]*PartialRecord - guard *engine.GuardianLock + failedCallsPrefix string // Configured failedCallsPrefix, used in case of flatstore CDRs + cdrSourceIds []string // Should be in sync with cdrFields on indexes + runDelay time.Duration + csvSep rune + duMultiplyFactors []float64 + cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes + cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters + httpSkipTlsCheck bool + cdrServer *engine.CdrServer // Reference towards internal cdrServer if that is the case + httpClient *http.Client + exitChan chan struct{} + maxOpenFiles chan struct{} // Maximum number of simultaneous files processed + partialRecords map[string]map[string]*PartialFlatstoreRecord // [FileName"][AccId]*PartialRecord + partialRecordCache time.Duration // Duration to cache partial records for + guard *engine.GuardianLock } // When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing @@ -347,6 +350,12 @@ func (self *Cdrc) processPartialRecord(record []string, fileName string) ([]stri // ToDo: schedule dumping of the .unpaired files if fileMp, hasFile := self.partialRecords[fileName]; !hasFile { self.partialRecords[fileName] = map[string]*PartialFlatstoreRecord{pr.AccId: pr} + if self.partialRecordCache != 0 { // Schedule expiry/dump of the just created entry in cache + go func() { + time.Sleep(self.partialRecordCache) + self.dumpUnpairedRecords(fileName) + }() + } return true, nil } else if _, hasAccId := fileMp[pr.AccId]; !hasAccId { self.partialRecords[fileName][pr.AccId] = pr @@ -361,6 +370,29 @@ func (self *Cdrc) processPartialRecord(record []string, fileName string) ([]stri return pairToRecord(self.partialRecords[fileName][pr.AccId], pr) } +// Dumps the cache into a .unpaired file in the outdir and cleans cache after +func (self *Cdrc) dumpUnpairedRecords(fileName string) error { + _, err := self.guard.Guard(func() (interface{}, error) { + unpairedFilePath := path.Join(self.cdrOutDir, fileName+UNPAIRED_SUFFIX) + fileOut, err := os.Create(unpairedFilePath) + if err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed creating %s, error: %s", unpairedFilePath, err.Error())) + return nil, err + } + csvWriter := csv.NewWriter(fileOut) + csvWriter.Comma = self.csvSep + for _, pr := range self.partialRecords[fileName] { + if err := csvWriter.Write(pr.Values); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed writing unpaired record %v to file: %s, error: %s", pr, unpairedFilePath, err.Error())) + return nil, err + } + } + delete(self.partialRecords, fileName) + return nil, nil + }, fileName) + return err +} + // Takes the record from a slice and turns it into StoredCdrs, posting them to the cdrServer func (self *Cdrc) processRecord(record []string, srcRowNr int) error { recordCdrs := make([]*engine.StoredCdr, 0) // More CDRs based on the number of filters and field templates