PartialRecordsCache with MergePartialCDR method

This commit is contained in:
DanB
2016-07-30 17:13:48 +02:00
parent 6b81e846c2
commit a36812f063

View File

@@ -19,6 +19,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package cdrc
import (
"encoding/csv"
"fmt"
"os"
"path"
"reflect"
"sort"
"time"
@@ -28,6 +32,110 @@ import (
"github.com/cgrates/cgrates/utils"
)
const (
PartialRecordsSuffix = ".partial"
)
func NewPartialRecordsCache(ttl time.Duration, cdrOutDir string, csvSep rune, roundDecimals int, timezone string, httpSkipTlsCheck bool) (*PartialRecordsCache, error) {
return &PartialRecordsCache{ttl: ttl, cdrOutDir: cdrOutDir, csvSep: csvSep, roundDecimals: roundDecimals, timezone: timezone, httpSkipTlsCheck: httpSkipTlsCheck,
partialRecords: make(map[string]*PartialCDRRecord), guard: engine.Guardian}, nil
}
type PartialRecordsCache struct {
ttl time.Duration
cdrOutDir string
csvSep rune
roundDecimals int
timezone string
httpSkipTlsCheck bool
partialRecords map[string]*PartialCDRRecord // [OriginID]*PartialRecord
dumpTimers map[string]*time.Timer // [OriginID]*time.Timer which can be canceled or reset
guard *engine.GuardianLock
}
// Dumps the cache into a .unpaired file in the outdir and cleans cache after
func (prc *PartialRecordsCache) dumpPartialRecords(originID string) {
_, err := prc.guard.Guard(func() (interface{}, error) {
if prc.partialRecords[originID].Len() != 0 { // Only write the file if there are records in the cache
dumpFilePath := path.Join(prc.cdrOutDir, fmt.Sprintf("%s.%s.%d", originID, PartialRecordsSuffix, time.Now().Unix()))
fileOut, err := os.Create(dumpFilePath)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<Cdrc> Failed creating %s, error: %s", dumpFilePath, err.Error()))
return nil, err
}
csvWriter := csv.NewWriter(fileOut)
csvWriter.Comma = prc.csvSep
for _, cdr := range prc.partialRecords[originID].cdrs {
expRec, err := cdr.AsExportRecord(prc.partialRecords[originID].cacheDumpFields, 0, prc.roundDecimals, prc.timezone, prc.httpSkipTlsCheck, 0, "", nil)
if err != nil {
return nil, err
}
if err := csvWriter.Write(expRec); err != nil {
utils.Logger.Err(fmt.Sprintf("<Cdrc> Failed writing partial CDR %v to file: %s, error: %s", cdr, dumpFilePath, err.Error()))
return nil, err
}
}
csvWriter.Flush()
}
delete(prc.partialRecords, originID)
return nil, nil
}, 0, originID)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRC> Failed dumping CDR with originID: %s, error: %s", originID, err.Error()))
}
}
// Called to cache a partial record.
// If exists in cache, CDRs will be updated
// Locking should be handled at higher layer
func (prc *PartialRecordsCache) cachePartialCDR(pCDR *PartialCDRRecord) *PartialCDRRecord {
originID := pCDR.cdrs[0].OriginID
if tmr, hasIt := prc.dumpTimers[originID]; hasIt {
tmr.Reset(prc.ttl)
} else {
prc.dumpTimers[originID] = time.AfterFunc(prc.ttl, func() { prc.dumpPartialRecords(originID) }) // Schedule dumping of the partial CDR
}
if _, hasIt := prc.partialRecords[originID]; !hasIt {
prc.partialRecords[originID] = pCDR
} else { // Exists, update it's records
prc.partialRecords[originID].cdrs = append(prc.partialRecords[originID].cdrs, pCDR.cdrs...)
}
return prc.partialRecords[originID]
}
// Called to uncache partialCDR and remove automatic dumping of the cached records
func (prc *PartialRecordsCache) uncachePartialCDR(pCDR *PartialCDRRecord) {
originID := pCDR.cdrs[0].OriginID
if tmr, hasIt := prc.dumpTimers[originID]; hasIt {
tmr.Stop()
}
delete(prc.partialRecords, originID)
}
// Returns PartialCDR only if merge was possible
func (prc *PartialRecordsCache) MergePartialCDR(pCDR *PartialCDRRecord) (*engine.CDR, error) {
if pCDR.Len() == 0 || pCDR.cdrs[0].OriginID == "" { // Sanity check
return nil, nil
}
originID := pCDR.cdrs[0].OriginID
pCDRIf, err := prc.guard.Guard(func() (interface{}, error) {
cachedPartialCDR := prc.cachePartialCDR(pCDR)
var final bool
for _, cdr := range pCDR.cdrs {
if !cdr.Partial {
final = true
break
}
}
if !final {
return nil, nil
}
prc.uncachePartialCDR(cachedPartialCDR)
return cachedPartialCDR.MergeCDRs(), nil
}, 0, originID)
return pCDRIf.(*engine.CDR), err
}
// PartialCDRRecord is a record which can be updated later
// different from PartialFlatstoreRecordsCache which is incomplete (eg: need to calculate duration out of 2 records)
type PartialCDRRecord struct {
@@ -108,5 +216,3 @@ func (partCDR *PartialCDRRecord) MergeCDRs() *engine.CDR {
}
return retCdr
}
//func (partCDR *PartialCDRRecord) AsString() *engine.CDR {