cdrc.dumpUnpairedRecords mecanism to auto-clean cache of the partial flatstore CDR files

This commit is contained in:
DanB
2015-07-12 15:58:21 +02:00
parent 7cd65d3fe9
commit 61344c1dbf

View File

@@ -39,8 +39,9 @@ import (
)
const (
CSV = "csv"
FS_CSV = "freeswitch_csv"
CSV = "csv"
FS_CSV = "freeswitch_csv"
UNPAIRED_SUFFIX = ".unpaired"
)
// Populates the
@@ -176,6 +177,7 @@ func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrS
idx := 0
for _, cfg := range cdrcCfgs {
if idx == 0 { // Steal the config from just one instance since it should be the same for all
cdrc.partialRecordCache = cfg.PartialRecordCache
cdrc.failedCallsPrefix = cfg.FailedCallsPrefix
}
cdrc.cdrSourceIds[idx] = cfg.CdrSourceId
@@ -199,20 +201,21 @@ type Cdrc struct {
CdrFormat,
cdrInDir,
cdrOutDir string
failedCallsPrefix string // Configured failedCallsPrefix, used in case of flatstore CDRs
cdrSourceIds []string // Should be in sync with cdrFields on indexes
runDelay time.Duration
csvSep rune
duMultiplyFactors []float64
cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes
cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters
httpSkipTlsCheck bool
cdrServer *engine.CdrServer // Reference towards internal cdrServer if that is the case
httpClient *http.Client
exitChan chan struct{}
maxOpenFiles chan struct{} // Maximum number of simultaneous files processed
partialRecords map[string]map[string]*PartialFlatstoreRecord // [FileName"][AccId]*PartialRecord
guard *engine.GuardianLock
failedCallsPrefix string // Configured failedCallsPrefix, used in case of flatstore CDRs
cdrSourceIds []string // Should be in sync with cdrFields on indexes
runDelay time.Duration
csvSep rune
duMultiplyFactors []float64
cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes
cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters
httpSkipTlsCheck bool
cdrServer *engine.CdrServer // Reference towards internal cdrServer if that is the case
httpClient *http.Client
exitChan chan struct{}
maxOpenFiles chan struct{} // Maximum number of simultaneous files processed
partialRecords map[string]map[string]*PartialFlatstoreRecord // [FileName"][AccId]*PartialRecord
partialRecordCache time.Duration // Duration to cache partial records for
guard *engine.GuardianLock
}
// When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing
@@ -347,6 +350,12 @@ func (self *Cdrc) processPartialRecord(record []string, fileName string) ([]stri
// ToDo: schedule dumping of the .unpaired files
if fileMp, hasFile := self.partialRecords[fileName]; !hasFile {
self.partialRecords[fileName] = map[string]*PartialFlatstoreRecord{pr.AccId: pr}
if self.partialRecordCache != 0 { // Schedule expiry/dump of the just created entry in cache
go func() {
time.Sleep(self.partialRecordCache)
self.dumpUnpairedRecords(fileName)
}()
}
return true, nil
} else if _, hasAccId := fileMp[pr.AccId]; !hasAccId {
self.partialRecords[fileName][pr.AccId] = pr
@@ -361,6 +370,29 @@ func (self *Cdrc) processPartialRecord(record []string, fileName string) ([]stri
return pairToRecord(self.partialRecords[fileName][pr.AccId], pr)
}
// Dumps the cache into a .unpaired file in the outdir and cleans cache after
func (self *Cdrc) dumpUnpairedRecords(fileName string) error {
_, err := self.guard.Guard(func() (interface{}, error) {
unpairedFilePath := path.Join(self.cdrOutDir, fileName+UNPAIRED_SUFFIX)
fileOut, err := os.Create(unpairedFilePath)
if err != nil {
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed creating %s, error: %s", unpairedFilePath, err.Error()))
return nil, err
}
csvWriter := csv.NewWriter(fileOut)
csvWriter.Comma = self.csvSep
for _, pr := range self.partialRecords[fileName] {
if err := csvWriter.Write(pr.Values); err != nil {
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed writing unpaired record %v to file: %s, error: %s", pr, unpairedFilePath, err.Error()))
return nil, err
}
}
delete(self.partialRecords, fileName)
return nil, nil
}, fileName)
return err
}
// Takes the record from a slice and turns it into StoredCdrs, posting them to the cdrServer
func (self *Cdrc) processRecord(record []string, srcRowNr int) error {
recordCdrs := make([]*engine.StoredCdr, 0) // More CDRs based on the number of filters and field templates