mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add NewGuardianConstructor, rename accountlock.go->guardian.go files, adding lock on caching the partial flatstore records
This commit is contained in:
105
cdrc/cdrc.go
105
cdrc/cdrc.go
@@ -90,47 +90,6 @@ func populateStoredCdrField(cdr *engine.StoredCdr, fieldId, fieldVal string) err
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
One instance of CDRC will act on one folder.
|
||||
Common parameters within configs processed:
|
||||
* cdrS, cdrFormat, cdrInDir, cdrOutDir, runDelay
|
||||
Parameters specific per config instance:
|
||||
* duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields
|
||||
*/
|
||||
func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrServer *engine.CdrServer, exitChan chan struct{}) (*Cdrc, error) {
|
||||
var cdrcCfg *config.CdrcConfig
|
||||
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
|
||||
break
|
||||
}
|
||||
cdrc := &Cdrc{cdrsAddress: cdrcCfg.Cdrs, CdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir,
|
||||
runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator,
|
||||
httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles)}
|
||||
var processCsvFile struct{}
|
||||
for i := 0; i < cdrcCfg.MaxOpenFiles; i++ {
|
||||
cdrc.maxOpenFiles <- processCsvFile // Empty initiate so we do not need to wait later when we pop
|
||||
}
|
||||
cdrc.cdrSourceIds = make([]string, len(cdrcCfgs))
|
||||
cdrc.duMultiplyFactors = make([]float64, len(cdrcCfgs))
|
||||
cdrc.cdrFilters = make([]utils.RSRFields, len(cdrcCfgs))
|
||||
cdrc.cdrFields = make([][]*config.CfgCdrField, len(cdrcCfgs))
|
||||
idx := 0
|
||||
for _, cfg := range cdrcCfgs {
|
||||
cdrc.cdrSourceIds[idx] = cfg.CdrSourceId
|
||||
cdrc.duMultiplyFactors[idx] = cfg.DataUsageMultiplyFactor
|
||||
cdrc.cdrFilters[idx] = cfg.CdrFilter
|
||||
cdrc.cdrFields[idx] = cfg.CdrFields
|
||||
idx += 1
|
||||
}
|
||||
// Before processing, make sure in and out folders exist
|
||||
for _, dir := range []string{cdrc.cdrInDir, cdrc.cdrOutDir} {
|
||||
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("Nonexistent folder: %s", dir)
|
||||
}
|
||||
}
|
||||
cdrc.httpClient = new(http.Client)
|
||||
return cdrc, nil
|
||||
}
|
||||
|
||||
func NewPartialFlatstoreRecord(record []string) (*PartialFlatstoreRecord, error) {
|
||||
if len(record) < 7 {
|
||||
return nil, errors.New("MISSING_IE")
|
||||
@@ -189,6 +148,48 @@ func pairToRecord(part1, part2 *PartialFlatstoreRecord) ([]string, error) {
|
||||
return record, nil
|
||||
}
|
||||
|
||||
/*
|
||||
One instance of CDRC will act on one folder.
|
||||
Common parameters within configs processed:
|
||||
* cdrS, cdrFormat, cdrInDir, cdrOutDir, runDelay
|
||||
Parameters specific per config instance:
|
||||
* duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields
|
||||
*/
|
||||
func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrServer *engine.CdrServer, exitChan chan struct{}) (*Cdrc, error) {
|
||||
var cdrcCfg *config.CdrcConfig
|
||||
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
|
||||
break
|
||||
}
|
||||
cdrc := &Cdrc{cdrsAddress: cdrcCfg.Cdrs, CdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir,
|
||||
runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator,
|
||||
httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles),
|
||||
guard: engine.NewGuardianLock()}
|
||||
var processCsvFile struct{}
|
||||
for i := 0; i < cdrcCfg.MaxOpenFiles; i++ {
|
||||
cdrc.maxOpenFiles <- processCsvFile // Empty initiate so we do not need to wait later when we pop
|
||||
}
|
||||
cdrc.cdrSourceIds = make([]string, len(cdrcCfgs))
|
||||
cdrc.duMultiplyFactors = make([]float64, len(cdrcCfgs))
|
||||
cdrc.cdrFilters = make([]utils.RSRFields, len(cdrcCfgs))
|
||||
cdrc.cdrFields = make([][]*config.CfgCdrField, len(cdrcCfgs))
|
||||
idx := 0
|
||||
for _, cfg := range cdrcCfgs {
|
||||
cdrc.cdrSourceIds[idx] = cfg.CdrSourceId
|
||||
cdrc.duMultiplyFactors[idx] = cfg.DataUsageMultiplyFactor
|
||||
cdrc.cdrFilters[idx] = cfg.CdrFilter
|
||||
cdrc.cdrFields[idx] = cfg.CdrFields
|
||||
idx += 1
|
||||
}
|
||||
// Before processing, make sure in and out folders exist
|
||||
for _, dir := range []string{cdrc.cdrInDir, cdrc.cdrOutDir} {
|
||||
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("Nonexistent folder: %s", dir)
|
||||
}
|
||||
}
|
||||
cdrc.httpClient = new(http.Client)
|
||||
return cdrc, nil
|
||||
}
|
||||
|
||||
type Cdrc struct {
|
||||
cdrsAddress,
|
||||
CdrFormat,
|
||||
@@ -206,6 +207,7 @@ type Cdrc struct {
|
||||
exitChan chan struct{}
|
||||
maxOpenFiles chan struct{} // Maximum number of simultaneous files processed
|
||||
partialRecords map[string]map[string]*PartialFlatstoreRecord // [FileName"][AccId]*PartialRecord
|
||||
guard *engine.GuardianLock
|
||||
}
|
||||
|
||||
// When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing
|
||||
@@ -332,14 +334,19 @@ func (self *Cdrc) processPartialRecord(record []string, fileName string) ([]stri
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// ToDo: cache locking
|
||||
// ToDo: schedule dumping of the .unpaired files
|
||||
if fileMp, hasFile := self.partialRecords[fileName]; !hasFile {
|
||||
self.partialRecords[fileName] = map[string]*PartialFlatstoreRecord{pr.AccId: pr}
|
||||
return nil, nil
|
||||
} else if _, hasAccId := fileMp[pr.AccId]; !hasAccId {
|
||||
self.partialRecords[fileName][pr.AccId] = pr
|
||||
return nil, nil
|
||||
doneCaching, _ := self.guard.Guard(func() (interface{}, error) { // Lock caching on fileName
|
||||
// ToDo: schedule dumping of the .unpaired files
|
||||
if fileMp, hasFile := self.partialRecords[fileName]; !hasFile {
|
||||
self.partialRecords[fileName] = map[string]*PartialFlatstoreRecord{pr.AccId: pr}
|
||||
return true, nil
|
||||
} else if _, hasAccId := fileMp[pr.AccId]; !hasAccId {
|
||||
self.partialRecords[fileName][pr.AccId] = pr
|
||||
return true, nil
|
||||
}
|
||||
return nil, nil // No caching done
|
||||
}, fileName)
|
||||
if doneCaching != nil {
|
||||
return nil, nil // Have cached the record, do not proceed with pairing
|
||||
}
|
||||
// The paired is already in cache, build up the final record
|
||||
return pairToRecord(self.partialRecords[fileName][pr.AccId], pr)
|
||||
|
||||
@@ -398,7 +398,8 @@ BYE|3111f3c9|49ca4c42|a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:0|200|OK|14
|
||||
&config.CfgCdrField{Tag: "Duration", Type: utils.CDRFIELD, CdrFieldId: utils.USAGE, Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "DialogId", Type: utils.CDRFIELD, CdrFieldId: "DialogIdentifier", Value: utils.ParseRSRFieldsMustCompile("11", utils.INFIELD_SEP)},
|
||||
}}
|
||||
cdrc := &Cdrc{CdrFormat: utils.OSIPS_FLATSTORE, cdrSourceIds: []string{"TEST_CDRC"}, cdrFields: cdrFields, partialRecords: make(map[string]map[string]*PartialFlatstoreRecord)}
|
||||
cdrc := &Cdrc{CdrFormat: utils.OSIPS_FLATSTORE, cdrSourceIds: []string{"TEST_CDRC"}, cdrFields: cdrFields, partialRecords: make(map[string]map[string]*PartialFlatstoreRecord),
|
||||
guard: engine.NewGuardianLock()}
|
||||
cdrsContent := bytes.NewReader([]byte(osipsCdrs))
|
||||
csvReader := csv.NewReader(cdrsContent)
|
||||
csvReader.Comma = '|'
|
||||
|
||||
@@ -25,6 +25,10 @@ import (
|
||||
// global package variable
|
||||
var Guardian = &GuardianLock{queue: make(map[string]chan bool)}
|
||||
|
||||
func NewGuardianLock() *GuardianLock {
|
||||
return &GuardianLock{queue: make(map[string]chan bool)}
|
||||
}
|
||||
|
||||
type GuardianLock struct {
|
||||
queue map[string]chan bool
|
||||
mu sync.Mutex
|
||||
Reference in New Issue
Block a user