From ba7d40d749bcd106f495373a0ce0d557fbc3d829 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Mon, 27 Jul 2015 16:35:05 +0300 Subject: [PATCH] fix wrong merge --- cdrc/cdrc.go | 198 --------------------------------------------------- 1 file changed, 198 deletions(-) diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 41daa0a4e..6aaa89c05 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -27,8 +27,6 @@ import ( "net/http" "os" "path" - "strconv" - "strings" "time" "github.com/cgrates/cgrates/config" @@ -285,199 +283,3 @@ func (self *Cdrc) processFile(filePath string) error { fn, newPath, procRowNr, time.Now().Sub(timeStart))) return nil } - -// Processes a single partial record for flatstore CDRs -func (self *Cdrc) processPartialRecord(record []string, fileName string) ([]string, error) { - if strings.HasPrefix(fileName, self.failedCallsPrefix) { // Use the first index since they should be the same in all configs - record = append(record, "0") // Append duration 0 for failed calls flatstore CDR and do not process it further - return record, nil - } - pr, err := NewPartialFlatstoreRecord(record) - if err != nil { - return nil, err - } - // Retrieve and complete the record from cache - var cachedFilename string - var cachedPartial *PartialFlatstoreRecord - cachedFNames := []string{fileName} // Higher probability to match as firstFileName - for fName := range self.partialRecords { - if fName != fileName { - cachedFNames = append(cachedFNames, fName) - } - } - for _, fName := range cachedFNames { // Need to lock them individually - self.guard.Guard(func() (interface{}, error) { - var hasPartial bool - if cachedPartial, hasPartial = self.partialRecords[fName][pr.AccId]; hasPartial { - cachedFilename = fName - } - return nil, nil - }, fName) - if cachedPartial != nil { - break - } - } - - if cachedPartial == nil { // Not cached, do it here and stop processing - self.guard.Guard(func() (interface{}, error) { - if fileMp, hasFile := self.partialRecords[fileName]; !hasFile { - self.partialRecords[fileName] = map[string]*PartialFlatstoreRecord{pr.AccId: pr} - if self.partialRecordCache != 0 { // Schedule expiry/dump of the just created entry in cache - go func() { - time.Sleep(self.partialRecordCache) - self.dumpUnpairedRecords(fileName) - }() - } - } else if _, hasAccId := fileMp[pr.AccId]; !hasAccId { - self.partialRecords[fileName][pr.AccId] = pr - } - return nil, nil - }, fileName) - return nil, nil - } - - pairedRecord, err := pairToRecord(cachedPartial, pr) - if err != nil { - return nil, err - } - self.guard.Guard(func() (interface{}, error) { - delete(self.partialRecords[cachedFilename], pr.AccId) // Remove the record out of cache - return nil, nil - }, fileName) - return pairedRecord, nil -} - -// Dumps the cache into a .unpaired file in the outdir and cleans cache after -func (self *Cdrc) dumpUnpairedRecords(fileName string) error { - _, err := self.guard.Guard(func() (interface{}, error) { - if len(self.partialRecords[fileName]) != 0 { // Only write the file if there are records in the cache - unpairedFilePath := path.Join(self.cdrOutDir, fileName+UNPAIRED_SUFFIX) - fileOut, err := os.Create(unpairedFilePath) - if err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed creating %s, error: %s", unpairedFilePath, err.Error())) - return nil, err - } - csvWriter := csv.NewWriter(fileOut) - csvWriter.Comma = self.csvSep - for _, pr := range self.partialRecords[fileName] { - if err := csvWriter.Write(pr.Values); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed writing unpaired record %v to file: %s, error: %s", pr, unpairedFilePath, err.Error())) - return nil, err - } - } - csvWriter.Flush() - } - delete(self.partialRecords, fileName) - return nil, nil - }, fileName) - return err -} - -// Takes the record from a slice and turns it into StoredCdrs, posting them to the cdrServer -func (self *Cdrc) processRecord(record []string, srcRowNr int) error { - recordCdrs := make([]*engine.StoredCdr, 0) // More CDRs based on the number of filters and field templates - for idx := range self.cdrFields { // cdrFields coming from more templates will produce individual storCdr records - // Make sure filters are matching - filterBreak := false - for _, rsrFilter := range self.cdrFilters[idx] { - if rsrFilter == nil { // Nil filter does not need to match anything - continue - } - if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx { - return fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter) - } else if !rsrFilter.FilterPasses(record[cfgFieldIdx]) { - filterBreak = true - break - } - } - if filterBreak { // Stop importing cdrc fields profile due to non matching filter - continue - } - if storedCdr, err := self.recordToStoredCdr(record, idx); err != nil { - engine.Logger.Err(fmt.Sprintf(" Row %d - failed converting to StoredCdr, error: %s", srcRowNr, err.Error())) - continue - } else { - recordCdrs = append(recordCdrs, storedCdr) - } - } - for _, storedCdr := range recordCdrs { - if self.cdrsAddress == utils.INTERNAL { - if err := self.cdrServer.ProcessCdr(storedCdr); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, row: %d, error: %s", srcRowNr, err.Error())) - continue - } - } else { // CDRs listening on IP - if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cdr_http", self.cdrsAddress), storedCdr.AsHttpForm()); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, row: %d, error: %s", srcRowNr, err.Error())) - continue - } - } - } - return nil -} - -// Takes the record out of csv and turns it into storedCdr which can be processed by CDRS -func (self *Cdrc) recordToStoredCdr(record []string, cfgIdx int) (*engine.StoredCdr, error) { - storedCdr := &engine.StoredCdr{CdrHost: "0.0.0.0", CdrSource: self.cdrSourceIds[cfgIdx], ExtraFields: make(map[string]string), Cost: -1} - var err error - var lazyHttpFields []*config.CfgCdrField - for _, cdrFldCfg := range self.cdrFields[cfgIdx] { - if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) { // Hardcode some values in case of flatstore - switch cdrFldCfg.CdrFieldId { - case utils.ACCID: - cdrFldCfg.Value = utils.ParseRSRFieldsMustCompile("3;1;2", utils.INFIELD_SEP) // in case of flatstore, accounting id is made up out of callid, from_tag and to_tag - case utils.USAGE: - cdrFldCfg.Value = utils.ParseRSRFieldsMustCompile(strconv.Itoa(len(record)-1), utils.INFIELD_SEP) // in case of flatstore, last element will be the duration computed by us - } - - } - var fieldVal string - if utils.IsSliceMember([]string{CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) { - if cdrFldCfg.Type == utils.CDRFIELD { - for _, cfgFieldRSR := range cdrFldCfg.Value { - if cfgFieldRSR.IsStatic() { - fieldVal += cfgFieldRSR.ParseValue("") - } else { // Dynamic value extracted using index - if cfgFieldIdx, _ := strconv.Atoi(cfgFieldRSR.Id); len(record) <= cfgFieldIdx { - return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, cdrFldCfg.Tag) - } else { - fieldVal += cfgFieldRSR.ParseValue(record[cfgFieldIdx]) - } - } - } - } else if cdrFldCfg.Type == utils.HTTP_POST { - lazyHttpFields = append(lazyHttpFields, cdrFldCfg) // Will process later so we can send an estimation of storedCdr to http server - } else { - return nil, fmt.Errorf("Unsupported field type: %s", cdrFldCfg.Type) - } - } else { // Modify here when we add more supported cdr formats - return nil, fmt.Errorf("Unsupported CDR file format: %s", self.CdrFormat) - } - if err := populateStoredCdrField(storedCdr, cdrFldCfg.CdrFieldId, fieldVal); err != nil { - return nil, err - } - } - storedCdr.CgrId = utils.Sha1(storedCdr.AccId, storedCdr.SetupTime.String()) - if storedCdr.TOR == utils.DATA && self.duMultiplyFactors[cfgIdx] != 0 { - storedCdr.Usage = time.Duration(float64(storedCdr.Usage.Nanoseconds()) * self.duMultiplyFactors[cfgIdx]) - } - for _, httpFieldCfg := range lazyHttpFields { // Lazy process the http fields - var outValByte []byte - var fieldVal, httpAddr string - for _, rsrFld := range httpFieldCfg.Value { - httpAddr += rsrFld.ParseValue("") - } - if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, storedCdr); err != nil && httpFieldCfg.Mandatory { - return nil, err - } else { - fieldVal = string(outValByte) - if len(fieldVal) == 0 && httpFieldCfg.Mandatory { - return nil, fmt.Errorf("MandatoryIeMissing: Empty result for http_post field: %s", httpFieldCfg.Tag) - } - if err := populateStoredCdrField(storedCdr, httpFieldCfg.CdrFieldId, fieldVal); err != nil { - return nil, err - } - } - } - return storedCdr, nil -}