xmlcdrc with setDefaults(), modifying default order of imported and exported cdrs

This commit is contained in:
DanB
2014-05-23 21:44:33 +02:00
parent 5fe65f203c
commit 9bf5f7611b
16 changed files with 295 additions and 123 deletions

View File

@@ -29,6 +29,7 @@ import (
"path"
"strconv"
"time"
"unicode/utf8"
"github.com/cgrates/cgrates/cdrs"
"github.com/cgrates/cgrates/engine"
@@ -41,13 +42,17 @@ const (
FS_CSV = "freeswitch_csv"
)
func NewCdrc(cdrsAddress, cdrType, cdrInDir, cdrOutDir, cdrSourceId string, runDelay time.Duration, cdrFields map[string]*utils.RSRField, cdrServer *cdrs.CDRS) (*Cdrc, error) {
func NewCdrc(cdrsAddress, cdrType, cdrInDir, cdrOutDir, cdrSourceId string, runDelay time.Duration, csvSep string, cdrFields map[string]*utils.RSRField, cdrServer *cdrs.CDRS) (*Cdrc, error) {
if len(csvSep) != 1 {
return nil, fmt.Errorf("Unsupported csv separator: %s", csvSep)
}
csvSepRune, _ := utf8.DecodeRune([]byte(csvSep))
cdrc := &Cdrc{cdrsAddress: cdrsAddress, cdrType: cdrType, cdrInDir: cdrInDir, cdrOutDir: cdrOutDir,
cdrSourceId: cdrSourceId, runDelay: runDelay, cdrFields: cdrFields, cdrServer: cdrServer}
cdrSourceId: cdrSourceId, runDelay: runDelay, csvSep: csvSepRune, cdrFields: cdrFields, cdrServer: cdrServer}
// Before processing, make sure in and out folders exist
for _, dir := range []string{cdrc.cdrInDir, cdrc.cdrOutDir} {
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
return nil, fmt.Errorf("Inexistent folder: %s", dir)
return nil, fmt.Errorf("Nonexistent folder: %s", dir)
}
}
cdrc.httpClient = new(http.Client)
@@ -61,6 +66,7 @@ type Cdrc struct {
cdrOutDir,
cdrSourceId string
runDelay time.Duration
csvSep rune
cdrFields map[string]*utils.RSRField
cdrServer *cdrs.CDRS // Reference towards internal cdrServer if that is the case
httpClient *http.Client
@@ -182,6 +188,7 @@ func (self *Cdrc) processFile(filePath string) error {
return err
}
csvReader := csv.NewReader(bufio.NewReader(file))
csvReader.Comma = self.csvSep
for {
record, err := csvReader.Read()
if err != nil && err == io.EOF {

View File

@@ -70,6 +70,11 @@ accid22,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:0
#accid1,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1
accid23,prepaid,out,cgrates.org,call,1001,1001,+4986517174963,2013-02-03 19:54:00,62,supplier1,172.16.1.1`
var fileContent3 = `accid31;prepaid;out;cgrates.org;call;1001;1001;+4986517174963;2013-02-03 19:54:00;62;supplier1;172.16.1.1
accid32;prepaid;out;cgrates.org;call;1001;1001;+4986517174963;2013-02-03 19:54:00;62;supplier1;172.16.1.1
#accid1;prepaid;out;cgrates.org;call;1001;1001;+4986517174963;2013-02-03 19:54:00;62;supplier1;172.16.1.1
accid33;prepaid;out;cgrates.org;call;1001;1001;+4986517174963;2013-02-03 19:54:00;62;supplier1;172.16.1.1`
func startEngine() error {
enginePath, err := exec.LookPath("cgr-engine")
if err != nil {
@@ -126,6 +131,12 @@ func TestCreateCdrFiles(t *testing.T) {
if err := os.MkdirAll(cfg.CdrcCdrInDir, 0755); err != nil {
t.Fatal("Error creating folder: ", cfg.CdrcCdrInDir, err)
}
if err := os.RemoveAll(cfg.CdrcCdrOutDir); err != nil {
t.Fatal("Error removing folder: ", cfg.CdrcCdrInDir, err)
}
if err := os.MkdirAll(cfg.CdrcCdrOutDir, 0755); err != nil {
t.Fatal("Error creating folder: ", cfg.CdrcCdrOutDir, err)
}
if err := ioutil.WriteFile(path.Join(cfg.CdrcCdrInDir, "file1.csv"), []byte(fileContent1), 0644); err != nil {
t.Fatal(err.Error)
}
@@ -144,7 +155,44 @@ func TestProcessCdrDir(t *testing.T) {
if err := startEngine(); err != nil {
t.Fatal(err.Error())
}
cdrc, err := NewCdrc(cfg.CdrcCdrs, cfg.CdrcCdrType, cfg.CdrcCdrInDir, cfg.CdrcCdrOutDir, cfg.CdrcSourceId, cfg.CdrcRunDelay,
cdrc, err := NewCdrc(cfg.CdrcCdrs, cfg.CdrcCdrType, cfg.CdrcCdrInDir, cfg.CdrcCdrOutDir, cfg.CdrcSourceId, cfg.CdrcRunDelay, cfg.CdrcCsvSep,
cfg.CdrcCdrFields, nil)
if err != nil {
t.Fatal(err.Error())
}
if err := cdrc.processCdrDir(); err != nil {
t.Error(err)
}
stopEngine()
}
// Creates cdr files and starts the engine
func TestCreateCdr3File(t *testing.T) {
if !*testLocal {
return
}
if err := os.RemoveAll(cfg.CdrcCdrInDir); err != nil {
t.Fatal("Error removing folder: ", cfg.CdrcCdrInDir, err)
}
if err := os.MkdirAll(cfg.CdrcCdrInDir, 0755); err != nil {
t.Fatal("Error creating folder: ", cfg.CdrcCdrInDir, err)
}
if err := ioutil.WriteFile(path.Join(cfg.CdrcCdrInDir, "file3.csv"), []byte(fileContent3), 0644); err != nil {
t.Fatal(err.Error)
}
}
func TestProcessCdr3Dir(t *testing.T) {
if !*testLocal {
return
}
if cfg.CdrcCdrs == utils.INTERNAL { // For now we only test over network
return
}
if err := startEngine(); err != nil {
t.Fatal(err.Error())
}
cdrc, err := NewCdrc(cfg.CdrcCdrs, cfg.CdrcCdrType, cfg.CdrcCdrInDir, cfg.CdrcCdrOutDir, cfg.CdrcSourceId, cfg.CdrcRunDelay, ";",
cfg.CdrcCdrFields, nil)
if err != nil {
t.Fatal(err.Error())

View File

@@ -25,12 +25,14 @@ import (
"reflect"
"testing"
"time"
"unicode/utf8"
)
func TestRecordForkCdr(t *testing.T) {
cgrConfig, _ := config.NewDefaultCGRConfig()
cgrConfig.CdrcCdrFields["supplier"] = &utils.RSRField{Id: "11"}
cdrc := &Cdrc{cgrConfig.CdrcCdrs, cgrConfig.CdrcCdrType, cgrConfig.CdrcCdrInDir, cgrConfig.CdrcCdrOutDir, cgrConfig.CdrcSourceId, cgrConfig.CdrcRunDelay,
csvSepRune, _ := utf8.DecodeRune([]byte(cgrConfig.CdrcCsvSep))
cdrc := &Cdrc{cgrConfig.CdrcCdrs, cgrConfig.CdrcCdrType, cgrConfig.CdrcCdrInDir, cgrConfig.CdrcCdrOutDir, cgrConfig.CdrcSourceId, cgrConfig.CdrcRunDelay, csvSepRune,
cgrConfig.CdrcCdrFields, new(cdrs.CDRS), nil}
cdrRow := []string{"firstField", "secondField"}
_, err := cdrc.recordForkCdr(cdrRow)
@@ -64,15 +66,4 @@ func TestRecordForkCdr(t *testing.T) {
if !reflect.DeepEqual(expectedCdr, rtCdr) {
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
}
/*
if cdrAsForm.Get(utils.CDRSOURCE) != cgrConfig.CdrcSourceId {
t.Error("Unexpected cdrsource received", cdrAsForm.Get(utils.CDRSOURCE))
}
if cdrAsForm.Get(utils.REQTYPE) != "prepaid" {
t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE))
}
if cdrAsForm.Get("supplier") != "supplier1" {
t.Error("Unexpected CDR value received", cdrAsForm.Get("supplier"))
}
*/
}

View File

@@ -40,7 +40,7 @@ func TestCsvCdrWriter(t *testing.T) {
}
csvCdrWriter.WriteCdr(ratedCdr)
csvCdrWriter.Close()
expected := `dbafe9c8614c785a65aabd116dd3959c3c56f7f6,default,*voice,dsafdsaf,192.168.1.1,rated,*out,cgrates.org,call,1001,1001,1002,2013-11-07 08:42:25 +0000 UTC,2013-11-07 08:42:26 +0000 UTC,10000000000,1.0100,val_extra3,"",val_extra1`
expected := `dbafe9c8614c785a65aabd116dd3959c3c56f7f6,default,*voice,dsafdsaf,rated,*out,cgrates.org,call,1001,1001,1002,2013-11-07 08:42:25 +0000 UTC,2013-11-07 08:42:26 +0000 UTC,10000000000,1.0100,val_extra3,"",val_extra1`
result := strings.TrimSpace(writer.String())
if result != expected {
t.Errorf("Expected: \n%s received: \n%s.", expected, result)

View File

@@ -124,11 +124,11 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD
}
// Fires up a cdrc instance
func startCdrc(cdrsChan chan struct{}, cdrsAddress, cdrType, cdrInDir, cdrOutDir, cdrSourceId string, runDelay time.Duration, cdrFields map[string]*utils.RSRField) {
func startCdrc(cdrsChan chan struct{}, cdrsAddress, cdrType, cdrInDir, cdrOutDir, cdrSourceId string, runDelay time.Duration, csvSep string, cdrFields map[string]*utils.RSRField) {
if cdrsAddress == utils.INTERNAL {
<-cdrsChan // Wait for CDRServer to come up before start processing
}
cdrc, err := cdrc.NewCdrc(cdrsAddress, cdrType, cdrInDir, cdrOutDir, cdrSourceId, runDelay, cdrFields, cdrServer)
cdrc, err := cdrc.NewCdrc(cdrsAddress, cdrType, cdrInDir, cdrOutDir, cdrSourceId, runDelay, csvSep, cdrFields, cdrServer)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("Cdrc config parsing error: %s", err.Error()))
exitChan <- true
@@ -456,7 +456,7 @@ func main() {
var cdrcEnabled bool
if cfg.CdrcEnabled { // Start default cdrc configured in csv here
cdrcEnabled = true
go startCdrc(cdrsChan, cfg.CdrcCdrs, cfg.CdrcCdrType, cfg.CdrcCdrInDir, cfg.CdrcCdrOutDir, cfg.CdrcSourceId, cfg.CdrcRunDelay, cfg.CdrcCdrFields)
go startCdrc(cdrsChan, cfg.CdrcCdrs, cfg.CdrcCdrType, cfg.CdrcCdrInDir, cfg.CdrcCdrOutDir, cfg.CdrcSourceId, cfg.CdrcRunDelay, cfg.CdrcCsvSep, cfg.CdrcCdrFields)
}
if cfg.XmlCfgDocument != nil {
for _, xmlCdrc := range cfg.XmlCfgDocument.GetCdrcCfgs("") {
@@ -464,8 +464,8 @@ func main() {
continue
}
cdrcEnabled = true
go startCdrc(cdrsChan, xmlCdrc.CdrsAddress, xmlCdrc.CdrType, xmlCdrc.CdrInDir,
xmlCdrc.CdrOutDir, xmlCdrc.CdrSourceId, time.Duration(xmlCdrc.RunDelay), xmlCdrc.CdrRSRFields())
go startCdrc(cdrsChan, xmlCdrc.CdrsAddress, xmlCdrc.CdrType, xmlCdrc.CdrInDir, xmlCdrc.CdrOutDir,
xmlCdrc.CdrSourceId, time.Duration(xmlCdrc.RunDelay), xmlCdrc.CsvSeparator, xmlCdrc.CdrRSRFields())
}
}
if cdrcEnabled {

View File

@@ -101,6 +101,7 @@ type CGRConfig struct {
CdrcCdrs string // Address where to reach CDR server
CdrcRunDelay time.Duration // Sleep interval between consecutive runs, 0 to use automation via inotify
CdrcCdrType string // CDR file format <csv>.
CdrcCsvSep string // Separator used in case of csv files. One character only supported.
CdrcCdrInDir string // Absolute path towards the directory where the CDRs are stored.
CdrcCdrOutDir string // Absolute path towards the directory where processed CDRs will be moved.
CdrcSourceId string // Tag identifying the source of the CDRs within CGRS database.
@@ -175,21 +176,23 @@ func (self *CGRConfig) setDefaults() error {
self.CdrcCdrs = utils.INTERNAL
self.CdrcRunDelay = time.Duration(0)
self.CdrcCdrType = utils.CSV
self.CdrcCsvSep = string(utils.CSV_SEP)
self.CdrcCdrInDir = "/var/log/cgrates/cdrc/in"
self.CdrcCdrOutDir = "/var/log/cgrates/cdrc/out"
self.CdrcSourceId = "freeswitch_csv"
self.CdrcCdrFields = map[string]*utils.RSRField{
utils.ACCID: &utils.RSRField{Id: "0"},
utils.REQTYPE: &utils.RSRField{Id: "1"},
utils.DIRECTION: &utils.RSRField{Id: "2"},
utils.TENANT: &utils.RSRField{Id: "3"},
utils.CATEGORY: &utils.RSRField{Id: "4"},
utils.ACCOUNT: &utils.RSRField{Id: "5"},
utils.SUBJECT: &utils.RSRField{Id: "6"},
utils.DESTINATION: &utils.RSRField{Id: "7"},
utils.SETUP_TIME: &utils.RSRField{Id: "8"},
utils.ANSWER_TIME: &utils.RSRField{Id: "9"},
utils.USAGE: &utils.RSRField{Id: "10"},
utils.TOR: &utils.RSRField{Id: "2"},
utils.ACCID: &utils.RSRField{Id: "3"},
utils.REQTYPE: &utils.RSRField{Id: "4"},
utils.DIRECTION: &utils.RSRField{Id: "5"},
utils.TENANT: &utils.RSRField{Id: "6"},
utils.CATEGORY: &utils.RSRField{Id: "7"},
utils.ACCOUNT: &utils.RSRField{Id: "8"},
utils.SUBJECT: &utils.RSRField{Id: "9"},
utils.DESTINATION: &utils.RSRField{Id: "10"},
utils.SETUP_TIME: &utils.RSRField{Id: "11"},
utils.ANSWER_TIME: &utils.RSRField{Id: "12"},
utils.USAGE: &utils.RSRField{Id: "13"},
}
self.MediatorEnabled = false
self.MediatorRater = "internal"
@@ -219,7 +222,6 @@ func (self *CGRConfig) setDefaults() error {
&utils.RSRField{Id: utils.MEDI_RUNID},
&utils.RSRField{Id: utils.TOR},
&utils.RSRField{Id: utils.ACCID},
&utils.RSRField{Id: utils.CDRHOST},
&utils.RSRField{Id: utils.REQTYPE},
&utils.RSRField{Id: utils.DIRECTION},
&utils.RSRField{Id: utils.TENANT},
@@ -244,10 +246,16 @@ func (self *CGRConfig) checkConfigSanity() error {
return errors.New("Need XmlTemplate for fixed_width cdr export")
}
}
if self.CdrcCdrType == utils.CSV {
for _, rsrFld := range self.CdrcCdrFields {
if _, errConv := strconv.Atoi(rsrFld.Id); errConv != nil {
return fmt.Errorf("CDR fields must be indices in case of .csv files, have instead: %s", rsrFld.Id)
if self.CdrcEnabled {
if len(self.CdrcCdrFields) == 0 {
return errors.New("CdrC enabled but no fields to be processed defined!")
}
if self.CdrcCdrType == utils.CSV {
for _, rsrFld := range self.CdrcCdrFields {
if _, errConv := strconv.Atoi(rsrFld.Id); errConv != nil {
fmt.Println("5")
return fmt.Errorf("CDR fields must be indices in case of .csv files, have instead: %s", rsrFld.Id)
}
}
}
}
@@ -461,6 +469,9 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
if hasOpt = c.HasOption("cdrc", "cdr_type"); hasOpt {
cfg.CdrcCdrType, _ = c.GetString("cdrc", "cdr_type")
}
if hasOpt = c.HasOption("cdrc", "csv_separator"); hasOpt {
cfg.CdrcCsvSep, _ = c.GetString("cdrc", "csv_separator")
}
if hasOpt = c.HasOption("cdrc", "cdr_in_dir"); hasOpt {
cfg.CdrcCdrInDir, _ = c.GetString("cdrc", "cdr_in_dir")
}
@@ -483,9 +494,13 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
answerTimeFld, _ := c.GetString("cdrc", "answer_time_field")
durFld, _ := c.GetString("cdrc", "usage_field")
extraFlds, _ := c.GetString("cdrc", "extra_fields")
if cfg.CdrcCdrFields, err = ParseCdrcCdrFields(accIdFld, reqtypeFld, directionFld, tenantFld, categoryFld, acntFld, subjectFld, destFld,
setupTimeFld, answerTimeFld, durFld, extraFlds); err != nil {
return nil, err
if len(accIdFld) != 0 || len(reqtypeFld) != 0 || len(directionFld) != 0 || len(tenantFld) != 0 || len(categoryFld) != 0 || len(acntFld) != 0 ||
len(subjectFld) != 0 || len(destFld) != 0 || len(setupTimeFld) != 0 || len(answerTimeFld) != 0 || len(durFld) != 0 || len(extraFlds) != 0 {
// We overwrite the defaults only if at least one of the fields were defined
if cfg.CdrcCdrFields, err = ParseCdrcCdrFields(accIdFld, reqtypeFld, directionFld, tenantFld, categoryFld, acntFld, subjectFld, destFld,
setupTimeFld, answerTimeFld, durFld, extraFlds); err != nil {
return nil, err
}
}
if hasOpt = c.HasOption("mediator", "enabled"); hasOpt {
cfg.MediatorEnabled, _ = c.GetBool("mediator", "enabled")

View File

@@ -90,21 +90,23 @@ func TestDefaults(t *testing.T) {
eCfg.CdrcCdrs = utils.INTERNAL
eCfg.CdrcRunDelay = time.Duration(0)
eCfg.CdrcCdrType = "csv"
eCfg.CdrcCsvSep = string(utils.CSV_SEP)
eCfg.CdrcCdrInDir = "/var/log/cgrates/cdrc/in"
eCfg.CdrcCdrOutDir = "/var/log/cgrates/cdrc/out"
eCfg.CdrcSourceId = "freeswitch_csv"
eCfg.CdrcCdrFields = map[string]*utils.RSRField{
utils.ACCID: &utils.RSRField{Id: "0"},
utils.REQTYPE: &utils.RSRField{Id: "1"},
utils.DIRECTION: &utils.RSRField{Id: "2"},
utils.TENANT: &utils.RSRField{Id: "3"},
utils.CATEGORY: &utils.RSRField{Id: "4"},
utils.ACCOUNT: &utils.RSRField{Id: "5"},
utils.SUBJECT: &utils.RSRField{Id: "6"},
utils.DESTINATION: &utils.RSRField{Id: "7"},
utils.SETUP_TIME: &utils.RSRField{Id: "8"},
utils.ANSWER_TIME: &utils.RSRField{Id: "9"},
utils.USAGE: &utils.RSRField{Id: "10"},
utils.TOR: &utils.RSRField{Id: "2"},
utils.ACCID: &utils.RSRField{Id: "3"},
utils.REQTYPE: &utils.RSRField{Id: "4"},
utils.DIRECTION: &utils.RSRField{Id: "5"},
utils.TENANT: &utils.RSRField{Id: "6"},
utils.CATEGORY: &utils.RSRField{Id: "7"},
utils.ACCOUNT: &utils.RSRField{Id: "8"},
utils.SUBJECT: &utils.RSRField{Id: "9"},
utils.DESTINATION: &utils.RSRField{Id: "10"},
utils.SETUP_TIME: &utils.RSRField{Id: "11"},
utils.ANSWER_TIME: &utils.RSRField{Id: "12"},
utils.USAGE: &utils.RSRField{Id: "13"},
}
eCfg.MediatorEnabled = false
eCfg.MediatorRater = "internal"
@@ -134,7 +136,6 @@ func TestDefaults(t *testing.T) {
&utils.RSRField{Id: utils.MEDI_RUNID},
&utils.RSRField{Id: utils.TOR},
&utils.RSRField{Id: utils.ACCID},
&utils.RSRField{Id: utils.CDRHOST},
&utils.RSRField{Id: utils.REQTYPE},
&utils.RSRField{Id: utils.DIRECTION},
&utils.RSRField{Id: utils.TENANT},
@@ -168,12 +169,15 @@ func TestSanityCheck(t *testing.T) {
if err := cfg.checkConfigSanity(); err == nil {
t.Error("Failed to detect fixed_width dependency on xml configuration")
}
cfg.CdrcEnabled = true
if err := cfg.checkConfigSanity(); err == nil {
t.Error("Failed to detect missing CDR fields definition")
}
cfg.CdrcCdrType = utils.CSV
cfg.CdrcCdrFields = map[string]*utils.RSRField{utils.ACCID: &utils.RSRField{Id: "test"}}
if err := cfg.checkConfigSanity(); err == nil {
t.Error("Failed to detect improper use of CDR field names")
}
cfg = &CGRConfig{}
cfg.CdrcCdrType = utils.CSV
cfg.CdrcCdrFields = map[string]*utils.RSRField{"extra1": &utils.RSRField{Id: "test"}}
if err := cfg.checkConfigSanity(); err == nil {
t.Error("Failed to detect improper use of CDR field names")
@@ -234,6 +238,7 @@ func TestConfigFromFile(t *testing.T) {
eCfg.CdrcCdrs = "test"
eCfg.CdrcRunDelay = time.Duration(99) * time.Second
eCfg.CdrcCdrType = "test"
eCfg.CdrcCsvSep = ";"
eCfg.CdrcCdrInDir = "test"
eCfg.CdrcCdrOutDir = "test"
eCfg.CdrcSourceId = "test"
@@ -340,3 +345,15 @@ export_template = cgrid,~accid:s/(\d)/$1,runid
t.Error("Failed to detect failed RSRParsing")
}
}
func TestCdrcCdrDefaultFields(t *testing.T) {
cdrcCfg := []byte(`[cdrc]
enabled = true
`)
cfgDefault, _ := NewDefaultCGRConfig()
if cfg, err := NewCGRConfigFromBytes(cdrcCfg); err != nil {
t.Error("Could not parse the config", err.Error())
} else if !reflect.DeepEqual(cfg.CdrcCdrFields, cfgDefault.CdrcCdrFields) {
t.Errorf("Unexpected value for CdrcCdrFields: %v", cfg.CdrcCdrFields)
}
}

View File

@@ -59,6 +59,7 @@ enabled = true # Enable CDR client functionality
cdrs = test # Address where to reach CDR server
run_delay = 99 # Period to sleep between two runs, 0 to use automation via inotify
cdr_type = test # CDR file format <csv>.
csv_separator =; # Csv separator, one character only and should be next to equal sign
cdr_in_dir = test # Absolute path towards the direccategoryy where the CDRs are kept (file scategoryed CDRs).
cdr_out_dir = test # Absolute path towards the direccategoryy where processed CDRs will be moved after processing.
cdr_source_id = test # Tag identifying the source of the CDRs within CGRS database.

View File

@@ -24,14 +24,44 @@ import (
)
type CgrXmlCdrcCfg struct {
Enabled bool `xml:"enabled"` // Enable/Disable the
CdrsAddress string `xml:"cdrs_address"` // The address where CDRs can be reached
CdrType string `xml:"cdr_type"` // The type of CDR to process <csv>
RunDelay int64 `xml:"run_delay"` // Delay between runs
CdrInDir string `xml:"cdr_in_dir"` // Folder to process CDRs from
CdrOutDir string `xml:"cdr_out_dir"` // Folder to move processed CDRs to
CdrSourceId string `xml:"cdr_source_id"` // Source identifier for the processed CDRs
CdrFields []*CdrcField `xml:"fields>field"`
Enabled bool `xml:"enabled"` // Enable/Disable the
CdrsAddress string `xml:"cdrs_address"` // The address where CDRs can be reached
CdrType string `xml:"cdr_type"` // The type of CDR to process <csv>
CsvSeparator string `xml:"csv_separator"` // The separator to use when reading csvs
RunDelay int64 `xml:"run_delay"` // Delay between runs
CdrInDir string `xml:"cdr_in_dir"` // Folder to process CDRs from
CdrOutDir string `xml:"cdr_out_dir"` // Folder to move processed CDRs to
CdrSourceId string `xml:"cdr_source_id"` // Source identifier for the processed CDRs
CdrFields []*CdrcField `xml:"fields>field"`
}
// Set the defaults
func (cdrcCfg *CgrXmlCdrcCfg) setDefaults() error {
dfCfg, _ := NewDefaultCGRConfig()
if len(cdrcCfg.CdrsAddress) == 0 {
cdrcCfg.CdrsAddress = dfCfg.CdrcCdrs
}
if len(cdrcCfg.CdrType) == 0 {
cdrcCfg.CdrType = dfCfg.CdrcCdrType
}
if len(cdrcCfg.CsvSeparator) == 0 {
cdrcCfg.CsvSeparator = dfCfg.CdrcCsvSep
}
if len(cdrcCfg.CdrInDir) == 0 {
cdrcCfg.CdrInDir = dfCfg.CdrcCdrInDir
}
if len(cdrcCfg.CdrOutDir) == 0 {
cdrcCfg.CdrOutDir = dfCfg.CdrcCdrOutDir
}
if len(cdrcCfg.CdrSourceId) == 0 {
cdrcCfg.CdrSourceId = dfCfg.CdrcSourceId
}
if len(cdrcCfg.CdrFields) == 0 {
for key, cfgRsrField := range dfCfg.CdrcCdrFields {
cdrcCfg.CdrFields = append(cdrcCfg.CdrFields, &CdrcField{Id: key, Filter: cfgRsrField.Id, rsrField: cfgRsrField})
}
}
return nil
}
func (cdrcCfg *CgrXmlCdrcCfg) CdrRSRFields() map[string]*utils.RSRField {

View File

@@ -43,6 +43,37 @@ func TestPopulateRSRFIeld(t *testing.T) {
}
}
func TestSetDefaults(t *testing.T) {
cfgXmlStr := `<?xml version="1.0" encoding="UTF-8"?>
<document type="cgrates/xml">
<configuration section="cdrc" type="csv" id="CDRC-CSVDF">
<enabled>true</enabled>
</configuration>
</document>`
var xmlCdrc *CgrXmlCdrcCfg
reader := strings.NewReader(cfgXmlStr)
if cfgDocCdrcDf, err := ParseCgrXmlConfig(reader); err != nil {
t.Error(err.Error())
} else if cfgDocCdrcDf == nil {
t.Fatal("Could not parse xml configuration document")
} else if len(cfgDocCdrcDf.cdrcs) != 1 {
t.Error("Did not load cdrc")
} else {
xmlCdrc = cfgDocCdrcDf.cdrcs["CDRC-CSVDF"]
}
dfCfg, _ := NewDefaultCGRConfig()
xmlCdrc.setDefaults()
if xmlCdrc.CdrsAddress != dfCfg.CdrcCdrs ||
xmlCdrc.CdrType != dfCfg.CdrcCdrType ||
xmlCdrc.CsvSeparator != dfCfg.CdrcCsvSep ||
xmlCdrc.CdrInDir != dfCfg.CdrcCdrInDir ||
xmlCdrc.CdrOutDir != dfCfg.CdrcCdrOutDir ||
xmlCdrc.CdrSourceId != dfCfg.CdrcSourceId ||
len(xmlCdrc.CdrFields) != len(dfCfg.CdrcCdrFields) {
t.Error("Failed loading default configuration")
}
}
func TestParseXmlCdrcConfig(t *testing.T) {
cfgXmlStr := `<?xml version="1.0" encoding="UTF-8"?>
<document type="cgrates/xml">
@@ -50,6 +81,7 @@ func TestParseXmlCdrcConfig(t *testing.T) {
<enabled>true</enabled>
<cdrs_address>internal</cdrs_address>
<cdr_type>csv</cdr_type>
<csv_separator>,</csv_separator>
<run_delay>0</run_delay>
<cdr_in_dir>/var/log/cgrates/cdrc/in</cdr_in_dir>
<cdr_out_dir>/var/log/cgrates/cdrc/out</cdr_out_dir>
@@ -88,7 +120,7 @@ func TestGetCdrcCfgs(t *testing.T) {
if cdrcfgs == nil {
t.Error("No config instance returned")
}
expectCdrc := &CgrXmlCdrcCfg{Enabled: true, CdrsAddress: "internal", CdrType: "csv",
expectCdrc := &CgrXmlCdrcCfg{Enabled: true, CdrsAddress: "internal", CdrType: "csv", CsvSeparator: ",",
RunDelay: 0, CdrInDir: "/var/log/cgrates/cdrc/in", CdrOutDir: "/var/log/cgrates/cdrc/out", CdrSourceId: "freeswitch_csv"}
cdrFlds := []*CdrcField{
&CdrcField{XMLName: xml.Name{Local: "field"}, Id: utils.ACCID, Filter: "0"},

View File

@@ -110,6 +110,7 @@ func (xmlCfg *CgrXmlCfgDocument) cacheCdrcCfgs() error {
return fmt.Errorf("Populating field %s, error: %s", fld.Id, err.Error())
}
}
cdrcCfg.setDefaults()
xmlCfg.cdrcs[cfgInst.Id] = cdrcCfg
}
return nil

View File

@@ -63,6 +63,7 @@
# cdrs = internal # Address where to reach CDR server. <internal|127.0.0.1:2080>
# run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify
# cdr_type = csv # CDR file format <csv|freeswitch_csv>.
# csv_separator = , # Separator used in case of csv files. One character only supported and needs to be right after equal sign
# cdr_in_dir = /var/log/cgrates/cdrc/in # Absolute path towards the directory where the CDRs are stored.
# cdr_out_dir = /var/log/cgrates/cdrc/out # Absolute path towards the directory where processed CDRs will be moved.
# cdr_source_id = freeswitch_csv # Free form field, tag identifying the source of the CDRs within CGRS database.

View File

@@ -18,18 +18,13 @@ mediator = internal # Address where to reach the Mediator. Empty for disabli
export_dir = /tmp/cgrates/cdr/cdre/csv # Path where the exported CDRs will be placed
[cdrc]
enabled = true
cdrs = 127.0.0.1:2080
cdr_in_dir = /tmp/cgrates/cdr/cdrc/in # Absolute path towards the directory where the CDRs are stored.
cdr_out_dir =/tmp/cgrates/cdr/cdrc/out # Absolute path towards the directory where processed CDRs will be moved.
[mediator]
enabled = true # Starts Mediator service: <true|false>.
rater = 127.0.0.1:2012 # Address where to reach the Rater: <internal|x.y.z.y:1234>
[history_server]
enabled = true # Starts History service: <true|false>.
history_dir = /tmp/cgrates/history # Location on disk where to store history files.
[history_agent]
enabled = true # Starts History as a client: <true|false>.
rater = internal # Address where to reach the Rater: <internal|x.y.z.y:1234>

View File

@@ -5,34 +5,34 @@
<cdrs_address>internal</cdrs_address>
<cdr_type>csv</cdr_type>
<run_delay>0</run_delay>
<cdr_in_dir>/var/log/cgrates/cdrc/in</cdr_in_dir>
<cdr_out_dir>/var/log/cgrates/cdrc/out</cdr_out_dir>
<cdr_source_id>freeswitch_csv</cdr_source_id>
<cdr_in_dir>/tmp/cgrates/cdrc/csv1/in</cdr_in_dir>
<cdr_out_dir>/tmp/cgrates/cdrc/csv1/out</cdr_out_dir>
<cdr_source_id>csv1</cdr_source_id>
<fields>
<field id="accid" filter="0" />
<field id="reqtype" filter="1" />
<field id="direction" filter="2" />
<field id="tenant" filter="3" />
<field id="category" filter="4" />
<field id="account" filter="5" />
<field id="subject" filter="6" />
<field id="destination" filter="7" />
<field id="setup_time" filter="8" />
<field id="answer_time" filter="9" />
<field id="usage" filter="10" />
<field id="extr1" filter="11" />
<field id="extr2" filter="12" />
<field id="tor" filter="2" />
<field id="accid" filter="2" />
<field id="reqtype" filter="3" />
<field id="direction" filter="4" />
<field id="tenant" filter="5" />
<field id="category" filter="6" />
<field id="account" filter="7" />
<field id="subject" filter="8" />
<field id="destination" filter="9" />
<field id="setup_time" filter="10" />
<field id="answer_time" filter="11" />
<field id="usage" filter="12" />
</fields>
</configuration>
<configuration section="cdrc" type="csv" id="CDRC-CSV2">
<enabled>true</enabled>
<cdrs_address>internal</cdrs_address>
<cdr_type>csv</cdr_type>
<run_delay>5</run_delay>
<cdr_in_dir>/var/log/cgrates/cdrc/in</cdr_in_dir>
<cdr_out_dir>/var/log/cgrates/cdrc/out</cdr_out_dir>
<cdr_source_id>freeswitch_csv</cdr_source_id>
<run_delay>0</run_delay>
<cdr_in_dir>/tmp/cgrates/cdrc/csv1/in</cdr_in_dir>
<cdr_out_dir>/tmp/cgrates/cdrc/csv1/out</cdr_out_dir>
<cdr_source_id>csv2</cdr_source_id>
<fields>
<field id="tor" filter="7" />
<field id="accid" filter="0" />
<field id="reqtype" filter="^rated" />
<field id="direction" filter="^*out" />
@@ -46,53 +46,76 @@
<field id="usage" filter="9" />
</fields>
</configuration>
<configuration section="cdre" type="fixed_width" id="CDRE-FW1">
<configuration section="cdrc" type="csv" id="CDRC-CSV3">
<enabled>true</enabled>
<cdrs_address>internal</cdrs_address>
<cdr_type>csv</cdr_type>
<run_delay>0</run_delay>
<cdr_in_dir>/tmp/cgrates/cdrc/csv1/in</cdr_in_dir>
<cdr_out_dir>/tmp/cgrates/cdrc/csv1/out</cdr_out_dir>
<cdr_source_id>csv3</cdr_source_id>
<fields>
<field id="tor" filter="7" />
<field id="accid" filter="0" />
<field id="reqtype" filter="^rated" />
<field id="direction" filter="^*out" />
<field id="tenant" filter="^cgrates.org" />
<field id="category" filter="7" />
<field id="account" filter="3" />
<field id="subject" filter="3" />
<field id="destination" filter="5" />
<field id="setup_time" filter="1" />
<field id="answer_time" filter="1" />
<field id="usage" filter="9" />
</fields>
</configuration>
<configuration section="cdre" type="fwv" id="CDRE-FW1">
<header>
<fields>
<field name="TypeOfRecord" type="constant" value="10" width="2" />
<field name="ToR" type="constant" value="10" width="2" />
<field name="Filler1" type="filler" width="3" />
<field name="DistributorCode" type="constant" value="VOI" width="3" />
<field name="FileType" type="constant" value="SIP" width="3" />
<field name="FileSeqNr" type="metatag" value="export_id" padding="zeroleft" width="5" />
<field name="LastCdr" type="metatag" value="last_cdr_time" layout="020106150400" width="12" />
<field name="FileCreationfTime" type="metatag" value="time_now" layout="020106150400" width="12" />
<field name="Version" type="constant" value="01" width="2" />
<field name="LastCdr" type="metatag" value="last_cdr_atime" layout="020106150405" width="12" />
<field name="FileCreationfTime" type="metatag" value="time_now" layout="020106150405" width="12" />
<field name="FileVersion" type="constant" value="01" width="2" />
<field name="Filler2" type="filler" width="105" />
</fields>
</header>
<content>
<fields>
<field name="TypeOfRecord" type="constant" value="20" width="2" />
<field name="Account" type="cdrfield" value="cgrid" width="12" mandatory="true" />
<field name="Subject" type="cdrfield" value="subject" strip="left" padding="left" width="5" />
<field name="CLI" type="cdrfield" value="cli" strip="xright" width="15" />
<field name="Destination" type="cdrfield" value="destination" strip="xright" width="24" />
<field name="TOR" type="constant" value="02" width="2" />
<field name="SubtypeTOR" type="constant" value="11" width="4" />
<field name="SetupTime" type="cdrfield" value="start_time" layout="020106150400" width="12" />
<field name="Duration" type="cdrfield" value="duration" width="6" />
<field name="DataVolume" type="filler" width="6" />
<field name="TaxCode" type="constant" value="1" width="1" />
<field name="OperatorCode" type="cdrfield" value="operator" width="2" />
<field name="ProductId" type="cdrfield" value="productid" width="5" />
<field name="NetworkId" type="constant" value="3" width="1" />
<field name="CallId" type="cdrfield" value="accid" width="16" />
<field name="Filler" type="filler" width="8" />
<field name="Filler" type="filler" width="8" />
<field name="TerminationCode" type="cdrfield" value="~cost_details:s/&quot;MatchedDestId&quot;:&quot;.+_(\s\s\s\s\s)&quot;/$1/" width="5" />
<field name="ToR" type="constant" value="20" width="2" />
<field name="Subject" type="cdrfield" value="subject" width="12" padding="right" mandatory="true" />
<field name="ConnectionNumber" type="constant" value="00000" width="5" />
<field name="CallerId" type="cdrfield" value="~callerid:s/\+(\d+)/00$1/" strip="xright" width="15" padding="right" />
<field name="Destination" type="cdrfield" value="~destination:s/^\+311400(\d+)/$1/:s/^\+311412\d\d112/112/:s/^\+31(\d+)/0$1/:s/^\+(\d+)/00$1/" strip="xright" width="24" padding="right" mandatory="true" />
<field name="TypeOfService" type="constant" value="00" width="2" />
<field name="ServiceId" type="constant" value="11" width="4" padding="right" />
<field name="AnswerTime" type="cdrfield" value="answer_time" layout="020106150405" width="12" mandatory="true" />
<field name="Usage" type="cdrfield" value="usage" layout="seconds" width="6" padding="right" mandatory="true" />
<field name="DataCounter" type="filler" width="6" />
<field name="VatCode" type="constant" value="1" width="1" />
<field name="NetworkId" type="constant" value="S1" width="2" />
<field name="DestinationSubId" type="cdrfield" value="~cost_details:s/&quot;MatchedDestId&quot;:&quot;.+_(\w{5})&quot;/$1/:s/(\w{6})/$1/" width="5" />
<field name="NetworkSubtype" type="constant" value="3" width="1" padding="left" />
<field name="CgrId" type="cdrfield" value="cgrid" strip="xleft" width="16" paddingi="right" mandatory="true" />
<field name="FillerVolume1" type="filler" width="8" />
<field name="FillerVolume2" type="filler" width="8" />
<field name="DestinationSubId" type="cdrfield" value="~cost_details:s/&quot;MatchedDestId&quot;:&quot;.+_(\w{5})&quot;/$1/:s/(\w{6})/$1/" width="5" />
<field name="Cost" type="cdrfield" value="cost" padding="zeroleft" width="9" />
<field name="CalledMask" type="cdrfield" value="calledmask" width="1" />
<field name="MaskDestination" type="metatag" value="mask_destination" width="1" />
</fields>
</content>
<trailer>
<fields>
<field name="TypeOfRecord" type="constant" value="90" width="2" />
<field name="ToR" type="constant" value="90" width="2" />
<field name="Filler1" type="filler" width="3" />
<field name="DistributorCode" type="constant" value="VOI" width="3" />
<field name="FileType" type="constant" value="SIP" width="3" />
<field name="FileSeqNr" type="metatag" value="export_id" padding="zeroleft" width="5" />
<field name="NumberOfRecords" type="metatag" value="cdrs_number" padding="zeroleft" width="6" />
<field name="CdrsDuration" type="metatag" value="cdrs_duration" padding="zeroleft" width="8" />
<field name="FirstCdrTime" type="metatag" value="first_cdr_time" layout="020106150400" width="12" />
<field name="LastCdrTime" type="metatag" value="last_cdr_time" layout="020106150400" width="12" />
<field name="TotalRecords" type="metatag" value="cdrs_number" padding="zeroleft" width="6" />
<field name="TotalDuration" type="metatag" value="cdrs_duration" padding="zeroleft" width="8" />
<field name="FirstCdrTime" type="metatag" value="first_cdr_atime" layout="020106150405" width="12" />
<field name="LastCdrTime" type="metatag" value="last_cdr_atime" layout="020106150405" width="12" />
<field name="Filler1" type="filler" width="93" />
</fields>
</trailer>

View File

@@ -113,6 +113,7 @@ func ParseTimeDetectLayout(tmStr string) (time.Time, error) {
fsTimestamp := regexp.MustCompile(`^\d{16}$`)
unixTimestampRule := regexp.MustCompile(`^\d{10}$`)
oneLineTimestampRule := regexp.MustCompile(`^\d{14}$`)
twoSpaceTimestampRule := regexp.MustCompile(`^\d{2}\.\d{2}.\d{4}\s{2}\d{2}:\d{2}:\d{2}$`)
switch {
case rfc3339Rule.MatchString(tmStr):
return time.Parse(time.RFC3339, tmStr)
@@ -136,6 +137,8 @@ func ParseTimeDetectLayout(tmStr string) (time.Time, error) {
return nilTime, nil
case oneLineTimestampRule.MatchString(tmStr):
return time.Parse("20060102150405", tmStr)
case twoSpaceTimestampRule.MatchString(tmStr):
return time.Parse("02.01.2006 15:04:05", tmStr)
}
return nilTime, errors.New("Unsupported time format")
}

View File

@@ -211,6 +211,14 @@ func TestParseTimeDetectLayout(t *testing.T) {
} else if !olTm.Equal(expectedTime) {
t.Errorf("Unexpected time parsed: %v, expecting: %v", olTm, expectedTime)
}
twoSpaceTmStr := "08.04.2014 22:14:29"
tsTm, err := ParseTimeDetectLayout(twoSpaceTmStr)
expectedTime = time.Date(2014, 4, 8, 22, 14, 29, 0, time.UTC)
if err != nil {
t.Error(err)
} else if !tsTm.Equal(expectedTime) {
t.Errorf("Unexpected time parsed: %v, expecting: %v", tsTm, expectedTime)
}
}
func TestParseDateUnix(t *testing.T) {