mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-15 13:19:53 +05:00
fix wrong merge
This commit is contained in:
198
cdrc/cdrc.go
198
cdrc/cdrc.go
@@ -27,8 +27,6 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
@@ -285,199 +283,3 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
fn, newPath, procRowNr, time.Now().Sub(timeStart)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Processes a single partial record for flatstore CDRs
|
||||
func (self *Cdrc) processPartialRecord(record []string, fileName string) ([]string, error) {
|
||||
if strings.HasPrefix(fileName, self.failedCallsPrefix) { // Use the first index since they should be the same in all configs
|
||||
record = append(record, "0") // Append duration 0 for failed calls flatstore CDR and do not process it further
|
||||
return record, nil
|
||||
}
|
||||
pr, err := NewPartialFlatstoreRecord(record)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Retrieve and complete the record from cache
|
||||
var cachedFilename string
|
||||
var cachedPartial *PartialFlatstoreRecord
|
||||
cachedFNames := []string{fileName} // Higher probability to match as firstFileName
|
||||
for fName := range self.partialRecords {
|
||||
if fName != fileName {
|
||||
cachedFNames = append(cachedFNames, fName)
|
||||
}
|
||||
}
|
||||
for _, fName := range cachedFNames { // Need to lock them individually
|
||||
self.guard.Guard(func() (interface{}, error) {
|
||||
var hasPartial bool
|
||||
if cachedPartial, hasPartial = self.partialRecords[fName][pr.AccId]; hasPartial {
|
||||
cachedFilename = fName
|
||||
}
|
||||
return nil, nil
|
||||
}, fName)
|
||||
if cachedPartial != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if cachedPartial == nil { // Not cached, do it here and stop processing
|
||||
self.guard.Guard(func() (interface{}, error) {
|
||||
if fileMp, hasFile := self.partialRecords[fileName]; !hasFile {
|
||||
self.partialRecords[fileName] = map[string]*PartialFlatstoreRecord{pr.AccId: pr}
|
||||
if self.partialRecordCache != 0 { // Schedule expiry/dump of the just created entry in cache
|
||||
go func() {
|
||||
time.Sleep(self.partialRecordCache)
|
||||
self.dumpUnpairedRecords(fileName)
|
||||
}()
|
||||
}
|
||||
} else if _, hasAccId := fileMp[pr.AccId]; !hasAccId {
|
||||
self.partialRecords[fileName][pr.AccId] = pr
|
||||
}
|
||||
return nil, nil
|
||||
}, fileName)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
pairedRecord, err := pairToRecord(cachedPartial, pr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
self.guard.Guard(func() (interface{}, error) {
|
||||
delete(self.partialRecords[cachedFilename], pr.AccId) // Remove the record out of cache
|
||||
return nil, nil
|
||||
}, fileName)
|
||||
return pairedRecord, nil
|
||||
}
|
||||
|
||||
// Dumps the cache into a .unpaired file in the outdir and cleans cache after
|
||||
func (self *Cdrc) dumpUnpairedRecords(fileName string) error {
|
||||
_, err := self.guard.Guard(func() (interface{}, error) {
|
||||
if len(self.partialRecords[fileName]) != 0 { // Only write the file if there are records in the cache
|
||||
unpairedFilePath := path.Join(self.cdrOutDir, fileName+UNPAIRED_SUFFIX)
|
||||
fileOut, err := os.Create(unpairedFilePath)
|
||||
if err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed creating %s, error: %s", unpairedFilePath, err.Error()))
|
||||
return nil, err
|
||||
}
|
||||
csvWriter := csv.NewWriter(fileOut)
|
||||
csvWriter.Comma = self.csvSep
|
||||
for _, pr := range self.partialRecords[fileName] {
|
||||
if err := csvWriter.Write(pr.Values); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed writing unpaired record %v to file: %s, error: %s", pr, unpairedFilePath, err.Error()))
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
csvWriter.Flush()
|
||||
}
|
||||
delete(self.partialRecords, fileName)
|
||||
return nil, nil
|
||||
}, fileName)
|
||||
return err
|
||||
}
|
||||
|
||||
// Takes the record from a slice and turns it into StoredCdrs, posting them to the cdrServer
|
||||
func (self *Cdrc) processRecord(record []string, srcRowNr int) error {
|
||||
recordCdrs := make([]*engine.StoredCdr, 0) // More CDRs based on the number of filters and field templates
|
||||
for idx := range self.cdrFields { // cdrFields coming from more templates will produce individual storCdr records
|
||||
// Make sure filters are matching
|
||||
filterBreak := false
|
||||
for _, rsrFilter := range self.cdrFilters[idx] {
|
||||
if rsrFilter == nil { // Nil filter does not need to match anything
|
||||
continue
|
||||
}
|
||||
if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx {
|
||||
return fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter)
|
||||
} else if !rsrFilter.FilterPasses(record[cfgFieldIdx]) {
|
||||
filterBreak = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if filterBreak { // Stop importing cdrc fields profile due to non matching filter
|
||||
continue
|
||||
}
|
||||
if storedCdr, err := self.recordToStoredCdr(record, idx); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Row %d - failed converting to StoredCdr, error: %s", srcRowNr, err.Error()))
|
||||
continue
|
||||
} else {
|
||||
recordCdrs = append(recordCdrs, storedCdr)
|
||||
}
|
||||
}
|
||||
for _, storedCdr := range recordCdrs {
|
||||
if self.cdrsAddress == utils.INTERNAL {
|
||||
if err := self.cdrServer.ProcessCdr(storedCdr); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, row: %d, error: %s", srcRowNr, err.Error()))
|
||||
continue
|
||||
}
|
||||
} else { // CDRs listening on IP
|
||||
if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cdr_http", self.cdrsAddress), storedCdr.AsHttpForm()); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, row: %d, error: %s", srcRowNr, err.Error()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Takes the record out of csv and turns it into storedCdr which can be processed by CDRS
|
||||
func (self *Cdrc) recordToStoredCdr(record []string, cfgIdx int) (*engine.StoredCdr, error) {
|
||||
storedCdr := &engine.StoredCdr{CdrHost: "0.0.0.0", CdrSource: self.cdrSourceIds[cfgIdx], ExtraFields: make(map[string]string), Cost: -1}
|
||||
var err error
|
||||
var lazyHttpFields []*config.CfgCdrField
|
||||
for _, cdrFldCfg := range self.cdrFields[cfgIdx] {
|
||||
if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) { // Hardcode some values in case of flatstore
|
||||
switch cdrFldCfg.CdrFieldId {
|
||||
case utils.ACCID:
|
||||
cdrFldCfg.Value = utils.ParseRSRFieldsMustCompile("3;1;2", utils.INFIELD_SEP) // in case of flatstore, accounting id is made up out of callid, from_tag and to_tag
|
||||
case utils.USAGE:
|
||||
cdrFldCfg.Value = utils.ParseRSRFieldsMustCompile(strconv.Itoa(len(record)-1), utils.INFIELD_SEP) // in case of flatstore, last element will be the duration computed by us
|
||||
}
|
||||
|
||||
}
|
||||
var fieldVal string
|
||||
if utils.IsSliceMember([]string{CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) {
|
||||
if cdrFldCfg.Type == utils.CDRFIELD {
|
||||
for _, cfgFieldRSR := range cdrFldCfg.Value {
|
||||
if cfgFieldRSR.IsStatic() {
|
||||
fieldVal += cfgFieldRSR.ParseValue("")
|
||||
} else { // Dynamic value extracted using index
|
||||
if cfgFieldIdx, _ := strconv.Atoi(cfgFieldRSR.Id); len(record) <= cfgFieldIdx {
|
||||
return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, cdrFldCfg.Tag)
|
||||
} else {
|
||||
fieldVal += cfgFieldRSR.ParseValue(record[cfgFieldIdx])
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if cdrFldCfg.Type == utils.HTTP_POST {
|
||||
lazyHttpFields = append(lazyHttpFields, cdrFldCfg) // Will process later so we can send an estimation of storedCdr to http server
|
||||
} else {
|
||||
return nil, fmt.Errorf("Unsupported field type: %s", cdrFldCfg.Type)
|
||||
}
|
||||
} else { // Modify here when we add more supported cdr formats
|
||||
return nil, fmt.Errorf("Unsupported CDR file format: %s", self.CdrFormat)
|
||||
}
|
||||
if err := populateStoredCdrField(storedCdr, cdrFldCfg.CdrFieldId, fieldVal); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
storedCdr.CgrId = utils.Sha1(storedCdr.AccId, storedCdr.SetupTime.String())
|
||||
if storedCdr.TOR == utils.DATA && self.duMultiplyFactors[cfgIdx] != 0 {
|
||||
storedCdr.Usage = time.Duration(float64(storedCdr.Usage.Nanoseconds()) * self.duMultiplyFactors[cfgIdx])
|
||||
}
|
||||
for _, httpFieldCfg := range lazyHttpFields { // Lazy process the http fields
|
||||
var outValByte []byte
|
||||
var fieldVal, httpAddr string
|
||||
for _, rsrFld := range httpFieldCfg.Value {
|
||||
httpAddr += rsrFld.ParseValue("")
|
||||
}
|
||||
if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, storedCdr); err != nil && httpFieldCfg.Mandatory {
|
||||
return nil, err
|
||||
} else {
|
||||
fieldVal = string(outValByte)
|
||||
if len(fieldVal) == 0 && httpFieldCfg.Mandatory {
|
||||
return nil, fmt.Errorf("MandatoryIeMissing: Empty result for http_post field: %s", httpFieldCfg.Tag)
|
||||
}
|
||||
if err := populateStoredCdrField(storedCdr, httpFieldCfg.CdrFieldId, fieldVal); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return storedCdr, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user