diff --git a/cdrc/partial_cdr.go b/cdrc/partial_cdr.go index 2848919df..9f865e6d8 100644 --- a/cdrc/partial_cdr.go +++ b/cdrc/partial_cdr.go @@ -19,6 +19,10 @@ along with this program. If not, see package cdrc import ( + "encoding/csv" + "fmt" + "os" + "path" "reflect" "sort" "time" @@ -28,6 +32,110 @@ import ( "github.com/cgrates/cgrates/utils" ) +const ( + PartialRecordsSuffix = ".partial" +) + +func NewPartialRecordsCache(ttl time.Duration, cdrOutDir string, csvSep rune, roundDecimals int, timezone string, httpSkipTlsCheck bool) (*PartialRecordsCache, error) { + return &PartialRecordsCache{ttl: ttl, cdrOutDir: cdrOutDir, csvSep: csvSep, roundDecimals: roundDecimals, timezone: timezone, httpSkipTlsCheck: httpSkipTlsCheck, + partialRecords: make(map[string]*PartialCDRRecord), guard: engine.Guardian}, nil +} + +type PartialRecordsCache struct { + ttl time.Duration + cdrOutDir string + csvSep rune + roundDecimals int + timezone string + httpSkipTlsCheck bool + partialRecords map[string]*PartialCDRRecord // [OriginID]*PartialRecord + dumpTimers map[string]*time.Timer // [OriginID]*time.Timer which can be canceled or reset + guard *engine.GuardianLock +} + +// Dumps the cache into a .unpaired file in the outdir and cleans cache after +func (prc *PartialRecordsCache) dumpPartialRecords(originID string) { + _, err := prc.guard.Guard(func() (interface{}, error) { + if prc.partialRecords[originID].Len() != 0 { // Only write the file if there are records in the cache + dumpFilePath := path.Join(prc.cdrOutDir, fmt.Sprintf("%s.%s.%d", originID, PartialRecordsSuffix, time.Now().Unix())) + fileOut, err := os.Create(dumpFilePath) + if err != nil { + utils.Logger.Err(fmt.Sprintf(" Failed creating %s, error: %s", dumpFilePath, err.Error())) + return nil, err + } + csvWriter := csv.NewWriter(fileOut) + csvWriter.Comma = prc.csvSep + for _, cdr := range prc.partialRecords[originID].cdrs { + expRec, err := cdr.AsExportRecord(prc.partialRecords[originID].cacheDumpFields, 0, prc.roundDecimals, prc.timezone, prc.httpSkipTlsCheck, 0, "", nil) + if err != nil { + return nil, err + } + if err := csvWriter.Write(expRec); err != nil { + utils.Logger.Err(fmt.Sprintf(" Failed writing partial CDR %v to file: %s, error: %s", cdr, dumpFilePath, err.Error())) + return nil, err + } + } + csvWriter.Flush() + } + delete(prc.partialRecords, originID) + return nil, nil + }, 0, originID) + if err != nil { + utils.Logger.Err(fmt.Sprintf(" Failed dumping CDR with originID: %s, error: %s", originID, err.Error())) + } +} + +// Called to cache a partial record. +// If exists in cache, CDRs will be updated +// Locking should be handled at higher layer +func (prc *PartialRecordsCache) cachePartialCDR(pCDR *PartialCDRRecord) *PartialCDRRecord { + originID := pCDR.cdrs[0].OriginID + if tmr, hasIt := prc.dumpTimers[originID]; hasIt { + tmr.Reset(prc.ttl) + } else { + prc.dumpTimers[originID] = time.AfterFunc(prc.ttl, func() { prc.dumpPartialRecords(originID) }) // Schedule dumping of the partial CDR + } + if _, hasIt := prc.partialRecords[originID]; !hasIt { + prc.partialRecords[originID] = pCDR + } else { // Exists, update it's records + prc.partialRecords[originID].cdrs = append(prc.partialRecords[originID].cdrs, pCDR.cdrs...) + } + return prc.partialRecords[originID] +} + +// Called to uncache partialCDR and remove automatic dumping of the cached records +func (prc *PartialRecordsCache) uncachePartialCDR(pCDR *PartialCDRRecord) { + originID := pCDR.cdrs[0].OriginID + if tmr, hasIt := prc.dumpTimers[originID]; hasIt { + tmr.Stop() + } + delete(prc.partialRecords, originID) +} + +// Returns PartialCDR only if merge was possible +func (prc *PartialRecordsCache) MergePartialCDR(pCDR *PartialCDRRecord) (*engine.CDR, error) { + if pCDR.Len() == 0 || pCDR.cdrs[0].OriginID == "" { // Sanity check + return nil, nil + } + originID := pCDR.cdrs[0].OriginID + pCDRIf, err := prc.guard.Guard(func() (interface{}, error) { + cachedPartialCDR := prc.cachePartialCDR(pCDR) + var final bool + for _, cdr := range pCDR.cdrs { + if !cdr.Partial { + final = true + break + } + } + if !final { + return nil, nil + } + prc.uncachePartialCDR(cachedPartialCDR) + return cachedPartialCDR.MergeCDRs(), nil + }, 0, originID) + return pCDRIf.(*engine.CDR), err +} + // PartialCDRRecord is a record which can be updated later // different from PartialFlatstoreRecordsCache which is incomplete (eg: need to calculate duration out of 2 records) type PartialCDRRecord struct { @@ -108,5 +216,3 @@ func (partCDR *PartialCDRRecord) MergeCDRs() *engine.CDR { } return retCdr } - -//func (partCDR *PartialCDRRecord) AsString() *engine.CDR {