mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Partial implementation of db_flatstore CDRs from *ser
This commit is contained in:
192
cdrc/cdrc.go
192
cdrc/cdrc.go
@@ -21,6 +21,7 @@ package cdrc
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/csv"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
@@ -104,9 +105,9 @@ func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrS
|
||||
cdrc := &Cdrc{cdrsAddress: cdrcCfg.Cdrs, CdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir,
|
||||
runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator,
|
||||
httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles)}
|
||||
var processFile struct{}
|
||||
var processCsvFile struct{}
|
||||
for i := 0; i < cdrcCfg.MaxOpenFiles; i++ {
|
||||
cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop
|
||||
cdrc.maxOpenFiles <- processCsvFile // Empty initiate so we do not need to wait later when we pop
|
||||
}
|
||||
cdrc.cdrSourceIds = make([]string, len(cdrcCfgs))
|
||||
cdrc.duMultiplyFactors = make([]float64, len(cdrcCfgs))
|
||||
@@ -130,6 +131,64 @@ func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrS
|
||||
return cdrc, nil
|
||||
}
|
||||
|
||||
func NewPartialFlatstoreRecord(record []string) (*PartialFlatstoreRecord, error) {
|
||||
if len(record) < 7 {
|
||||
return nil, errors.New("MISSING_IE")
|
||||
}
|
||||
pr := &PartialFlatstoreRecord{Method: record[0], AccId: record[3] + record[1] + record[2], Values: record}
|
||||
var err error
|
||||
if pr.Timestamp, err = utils.ParseTimeDetectLayout(record[6]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pr, nil
|
||||
}
|
||||
|
||||
// This is a partial record received from Flatstore, can be INVITE or BYE and it needs to be paired in order to produce duration
|
||||
type PartialFlatstoreRecord struct {
|
||||
Method string // INVITE or BYE
|
||||
AccId string // Copute here the AccId
|
||||
Timestamp time.Time // Timestamp of the event, as written by db_flastore module
|
||||
Values []string // Can contain original values or updated via UpdateValues
|
||||
}
|
||||
|
||||
// Pairs INVITE and BYE into final record containing as last element the duration
|
||||
func pairToRecord(part1, part2 *PartialFlatstoreRecord) ([]string, error) {
|
||||
var invite, bye *PartialFlatstoreRecord
|
||||
if part1.Method == "INVITE" {
|
||||
invite = part1
|
||||
} else if part2.Method == "INVITE" {
|
||||
invite = part2
|
||||
} else {
|
||||
return nil, errors.New("MISSING_INVITE")
|
||||
}
|
||||
if part1.Method == "BYE" {
|
||||
bye = part1
|
||||
} else if part2.Method == "BYE" {
|
||||
bye = part2
|
||||
} else {
|
||||
return nil, errors.New("MISSING_BYE")
|
||||
}
|
||||
if len(invite.Values) != len(bye.Values) {
|
||||
return nil, errors.New("INCONSISTENT_VALUES_LENGTH")
|
||||
}
|
||||
record := invite.Values
|
||||
for idx := range record {
|
||||
switch idx {
|
||||
case 0, 1, 2, 3, 6: // Leave these values as they are
|
||||
case 4, 5:
|
||||
record[idx] = bye.Values[idx] // Update record with status from bye
|
||||
default:
|
||||
if bye.Values[idx] != "" { // Any value higher than 6 is dynamically inserted, overwrite if non empty
|
||||
record[idx] = bye.Values[idx]
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
callDur := bye.Timestamp.Sub(invite.Timestamp)
|
||||
record = append(record, strconv.FormatFloat(callDur.Seconds(), 'f', -1, 64))
|
||||
return record, nil
|
||||
}
|
||||
|
||||
type Cdrc struct {
|
||||
cdrsAddress,
|
||||
CdrFormat,
|
||||
@@ -145,7 +204,8 @@ type Cdrc struct {
|
||||
cdrServer *engine.CdrServer // Reference towards internal cdrServer if that is the case
|
||||
httpClient *http.Client
|
||||
exitChan chan struct{}
|
||||
maxOpenFiles chan struct{} // Maximum number of simultaneous files processed
|
||||
maxOpenFiles chan struct{} // Maximum number of simultaneous files processed
|
||||
partialRecords map[string]map[string]*PartialFlatstoreRecord // [FileName"][AccId]*PartialRecord
|
||||
}
|
||||
|
||||
// When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing
|
||||
@@ -188,7 +248,7 @@ func (self *Cdrc) trackCDRFiles() (err error) {
|
||||
case ev := <-watcher.Events:
|
||||
if ev.Op&fsnotify.Create == fsnotify.Create && (self.CdrFormat != FS_CSV || path.Ext(ev.Name) != ".csv") {
|
||||
go func() { //Enable async processing here
|
||||
if err = self.processFile(ev.Name); err != nil {
|
||||
if err = self.processCsvFile(ev.Name); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", ev.Name, err.Error()))
|
||||
}
|
||||
}()
|
||||
@@ -206,7 +266,7 @@ func (self *Cdrc) processCdrDir() error {
|
||||
for _, file := range filesInDir {
|
||||
if self.CdrFormat != FS_CSV || path.Ext(file.Name()) != ".csv" {
|
||||
go func() { //Enable async processing here
|
||||
if err := self.processFile(path.Join(self.cdrInDir, file.Name())); err != nil {
|
||||
if err := self.processCsvFile(path.Join(self.cdrInDir, file.Name())); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", file, err.Error()))
|
||||
}
|
||||
}()
|
||||
@@ -216,9 +276,9 @@ func (self *Cdrc) processCdrDir() error {
|
||||
}
|
||||
|
||||
// Processe file at filePath and posts the valid cdr rows out of it
|
||||
func (self *Cdrc) processFile(filePath string) error {
|
||||
processFile := <-self.maxOpenFiles // Queue here for maxOpenFiles
|
||||
defer func() { self.maxOpenFiles <- processFile }()
|
||||
func (self *Cdrc) processCsvFile(filePath string) error {
|
||||
processCsvFile := <-self.maxOpenFiles // Queue here for maxOpenFiles
|
||||
defer func() { self.maxOpenFiles <- processCsvFile }()
|
||||
_, fn := path.Split(filePath)
|
||||
engine.Logger.Info(fmt.Sprintf("<Cdrc> Parsing: %s", filePath))
|
||||
file, err := os.Open(filePath)
|
||||
@@ -241,43 +301,18 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Row %d - csv error: %s", procRowNr, err.Error()))
|
||||
continue // Other csv related errors, ignore
|
||||
}
|
||||
recordCdrs := make([]*engine.StoredCdr, 0) // More CDRs based on the number of filters and field templates
|
||||
for idx := range self.cdrFields {
|
||||
// Make sure filters are matching
|
||||
filterBreak := false
|
||||
for _, rsrFilter := range self.cdrFilters[idx] {
|
||||
if rsrFilter == nil { // Nil filter does not need to match anything
|
||||
continue
|
||||
}
|
||||
if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx {
|
||||
return fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter)
|
||||
} else if !rsrFilter.FilterPasses(record[cfgFieldIdx]) {
|
||||
filterBreak = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if filterBreak { // Stop importing cdrc fields profile due to non matching filter
|
||||
if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) { // partial records for flatstore CDRs
|
||||
if record, err = self.processPartialRecord(record, fn); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed processing partial record, row: %d, error: %s", procRowNr, err.Error()))
|
||||
continue
|
||||
} else if record == nil {
|
||||
continue
|
||||
}
|
||||
if storedCdr, err := self.recordToStoredCdr(record, idx); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Row %d - failed converting to StoredCdr, error: %s", procRowNr, err.Error()))
|
||||
continue
|
||||
} else {
|
||||
recordCdrs = append(recordCdrs, storedCdr)
|
||||
}
|
||||
// Record was overwriten with complete information out of cache
|
||||
}
|
||||
for _, storedCdr := range recordCdrs {
|
||||
if self.cdrsAddress == utils.INTERNAL {
|
||||
if err := self.cdrServer.ProcessCdr(storedCdr); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, row: %d, error: %s", procRowNr, err.Error()))
|
||||
continue
|
||||
}
|
||||
} else { // CDRs listening on IP
|
||||
if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cdr_post", self.cdrsAddress), storedCdr.AsHttpForm()); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, row: %d, error: %s", procRowNr, err.Error()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
if err := self.processRecord(record, procRowNr); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed processing CDR, row: %d, error: %s", procRowNr, err.Error()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
// Finished with file, move it to processed folder
|
||||
@@ -291,14 +326,85 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Processes a single partial record for flatstore CDRs, in case of failed calls it will emulate the BYE by copying the INVITE
|
||||
func (self *Cdrc) processPartialRecord(record []string, fileName string) ([]string, error) {
|
||||
pr, err := NewPartialFlatstoreRecord(record)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// ToDo: cache locking
|
||||
// ToDo: schedule dumping of the .unpaired files
|
||||
if fileMp, hasFile := self.partialRecords[fileName]; !hasFile {
|
||||
self.partialRecords[fileName] = map[string]*PartialFlatstoreRecord{pr.AccId: pr}
|
||||
return nil, nil
|
||||
} else if _, hasAccId := fileMp[pr.AccId]; !hasAccId {
|
||||
self.partialRecords[fileName][pr.AccId] = pr
|
||||
return nil, nil
|
||||
}
|
||||
// The paired is already in cache, build up the final record
|
||||
return pairToRecord(self.partialRecords[fileName][pr.AccId], pr)
|
||||
}
|
||||
|
||||
// Takes the record from a slice and turns it into StoredCdrs, posting them to the cdrServer
|
||||
func (self *Cdrc) processRecord(record []string, srcRowNr int) error {
|
||||
recordCdrs := make([]*engine.StoredCdr, 0) // More CDRs based on the number of filters and field templates
|
||||
for idx := range self.cdrFields { // cdrFields coming from more templates will produce individual storCdr records
|
||||
// Make sure filters are matching
|
||||
filterBreak := false
|
||||
for _, rsrFilter := range self.cdrFilters[idx] {
|
||||
if rsrFilter == nil { // Nil filter does not need to match anything
|
||||
continue
|
||||
}
|
||||
if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx {
|
||||
return fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter)
|
||||
} else if !rsrFilter.FilterPasses(record[cfgFieldIdx]) {
|
||||
filterBreak = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if filterBreak { // Stop importing cdrc fields profile due to non matching filter
|
||||
continue
|
||||
}
|
||||
if storedCdr, err := self.recordToStoredCdr(record, idx); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Row %d - failed converting to StoredCdr, error: %s", srcRowNr, err.Error()))
|
||||
continue
|
||||
} else {
|
||||
recordCdrs = append(recordCdrs, storedCdr)
|
||||
}
|
||||
}
|
||||
for _, storedCdr := range recordCdrs {
|
||||
if self.cdrsAddress == utils.INTERNAL {
|
||||
if err := self.cdrServer.ProcessCdr(storedCdr); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, row: %d, error: %s", srcRowNr, err.Error()))
|
||||
continue
|
||||
}
|
||||
} else { // CDRs listening on IP
|
||||
if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cdr_post", self.cdrsAddress), storedCdr.AsHttpForm()); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, row: %d, error: %s", srcRowNr, err.Error()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Takes the record out of csv and turns it into storedCdr which can be processed by CDRS
|
||||
func (self *Cdrc) recordToStoredCdr(record []string, cfgIdx int) (*engine.StoredCdr, error) {
|
||||
storedCdr := &engine.StoredCdr{CdrHost: "0.0.0.0", CdrSource: self.cdrSourceIds[cfgIdx], ExtraFields: make(map[string]string), Cost: -1}
|
||||
var err error
|
||||
var lazyHttpFields []*config.CfgCdrField
|
||||
for _, cdrFldCfg := range self.cdrFields[cfgIdx] {
|
||||
if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) { // Hardcode some values in case of flatstore
|
||||
switch cdrFldCfg.CdrFieldId {
|
||||
case utils.ACCID:
|
||||
cdrFldCfg.Value = utils.ParseRSRFieldsMustCompile("3;1;2", utils.INFIELD_SEP) // in case of flatstore, last element will be the duration computed by us
|
||||
case utils.USAGE:
|
||||
cdrFldCfg.Value = utils.ParseRSRFieldsMustCompile(strconv.Itoa(len(record)-1), utils.INFIELD_SEP) // in case of flatstore, last element will be the duration computed by us
|
||||
}
|
||||
|
||||
}
|
||||
var fieldVal string
|
||||
if utils.IsSliceMember([]string{CSV, FS_CSV}, self.CdrFormat) {
|
||||
if utils.IsSliceMember([]string{CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) {
|
||||
if cdrFldCfg.Type == utils.CDRFIELD {
|
||||
for _, cfgFieldRSR := range cdrFldCfg.Value {
|
||||
if cfgFieldRSR.IsStatic() {
|
||||
|
||||
@@ -19,6 +19,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package cdrc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/csv"
|
||||
"io"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -238,3 +241,197 @@ func TestDnTdmCdrs(t *testing.T) {
|
||||
|
||||
}
|
||||
*/
|
||||
|
||||
func TestNewPartialFlatstoreRecord(t *testing.T) {
|
||||
ePr := &PartialFlatstoreRecord{Method: "INVITE", AccId: "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:02daec40c548625ac", Timestamp: time.Date(2015, 7, 9, 17, 6, 48, 0, time.Local),
|
||||
Values: []string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454408", "*prepaid", "1001", "1002", "", "3401:2069362475"}}
|
||||
if pr, err := NewPartialFlatstoreRecord(ePr.Values); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(ePr, pr) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", ePr, pr)
|
||||
}
|
||||
if _, err := NewPartialFlatstoreRecord([]string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK"}); err == nil || err.Error() != "MISSING_IE" {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPairToRecord(t *testing.T) {
|
||||
eRecord := []string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454408", "*prepaid", "1001", "1002", "", "3401:2069362475", "2"}
|
||||
invPr := &PartialFlatstoreRecord{Method: "INVITE", Timestamp: time.Date(2015, 7, 9, 17, 6, 48, 0, time.Local),
|
||||
Values: []string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454408", "*prepaid", "1001", "1002", "", "3401:2069362475"}}
|
||||
byePr := &PartialFlatstoreRecord{Method: "BYE", Timestamp: time.Date(2015, 7, 9, 17, 6, 50, 0, time.Local),
|
||||
Values: []string{"BYE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454410", "", "", "", "", "3401:2069362475"}}
|
||||
if rec, err := pairToRecord(invPr, byePr); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eRecord, rec) {
|
||||
t.Errorf("Expected: %+v, received: %+v", eRecord, rec)
|
||||
}
|
||||
if rec, err := pairToRecord(byePr, invPr); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eRecord, rec) {
|
||||
t.Errorf("Expected: %+v, received: %+v", eRecord, rec)
|
||||
}
|
||||
if _, err := pairToRecord(byePr, byePr); err == nil || err.Error() != "MISSING_INVITE" {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, err := pairToRecord(invPr, invPr); err == nil || err.Error() != "MISSING_BYE" {
|
||||
t.Error(err)
|
||||
}
|
||||
byePr.Values = []string{"BYE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454410", "", "", "", "3401:2069362475"} // Took one value out
|
||||
if _, err := pairToRecord(invPr, byePr); err == nil || err.Error() != "INCONSISTENT_VALUES_LENGTH" {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOsipsFlatstoreCdrs(t *testing.T) {
|
||||
osipsCdrs := `
|
||||
INVITE|2daec40c|548625ac|dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0|200|OK|1436454408|*prepaid|1001|1002||3401:2069362475
|
||||
BYE|2daec40c|548625ac|dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0|200|OK|1436454410|||||3401:2069362475
|
||||
INVITE|f9d3d5c3|c863a6e3|214d8f52b566e33a9349b184e72a4cca@0:0:0:0:0:0:0:0|200|OK|1436454647|*postpaid|1002|1001||1877:893549741
|
||||
BYE|f9d3d5c3|c863a6e3|214d8f52b566e33a9349b184e72a4cca@0:0:0:0:0:0:0:0|200|OK|1436454651|||||1877:893549741
|
||||
INVITE|36e39a5|42d996f9|3a63321dd3b325eec688dc2aefb6ac2d@0:0:0:0:0:0:0:0|200|OK|1436454657|*prepaid|1001|1002||2407:1884881533
|
||||
BYE|36e39a5|42d996f9|3a63321dd3b325eec688dc2aefb6ac2d@0:0:0:0:0:0:0:0|200|OK|1436454661|||||2407:1884881533
|
||||
INVITE|3111f3c9|49ca4c42|a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:0|200|OK|1436454690|*prepaid|1001|1002||3099:1909036290
|
||||
BYE|3111f3c9|49ca4c42|a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:0|200|OK|1436454692|||||3099:1909036290
|
||||
`
|
||||
/*
|
||||
eCdrs := []*engine.StoredCdr{
|
||||
&engine.StoredCdr{
|
||||
CgrId: utils.Sha1("dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:02daec40c548625ac", time.Date(2014, 7, 2, 15, 24, 40, 0, time.UTC).String()),
|
||||
TOR: utils.VOICE,
|
||||
AccId: "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:02daec40c548625ac",
|
||||
CdrHost: "0.0.0.0",
|
||||
CdrSource: "TEST_CDRC",
|
||||
ReqType: utils.META_PREPAID,
|
||||
Direction: "*out",
|
||||
Tenant: "sip.test.deanconnect.nl",
|
||||
Category: "call",
|
||||
Account: "1001",
|
||||
Subject: "1001",
|
||||
Destination: "1002",
|
||||
SetupTime: time.Date(2014, 7, 2, 15, 24, 40, 0, time.UTC),
|
||||
AnswerTime: time.Date(2014, 7, 2, 15, 24, 40, 0, time.UTC),
|
||||
Usage: time.Duration(25) * time.Second,
|
||||
Cost: -1,
|
||||
},
|
||||
&engine.StoredCdr{
|
||||
CgrId: utils.Sha1("214d8f52b566e33a9349b184e72a4cca@0:0:0:0:0:0:0:0f9d3d5c3c863a6e3", time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC).String()),
|
||||
TOR: utils.VOICE,
|
||||
AccId: "214d8f52b566e33a9349b184e72a4cca@0:0:0:0:0:0:0:0f9d3d5c3c863a6e3",
|
||||
CdrHost: "0.0.0.0",
|
||||
CdrSource: "TEST_CDRC",
|
||||
ReqType: utils.META_PREPAID,
|
||||
Direction: "*out",
|
||||
Tenant: "sip.test.deanconnect.nl",
|
||||
Category: "call",
|
||||
Account: "1002",
|
||||
Subject: "1002",
|
||||
Destination: "1001",
|
||||
SetupTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC),
|
||||
AnswerTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC),
|
||||
Usage: time.Duration(8) * time.Second,
|
||||
Cost: -1,
|
||||
},
|
||||
&engine.StoredCdr{
|
||||
CgrId: utils.Sha1("3a63321dd3b325eec688dc2aefb6ac2d@0:0:0:0:0:0:0:036e39a542d996f9", time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC).String()),
|
||||
TOR: utils.VOICE,
|
||||
AccId: "3a63321dd3b325eec688dc2aefb6ac2d@0:0:0:0:0:0:0:036e39a542d996f9",
|
||||
CdrHost: "0.0.0.0",
|
||||
CdrSource: "TEST_CDRC",
|
||||
ReqType: utils.META_PREPAID,
|
||||
Direction: "*out",
|
||||
Tenant: "sip.test.deanconnect.nl",
|
||||
Category: "call",
|
||||
Account: "1001",
|
||||
Subject: "1001",
|
||||
Destination: "1002",
|
||||
SetupTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC),
|
||||
AnswerTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC),
|
||||
Usage: time.Duration(8) * time.Second,
|
||||
Cost: -1,
|
||||
},
|
||||
&engine.StoredCdr{
|
||||
CgrId: utils.Sha1("a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:03111f3c949ca4c42", time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC).String()),
|
||||
TOR: utils.VOICE,
|
||||
AccId: "a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:03111f3c949ca4c42",
|
||||
CdrHost: "0.0.0.0",
|
||||
CdrSource: "TEST_CDRC",
|
||||
ReqType: utils.META_PREPAID,
|
||||
Direction: "*out",
|
||||
Tenant: "sip.test.deanconnect.nl",
|
||||
Category: "call",
|
||||
Account: "1001",
|
||||
Subject: "1001",
|
||||
Destination: "1002",
|
||||
SetupTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC),
|
||||
AnswerTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC),
|
||||
Usage: time.Duration(8) * time.Second,
|
||||
Cost: -1,
|
||||
},
|
||||
}
|
||||
*/
|
||||
//cgrConfig, _ := config.NewDefaultCGRConfig()
|
||||
cdrFields := [][]*config.CfgCdrField{[]*config.CfgCdrField{
|
||||
&config.CfgCdrField{Tag: "Tor", Type: utils.CDRFIELD, CdrFieldId: utils.TOR, Value: utils.ParseRSRFieldsMustCompile("^*voice", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "AccId", Type: utils.CDRFIELD, CdrFieldId: utils.ACCID, Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "ReqType", Type: utils.CDRFIELD, CdrFieldId: utils.REQTYPE, Value: utils.ParseRSRFieldsMustCompile("7", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "Direction", Type: utils.CDRFIELD, CdrFieldId: utils.DIRECTION, Value: utils.ParseRSRFieldsMustCompile("^*out", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "Direction", Type: utils.CDRFIELD, CdrFieldId: utils.DIRECTION, Value: utils.ParseRSRFieldsMustCompile("^*out", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "Tenant", Type: utils.CDRFIELD, CdrFieldId: utils.TENANT, Value: utils.ParseRSRFieldsMustCompile("^cgrates.org", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "Category", Type: utils.CDRFIELD, CdrFieldId: utils.CATEGORY, Value: utils.ParseRSRFieldsMustCompile("^call", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "Account", Type: utils.CDRFIELD, CdrFieldId: utils.ACCOUNT, Value: utils.ParseRSRFieldsMustCompile("8", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "Subject", Type: utils.CDRFIELD, CdrFieldId: utils.SUBJECT, Value: utils.ParseRSRFieldsMustCompile("8", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "Destination", Type: utils.CDRFIELD, CdrFieldId: utils.DESTINATION, Value: utils.ParseRSRFieldsMustCompile("9", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "SetupTime", Type: utils.CDRFIELD, CdrFieldId: utils.SETUP_TIME, Value: utils.ParseRSRFieldsMustCompile("6", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "AnswerTime", Type: utils.CDRFIELD, CdrFieldId: utils.ANSWER_TIME, Value: utils.ParseRSRFieldsMustCompile("6", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "Duration", Type: utils.CDRFIELD, CdrFieldId: utils.ANSWER_TIME, Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "DialogId", Type: utils.CDRFIELD, CdrFieldId: "DialogIdentifier", Value: utils.ParseRSRFieldsMustCompile("11", utils.INFIELD_SEP)},
|
||||
}}
|
||||
cdrc := &Cdrc{CdrFormat: utils.OSIPS_FLATSTORE, cdrSourceIds: []string{"TEST_CDRC"}, cdrFields: cdrFields, partialRecords: make(map[string]map[string]*PartialFlatstoreRecord)}
|
||||
cdrsContent := bytes.NewReader([]byte(osipsCdrs))
|
||||
csvReader := csv.NewReader(cdrsContent)
|
||||
csvReader.Comma = '|'
|
||||
cdrs := make([]*engine.StoredCdr, 0)
|
||||
recNrs := 0
|
||||
for {
|
||||
recNrs++
|
||||
cdrCsv, err := csvReader.Read()
|
||||
if err != nil && err == io.EOF {
|
||||
break // End of file
|
||||
} else if err != nil {
|
||||
t.Error("Unexpected error:", err)
|
||||
}
|
||||
record, err := cdrc.processPartialRecord(cdrCsv, "dummyfilename")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if record == nil {
|
||||
continue // Partial record
|
||||
}
|
||||
if storedCdr, err := cdrc.recordToStoredCdr(record, 0); err != nil {
|
||||
t.Error(err)
|
||||
} else if storedCdr != nil {
|
||||
cdrs = append(cdrs, storedCdr)
|
||||
}
|
||||
}
|
||||
/*
|
||||
if !reflect.DeepEqual(eCdrs, cdrs) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eCdrs, cdrs)
|
||||
}
|
||||
*/
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
func TestOsipsFlatstoreMissedCdrs(t *testing.T) {
|
||||
osipsCdrs := `
|
||||
INVITE|ef6c6256|da501581|0bfdd176d1b93e7df3de5c6f4873ee04@0:0:0:0:0:0:0:0|487|Request Terminated|1436454643|*prepaid|1001|1002||1224:339382783
|
||||
INVITE|7905e511||81880da80a94bda81b425b09009e055c@0:0:0:0:0:0:0:0|404|Not Found|1436454668|*prepaid|1001|1002||1980:1216490844
|
||||
INVITE|25f7def5||9984b9ec535a7d317b542744d48d0ed6@0:0:0:0:0:0:0:0|404|Not Found|1436454669|*prepaid|1001|1002||14:1595110662
|
||||
INVITE|ae0a7f6c||02f7fa9334db7aa4130bbf7627370621@0:0:0:0:0:0:0:0|404|Not Found|1436454670|*prepaid|1001|1002||176:1975670970
|
||||
INVITE|32b97104||3154c2a80294f538991a88d86f4e1085@0:0:0:0:0:0:0:0|404|Not Found|1436454678|*prepaid|1001|1002||2607:1024337552
|
||||
INVITE|324cb497|d4af7023|8deaadf2ae9a17809a391f05af31afb0@0:0:0:0:0:0:0:0|486|Busy here|1436454687|*postpaid|1002|1002||474:130115066
|
||||
INVITE|167ac4db|c53c85e5|4b3885cb78dde44dc7936abd2fa281e1@0:0:0:0:0:0:0:0|487|Request Terminated|1436454695|*postpaid|1002|1002||1922:549002535
|
||||
`
|
||||
}
|
||||
*/
|
||||
|
||||
@@ -27,15 +27,17 @@ import (
|
||||
type CdrcConfig struct {
|
||||
Enabled bool // Enable/Disable the profile
|
||||
Cdrs string // The address where CDRs can be reached
|
||||
CdrFormat string // The type of CDR file to process <csv>
|
||||
CdrFormat string // The type of CDR file to process <csv|opensips_flatstore>
|
||||
FieldSeparator rune // The separator to use when reading csvs
|
||||
DataUsageMultiplyFactor float64 // Conversion factor for data usage
|
||||
RunDelay time.Duration // Delay between runs, 0 for inotify driven requests
|
||||
MaxOpenFiles int // Maximum number of files opened simultaneously
|
||||
CdrInDir string // Folder to process CDRs from
|
||||
CdrOutDir string // Folder to move processed CDRs to
|
||||
FailedCallsPrefix string // Used in case of flatstore CDRs to avoid searching for BYE records
|
||||
CdrSourceId string // Source identifier for the processed CDRs
|
||||
CdrFilter utils.RSRFields // Filter CDR records to import
|
||||
PartialRecordCache time.Duration // Duration to cache partial records when not pairing
|
||||
CdrFields []*CfgCdrField // List of fields to be processed
|
||||
}
|
||||
|
||||
@@ -72,6 +74,9 @@ func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error {
|
||||
if jsnCfg.Cdr_out_dir != nil {
|
||||
self.CdrOutDir = *jsnCfg.Cdr_out_dir
|
||||
}
|
||||
if jsnCfg.Failed_calls_prefix != nil {
|
||||
self.FailedCallsPrefix = *jsnCfg.Failed_calls_prefix
|
||||
}
|
||||
if jsnCfg.Cdr_source_id != nil {
|
||||
self.CdrSourceId = *jsnCfg.Cdr_source_id
|
||||
}
|
||||
|
||||
@@ -152,15 +152,17 @@ const CGRATES_CFG_JSON = `
|
||||
"*default": {
|
||||
"enabled": false, // enable CDR client functionality
|
||||
"cdrs": "internal", // address where to reach CDR server. <internal|x.y.z.y:1234>
|
||||
"cdr_format": "csv", // CDR file format <csv|freeswitch_csv|fwv>
|
||||
"cdr_format": "csv", // CDR file format <csv|freeswitch_csv|fwv|opensips_flatstore>
|
||||
"field_separator": ",", // separator used in case of csv files
|
||||
"run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify
|
||||
"max_open_files": 1024, // maximum simultaneous files to process
|
||||
"data_usage_multiply_factor": 1024, // conversion factor for data usage
|
||||
"cdr_in_dir": "/var/log/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_dir": "/var/log/cgrates/cdrc/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"failed_calls_prefix": "missed_calls", // used in case of flatstore CDRs to avoid searching for BYE records
|
||||
"cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
"cdr_filter": "", // filter CDR records to import
|
||||
"partial_record_cache": "10s", // duration to cache partial records when not pairing
|
||||
"cdr_fields":[ // import template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "tor", "cdr_field_id": "tor", "type": "cdrfield", "value": "2", "mandatory": true},
|
||||
{"tag": "accid", "cdr_field_id": "accid", "type": "cdrfield", "value": "3", "mandatory": true},
|
||||
|
||||
@@ -294,6 +294,7 @@ func TestDfCdrcJsonCfg(t *testing.T) {
|
||||
Data_usage_multiply_factor: utils.Float64Pointer(1024.0),
|
||||
Cdr_in_dir: utils.StringPointer("/var/log/cgrates/cdrc/in"),
|
||||
Cdr_out_dir: utils.StringPointer("/var/log/cgrates/cdrc/out"),
|
||||
Failed_calls_prefix: utils.StringPointer("missed_calls"),
|
||||
Cdr_source_id: utils.StringPointer("freeswitch_csv"),
|
||||
Cdr_filter: utils.StringPointer(""),
|
||||
Cdr_fields: &cdrFields,
|
||||
|
||||
@@ -44,6 +44,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
|
||||
MaxOpenFiles: 1024,
|
||||
CdrInDir: "/var/log/cgrates/cdrc/in",
|
||||
CdrOutDir: "/var/log/cgrates/cdrc/out",
|
||||
FailedCallsPrefix: "missed_calls",
|
||||
CdrSourceId: "freeswitch_csv",
|
||||
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
|
||||
CdrFields: []*CfgCdrField{
|
||||
|
||||
@@ -132,9 +132,11 @@ type CdrcJsonCfg struct {
|
||||
Data_usage_multiply_factor *float64
|
||||
Cdr_in_dir *string
|
||||
Cdr_out_dir *string
|
||||
Failed_calls_prefix *string
|
||||
Cdr_source_id *string
|
||||
Cdr_filter *string
|
||||
Max_open_files *int
|
||||
PartialRecordCache *string
|
||||
Cdr_fields *[]*CdrFieldJsonCfg
|
||||
}
|
||||
|
||||
|
||||
@@ -37,8 +37,7 @@ modparam("tm", "onreply_avp_mode", 1)
|
||||
|
||||
#### Record Route Module
|
||||
loadmodule "rr.so"
|
||||
/* do not append from tag to the RR (no need for this script) */
|
||||
modparam("rr", "append_fromtag", 0)
|
||||
|
||||
|
||||
#### MAX ForWarD module
|
||||
loadmodule "maxfwd.so"
|
||||
@@ -83,17 +82,31 @@ modparam("dialog", "dlg_match_mode", 1)
|
||||
modparam("dialog", "default_timeout", 21600) # 6 hours timeout
|
||||
modparam("dialog", "db_mode", 0)
|
||||
|
||||
#### DbFlatstore module
|
||||
loadmodule "db_flatstore.so"
|
||||
modparam("db_flatstore", "single_file", 1)
|
||||
|
||||
|
||||
#### ACCounting module
|
||||
loadmodule "acc.so"
|
||||
modparam("acc", "detect_direction", 1)
|
||||
#modparam("acc", "cdr_flag", "CDR")
|
||||
modparam("acc", "evi_flag", "CDR")
|
||||
modparam("acc", "evi_missed_flag", "CDR")
|
||||
#modparam("acc", "evi_flag", "CDR")
|
||||
#modparam("acc", "evi_missed_flag", "CDR")
|
||||
modparam("acc", "evi_extra", "cgr_reqtype=$avp(cgr_reqtype);
|
||||
cgr_account=$avp(cgr_account);
|
||||
cgr_destination=$avp(cgr_destination);
|
||||
cgr_supplier=$avp(cgr_supplier);
|
||||
dialog_id=$DLG_did")
|
||||
modparam("acc", "db_url", "flatstore:/tmp")
|
||||
modparam("acc", "db_flag", "CDR")
|
||||
modparam("acc", "db_missed_flag", "CDR")
|
||||
modparam("acc", "db_table_missed_calls", "cgr_missed")
|
||||
modparam("acc", "db_extra", "cgr_reqtype=$avp(cgr_reqtype);
|
||||
cgr_account=$avp(cgr_account);
|
||||
cgr_destination=$avp(cgr_destination);
|
||||
cgr_supplier=$avp(cgr_supplier);
|
||||
dialog_id=$DLG_did")
|
||||
|
||||
#### CfgUtils module
|
||||
loadmodule "cfgutils.so"
|
||||
@@ -205,9 +218,6 @@ route{
|
||||
# during the dialog.
|
||||
record_route();
|
||||
}
|
||||
|
||||
|
||||
|
||||
# route it out to whatever destination was set by loose_route()
|
||||
# in $du (destination URI).
|
||||
route(relay);
|
||||
@@ -300,7 +310,6 @@ route{
|
||||
}
|
||||
|
||||
if (!uri==myself) {
|
||||
append_hf("P-hint: outbound\r\n");
|
||||
route(relay);
|
||||
}
|
||||
|
||||
@@ -349,6 +358,9 @@ route[location] {
|
||||
t_reply("404", "Not Found");
|
||||
exit;
|
||||
}
|
||||
append_branch();
|
||||
append_branch();
|
||||
setflag(CDR);
|
||||
}
|
||||
|
||||
failure_route[missed_call] {
|
||||
|
||||
@@ -43,7 +43,7 @@ func TestTutOsipsCallsInitCfg(t *testing.T) {
|
||||
}
|
||||
// Init config first
|
||||
var err error
|
||||
tutOsipsCallsCfg, err = config.NewCGRConfigFromFolder(path.Join(*dataDir, "tutorials", "kamevapi", "cgrates", "etc", "cgrates"))
|
||||
tutOsipsCallsCfg, err = config.NewCGRConfigFromFolder(path.Join(*dataDir, "tutorials", "osips_async", "cgrates", "etc", "cgrates"))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
@@ -71,6 +71,7 @@ func TestTutOsipsCallsResetStorDb(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
// start Kam server
|
||||
func TestTutOsipsCallsStartOsips(t *testing.T) {
|
||||
if !*testCalls {
|
||||
@@ -81,6 +82,7 @@ func TestTutOsipsCallsStartOsips(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// Start CGR Engine
|
||||
func TestTutOsipsCallsStartEngine(t *testing.T) {
|
||||
@@ -446,9 +448,11 @@ func TestTutOsipsCallsStopCgrEngine(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
func TestTutOsipsCallsStopOpensips(t *testing.T) {
|
||||
if !*testCalls {
|
||||
return
|
||||
}
|
||||
engine.KillProcName("opensips", 100)
|
||||
}
|
||||
*/
|
||||
|
||||
@@ -208,6 +208,8 @@ const (
|
||||
CGR_DISCONNECT_CAUSE = "cgr_disconnectcause"
|
||||
CGR_COMPUTELCR = "cgr_computelcr"
|
||||
CGR_SUPPLIERS = "cgr_suppliers"
|
||||
KAM_FLATSTORE = "kamailio_flatstore"
|
||||
OSIPS_FLATSTORE = "opensips_flatstore"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
Reference in New Issue
Block a user