diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index 9921bc4ab..4af0164d8 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -1276,7 +1276,7 @@ func TestApierLoadTariffPlanFromFolder(t *testing.T) { } else if reply != "OK" { t.Error("Calling ApierV1.LoadTariffPlanFromFolder got reply: ", reply) } - time.Sleep(time.Duration(3**waitRater) * time.Millisecond) + time.Sleep(time.Duration(4**waitRater) * time.Millisecond) } func TestApierResetDataAfterLoadFromFolder(t *testing.T) { diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index d62857a27..a3dba78fd 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -68,7 +68,7 @@ func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclien cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop } var err error - if cdrc.partialRecordsCache, err = NewPartialRecordsCache(cdrcCfg.PartialRecordCache, cdrcCfg.CdrOutDir, cdrcCfg.FieldSeparator); err != nil { + if cdrc.partialFlatstoreRecordsCache, err = NewPartialFlatstoreRecordsCache(cdrcCfg.PartialRecordCache, cdrcCfg.CdrOutDir, cdrcCfg.FieldSeparator); err != nil { return nil, err } // Before processing, make sure in and out folders exist @@ -82,15 +82,15 @@ func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclien } type Cdrc struct { - httpSkipTlsCheck bool - cdrcCfgs []*config.CdrcConfig // All cdrc config profiles attached to this CDRC (key will be profile instance name) - dfltCdrcCfg *config.CdrcConfig - timezone string - cdrs rpcclient.RpcClientConnection - httpClient *http.Client - closeChan chan struct{} // Used to signal config reloads when we need to span different CDRC-Client - maxOpenFiles chan struct{} // Maximum number of simultaneous files processed - partialRecordsCache *PartialRecordsCache // Shared between all files in the folder we process + httpSkipTlsCheck bool + cdrcCfgs []*config.CdrcConfig // All cdrc config profiles attached to this CDRC (key will be profile instance name) + dfltCdrcCfg *config.CdrcConfig + timezone string + cdrs rpcclient.RpcClientConnection + httpClient *http.Client + closeChan chan struct{} // Used to signal config reloads when we need to span different CDRC-Client + maxOpenFiles chan struct{} // Maximum number of simultaneous files processed + partialFlatstoreRecordsCache *PartialFlatstoreRecordsCache // Shared between all files in the folder we process } // When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing @@ -178,7 +178,7 @@ func (self *Cdrc) processFile(filePath string) error { csvReader := csv.NewReader(bufio.NewReader(file)) csvReader.Comma = self.dfltCdrcCfg.FieldSeparator recordsProcessor = NewCsvRecordsProcessor(csvReader, self.timezone, fn, self.dfltCdrcCfg, self.cdrcCfgs, - self.httpSkipTlsCheck, self.partialRecordsCache) + self.httpSkipTlsCheck, self.partialFlatstoreRecordsCache) case utils.FWV: recordsProcessor = NewFwvRecordsProcessor(file, self.dfltCdrcCfg, self.cdrcCfgs, self.httpClient, self.httpSkipTlsCheck, self.timezone) case utils.XML: diff --git a/cdrc/csv.go b/cdrc/csv.go index 6d12a4e52..a9a4f4906 100644 --- a/cdrc/csv.go +++ b/cdrc/csv.go @@ -22,8 +22,6 @@ import ( "encoding/csv" "encoding/json" "fmt" - "os" - "path" "strconv" "strings" "time" @@ -33,112 +31,24 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewPartialRecordsCache(ttl time.Duration, cdrOutDir string, csvSep rune) (*PartialRecordsCache, error) { - return &PartialRecordsCache{ttl: ttl, cdrOutDir: cdrOutDir, csvSep: csvSep, - partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), guard: engine.Guardian}, nil -} - -type PartialRecordsCache struct { - ttl time.Duration - cdrOutDir string - csvSep rune - partialRecords map[string]map[string]*PartialFlatstoreRecord // [FileName"][OriginID]*PartialRecord - guard *engine.GuardianLock -} - -// Dumps the cache into a .unpaired file in the outdir and cleans cache after -func (self *PartialRecordsCache) dumpUnpairedRecords(fileName string) error { - _, err := self.guard.Guard(func() (interface{}, error) { - if len(self.partialRecords[fileName]) != 0 { // Only write the file if there are records in the cache - unpairedFilePath := path.Join(self.cdrOutDir, fileName+UNPAIRED_SUFFIX) - fileOut, err := os.Create(unpairedFilePath) - if err != nil { - utils.Logger.Err(fmt.Sprintf(" Failed creating %s, error: %s", unpairedFilePath, err.Error())) - return nil, err - } - csvWriter := csv.NewWriter(fileOut) - csvWriter.Comma = self.csvSep - for _, pr := range self.partialRecords[fileName] { - if err := csvWriter.Write(pr.Values); err != nil { - utils.Logger.Err(fmt.Sprintf(" Failed writing unpaired record %v to file: %s, error: %s", pr, unpairedFilePath, err.Error())) - return nil, err - } - } - csvWriter.Flush() - } - delete(self.partialRecords, fileName) - return nil, nil - }, 0, fileName) - return err -} - -// Search in cache and return the partial record with accountind id defined, prefFilename is searched at beginning because of better match probability -func (self *PartialRecordsCache) GetPartialRecord(OriginID, prefFileName string) (string, *PartialFlatstoreRecord) { - var cachedFilename string - var cachedPartial *PartialFlatstoreRecord - checkCachedFNames := []string{prefFileName} // Higher probability to match as firstFileName - for fName := range self.partialRecords { - if fName != prefFileName { - checkCachedFNames = append(checkCachedFNames, fName) - } - } - for _, fName := range checkCachedFNames { // Need to lock them individually - self.guard.Guard(func() (interface{}, error) { - var hasPartial bool - if cachedPartial, hasPartial = self.partialRecords[fName][OriginID]; hasPartial { - cachedFilename = fName - } - return nil, nil - }, 0, fName) - if cachedPartial != nil { - break - } - } - return cachedFilename, cachedPartial -} - -func (self *PartialRecordsCache) CachePartial(fileName string, pr *PartialFlatstoreRecord) { - self.guard.Guard(func() (interface{}, error) { - if fileMp, hasFile := self.partialRecords[fileName]; !hasFile { - self.partialRecords[fileName] = map[string]*PartialFlatstoreRecord{pr.OriginID: pr} - if self.ttl != 0 { // Schedule expiry/dump of the just created entry in cache - go func() { - time.Sleep(self.ttl) - self.dumpUnpairedRecords(fileName) - }() - } - } else if _, hasOriginID := fileMp[pr.OriginID]; !hasOriginID { - self.partialRecords[fileName][pr.OriginID] = pr - } - return nil, nil - }, 0, fileName) -} - -func (self *PartialRecordsCache) UncachePartial(fileName string, pr *PartialFlatstoreRecord) { - self.guard.Guard(func() (interface{}, error) { - delete(self.partialRecords[fileName], pr.OriginID) // Remove the record out of cache - return nil, nil - }, 0, fileName) -} - func NewCsvRecordsProcessor(csvReader *csv.Reader, timezone, fileName string, dfltCdrcCfg *config.CdrcConfig, cdrcCfgs []*config.CdrcConfig, - httpSkipTlsCheck bool, partialRecordsCache *PartialRecordsCache) *CsvRecordsProcessor { + httpSkipTlsCheck bool, PartialFlatstoreRecordsCache *PartialFlatstoreRecordsCache) *CsvRecordsProcessor { return &CsvRecordsProcessor{csvReader: csvReader, timezone: timezone, fileName: fileName, dfltCdrcCfg: dfltCdrcCfg, cdrcCfgs: cdrcCfgs, - httpSkipTlsCheck: httpSkipTlsCheck, partialRecordsCache: partialRecordsCache} + httpSkipTlsCheck: httpSkipTlsCheck, PartialFlatstoreRecordsCache: PartialFlatstoreRecordsCache} } type CsvRecordsProcessor struct { - csvReader *csv.Reader - timezone string // Timezone for CDRs which are not clearly specifying it - fileName string - dfltCdrcCfg *config.CdrcConfig - cdrcCfgs []*config.CdrcConfig - processedRecordsNr int64 // Number of content records in file - httpSkipTlsCheck bool - partialRecordsCache *PartialRecordsCache // Shared by cdrc so we can cache for all files in a folder + csvReader *csv.Reader + timezone string // Timezone for CDRs which are not clearly specifying it + fileName string + dfltCdrcCfg *config.CdrcConfig + cdrcCfgs []*config.CdrcConfig + processedRecordsNr int64 // Number of content records in file + httpSkipTlsCheck bool + PartialFlatstoreRecordsCache *PartialFlatstoreRecordsCache // Shared by cdrc so we can cache for all files in a folder } func (self *CsvRecordsProcessor) ProcessedRecordsNr() int64 { @@ -152,7 +62,7 @@ func (self *CsvRecordsProcessor) ProcessNextRecord() ([]*engine.CDR, error) { } self.processedRecordsNr += 1 if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.dfltCdrcCfg.CdrFormat) { - if record, err = self.processPartialRecord(record); err != nil { + if record, err = self.processFlatstoreRecord(record); err != nil { return nil, err } else if record == nil { return nil, nil // Due to partial, none returned @@ -163,7 +73,7 @@ func (self *CsvRecordsProcessor) ProcessNextRecord() ([]*engine.CDR, error) { } // Processes a single partial record for flatstore CDRs -func (self *CsvRecordsProcessor) processPartialRecord(record []string) ([]string, error) { +func (self *CsvRecordsProcessor) processFlatstoreRecord(record []string) ([]string, error) { if strings.HasPrefix(self.fileName, self.dfltCdrcCfg.FailedCallsPrefix) { // Use the first index since they should be the same in all configs record = append(record, "0") // Append duration 0 for failed calls flatstore CDR and do not process it further return record, nil @@ -173,16 +83,16 @@ func (self *CsvRecordsProcessor) processPartialRecord(record []string) ([]string return nil, err } // Retrieve and complete the record from cache - cachedFilename, cachedPartial := self.partialRecordsCache.GetPartialRecord(pr.OriginID, self.fileName) + cachedFilename, cachedPartial := self.PartialFlatstoreRecordsCache.GetPartialRecord(pr.OriginID, self.fileName) if cachedPartial == nil { // Not cached, do it here and stop processing - self.partialRecordsCache.CachePartial(self.fileName, pr) + self.PartialFlatstoreRecordsCache.CachePartial(self.fileName, pr) return nil, nil } pairedRecord, err := pairToRecord(cachedPartial, pr) if err != nil { return nil, err } - self.partialRecordsCache.UncachePartial(cachedFilename, pr) + self.PartialFlatstoreRecordsCache.UncachePartial(cachedFilename, pr) return pairedRecord, nil } diff --git a/cdrc/flatstore.go b/cdrc/flatstore.go new file mode 100644 index 000000000..8bdcc7a6f --- /dev/null +++ b/cdrc/flatstore.go @@ -0,0 +1,178 @@ +/* +Real-time Charging System for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package cdrc + +import ( + "encoding/csv" + "errors" + "fmt" + "os" + "path" + "strconv" + "time" + + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func NewPartialFlatstoreRecordsCache(ttl time.Duration, cdrOutDir string, csvSep rune) (*PartialFlatstoreRecordsCache, error) { + return &PartialFlatstoreRecordsCache{ttl: ttl, cdrOutDir: cdrOutDir, csvSep: csvSep, + partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), guard: engine.Guardian}, nil +} + +type PartialFlatstoreRecordsCache struct { + ttl time.Duration + cdrOutDir string + csvSep rune + partialRecords map[string]map[string]*PartialFlatstoreRecord // [FileName"][OriginID]*PartialRecord + guard *engine.GuardianLock +} + +// Dumps the cache into a .unpaired file in the outdir and cleans cache after +func (self *PartialFlatstoreRecordsCache) dumpUnpairedRecords(fileName string) error { + _, err := self.guard.Guard(func() (interface{}, error) { + if len(self.partialRecords[fileName]) != 0 { // Only write the file if there are records in the cache + unpairedFilePath := path.Join(self.cdrOutDir, fileName+UNPAIRED_SUFFIX) + fileOut, err := os.Create(unpairedFilePath) + if err != nil { + utils.Logger.Err(fmt.Sprintf(" Failed creating %s, error: %s", unpairedFilePath, err.Error())) + return nil, err + } + csvWriter := csv.NewWriter(fileOut) + csvWriter.Comma = self.csvSep + for _, pr := range self.partialRecords[fileName] { + if err := csvWriter.Write(pr.Values); err != nil { + utils.Logger.Err(fmt.Sprintf(" Failed writing unpaired record %v to file: %s, error: %s", pr, unpairedFilePath, err.Error())) + return nil, err + } + } + csvWriter.Flush() + } + delete(self.partialRecords, fileName) + return nil, nil + }, 0, fileName) + return err +} + +// Search in cache and return the partial record with accountind id defined, prefFilename is searched at beginning because of better match probability +func (self *PartialFlatstoreRecordsCache) GetPartialRecord(OriginID, prefFileName string) (string, *PartialFlatstoreRecord) { + var cachedFilename string + var cachedPartial *PartialFlatstoreRecord + checkCachedFNames := []string{prefFileName} // Higher probability to match as firstFileName + for fName := range self.partialRecords { + if fName != prefFileName { + checkCachedFNames = append(checkCachedFNames, fName) + } + } + for _, fName := range checkCachedFNames { // Need to lock them individually + self.guard.Guard(func() (interface{}, error) { + var hasPartial bool + if cachedPartial, hasPartial = self.partialRecords[fName][OriginID]; hasPartial { + cachedFilename = fName + } + return nil, nil + }, 0, fName) + if cachedPartial != nil { + break + } + } + return cachedFilename, cachedPartial +} + +func (self *PartialFlatstoreRecordsCache) CachePartial(fileName string, pr *PartialFlatstoreRecord) { + self.guard.Guard(func() (interface{}, error) { + if fileMp, hasFile := self.partialRecords[fileName]; !hasFile { + self.partialRecords[fileName] = map[string]*PartialFlatstoreRecord{pr.OriginID: pr} + if self.ttl != 0 { // Schedule expiry/dump of the just created entry in cache + go func() { + time.Sleep(self.ttl) + self.dumpUnpairedRecords(fileName) + }() + } + } else if _, hasOriginID := fileMp[pr.OriginID]; !hasOriginID { + self.partialRecords[fileName][pr.OriginID] = pr + } + return nil, nil + }, 0, fileName) +} + +func (self *PartialFlatstoreRecordsCache) UncachePartial(fileName string, pr *PartialFlatstoreRecord) { + self.guard.Guard(func() (interface{}, error) { + delete(self.partialRecords[fileName], pr.OriginID) // Remove the record out of cache + return nil, nil + }, 0, fileName) +} + +func NewPartialFlatstoreRecord(record []string, timezone string) (*PartialFlatstoreRecord, error) { + if len(record) < 7 { + return nil, errors.New("MISSING_IE") + } + pr := &PartialFlatstoreRecord{Method: record[0], OriginID: record[3] + record[1] + record[2], Values: record} + var err error + if pr.Timestamp, err = utils.ParseTimeDetectLayout(record[6], timezone); err != nil { + return nil, err + } + return pr, nil +} + +// This is a partial record received from Flatstore, can be INVITE or BYE and it needs to be paired in order to produce duration +type PartialFlatstoreRecord struct { + Method string // INVITE or BYE + OriginID string // Copute here the OriginID + Timestamp time.Time // Timestamp of the event, as written by db_flastore module + Values []string // Can contain original values or updated via UpdateValues +} + +// Pairs INVITE and BYE into final record containing as last element the duration +func pairToRecord(part1, part2 *PartialFlatstoreRecord) ([]string, error) { + var invite, bye *PartialFlatstoreRecord + if part1.Method == "INVITE" { + invite = part1 + } else if part2.Method == "INVITE" { + invite = part2 + } else { + return nil, errors.New("MISSING_INVITE") + } + if part1.Method == "BYE" { + bye = part1 + } else if part2.Method == "BYE" { + bye = part2 + } else { + return nil, errors.New("MISSING_BYE") + } + if len(invite.Values) != len(bye.Values) { + return nil, errors.New("INCONSISTENT_VALUES_LENGTH") + } + record := invite.Values + for idx := range record { + switch idx { + case 0, 1, 2, 3, 6: // Leave these values as they are + case 4, 5: + record[idx] = bye.Values[idx] // Update record with status from bye + default: + if bye.Values[idx] != "" { // Any value higher than 6 is dynamically inserted, overwrite if non empty + record[idx] = bye.Values[idx] + } + + } + } + callDur := bye.Timestamp.Sub(invite.Timestamp) + record = append(record, strconv.FormatFloat(callDur.Seconds(), 'f', -1, 64)) + return record, nil +} diff --git a/cdrc/partial_cdr.go b/cdrc/partial_cdr.go index 970337e8d..7a96d82ec 100644 --- a/cdrc/partial_cdr.go +++ b/cdrc/partial_cdr.go @@ -19,10 +19,8 @@ along with this program. If not, see package cdrc import ( - "errors" "reflect" "sort" - "strconv" "time" "github.com/cgrates/cgrates/config" @@ -30,64 +28,6 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewPartialFlatstoreRecord(record []string, timezone string) (*PartialFlatstoreRecord, error) { - if len(record) < 7 { - return nil, errors.New("MISSING_IE") - } - pr := &PartialFlatstoreRecord{Method: record[0], OriginID: record[3] + record[1] + record[2], Values: record} - var err error - if pr.Timestamp, err = utils.ParseTimeDetectLayout(record[6], timezone); err != nil { - return nil, err - } - return pr, nil -} - -// This is a partial record received from Flatstore, can be INVITE or BYE and it needs to be paired in order to produce duration -type PartialFlatstoreRecord struct { - Method string // INVITE or BYE - OriginID string // Copute here the OriginID - Timestamp time.Time // Timestamp of the event, as written by db_flastore module - Values []string // Can contain original values or updated via UpdateValues -} - -// Pairs INVITE and BYE into final record containing as last element the duration -func pairToRecord(part1, part2 *PartialFlatstoreRecord) ([]string, error) { - var invite, bye *PartialFlatstoreRecord - if part1.Method == "INVITE" { - invite = part1 - } else if part2.Method == "INVITE" { - invite = part2 - } else { - return nil, errors.New("MISSING_INVITE") - } - if part1.Method == "BYE" { - bye = part1 - } else if part2.Method == "BYE" { - bye = part2 - } else { - return nil, errors.New("MISSING_BYE") - } - if len(invite.Values) != len(bye.Values) { - return nil, errors.New("INCONSISTENT_VALUES_LENGTH") - } - record := invite.Values - for idx := range record { - switch idx { - case 0, 1, 2, 3, 6: // Leave these values as they are - case 4, 5: - record[idx] = bye.Values[idx] // Update record with status from bye - default: - if bye.Values[idx] != "" { // Any value higher than 6 is dynamically inserted, overwrite if non empty - record[idx] = bye.Values[idx] - } - - } - } - callDur := bye.Timestamp.Sub(invite.Timestamp) - record = append(record, strconv.FormatFloat(callDur.Seconds(), 'f', -1, 64)) - return record, nil -} - type PartialCDRRecord struct { cdrs []*engine.CDR // Number of CDRs cacheDumpFields []*config.CfgCdrField // Fields template to use when dumping from cache on disk