diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index a04d72d3c..117cc0d62 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -90,47 +90,6 @@ func populateStoredCdrField(cdr *engine.StoredCdr, fieldId, fieldVal string) err return nil } -/* -One instance of CDRC will act on one folder. -Common parameters within configs processed: - * cdrS, cdrFormat, cdrInDir, cdrOutDir, runDelay -Parameters specific per config instance: - * duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields -*/ -func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrServer *engine.CdrServer, exitChan chan struct{}) (*Cdrc, error) { - var cdrcCfg *config.CdrcConfig - for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one - break - } - cdrc := &Cdrc{cdrsAddress: cdrcCfg.Cdrs, CdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir, - runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator, - httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles)} - var processCsvFile struct{} - for i := 0; i < cdrcCfg.MaxOpenFiles; i++ { - cdrc.maxOpenFiles <- processCsvFile // Empty initiate so we do not need to wait later when we pop - } - cdrc.cdrSourceIds = make([]string, len(cdrcCfgs)) - cdrc.duMultiplyFactors = make([]float64, len(cdrcCfgs)) - cdrc.cdrFilters = make([]utils.RSRFields, len(cdrcCfgs)) - cdrc.cdrFields = make([][]*config.CfgCdrField, len(cdrcCfgs)) - idx := 0 - for _, cfg := range cdrcCfgs { - cdrc.cdrSourceIds[idx] = cfg.CdrSourceId - cdrc.duMultiplyFactors[idx] = cfg.DataUsageMultiplyFactor - cdrc.cdrFilters[idx] = cfg.CdrFilter - cdrc.cdrFields[idx] = cfg.CdrFields - idx += 1 - } - // Before processing, make sure in and out folders exist - for _, dir := range []string{cdrc.cdrInDir, cdrc.cdrOutDir} { - if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { - return nil, fmt.Errorf("Nonexistent folder: %s", dir) - } - } - cdrc.httpClient = new(http.Client) - return cdrc, nil -} - func NewPartialFlatstoreRecord(record []string) (*PartialFlatstoreRecord, error) { if len(record) < 7 { return nil, errors.New("MISSING_IE") @@ -189,6 +148,48 @@ func pairToRecord(part1, part2 *PartialFlatstoreRecord) ([]string, error) { return record, nil } +/* +One instance of CDRC will act on one folder. +Common parameters within configs processed: + * cdrS, cdrFormat, cdrInDir, cdrOutDir, runDelay +Parameters specific per config instance: + * duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields +*/ +func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrServer *engine.CdrServer, exitChan chan struct{}) (*Cdrc, error) { + var cdrcCfg *config.CdrcConfig + for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one + break + } + cdrc := &Cdrc{cdrsAddress: cdrcCfg.Cdrs, CdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir, + runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator, + httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles), + guard: engine.NewGuardianLock()} + var processCsvFile struct{} + for i := 0; i < cdrcCfg.MaxOpenFiles; i++ { + cdrc.maxOpenFiles <- processCsvFile // Empty initiate so we do not need to wait later when we pop + } + cdrc.cdrSourceIds = make([]string, len(cdrcCfgs)) + cdrc.duMultiplyFactors = make([]float64, len(cdrcCfgs)) + cdrc.cdrFilters = make([]utils.RSRFields, len(cdrcCfgs)) + cdrc.cdrFields = make([][]*config.CfgCdrField, len(cdrcCfgs)) + idx := 0 + for _, cfg := range cdrcCfgs { + cdrc.cdrSourceIds[idx] = cfg.CdrSourceId + cdrc.duMultiplyFactors[idx] = cfg.DataUsageMultiplyFactor + cdrc.cdrFilters[idx] = cfg.CdrFilter + cdrc.cdrFields[idx] = cfg.CdrFields + idx += 1 + } + // Before processing, make sure in and out folders exist + for _, dir := range []string{cdrc.cdrInDir, cdrc.cdrOutDir} { + if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { + return nil, fmt.Errorf("Nonexistent folder: %s", dir) + } + } + cdrc.httpClient = new(http.Client) + return cdrc, nil +} + type Cdrc struct { cdrsAddress, CdrFormat, @@ -206,6 +207,7 @@ type Cdrc struct { exitChan chan struct{} maxOpenFiles chan struct{} // Maximum number of simultaneous files processed partialRecords map[string]map[string]*PartialFlatstoreRecord // [FileName"][AccId]*PartialRecord + guard *engine.GuardianLock } // When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing @@ -332,14 +334,19 @@ func (self *Cdrc) processPartialRecord(record []string, fileName string) ([]stri if err != nil { return nil, err } - // ToDo: cache locking - // ToDo: schedule dumping of the .unpaired files - if fileMp, hasFile := self.partialRecords[fileName]; !hasFile { - self.partialRecords[fileName] = map[string]*PartialFlatstoreRecord{pr.AccId: pr} - return nil, nil - } else if _, hasAccId := fileMp[pr.AccId]; !hasAccId { - self.partialRecords[fileName][pr.AccId] = pr - return nil, nil + doneCaching, _ := self.guard.Guard(func() (interface{}, error) { // Lock caching on fileName + // ToDo: schedule dumping of the .unpaired files + if fileMp, hasFile := self.partialRecords[fileName]; !hasFile { + self.partialRecords[fileName] = map[string]*PartialFlatstoreRecord{pr.AccId: pr} + return true, nil + } else if _, hasAccId := fileMp[pr.AccId]; !hasAccId { + self.partialRecords[fileName][pr.AccId] = pr + return true, nil + } + return nil, nil // No caching done + }, fileName) + if doneCaching != nil { + return nil, nil // Have cached the record, do not proceed with pairing } // The paired is already in cache, build up the final record return pairToRecord(self.partialRecords[fileName][pr.AccId], pr) diff --git a/cdrc/cdrc_test.go b/cdrc/cdrc_test.go index 1351ddba2..a53f442c4 100644 --- a/cdrc/cdrc_test.go +++ b/cdrc/cdrc_test.go @@ -398,7 +398,8 @@ BYE|3111f3c9|49ca4c42|a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:0|200|OK|14 &config.CfgCdrField{Tag: "Duration", Type: utils.CDRFIELD, CdrFieldId: utils.USAGE, Mandatory: true}, &config.CfgCdrField{Tag: "DialogId", Type: utils.CDRFIELD, CdrFieldId: "DialogIdentifier", Value: utils.ParseRSRFieldsMustCompile("11", utils.INFIELD_SEP)}, }} - cdrc := &Cdrc{CdrFormat: utils.OSIPS_FLATSTORE, cdrSourceIds: []string{"TEST_CDRC"}, cdrFields: cdrFields, partialRecords: make(map[string]map[string]*PartialFlatstoreRecord)} + cdrc := &Cdrc{CdrFormat: utils.OSIPS_FLATSTORE, cdrSourceIds: []string{"TEST_CDRC"}, cdrFields: cdrFields, partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), + guard: engine.NewGuardianLock()} cdrsContent := bytes.NewReader([]byte(osipsCdrs)) csvReader := csv.NewReader(cdrsContent) csvReader.Comma = '|' diff --git a/engine/accountlock.go b/engine/guardian.go similarity index 93% rename from engine/accountlock.go rename to engine/guardian.go index ac1bb2675..b6e2f48ad 100644 --- a/engine/accountlock.go +++ b/engine/guardian.go @@ -25,6 +25,10 @@ import ( // global package variable var Guardian = &GuardianLock{queue: make(map[string]chan bool)} +func NewGuardianLock() *GuardianLock { + return &GuardianLock{queue: make(map[string]chan bool)} +} + type GuardianLock struct { queue map[string]chan bool mu sync.Mutex diff --git a/engine/accountlock_test.go b/engine/guardian_test.go similarity index 100% rename from engine/accountlock_test.go rename to engine/guardian_test.go