mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
CDRC refactoring to support plugable file processors
This commit is contained in:
@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package v1
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -54,9 +53,6 @@ func (self *ApierV1) GetMaxUsage(usageRecord engine.UsageRecord, maxUsage *float
|
||||
if usageRecord.SetupTime == "" {
|
||||
usageRecord.SetupTime = utils.META_NOW
|
||||
}
|
||||
if usageRecord.Usage == "" {
|
||||
usageRecord.Usage = strconv.FormatFloat(self.Config.MaxCallDuration.Seconds(), 'f', -1, 64)
|
||||
}
|
||||
storedCdr, err := usageRecord.AsStoredCdr()
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
|
||||
360
cdrc/cdrc.go
360
cdrc/cdrc.go
@@ -21,15 +21,12 @@ package cdrc
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/csv"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
@@ -92,62 +89,9 @@ func populateStoredCdrField(cdr *engine.StoredCdr, fieldId, fieldVal string) err
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewPartialFlatstoreRecord(record []string) (*PartialFlatstoreRecord, error) {
|
||||
if len(record) < 7 {
|
||||
return nil, errors.New("MISSING_IE")
|
||||
}
|
||||
pr := &PartialFlatstoreRecord{Method: record[0], AccId: record[3] + record[1] + record[2], Values: record}
|
||||
var err error
|
||||
if pr.Timestamp, err = utils.ParseTimeDetectLayout(record[6]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pr, nil
|
||||
}
|
||||
|
||||
// This is a partial record received from Flatstore, can be INVITE or BYE and it needs to be paired in order to produce duration
|
||||
type PartialFlatstoreRecord struct {
|
||||
Method string // INVITE or BYE
|
||||
AccId string // Copute here the AccId
|
||||
Timestamp time.Time // Timestamp of the event, as written by db_flastore module
|
||||
Values []string // Can contain original values or updated via UpdateValues
|
||||
}
|
||||
|
||||
// Pairs INVITE and BYE into final record containing as last element the duration
|
||||
func pairToRecord(part1, part2 *PartialFlatstoreRecord) ([]string, error) {
|
||||
var invite, bye *PartialFlatstoreRecord
|
||||
if part1.Method == "INVITE" {
|
||||
invite = part1
|
||||
} else if part2.Method == "INVITE" {
|
||||
invite = part2
|
||||
} else {
|
||||
return nil, errors.New("MISSING_INVITE")
|
||||
}
|
||||
if part1.Method == "BYE" {
|
||||
bye = part1
|
||||
} else if part2.Method == "BYE" {
|
||||
bye = part2
|
||||
} else {
|
||||
return nil, errors.New("MISSING_BYE")
|
||||
}
|
||||
if len(invite.Values) != len(bye.Values) {
|
||||
return nil, errors.New("INCONSISTENT_VALUES_LENGTH")
|
||||
}
|
||||
record := invite.Values
|
||||
for idx := range record {
|
||||
switch idx {
|
||||
case 0, 1, 2, 3, 6: // Leave these values as they are
|
||||
case 4, 5:
|
||||
record[idx] = bye.Values[idx] // Update record with status from bye
|
||||
default:
|
||||
if bye.Values[idx] != "" { // Any value higher than 6 is dynamically inserted, overwrite if non empty
|
||||
record[idx] = bye.Values[idx]
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
callDur := bye.Timestamp.Sub(invite.Timestamp)
|
||||
record = append(record, strconv.FormatFloat(callDur.Seconds(), 'f', -1, 64))
|
||||
return record, nil
|
||||
// Understands and processes a specific format of cdr (eg: .csv or .fwv)
|
||||
type RecordsProcessor interface {
|
||||
ProcessNextRecord() ([]*engine.StoredCdr, error) // Process a single record in the CDR file, return a slice of CDRs since based on configuration we can have more templates
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -157,28 +101,31 @@ Common parameters within configs processed:
|
||||
Parameters specific per config instance:
|
||||
* duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields
|
||||
*/
|
||||
func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrServer *engine.CdrServer, exitChan chan struct{}) (*Cdrc, error) {
|
||||
func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrs engine.Connector, exitChan chan struct{}) (*Cdrc, error) {
|
||||
var cdrcCfg *config.CdrcConfig
|
||||
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
|
||||
break
|
||||
}
|
||||
cdrc := &Cdrc{cdrsAddress: cdrcCfg.Cdrs, CdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir,
|
||||
cdrc := &Cdrc{cdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir,
|
||||
runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator,
|
||||
httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles),
|
||||
partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), guard: engine.NewGuardianLock()}
|
||||
var processCsvFile struct{}
|
||||
httpSkipTlsCheck: httpSkipTlsCheck, cdrs: cdrs, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles),
|
||||
}
|
||||
var processFile struct{}
|
||||
for i := 0; i < cdrcCfg.MaxOpenFiles; i++ {
|
||||
cdrc.maxOpenFiles <- processCsvFile // Empty initiate so we do not need to wait later when we pop
|
||||
cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop
|
||||
}
|
||||
cdrc.cdrSourceIds = make([]string, len(cdrcCfgs))
|
||||
cdrc.duMultiplyFactors = make([]float64, len(cdrcCfgs))
|
||||
cdrc.cdrFilters = make([]utils.RSRFields, len(cdrcCfgs))
|
||||
cdrc.cdrFields = make([][]*config.CfgCdrField, len(cdrcCfgs))
|
||||
idx := 0
|
||||
var err error
|
||||
for _, cfg := range cdrcCfgs {
|
||||
if idx == 0 { // Steal the config from just one instance since it should be the same for all
|
||||
cdrc.partialRecordCache = cfg.PartialRecordCache
|
||||
cdrc.failedCallsPrefix = cfg.FailedCallsPrefix
|
||||
if cdrc.partialRecordsCache, err = NewPartialRecordsCache(cdrcCfg.PartialRecordCache, cdrcCfg.CdrOutDir, cdrcCfg.FieldSeparator); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
cdrc.cdrSourceIds[idx] = cfg.CdrSourceId
|
||||
cdrc.duMultiplyFactors[idx] = cfg.DataUsageMultiplyFactor
|
||||
@@ -197,25 +144,22 @@ func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrS
|
||||
}
|
||||
|
||||
type Cdrc struct {
|
||||
cdrsAddress,
|
||||
CdrFormat,
|
||||
cdrFormat,
|
||||
cdrInDir,
|
||||
cdrOutDir string
|
||||
failedCallsPrefix string // Configured failedCallsPrefix, used in case of flatstore CDRs
|
||||
cdrSourceIds []string // Should be in sync with cdrFields on indexes
|
||||
runDelay time.Duration
|
||||
csvSep rune
|
||||
duMultiplyFactors []float64
|
||||
cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes
|
||||
cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters
|
||||
httpSkipTlsCheck bool
|
||||
cdrServer *engine.CdrServer // Reference towards internal cdrServer if that is the case
|
||||
httpClient *http.Client
|
||||
exitChan chan struct{}
|
||||
maxOpenFiles chan struct{} // Maximum number of simultaneous files processed
|
||||
partialRecords map[string]map[string]*PartialFlatstoreRecord // [FileName"][AccId]*PartialRecord
|
||||
partialRecordCache time.Duration // Duration to cache partial records for
|
||||
guard *engine.GuardianLock
|
||||
failedCallsPrefix string // Configured failedCallsPrefix, used in case of flatstore CDRs
|
||||
cdrSourceIds []string // Should be in sync with cdrFields on indexes
|
||||
runDelay time.Duration
|
||||
csvSep rune
|
||||
duMultiplyFactors []float64
|
||||
cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes
|
||||
cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters
|
||||
httpSkipTlsCheck bool
|
||||
cdrs engine.Connector
|
||||
httpClient *http.Client
|
||||
exitChan chan struct{}
|
||||
maxOpenFiles chan struct{} // Maximum number of simultaneous files processed
|
||||
partialRecordsCache *PartialRecordsCache // Shared between all files in the folder we process
|
||||
}
|
||||
|
||||
// When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing
|
||||
@@ -256,9 +200,9 @@ func (self *Cdrc) trackCDRFiles() (err error) {
|
||||
engine.Logger.Info(fmt.Sprintf("<Cdrc> Shutting down CDRC on path %s.", self.cdrInDir))
|
||||
return nil
|
||||
case ev := <-watcher.Events:
|
||||
if ev.Op&fsnotify.Create == fsnotify.Create && (self.CdrFormat != FS_CSV || path.Ext(ev.Name) != ".csv") {
|
||||
if ev.Op&fsnotify.Create == fsnotify.Create && (self.cdrFormat != FS_CSV || path.Ext(ev.Name) != ".csv") {
|
||||
go func() { //Enable async processing here
|
||||
if err = self.processCsvFile(ev.Name); err != nil {
|
||||
if err = self.processFile(ev.Name); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", ev.Name, err.Error()))
|
||||
}
|
||||
}()
|
||||
@@ -274,9 +218,9 @@ func (self *Cdrc) processCdrDir() error {
|
||||
engine.Logger.Info(fmt.Sprintf("<Cdrc> Parsing folder %s for CDR files.", self.cdrInDir))
|
||||
filesInDir, _ := ioutil.ReadDir(self.cdrInDir)
|
||||
for _, file := range filesInDir {
|
||||
if self.CdrFormat != FS_CSV || path.Ext(file.Name()) != ".csv" {
|
||||
if self.cdrFormat != FS_CSV || path.Ext(file.Name()) != ".csv" {
|
||||
go func() { //Enable async processing here
|
||||
if err := self.processCsvFile(path.Join(self.cdrInDir, file.Name())); err != nil {
|
||||
if err := self.processFile(path.Join(self.cdrInDir, file.Name())); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", file, err.Error()))
|
||||
}
|
||||
}()
|
||||
@@ -286,10 +230,10 @@ func (self *Cdrc) processCdrDir() error {
|
||||
}
|
||||
|
||||
// Processe file at filePath and posts the valid cdr rows out of it
|
||||
func (self *Cdrc) processCsvFile(filePath string) error {
|
||||
func (self *Cdrc) processFile(filePath string) error {
|
||||
if cap(self.maxOpenFiles) != 0 { // 0 goes for no limit
|
||||
processCsvFile := <-self.maxOpenFiles // Queue here for maxOpenFiles
|
||||
defer func() { self.maxOpenFiles <- processCsvFile }()
|
||||
processFile := <-self.maxOpenFiles // Queue here for maxOpenFiles
|
||||
defer func() { self.maxOpenFiles <- processFile }()
|
||||
}
|
||||
_, fn := path.Split(filePath)
|
||||
engine.Logger.Info(fmt.Sprintf("<Cdrc> Parsing: %s", filePath))
|
||||
@@ -299,33 +243,33 @@ func (self *Cdrc) processCsvFile(filePath string) error {
|
||||
engine.Logger.Crit(err.Error())
|
||||
return err
|
||||
}
|
||||
csvReader := csv.NewReader(bufio.NewReader(file))
|
||||
csvReader.Comma = self.csvSep
|
||||
var recordsProcessor RecordsProcessor
|
||||
if utils.IsSliceMember([]string{CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.cdrFormat) {
|
||||
csvReader := csv.NewReader(bufio.NewReader(file))
|
||||
csvReader.Comma = self.csvSep
|
||||
recordsProcessor = NewCsvRecordsProcessor(csvReader, self.cdrFormat, fn, self.failedCallsPrefix,
|
||||
self.cdrSourceIds, self.duMultiplyFactors, self.cdrFilters, self.cdrFields, self.httpSkipTlsCheck, self.partialRecordsCache)
|
||||
}
|
||||
procRowNr := 0
|
||||
timeStart := time.Now()
|
||||
for {
|
||||
record, err := csvReader.Read()
|
||||
if err != nil && err == io.EOF {
|
||||
break // End of file
|
||||
}
|
||||
procRowNr += 1 // Only increase if not end of file
|
||||
cdrs, err := recordsProcessor.ProcessNextRecord()
|
||||
if err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Row %d - csv error: %s", procRowNr, err.Error()))
|
||||
continue // Other csv related errors, ignore
|
||||
}
|
||||
if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) { // partial records for flatstore CDRs
|
||||
if record, err = self.processPartialRecord(record, fn); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed processing partial record, row: %d, error: %s", procRowNr, err.Error()))
|
||||
continue
|
||||
} else if record == nil {
|
||||
continue
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
// Record was overwriten with complete information out of cache
|
||||
}
|
||||
if err := self.processRecord(record, procRowNr); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed processing CDR, row: %d, error: %s", procRowNr, err.Error()))
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Row %d, error: %s", procRowNr, err.Error()))
|
||||
continue
|
||||
}
|
||||
procRowNr += 1
|
||||
for _, storedCdr := range cdrs { // Send CDRs to CDRS
|
||||
var reply string
|
||||
if err := self.cdrs.ProcessCdr(storedCdr, &reply); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed sending CDR, %+v, error: %s", storedCdr, err.Error()))
|
||||
} else if reply != "OK" {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Received unexpected reply for CDR, %+v, reply: %s", storedCdr, reply))
|
||||
}
|
||||
}
|
||||
}
|
||||
// Finished with file, move it to processed folder
|
||||
newPath := path.Join(self.cdrOutDir, fn)
|
||||
@@ -337,199 +281,3 @@ func (self *Cdrc) processCsvFile(filePath string) error {
|
||||
fn, newPath, procRowNr, time.Now().Sub(timeStart)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Processes a single partial record for flatstore CDRs
|
||||
func (self *Cdrc) processPartialRecord(record []string, fileName string) ([]string, error) {
|
||||
if strings.HasPrefix(fileName, self.failedCallsPrefix) { // Use the first index since they should be the same in all configs
|
||||
record = append(record, "0") // Append duration 0 for failed calls flatstore CDR and do not process it further
|
||||
return record, nil
|
||||
}
|
||||
pr, err := NewPartialFlatstoreRecord(record)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Retrieve and complete the record from cache
|
||||
var cachedFilename string
|
||||
var cachedPartial *PartialFlatstoreRecord
|
||||
cachedFNames := []string{fileName} // Higher probability to match as firstFileName
|
||||
for fName := range self.partialRecords {
|
||||
if fName != fileName {
|
||||
cachedFNames = append(cachedFNames, fName)
|
||||
}
|
||||
}
|
||||
for _, fName := range cachedFNames { // Need to lock them individually
|
||||
self.guard.Guard(func() (interface{}, error) {
|
||||
var hasPartial bool
|
||||
if cachedPartial, hasPartial = self.partialRecords[fName][pr.AccId]; hasPartial {
|
||||
cachedFilename = fName
|
||||
}
|
||||
return nil, nil
|
||||
}, fName)
|
||||
if cachedPartial != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if cachedPartial == nil { // Not cached, do it here and stop processing
|
||||
self.guard.Guard(func() (interface{}, error) {
|
||||
if fileMp, hasFile := self.partialRecords[fileName]; !hasFile {
|
||||
self.partialRecords[fileName] = map[string]*PartialFlatstoreRecord{pr.AccId: pr}
|
||||
if self.partialRecordCache != 0 { // Schedule expiry/dump of the just created entry in cache
|
||||
go func() {
|
||||
time.Sleep(self.partialRecordCache)
|
||||
self.dumpUnpairedRecords(fileName)
|
||||
}()
|
||||
}
|
||||
} else if _, hasAccId := fileMp[pr.AccId]; !hasAccId {
|
||||
self.partialRecords[fileName][pr.AccId] = pr
|
||||
}
|
||||
return nil, nil
|
||||
}, fileName)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
pairedRecord, err := pairToRecord(cachedPartial, pr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
self.guard.Guard(func() (interface{}, error) {
|
||||
delete(self.partialRecords[cachedFilename], pr.AccId) // Remove the record out of cache
|
||||
return nil, nil
|
||||
}, fileName)
|
||||
return pairedRecord, nil
|
||||
}
|
||||
|
||||
// Dumps the cache into a .unpaired file in the outdir and cleans cache after
|
||||
func (self *Cdrc) dumpUnpairedRecords(fileName string) error {
|
||||
_, err := self.guard.Guard(func() (interface{}, error) {
|
||||
if len(self.partialRecords[fileName]) != 0 { // Only write the file if there are records in the cache
|
||||
unpairedFilePath := path.Join(self.cdrOutDir, fileName+UNPAIRED_SUFFIX)
|
||||
fileOut, err := os.Create(unpairedFilePath)
|
||||
if err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed creating %s, error: %s", unpairedFilePath, err.Error()))
|
||||
return nil, err
|
||||
}
|
||||
csvWriter := csv.NewWriter(fileOut)
|
||||
csvWriter.Comma = self.csvSep
|
||||
for _, pr := range self.partialRecords[fileName] {
|
||||
if err := csvWriter.Write(pr.Values); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed writing unpaired record %v to file: %s, error: %s", pr, unpairedFilePath, err.Error()))
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
csvWriter.Flush()
|
||||
}
|
||||
delete(self.partialRecords, fileName)
|
||||
return nil, nil
|
||||
}, fileName)
|
||||
return err
|
||||
}
|
||||
|
||||
// Takes the record from a slice and turns it into StoredCdrs, posting them to the cdrServer
|
||||
func (self *Cdrc) processRecord(record []string, srcRowNr int) error {
|
||||
recordCdrs := make([]*engine.StoredCdr, 0) // More CDRs based on the number of filters and field templates
|
||||
for idx := range self.cdrFields { // cdrFields coming from more templates will produce individual storCdr records
|
||||
// Make sure filters are matching
|
||||
filterBreak := false
|
||||
for _, rsrFilter := range self.cdrFilters[idx] {
|
||||
if rsrFilter == nil { // Nil filter does not need to match anything
|
||||
continue
|
||||
}
|
||||
if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx {
|
||||
return fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter)
|
||||
} else if !rsrFilter.FilterPasses(record[cfgFieldIdx]) {
|
||||
filterBreak = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if filterBreak { // Stop importing cdrc fields profile due to non matching filter
|
||||
continue
|
||||
}
|
||||
if storedCdr, err := self.recordToStoredCdr(record, idx); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Row %d - failed converting to StoredCdr, error: %s", srcRowNr, err.Error()))
|
||||
continue
|
||||
} else {
|
||||
recordCdrs = append(recordCdrs, storedCdr)
|
||||
}
|
||||
}
|
||||
for _, storedCdr := range recordCdrs {
|
||||
if self.cdrsAddress == utils.INTERNAL {
|
||||
if err := self.cdrServer.ProcessCdr(storedCdr); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, row: %d, error: %s", srcRowNr, err.Error()))
|
||||
continue
|
||||
}
|
||||
} else { // CDRs listening on IP
|
||||
if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cdr_http", self.cdrsAddress), storedCdr.AsHttpForm()); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, row: %d, error: %s", srcRowNr, err.Error()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Takes the record out of csv and turns it into storedCdr which can be processed by CDRS
|
||||
func (self *Cdrc) recordToStoredCdr(record []string, cfgIdx int) (*engine.StoredCdr, error) {
|
||||
storedCdr := &engine.StoredCdr{CdrHost: "0.0.0.0", CdrSource: self.cdrSourceIds[cfgIdx], ExtraFields: make(map[string]string), Cost: -1}
|
||||
var err error
|
||||
var lazyHttpFields []*config.CfgCdrField
|
||||
for _, cdrFldCfg := range self.cdrFields[cfgIdx] {
|
||||
if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) { // Hardcode some values in case of flatstore
|
||||
switch cdrFldCfg.CdrFieldId {
|
||||
case utils.ACCID:
|
||||
cdrFldCfg.Value = utils.ParseRSRFieldsMustCompile("3;1;2", utils.INFIELD_SEP) // in case of flatstore, accounting id is made up out of callid, from_tag and to_tag
|
||||
case utils.USAGE:
|
||||
cdrFldCfg.Value = utils.ParseRSRFieldsMustCompile(strconv.Itoa(len(record)-1), utils.INFIELD_SEP) // in case of flatstore, last element will be the duration computed by us
|
||||
}
|
||||
|
||||
}
|
||||
var fieldVal string
|
||||
if utils.IsSliceMember([]string{CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) {
|
||||
if cdrFldCfg.Type == utils.CDRFIELD {
|
||||
for _, cfgFieldRSR := range cdrFldCfg.Value {
|
||||
if cfgFieldRSR.IsStatic() {
|
||||
fieldVal += cfgFieldRSR.ParseValue("")
|
||||
} else { // Dynamic value extracted using index
|
||||
if cfgFieldIdx, _ := strconv.Atoi(cfgFieldRSR.Id); len(record) <= cfgFieldIdx {
|
||||
return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, cdrFldCfg.Tag)
|
||||
} else {
|
||||
fieldVal += cfgFieldRSR.ParseValue(record[cfgFieldIdx])
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if cdrFldCfg.Type == utils.HTTP_POST {
|
||||
lazyHttpFields = append(lazyHttpFields, cdrFldCfg) // Will process later so we can send an estimation of storedCdr to http server
|
||||
} else {
|
||||
return nil, fmt.Errorf("Unsupported field type: %s", cdrFldCfg.Type)
|
||||
}
|
||||
} else { // Modify here when we add more supported cdr formats
|
||||
return nil, fmt.Errorf("Unsupported CDR file format: %s", self.CdrFormat)
|
||||
}
|
||||
if err := populateStoredCdrField(storedCdr, cdrFldCfg.CdrFieldId, fieldVal); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
storedCdr.CgrId = utils.Sha1(storedCdr.AccId, storedCdr.SetupTime.String())
|
||||
if storedCdr.TOR == utils.DATA && self.duMultiplyFactors[cfgIdx] != 0 {
|
||||
storedCdr.Usage = time.Duration(float64(storedCdr.Usage.Nanoseconds()) * self.duMultiplyFactors[cfgIdx])
|
||||
}
|
||||
for _, httpFieldCfg := range lazyHttpFields { // Lazy process the http fields
|
||||
var outValByte []byte
|
||||
var fieldVal, httpAddr string
|
||||
for _, rsrFld := range httpFieldCfg.Value {
|
||||
httpAddr += rsrFld.ParseValue("")
|
||||
}
|
||||
if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, storedCdr); err != nil && httpFieldCfg.Mandatory {
|
||||
return nil, err
|
||||
} else {
|
||||
fieldVal = string(outValByte)
|
||||
if len(fieldVal) == 0 && httpFieldCfg.Mandatory {
|
||||
return nil, fmt.Errorf("MandatoryIeMissing: Empty result for http_post field: %s", httpFieldCfg.Tag)
|
||||
}
|
||||
if err := populateStoredCdrField(storedCdr, httpFieldCfg.CdrFieldId, fieldVal); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return storedCdr, nil
|
||||
}
|
||||
|
||||
@@ -18,113 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
package cdrc
|
||||
|
||||
import (
|
||||
//"bytes"
|
||||
//"encoding/csv"
|
||||
//"io"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestRecordForkCdr(t *testing.T) {
|
||||
cgrConfig, _ := config.NewDefaultCGRConfig()
|
||||
cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT]
|
||||
cdrcConfig.CdrFields = append(cdrcConfig.CdrFields, &config.CfgCdrField{Tag: "SupplierTest", Type: utils.CDRFIELD, CdrFieldId: "supplier", Value: []*utils.RSRField{&utils.RSRField{Id: "14"}}})
|
||||
cdrcConfig.CdrFields = append(cdrcConfig.CdrFields, &config.CfgCdrField{Tag: "DisconnectCauseTest", Type: utils.CDRFIELD, CdrFieldId: utils.DISCONNECT_CAUSE,
|
||||
Value: []*utils.RSRField{&utils.RSRField{Id: "16"}}})
|
||||
cdrc := &Cdrc{CdrFormat: CSV, cdrSourceIds: []string{"TEST_CDRC"}, cdrFields: [][]*config.CfgCdrField{cdrcConfig.CdrFields}}
|
||||
cdrRow := []string{"firstField", "secondField"}
|
||||
_, err := cdrc.recordToStoredCdr(cdrRow, 0)
|
||||
if err == nil {
|
||||
t.Error("Failed to corectly detect missing fields from record")
|
||||
}
|
||||
cdrRow = []string{"ignored", "ignored", utils.VOICE, "acc1", utils.META_PREPAID, "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963",
|
||||
"2013-02-03 19:50:00", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1", "NORMAL_DISCONNECT"}
|
||||
rtCdr, err := cdrc.recordToStoredCdr(cdrRow, 0)
|
||||
if err != nil {
|
||||
t.Error("Failed to parse CDR in rated cdr", err)
|
||||
}
|
||||
expectedCdr := &engine.StoredCdr{
|
||||
CgrId: utils.Sha1(cdrRow[3], time.Date(2013, 2, 3, 19, 50, 0, 0, time.UTC).String()),
|
||||
TOR: cdrRow[2],
|
||||
AccId: cdrRow[3],
|
||||
CdrHost: "0.0.0.0", // Got it over internal interface
|
||||
CdrSource: "TEST_CDRC",
|
||||
ReqType: cdrRow[4],
|
||||
Direction: cdrRow[5],
|
||||
Tenant: cdrRow[6],
|
||||
Category: cdrRow[7],
|
||||
Account: cdrRow[8],
|
||||
Subject: cdrRow[9],
|
||||
Destination: cdrRow[10],
|
||||
SetupTime: time.Date(2013, 2, 3, 19, 50, 0, 0, time.UTC),
|
||||
AnswerTime: time.Date(2013, 2, 3, 19, 54, 0, 0, time.UTC),
|
||||
Usage: time.Duration(62) * time.Second,
|
||||
Supplier: "supplier1",
|
||||
DisconnectCause: "NORMAL_DISCONNECT",
|
||||
ExtraFields: map[string]string{},
|
||||
Cost: -1,
|
||||
}
|
||||
if !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDataMultiplyFactor(t *testing.T) {
|
||||
cdrFields := []*config.CfgCdrField{&config.CfgCdrField{Tag: "TORField", Type: utils.CDRFIELD, CdrFieldId: "tor", Value: []*utils.RSRField{&utils.RSRField{Id: "0"}}},
|
||||
&config.CfgCdrField{Tag: "UsageField", Type: utils.CDRFIELD, CdrFieldId: "usage", Value: []*utils.RSRField{&utils.RSRField{Id: "1"}}}}
|
||||
cdrc := &Cdrc{CdrFormat: CSV, cdrSourceIds: []string{"TEST_CDRC"}, duMultiplyFactors: []float64{0}, cdrFields: [][]*config.CfgCdrField{cdrFields}}
|
||||
cdrRow := []string{"*data", "1"}
|
||||
rtCdr, err := cdrc.recordToStoredCdr(cdrRow, 0)
|
||||
if err != nil {
|
||||
t.Error("Failed to parse CDR in rated cdr", err)
|
||||
}
|
||||
var sTime time.Time
|
||||
expectedCdr := &engine.StoredCdr{
|
||||
CgrId: utils.Sha1("", sTime.String()),
|
||||
TOR: cdrRow[0],
|
||||
CdrHost: "0.0.0.0",
|
||||
CdrSource: "TEST_CDRC",
|
||||
Usage: time.Duration(1) * time.Second,
|
||||
ExtraFields: map[string]string{},
|
||||
Cost: -1,
|
||||
}
|
||||
if !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
|
||||
}
|
||||
cdrc.duMultiplyFactors = []float64{1024}
|
||||
expectedCdr = &engine.StoredCdr{
|
||||
CgrId: utils.Sha1("", sTime.String()),
|
||||
TOR: cdrRow[0],
|
||||
CdrHost: "0.0.0.0",
|
||||
CdrSource: "TEST_CDRC",
|
||||
Usage: time.Duration(1024) * time.Second,
|
||||
ExtraFields: map[string]string{},
|
||||
Cost: -1,
|
||||
}
|
||||
if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow, 0); !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
|
||||
}
|
||||
cdrRow = []string{"*voice", "1"}
|
||||
expectedCdr = &engine.StoredCdr{
|
||||
CgrId: utils.Sha1("", sTime.String()),
|
||||
TOR: cdrRow[0],
|
||||
CdrHost: "0.0.0.0",
|
||||
CdrSource: "TEST_CDRC",
|
||||
Usage: time.Duration(1) * time.Second,
|
||||
ExtraFields: map[string]string{},
|
||||
Cost: -1,
|
||||
}
|
||||
if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow, 0); !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
func TestNewPartialFlatstoreRecord(t *testing.T) {
|
||||
ePr := &PartialFlatstoreRecord{Method: "INVITE", AccId: "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:02daec40c548625ac", Timestamp: time.Date(2015, 7, 9, 15, 6, 48, 0, time.UTC),
|
||||
@@ -140,34 +33,6 @@ func TestNewPartialFlatstoreRecord(t *testing.T) {
|
||||
}
|
||||
*/
|
||||
|
||||
func TestPairToRecord(t *testing.T) {
|
||||
eRecord := []string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454408", "*prepaid", "1001", "1002", "", "3401:2069362475", "2"}
|
||||
invPr := &PartialFlatstoreRecord{Method: "INVITE", Timestamp: time.Date(2015, 7, 9, 15, 6, 48, 0, time.UTC),
|
||||
Values: []string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454408", "*prepaid", "1001", "1002", "", "3401:2069362475"}}
|
||||
byePr := &PartialFlatstoreRecord{Method: "BYE", Timestamp: time.Date(2015, 7, 9, 15, 6, 50, 0, time.UTC),
|
||||
Values: []string{"BYE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454410", "", "", "", "", "3401:2069362475"}}
|
||||
if rec, err := pairToRecord(invPr, byePr); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eRecord, rec) {
|
||||
t.Errorf("Expected: %+v, received: %+v", eRecord, rec)
|
||||
}
|
||||
if rec, err := pairToRecord(byePr, invPr); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eRecord, rec) {
|
||||
t.Errorf("Expected: %+v, received: %+v", eRecord, rec)
|
||||
}
|
||||
if _, err := pairToRecord(byePr, byePr); err == nil || err.Error() != "MISSING_INVITE" {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, err := pairToRecord(invPr, invPr); err == nil || err.Error() != "MISSING_BYE" {
|
||||
t.Error(err)
|
||||
}
|
||||
byePr.Values = []string{"BYE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454410", "", "", "", "3401:2069362475"} // Took one value out
|
||||
if _, err := pairToRecord(invPr, byePr); err == nil || err.Error() != "INCONSISTENT_VALUES_LENGTH" {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
func TestOsipsFlatstoreCdrs(t *testing.T) {
|
||||
flatstoreCdrs := `
|
||||
|
||||
336
cdrc/csv.go
Normal file
336
cdrc/csv.go
Normal file
@@ -0,0 +1,336 @@
|
||||
/*
|
||||
Real-time Charging System for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package cdrc
|
||||
|
||||
import (
|
||||
"encoding/csv"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewPartialFlatstoreRecord(record []string) (*PartialFlatstoreRecord, error) {
|
||||
if len(record) < 7 {
|
||||
return nil, errors.New("MISSING_IE")
|
||||
}
|
||||
pr := &PartialFlatstoreRecord{Method: record[0], AccId: record[3] + record[1] + record[2], Values: record}
|
||||
var err error
|
||||
if pr.Timestamp, err = utils.ParseTimeDetectLayout(record[6]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pr, nil
|
||||
}
|
||||
|
||||
// This is a partial record received from Flatstore, can be INVITE or BYE and it needs to be paired in order to produce duration
|
||||
type PartialFlatstoreRecord struct {
|
||||
Method string // INVITE or BYE
|
||||
AccId string // Copute here the AccId
|
||||
Timestamp time.Time // Timestamp of the event, as written by db_flastore module
|
||||
Values []string // Can contain original values or updated via UpdateValues
|
||||
}
|
||||
|
||||
// Pairs INVITE and BYE into final record containing as last element the duration
|
||||
func pairToRecord(part1, part2 *PartialFlatstoreRecord) ([]string, error) {
|
||||
var invite, bye *PartialFlatstoreRecord
|
||||
if part1.Method == "INVITE" {
|
||||
invite = part1
|
||||
} else if part2.Method == "INVITE" {
|
||||
invite = part2
|
||||
} else {
|
||||
return nil, errors.New("MISSING_INVITE")
|
||||
}
|
||||
if part1.Method == "BYE" {
|
||||
bye = part1
|
||||
} else if part2.Method == "BYE" {
|
||||
bye = part2
|
||||
} else {
|
||||
return nil, errors.New("MISSING_BYE")
|
||||
}
|
||||
if len(invite.Values) != len(bye.Values) {
|
||||
return nil, errors.New("INCONSISTENT_VALUES_LENGTH")
|
||||
}
|
||||
record := invite.Values
|
||||
for idx := range record {
|
||||
switch idx {
|
||||
case 0, 1, 2, 3, 6: // Leave these values as they are
|
||||
case 4, 5:
|
||||
record[idx] = bye.Values[idx] // Update record with status from bye
|
||||
default:
|
||||
if bye.Values[idx] != "" { // Any value higher than 6 is dynamically inserted, overwrite if non empty
|
||||
record[idx] = bye.Values[idx]
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
callDur := bye.Timestamp.Sub(invite.Timestamp)
|
||||
record = append(record, strconv.FormatFloat(callDur.Seconds(), 'f', -1, 64))
|
||||
return record, nil
|
||||
}
|
||||
|
||||
func NewPartialRecordsCache(ttl time.Duration, cdrOutDir string, csvSep rune) (*PartialRecordsCache, error) {
|
||||
return &PartialRecordsCache{ttl: ttl, cdrOutDir: cdrOutDir, csvSep: csvSep,
|
||||
partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), guard: engine.NewGuardianLock()}, nil
|
||||
}
|
||||
|
||||
type PartialRecordsCache struct {
|
||||
ttl time.Duration
|
||||
cdrOutDir string
|
||||
csvSep rune
|
||||
partialRecords map[string]map[string]*PartialFlatstoreRecord // [FileName"][AccId]*PartialRecord
|
||||
guard *engine.GuardianLock
|
||||
}
|
||||
|
||||
// Dumps the cache into a .unpaired file in the outdir and cleans cache after
|
||||
func (self *PartialRecordsCache) dumpUnpairedRecords(fileName string) error {
|
||||
_, err := self.guard.Guard(func() (interface{}, error) {
|
||||
if len(self.partialRecords[fileName]) != 0 { // Only write the file if there are records in the cache
|
||||
unpairedFilePath := path.Join(self.cdrOutDir, fileName+UNPAIRED_SUFFIX)
|
||||
fileOut, err := os.Create(unpairedFilePath)
|
||||
if err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed creating %s, error: %s", unpairedFilePath, err.Error()))
|
||||
return nil, err
|
||||
}
|
||||
csvWriter := csv.NewWriter(fileOut)
|
||||
csvWriter.Comma = self.csvSep
|
||||
for _, pr := range self.partialRecords[fileName] {
|
||||
if err := csvWriter.Write(pr.Values); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed writing unpaired record %v to file: %s, error: %s", pr, unpairedFilePath, err.Error()))
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
csvWriter.Flush()
|
||||
}
|
||||
delete(self.partialRecords, fileName)
|
||||
return nil, nil
|
||||
}, fileName)
|
||||
return err
|
||||
}
|
||||
|
||||
// Search in cache and return the partial record with accountind id defined, prefFilename is searched at beginning because of better match probability
|
||||
func (self *PartialRecordsCache) GetPartialRecord(accId, prefFileName string) (string, *PartialFlatstoreRecord) {
|
||||
var cachedFilename string
|
||||
var cachedPartial *PartialFlatstoreRecord
|
||||
checkCachedFNames := []string{prefFileName} // Higher probability to match as firstFileName
|
||||
for fName := range self.partialRecords {
|
||||
if fName != prefFileName {
|
||||
checkCachedFNames = append(checkCachedFNames, fName)
|
||||
}
|
||||
}
|
||||
for _, fName := range checkCachedFNames { // Need to lock them individually
|
||||
self.guard.Guard(func() (interface{}, error) {
|
||||
var hasPartial bool
|
||||
if cachedPartial, hasPartial = self.partialRecords[fName][accId]; hasPartial {
|
||||
cachedFilename = fName
|
||||
}
|
||||
return nil, nil
|
||||
}, fName)
|
||||
if cachedPartial != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
return cachedFilename, cachedPartial
|
||||
}
|
||||
|
||||
func (self *PartialRecordsCache) CachePartial(fileName string, pr *PartialFlatstoreRecord) {
|
||||
self.guard.Guard(func() (interface{}, error) {
|
||||
if fileMp, hasFile := self.partialRecords[fileName]; !hasFile {
|
||||
self.partialRecords[fileName] = map[string]*PartialFlatstoreRecord{pr.AccId: pr}
|
||||
if self.ttl != 0 { // Schedule expiry/dump of the just created entry in cache
|
||||
go func() {
|
||||
time.Sleep(self.ttl)
|
||||
self.dumpUnpairedRecords(fileName)
|
||||
}()
|
||||
}
|
||||
} else if _, hasAccId := fileMp[pr.AccId]; !hasAccId {
|
||||
self.partialRecords[fileName][pr.AccId] = pr
|
||||
}
|
||||
return nil, nil
|
||||
}, fileName)
|
||||
}
|
||||
|
||||
func (self *PartialRecordsCache) UncachePartial(fileName string, pr *PartialFlatstoreRecord) {
|
||||
self.guard.Guard(func() (interface{}, error) {
|
||||
delete(self.partialRecords[fileName], pr.AccId) // Remove the record out of cache
|
||||
return nil, nil
|
||||
}, fileName)
|
||||
}
|
||||
|
||||
func NewCsvRecordsProcessor(csvReader *csv.Reader, cdrFormat, fileName, failedCallsPrefix string,
|
||||
cdrSourceIds []string, duMultiplyFactors []float64, cdrFilters []utils.RSRFields, cdrFields [][]*config.CfgCdrField,
|
||||
httpSkipTlsCheck bool, partialRecordsCache *PartialRecordsCache) *CsvRecordsProcessor {
|
||||
return &CsvRecordsProcessor{csvReader: csvReader, cdrFormat: cdrFormat, fileName: fileName,
|
||||
failedCallsPrefix: failedCallsPrefix, cdrSourceIds: cdrSourceIds,
|
||||
duMultiplyFactors: duMultiplyFactors, cdrFilters: cdrFilters, cdrFields: cdrFields,
|
||||
httpSkipTlsCheck: httpSkipTlsCheck, partialRecordsCache: partialRecordsCache}
|
||||
|
||||
}
|
||||
|
||||
type CsvRecordsProcessor struct {
|
||||
csvReader *csv.Reader
|
||||
cdrFormat string
|
||||
fileName string
|
||||
failedCallsPrefix string
|
||||
cdrSourceIds []string // Should be in sync with cdrFields on indexes
|
||||
duMultiplyFactors []float64
|
||||
cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes
|
||||
cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters
|
||||
httpSkipTlsCheck bool
|
||||
partialRecordsCache *PartialRecordsCache // Shared by cdrc so we can cache for all files in a folder
|
||||
}
|
||||
|
||||
func (self *CsvRecordsProcessor) ProcessNextRecord() ([]*engine.StoredCdr, error) {
|
||||
record, err := self.csvReader.Read()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if record, err = self.processPartialRecord(record); err != nil {
|
||||
return nil, err
|
||||
} else if record == nil {
|
||||
return nil, nil // Due to partial, none returned
|
||||
}
|
||||
// Record was overwriten with complete information out of cache
|
||||
return self.processRecord(record)
|
||||
}
|
||||
|
||||
// Processes a single partial record for flatstore CDRs
|
||||
func (self *CsvRecordsProcessor) processPartialRecord(record []string) ([]string, error) {
|
||||
if strings.HasPrefix(self.fileName, self.failedCallsPrefix) { // Use the first index since they should be the same in all configs
|
||||
record = append(record, "0") // Append duration 0 for failed calls flatstore CDR and do not process it further
|
||||
return record, nil
|
||||
}
|
||||
pr, err := NewPartialFlatstoreRecord(record)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Retrieve and complete the record from cache
|
||||
cachedFilename, cachedPartial := self.partialRecordsCache.GetPartialRecord(pr.AccId, self.fileName)
|
||||
if cachedPartial == nil { // Not cached, do it here and stop processing
|
||||
self.partialRecordsCache.CachePartial(self.fileName, pr)
|
||||
return nil, nil
|
||||
}
|
||||
pairedRecord, err := pairToRecord(cachedPartial, pr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
self.partialRecordsCache.UncachePartial(cachedFilename, pr)
|
||||
return pairedRecord, nil
|
||||
}
|
||||
|
||||
// Takes the record from a slice and turns it into StoredCdrs, posting them to the cdrServer
|
||||
func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.StoredCdr, error) {
|
||||
recordCdrs := make([]*engine.StoredCdr, 0) // More CDRs based on the number of filters and field templates
|
||||
for idx := range self.cdrFields { // cdrFields coming from more templates will produce individual storCdr records
|
||||
// Make sure filters are matching
|
||||
filterBreak := false
|
||||
for _, rsrFilter := range self.cdrFilters[idx] {
|
||||
if rsrFilter == nil { // Nil filter does not need to match anything
|
||||
continue
|
||||
}
|
||||
if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx {
|
||||
return nil, fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter)
|
||||
} else if !rsrFilter.FilterPasses(record[cfgFieldIdx]) {
|
||||
filterBreak = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if filterBreak { // Stop importing cdrc fields profile due to non matching filter
|
||||
continue
|
||||
}
|
||||
if storedCdr, err := self.recordToStoredCdr(record, idx); err != nil {
|
||||
return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error())
|
||||
} else {
|
||||
recordCdrs = append(recordCdrs, storedCdr)
|
||||
}
|
||||
}
|
||||
return recordCdrs, nil
|
||||
}
|
||||
|
||||
// Takes the record out of csv and turns it into storedCdr which can be processed by CDRS
|
||||
func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cfgIdx int) (*engine.StoredCdr, error) {
|
||||
storedCdr := &engine.StoredCdr{CdrHost: "0.0.0.0", CdrSource: self.cdrSourceIds[cfgIdx], ExtraFields: make(map[string]string), Cost: -1}
|
||||
var err error
|
||||
var lazyHttpFields []*config.CfgCdrField
|
||||
for _, cdrFldCfg := range self.cdrFields[cfgIdx] {
|
||||
if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.cdrFormat) { // Hardcode some values in case of flatstore
|
||||
switch cdrFldCfg.CdrFieldId {
|
||||
case utils.ACCID:
|
||||
cdrFldCfg.Value = utils.ParseRSRFieldsMustCompile("3;1;2", utils.INFIELD_SEP) // in case of flatstore, accounting id is made up out of callid, from_tag and to_tag
|
||||
case utils.USAGE:
|
||||
cdrFldCfg.Value = utils.ParseRSRFieldsMustCompile(strconv.Itoa(len(record)-1), utils.INFIELD_SEP) // in case of flatstore, last element will be the duration computed by us
|
||||
}
|
||||
|
||||
}
|
||||
var fieldVal string
|
||||
if utils.IsSliceMember([]string{CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.cdrFormat) {
|
||||
if cdrFldCfg.Type == utils.CDRFIELD {
|
||||
for _, cfgFieldRSR := range cdrFldCfg.Value {
|
||||
if cfgFieldRSR.IsStatic() {
|
||||
fieldVal += cfgFieldRSR.ParseValue("")
|
||||
} else { // Dynamic value extracted using index
|
||||
if cfgFieldIdx, _ := strconv.Atoi(cfgFieldRSR.Id); len(record) <= cfgFieldIdx {
|
||||
return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, cdrFldCfg.Tag)
|
||||
} else {
|
||||
fieldVal += cfgFieldRSR.ParseValue(record[cfgFieldIdx])
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if cdrFldCfg.Type == utils.HTTP_POST {
|
||||
lazyHttpFields = append(lazyHttpFields, cdrFldCfg) // Will process later so we can send an estimation of storedCdr to http server
|
||||
} else {
|
||||
return nil, fmt.Errorf("Unsupported field type: %s", cdrFldCfg.Type)
|
||||
}
|
||||
} else { // Modify here when we add more supported cdr formats
|
||||
return nil, fmt.Errorf("Unsupported CDR file format: %s", self.cdrFormat)
|
||||
}
|
||||
if err := populateStoredCdrField(storedCdr, cdrFldCfg.CdrFieldId, fieldVal); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
storedCdr.CgrId = utils.Sha1(storedCdr.AccId, storedCdr.SetupTime.String())
|
||||
if storedCdr.TOR == utils.DATA && self.duMultiplyFactors[cfgIdx] != 0 {
|
||||
storedCdr.Usage = time.Duration(float64(storedCdr.Usage.Nanoseconds()) * self.duMultiplyFactors[cfgIdx])
|
||||
}
|
||||
for _, httpFieldCfg := range lazyHttpFields { // Lazy process the http fields
|
||||
var outValByte []byte
|
||||
var fieldVal, httpAddr string
|
||||
for _, rsrFld := range httpFieldCfg.Value {
|
||||
httpAddr += rsrFld.ParseValue("")
|
||||
}
|
||||
if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, storedCdr); err != nil && httpFieldCfg.Mandatory {
|
||||
return nil, err
|
||||
} else {
|
||||
fieldVal = string(outValByte)
|
||||
if len(fieldVal) == 0 && httpFieldCfg.Mandatory {
|
||||
return nil, fmt.Errorf("MandatoryIeMissing: Empty result for http_post field: %s", httpFieldCfg.Tag)
|
||||
}
|
||||
if err := populateStoredCdrField(storedCdr, httpFieldCfg.CdrFieldId, fieldVal); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return storedCdr, nil
|
||||
}
|
||||
151
cdrc/csv_test.go
Normal file
151
cdrc/csv_test.go
Normal file
@@ -0,0 +1,151 @@
|
||||
/*
|
||||
Real-time Charging System for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package cdrc
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestCsvRecordForkCdr(t *testing.T) {
|
||||
cgrConfig, _ := config.NewDefaultCGRConfig()
|
||||
cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT]
|
||||
cdrcConfig.CdrFields = append(cdrcConfig.CdrFields, &config.CfgCdrField{Tag: "SupplierTest", Type: utils.CDRFIELD, CdrFieldId: "supplier", Value: []*utils.RSRField{&utils.RSRField{Id: "14"}}})
|
||||
cdrcConfig.CdrFields = append(cdrcConfig.CdrFields, &config.CfgCdrField{Tag: "DisconnectCauseTest", Type: utils.CDRFIELD, CdrFieldId: utils.DISCONNECT_CAUSE,
|
||||
Value: []*utils.RSRField{&utils.RSRField{Id: "16"}}})
|
||||
csvProcessor := &CsvRecordsProcessor{cdrFormat: CSV, cdrSourceIds: []string{"TEST_CDRC"}, cdrFields: [][]*config.CfgCdrField{cdrcConfig.CdrFields}}
|
||||
cdrRow := []string{"firstField", "secondField"}
|
||||
_, err := csvProcessor.recordToStoredCdr(cdrRow, 0)
|
||||
if err == nil {
|
||||
t.Error("Failed to corectly detect missing fields from record")
|
||||
}
|
||||
cdrRow = []string{"ignored", "ignored", utils.VOICE, "acc1", utils.META_PREPAID, "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963",
|
||||
"2013-02-03 19:50:00", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1", "NORMAL_DISCONNECT"}
|
||||
rtCdr, err := csvProcessor.recordToStoredCdr(cdrRow, 0)
|
||||
if err != nil {
|
||||
t.Error("Failed to parse CDR in rated cdr", err)
|
||||
}
|
||||
expectedCdr := &engine.StoredCdr{
|
||||
CgrId: utils.Sha1(cdrRow[3], time.Date(2013, 2, 3, 19, 50, 0, 0, time.UTC).String()),
|
||||
TOR: cdrRow[2],
|
||||
AccId: cdrRow[3],
|
||||
CdrHost: "0.0.0.0", // Got it over internal interface
|
||||
CdrSource: "TEST_CDRC",
|
||||
ReqType: cdrRow[4],
|
||||
Direction: cdrRow[5],
|
||||
Tenant: cdrRow[6],
|
||||
Category: cdrRow[7],
|
||||
Account: cdrRow[8],
|
||||
Subject: cdrRow[9],
|
||||
Destination: cdrRow[10],
|
||||
SetupTime: time.Date(2013, 2, 3, 19, 50, 0, 0, time.UTC),
|
||||
AnswerTime: time.Date(2013, 2, 3, 19, 54, 0, 0, time.UTC),
|
||||
Usage: time.Duration(62) * time.Second,
|
||||
Supplier: "supplier1",
|
||||
DisconnectCause: "NORMAL_DISCONNECT",
|
||||
ExtraFields: map[string]string{},
|
||||
Cost: -1,
|
||||
}
|
||||
if !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvDataMultiplyFactor(t *testing.T) {
|
||||
cdrFields := []*config.CfgCdrField{&config.CfgCdrField{Tag: "TORField", Type: utils.CDRFIELD, CdrFieldId: "tor", Value: []*utils.RSRField{&utils.RSRField{Id: "0"}}},
|
||||
&config.CfgCdrField{Tag: "UsageField", Type: utils.CDRFIELD, CdrFieldId: "usage", Value: []*utils.RSRField{&utils.RSRField{Id: "1"}}}}
|
||||
csvProcessor := &CsvRecordsProcessor{cdrFormat: CSV, cdrSourceIds: []string{"TEST_CDRC"}, duMultiplyFactors: []float64{0}, cdrFields: [][]*config.CfgCdrField{cdrFields}}
|
||||
cdrRow := []string{"*data", "1"}
|
||||
rtCdr, err := csvProcessor.recordToStoredCdr(cdrRow, 0)
|
||||
if err != nil {
|
||||
t.Error("Failed to parse CDR in rated cdr", err)
|
||||
}
|
||||
var sTime time.Time
|
||||
expectedCdr := &engine.StoredCdr{
|
||||
CgrId: utils.Sha1("", sTime.String()),
|
||||
TOR: cdrRow[0],
|
||||
CdrHost: "0.0.0.0",
|
||||
CdrSource: "TEST_CDRC",
|
||||
Usage: time.Duration(1) * time.Second,
|
||||
ExtraFields: map[string]string{},
|
||||
Cost: -1,
|
||||
}
|
||||
if !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
|
||||
}
|
||||
csvProcessor.duMultiplyFactors = []float64{1024}
|
||||
expectedCdr = &engine.StoredCdr{
|
||||
CgrId: utils.Sha1("", sTime.String()),
|
||||
TOR: cdrRow[0],
|
||||
CdrHost: "0.0.0.0",
|
||||
CdrSource: "TEST_CDRC",
|
||||
Usage: time.Duration(1024) * time.Second,
|
||||
ExtraFields: map[string]string{},
|
||||
Cost: -1,
|
||||
}
|
||||
if rtCdr, _ := csvProcessor.recordToStoredCdr(cdrRow, 0); !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
|
||||
}
|
||||
cdrRow = []string{"*voice", "1"}
|
||||
expectedCdr = &engine.StoredCdr{
|
||||
CgrId: utils.Sha1("", sTime.String()),
|
||||
TOR: cdrRow[0],
|
||||
CdrHost: "0.0.0.0",
|
||||
CdrSource: "TEST_CDRC",
|
||||
Usage: time.Duration(1) * time.Second,
|
||||
ExtraFields: map[string]string{},
|
||||
Cost: -1,
|
||||
}
|
||||
if rtCdr, _ := csvProcessor.recordToStoredCdr(cdrRow, 0); !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvPairToRecord(t *testing.T) {
|
||||
eRecord := []string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454408", "*prepaid", "1001", "1002", "", "3401:2069362475", "2"}
|
||||
invPr := &PartialFlatstoreRecord{Method: "INVITE", Timestamp: time.Date(2015, 7, 9, 15, 6, 48, 0, time.UTC),
|
||||
Values: []string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454408", "*prepaid", "1001", "1002", "", "3401:2069362475"}}
|
||||
byePr := &PartialFlatstoreRecord{Method: "BYE", Timestamp: time.Date(2015, 7, 9, 15, 6, 50, 0, time.UTC),
|
||||
Values: []string{"BYE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454410", "", "", "", "", "3401:2069362475"}}
|
||||
if rec, err := pairToRecord(invPr, byePr); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eRecord, rec) {
|
||||
t.Errorf("Expected: %+v, received: %+v", eRecord, rec)
|
||||
}
|
||||
if rec, err := pairToRecord(byePr, invPr); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eRecord, rec) {
|
||||
t.Errorf("Expected: %+v, received: %+v", eRecord, rec)
|
||||
}
|
||||
if _, err := pairToRecord(byePr, byePr); err == nil || err.Error() != "MISSING_INVITE" {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, err := pairToRecord(invPr, invPr); err == nil || err.Error() != "MISSING_BYE" {
|
||||
t.Error(err)
|
||||
}
|
||||
byePr.Values = []string{"BYE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454410", "", "", "", "3401:2069362475"} // Took one value out
|
||||
if _, err := pairToRecord(invPr, byePr); err == nil || err.Error() != "INCONSISTENT_VALUES_LENGTH" {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
/*
|
||||
Rating system designed to be used in VoIP Carriers World
|
||||
Copyright (C) 2012-2015 ITsysCOM
|
||||
Real-time Charging System for Telecom & ISP environments
|
||||
Copyright (C) 2012-2015 ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
|
||||
@@ -88,15 +88,25 @@ func cacheData(ratingDb engine.RatingStorage, accountDb engine.AccountingStorage
|
||||
}
|
||||
|
||||
// Fires up a cdrc instance
|
||||
func startCdrc(cdrsChan chan struct{}, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, closeChan chan struct{}) {
|
||||
func startCdrc(responder *engine.Responder, cdrsChan chan struct{}, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, closeChan chan struct{}) {
|
||||
var cdrsConn engine.Connector
|
||||
var cdrcCfg *config.CdrcConfig
|
||||
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
|
||||
break
|
||||
}
|
||||
if cdrcCfg.Cdrs == utils.INTERNAL {
|
||||
<-cdrsChan // Wait for CDRServer to come up before start processing
|
||||
cdrsConn = responder
|
||||
} else {
|
||||
conn, err := rpcclient.NewRpcClient("tcp", cdrcCfg.Cdrs, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
|
||||
if err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<CDRC> Could not connect to CDRS via RPC: %v", err))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
cdrsConn = &engine.RPCClientConnector{Client: conn}
|
||||
}
|
||||
cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrServer, closeChan)
|
||||
cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan)
|
||||
if err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("Cdrc config parsing error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
@@ -619,7 +629,7 @@ func main() {
|
||||
} else if !cdrcEnabled {
|
||||
cdrcEnabled = true // Mark that at least one cdrc service is active
|
||||
}
|
||||
go startCdrc(cdrsChan, cdrcCfgs, cfg.HttpSkipTlsVerify, cfg.ConfigReloads[utils.CDRC])
|
||||
go startCdrc(responder, cdrsChan, cdrcCfgs, cfg.HttpSkipTlsVerify, cfg.ConfigReloads[utils.CDRC])
|
||||
}
|
||||
if cdrcEnabled {
|
||||
engine.Logger.Info("Starting CGRateS CDR client.")
|
||||
|
||||
Reference in New Issue
Block a user