mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
RSRField with filterValue support, CDRC implementing instances based on processed folder, import filters
This commit is contained in:
@@ -79,8 +79,8 @@ func TestCreateDirs(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
for _, pathDir := range []string{cfg.CdreProfiles[utils.META_DEFAULT].ExportDir, cfg.CdrcProfiles[utils.META_DEFAULT].CdrInDir, cfg.CdrcProfiles[utils.META_DEFAULT].CdrOutDir,
|
||||
cfg.HistoryDir} {
|
||||
for _, pathDir := range []string{cfg.CdreProfiles[utils.META_DEFAULT].ExportDir, "/var/log/cgrates/cdrc/in", "/var/log/cgrates/cdrc/out", cfg.HistoryDir} {
|
||||
|
||||
if err := os.RemoveAll(pathDir); err != nil {
|
||||
t.Fatal("Error removing folder: ", pathDir, err)
|
||||
}
|
||||
|
||||
66
cdrc/cdrc.go
66
cdrc/cdrc.go
@@ -1,6 +1,6 @@
|
||||
/*
|
||||
Real-time Charging System for Telecom & ISP environments
|
||||
Copyright (C) 2012-2014 ITsysCOM GmbH
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
@@ -81,10 +81,22 @@ func populateStoredCdrField(cdr *utils.StoredCdr, fieldId, fieldVal string) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewCdrc(cdrcCfg *config.CdrcConfig, httpSkipTlsCheck bool, cdrServer *engine.CDRS, exitChan chan struct{}) (*Cdrc, error) {
|
||||
func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrServer *engine.CDRS, exitChan chan struct{}) (*Cdrc, error) {
|
||||
var cdrcCfg *config.CdrcConfig
|
||||
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
|
||||
break
|
||||
}
|
||||
cdrc := &Cdrc{cdrsAddress: cdrcCfg.CdrsAddress, CdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir,
|
||||
cdrSourceId: cdrcCfg.CdrSourceId, runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator, duMultiplyFactor: cdrcCfg.DataUsageMultiplyFactor,
|
||||
cdrFields: cdrcCfg.CdrFields, httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan}
|
||||
httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan}
|
||||
cdrc.cdrFilters = make([]utils.RSRFields, len(cdrcCfgs))
|
||||
cdrc.cdrFields = make([][]*config.CfgCdrField, len(cdrcCfgs))
|
||||
idx := 0
|
||||
for _, cfg := range cdrcCfgs {
|
||||
cdrc.cdrFilters[idx] = cfg.CdrFilter
|
||||
cdrc.cdrFields[idx] = cfg.CdrFields
|
||||
idx += 1
|
||||
}
|
||||
// Before processing, make sure in and out folders exist
|
||||
for _, dir := range []string{cdrc.cdrInDir, cdrc.cdrOutDir} {
|
||||
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
|
||||
@@ -104,7 +116,8 @@ type Cdrc struct {
|
||||
runDelay time.Duration
|
||||
csvSep rune
|
||||
duMultiplyFactor float64
|
||||
cdrFields []*config.CfgCdrField
|
||||
cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes
|
||||
cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters
|
||||
httpSkipTlsCheck bool
|
||||
cdrServer *engine.CDRS // Reference towards internal cdrServer if that is the case
|
||||
httpClient *http.Client
|
||||
@@ -202,20 +215,39 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Row %d - csv error: %s", procRowNr, err.Error()))
|
||||
continue // Other csv related errors, ignore
|
||||
}
|
||||
storedCdr, err := self.recordToStoredCdr(record)
|
||||
if err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Row %d - failed converting to StoredCdr, error: %s", procRowNr, err.Error()))
|
||||
continue
|
||||
}
|
||||
if self.cdrsAddress == utils.INTERNAL {
|
||||
if err := self.cdrServer.ProcessCdr(storedCdr); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, row: %d, error: %s", procRowNr, err.Error()))
|
||||
recordCdrs := make([]*utils.StoredCdr, 0) // More CDRs based on the number of filters and field templates
|
||||
for idx, cdrFieldsInst := range self.cdrFields {
|
||||
// Make sure filters are matching
|
||||
filterBreak := false
|
||||
for _, rsrFilter := range self.cdrFilters[idx] {
|
||||
if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx {
|
||||
return fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter)
|
||||
} else if !rsrFilter.FilterPasses(record[cfgFieldIdx]) {
|
||||
filterBreak = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if filterBreak { // Stop importing cdrc fields profile due to non matching filter
|
||||
continue
|
||||
}
|
||||
} else { // CDRs listening on IP
|
||||
if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cdrsAddress), storedCdr.AsHttpForm()); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, row: %d, error: %s", procRowNr, err.Error()))
|
||||
if storedCdr, err := self.recordToStoredCdr(record, cdrFieldsInst); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Row %d - failed converting to StoredCdr, error: %s", procRowNr, err.Error()))
|
||||
continue
|
||||
} else {
|
||||
recordCdrs = append(recordCdrs, storedCdr)
|
||||
}
|
||||
}
|
||||
for _, storedCdr := range recordCdrs {
|
||||
if self.cdrsAddress == utils.INTERNAL {
|
||||
if err := self.cdrServer.ProcessCdr(storedCdr); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, row: %d, error: %s", procRowNr, err.Error()))
|
||||
continue
|
||||
}
|
||||
} else { // CDRs listening on IP
|
||||
if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cdrsAddress), storedCdr.AsHttpForm()); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, row: %d, error: %s", procRowNr, err.Error()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -231,11 +263,11 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
}
|
||||
|
||||
// Takes the record out of csv and turns it into http form which can be posted
|
||||
func (self *Cdrc) recordToStoredCdr(record []string) (*utils.StoredCdr, error) {
|
||||
func (self *Cdrc) recordToStoredCdr(record []string, cdrFields []*config.CfgCdrField) (*utils.StoredCdr, error) {
|
||||
storedCdr := &utils.StoredCdr{CdrHost: "0.0.0.0", CdrSource: self.cdrSourceId, ExtraFields: make(map[string]string), Cost: -1}
|
||||
var err error
|
||||
var lazyHttpFields []*config.CfgCdrField
|
||||
for _, cdrFldCfg := range self.cdrFields {
|
||||
for _, cdrFldCfg := range cdrFields {
|
||||
var fieldVal string
|
||||
if utils.IsSliceMember([]string{CSV, FS_CSV}, self.CdrFormat) {
|
||||
if cdrFldCfg.Type == utils.CDRFIELD {
|
||||
|
||||
@@ -49,6 +49,7 @@ README:
|
||||
|
||||
var cfgPath string
|
||||
var cfg *config.CGRConfig
|
||||
var cdrcCfgs map[string]*config.CdrcConfig
|
||||
var cdrcCfg *config.CdrcConfig
|
||||
|
||||
var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args
|
||||
@@ -99,7 +100,7 @@ func TestLoadConfigt(*testing.T) {
|
||||
cfgPath = path.Join(*dataDir, "conf", "samples", "apier")
|
||||
cfg, _ = config.NewCGRConfigFromFolder(cfgPath)
|
||||
if len(cfg.CdrcProfiles) > 0 {
|
||||
cdrcCfg = cfg.CdrcProfiles[utils.META_DEFAULT]
|
||||
cdrcCfgs = cfg.CdrcProfiles["/var/log/cgrates/cdrc/in"]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,9 +135,12 @@ func TestCreateCdrFiles(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
if cdrcCfg == nil {
|
||||
if cdrcCfgs == nil {
|
||||
t.Fatal("Empty default cdrc configuration")
|
||||
}
|
||||
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
|
||||
break
|
||||
}
|
||||
if err := os.RemoveAll(cdrcCfg.CdrInDir); err != nil {
|
||||
t.Fatal("Error removing folder: ", cdrcCfg.CdrInDir, err)
|
||||
}
|
||||
@@ -162,13 +166,17 @@ func TestProcessCdrDir(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
var cdrcCfg *config.CdrcConfig
|
||||
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
|
||||
break
|
||||
}
|
||||
if cdrcCfg.CdrsAddress == utils.INTERNAL { // For now we only test over network
|
||||
cdrcCfg.CdrsAddress = "127.0.0.1:2013"
|
||||
}
|
||||
if err := startEngine(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
cdrc, err := NewCdrc(cdrcCfg, true, nil, make(chan struct{}))
|
||||
cdrc, err := NewCdrc(cdrcCfgs, true, nil, make(chan struct{}))
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
@@ -204,7 +212,7 @@ func TestProcessCdr3Dir(t *testing.T) {
|
||||
if err := startEngine(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
cdrc, err := NewCdrc(cdrcCfg, true, nil, make(chan struct{}))
|
||||
cdrc, err := NewCdrc(cdrcCfgs, true, nil, make(chan struct{}))
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
@@ -29,17 +29,17 @@ import (
|
||||
|
||||
func TestRecordForkCdr(t *testing.T) {
|
||||
cgrConfig, _ := config.NewDefaultCGRConfig()
|
||||
cdrcConfig := cgrConfig.CdrcProfiles[utils.META_DEFAULT]
|
||||
cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT]
|
||||
cdrcConfig.CdrFields = append(cdrcConfig.CdrFields, &config.CfgCdrField{Tag: "SupplierTest", Type: utils.CDRFIELD, CdrFieldId: "supplier", Value: []*utils.RSRField{&utils.RSRField{Id: "14"}}})
|
||||
cdrc := &Cdrc{CdrFormat: CSV, cdrSourceId: "TEST_CDRC", cdrFields: cdrcConfig.CdrFields}
|
||||
cdrc := &Cdrc{CdrFormat: CSV, cdrSourceId: "TEST_CDRC", cdrFields: [][]*config.CfgCdrField{cdrcConfig.CdrFields}}
|
||||
cdrRow := []string{"firstField", "secondField"}
|
||||
_, err := cdrc.recordToStoredCdr(cdrRow)
|
||||
_, err := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0])
|
||||
if err == nil {
|
||||
t.Error("Failed to corectly detect missing fields from record")
|
||||
}
|
||||
cdrRow = []string{"ignored", "ignored", utils.VOICE, "acc1", "prepaid", "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963",
|
||||
"2013-02-03 19:50:00", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1"}
|
||||
rtCdr, err := cdrc.recordToStoredCdr(cdrRow)
|
||||
rtCdr, err := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0])
|
||||
if err != nil {
|
||||
t.Error("Failed to parse CDR in rated cdr", err)
|
||||
}
|
||||
@@ -70,9 +70,9 @@ func TestRecordForkCdr(t *testing.T) {
|
||||
func TestDataMultiplyFactor(t *testing.T) {
|
||||
cdrFields := []*config.CfgCdrField{&config.CfgCdrField{Tag: "TORField", Type: utils.CDRFIELD, CdrFieldId: "tor", Value: []*utils.RSRField{&utils.RSRField{Id: "0"}}},
|
||||
&config.CfgCdrField{Tag: "UsageField", Type: utils.CDRFIELD, CdrFieldId: "usage", Value: []*utils.RSRField{&utils.RSRField{Id: "1"}}}}
|
||||
cdrc := &Cdrc{CdrFormat: CSV, cdrSourceId: "TEST_CDRC", cdrFields: cdrFields}
|
||||
cdrc := &Cdrc{CdrFormat: CSV, cdrSourceId: "TEST_CDRC", cdrFields: [][]*config.CfgCdrField{cdrFields}}
|
||||
cdrRow := []string{"*data", "1"}
|
||||
rtCdr, err := cdrc.recordToStoredCdr(cdrRow)
|
||||
rtCdr, err := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0])
|
||||
if err != nil {
|
||||
t.Error("Failed to parse CDR in rated cdr", err)
|
||||
}
|
||||
@@ -99,7 +99,7 @@ func TestDataMultiplyFactor(t *testing.T) {
|
||||
ExtraFields: map[string]string{},
|
||||
Cost: -1,
|
||||
}
|
||||
if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow); !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0]); !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
|
||||
}
|
||||
cdrRow = []string{"*voice", "1"}
|
||||
@@ -112,7 +112,7 @@ func TestDataMultiplyFactor(t *testing.T) {
|
||||
ExtraFields: map[string]string{},
|
||||
Cost: -1,
|
||||
}
|
||||
if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow); !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0]); !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,7 +121,7 @@ func (cdre *CdrExporter) getCdrCostDetails(cgrId, runId string) (string, error)
|
||||
|
||||
func (cdre *CdrExporter) getCombimedCdrFieldVal(processedCdr *utils.StoredCdr, cfgCdrFld *config.CfgCdrField) (string, error) {
|
||||
var combinedVal string // Will result as combination of the field values, filters must match
|
||||
for _, filterRule := range cfgCdrFld.Filter {
|
||||
for _, filterRule := range cfgCdrFld.FieldFilter {
|
||||
fltrPass, ftrPassValue := processedCdr.PassesFieldFilter(filterRule)
|
||||
if !fltrPass {
|
||||
return "", nil
|
||||
@@ -152,7 +152,7 @@ func (cdre *CdrExporter) getDateTimeFieldVal(cdr *utils.StoredCdr, cfgCdrFld *co
|
||||
if len(cfgCdrFld.Value) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
for _, fltrRl := range cfgCdrFld.Filter {
|
||||
for _, fltrRl := range cfgCdrFld.FieldFilter {
|
||||
if fltrPass, _ := cdr.PassesFieldFilter(fltrRl); !fltrPass {
|
||||
return "", fmt.Errorf("Field: %s not matching filter rule %v", fltrRl.Id, fltrRl)
|
||||
}
|
||||
@@ -170,7 +170,7 @@ func (cdre *CdrExporter) getDateTimeFieldVal(cdr *utils.StoredCdr, cfgCdrFld *co
|
||||
|
||||
// Extracts the value specified by cfgHdr out of cdr
|
||||
func (cdre *CdrExporter) cdrFieldValue(cdr *utils.StoredCdr, cfgCdrFld *config.CfgCdrField) (string, error) {
|
||||
for _, fltrRl := range cfgCdrFld.Filter {
|
||||
for _, fltrRl := range cfgCdrFld.FieldFilter {
|
||||
if fltrPass, _ := cdr.PassesFieldFilter(fltrRl); !fltrPass {
|
||||
return "", fmt.Errorf("Field: %s not matching filter rule %v", fltrRl.Id, fltrRl)
|
||||
}
|
||||
|
||||
@@ -58,7 +58,7 @@ func TestCdreGetCombimedCdrFieldVal(t *testing.T) {
|
||||
}
|
||||
fltrRule, _ := utils.ParseRSRFields("~mediation_runid:s/default/RUN_RTL/", utils.INFIELD_SEP)
|
||||
val, _ := utils.ParseRSRFields("cost", utils.INFIELD_SEP)
|
||||
cfgCdrFld := &config.CfgCdrField{Tag: "cost", Type: "cdrfield", CdrFieldId: "cost", Value: val, Filter: fltrRule}
|
||||
cfgCdrFld := &config.CfgCdrField{Tag: "cost", Type: "cdrfield", CdrFieldId: "cost", Value: val, FieldFilter: fltrRule}
|
||||
if costVal, err := cdre.getCombimedCdrFieldVal(cdrs[3], cfgCdrFld); err != nil {
|
||||
t.Error(err)
|
||||
} else if costVal != "1.01" {
|
||||
@@ -66,7 +66,7 @@ func TestCdreGetCombimedCdrFieldVal(t *testing.T) {
|
||||
}
|
||||
fltrRule, _ = utils.ParseRSRFields("~mediation_runid:s/default/RETAIL1/", utils.INFIELD_SEP)
|
||||
val, _ = utils.ParseRSRFields("account", utils.INFIELD_SEP)
|
||||
cfgCdrFld = &config.CfgCdrField{Tag: "account", Type: "cdrfield", CdrFieldId: "account", Value: val, Filter: fltrRule}
|
||||
cfgCdrFld = &config.CfgCdrField{Tag: "account", Type: "cdrfield", CdrFieldId: "account", Value: val, FieldFilter: fltrRule}
|
||||
if acntVal, err := cdre.getCombimedCdrFieldVal(cdrs[3], cfgCdrFld); err != nil {
|
||||
t.Error(err)
|
||||
} else if acntVal != "1000" {
|
||||
@@ -91,7 +91,7 @@ func TestGetDateTimeFieldVal(t *testing.T) {
|
||||
}
|
||||
// Test filter
|
||||
fltr, _ := utils.ParseRSRFields("~tenant:s/(.+)/itsyscom.com/", utils.INFIELD_SEP)
|
||||
cfgCdrFld = &config.CfgCdrField{Tag: "stop_time", Type: "cdrfield", CdrFieldId: "stop_time", Value: val, Filter: fltr, Layout: layout}
|
||||
cfgCdrFld = &config.CfgCdrField{Tag: "stop_time", Type: "cdrfield", CdrFieldId: "stop_time", Value: val, FieldFilter: fltr, Layout: layout}
|
||||
if _, err := cdreTst.getDateTimeFieldVal(cdrTst, cfgCdrFld); err == nil {
|
||||
t.Error(err)
|
||||
}
|
||||
@@ -117,7 +117,7 @@ func TestCdreCdrFieldValue(t *testing.T) {
|
||||
t.Errorf("Expecting: %s, received: %s", cdr.Destination, val)
|
||||
}
|
||||
fltr, _ := utils.ParseRSRFields("~tenant:s/(.+)/itsyscom.com/", utils.INFIELD_SEP)
|
||||
cfgCdrFld = &config.CfgCdrField{Tag: "destination", Type: "cdrfield", CdrFieldId: "destination", Value: val, Filter: fltr}
|
||||
cfgCdrFld = &config.CfgCdrField{Tag: "destination", Type: "cdrfield", CdrFieldId: "destination", Value: val, FieldFilter: fltr}
|
||||
if _, err := cdre.cdrFieldValue(cdr, cfgCdrFld); err == nil {
|
||||
t.Error("Failed to use filter")
|
||||
}
|
||||
|
||||
@@ -125,11 +125,15 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD
|
||||
}
|
||||
|
||||
// Fires up a cdrc instance
|
||||
func startCdrc(cdrsChan chan struct{}, cdrcCfg *config.CdrcConfig, httpSkipTlsCheck bool, cdrServer *engine.CDRS, closeChan chan struct{}) {
|
||||
func startCdrc(cdrsChan chan struct{}, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrServer *engine.CDRS, closeChan chan struct{}) {
|
||||
var cdrcCfg *config.CdrcConfig
|
||||
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
|
||||
break
|
||||
}
|
||||
if cdrcCfg.CdrsAddress == utils.INTERNAL {
|
||||
<-cdrsChan // Wait for CDRServer to come up before start processing
|
||||
}
|
||||
cdrc, err := cdrc.NewCdrc(cdrcCfg, httpSkipTlsCheck, cdrServer, closeChan)
|
||||
cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrServer, closeChan)
|
||||
if err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("Cdrc config parsing error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
@@ -502,15 +506,19 @@ func main() {
|
||||
go shutdownSessionmanagerSingnalHandler()
|
||||
}
|
||||
var cdrcEnabled bool
|
||||
for _, cdrcConfig := range cfg.CdrcProfiles {
|
||||
if cdrcConfig.Enabled == false {
|
||||
for _, cdrcCfgs := range cfg.CdrcProfiles {
|
||||
var cdrcCfg *config.CdrcConfig
|
||||
for _, cdrcCfg = range cdrcCfgs { // Take a random config out since they should be the same
|
||||
break
|
||||
}
|
||||
if cdrcCfg.Enabled == false {
|
||||
continue // Ignore not enabled
|
||||
} else if !cdrcEnabled {
|
||||
cdrcEnabled = true // Mark that at least one cdrc service is active
|
||||
}
|
||||
go startCdrc(cdrsChan, cdrcConfig, cfg.HttpSkipTlsVerify, cdrServer, cfg.ConfigReloads[utils.CDRC])
|
||||
go startCdrc(cdrsChan, cdrcCfgs, cfg.HttpSkipTlsVerify, cdrServer, cfg.ConfigReloads[utils.CDRC])
|
||||
}
|
||||
if cdrcEnabled {
|
||||
if len(cfg.CdrcProfiles) != 0 {
|
||||
engine.Logger.Info("Starting CGRateS CDR client.")
|
||||
}
|
||||
|
||||
|
||||
@@ -19,20 +19,22 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package config
|
||||
|
||||
import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"time"
|
||||
)
|
||||
|
||||
type CdrcConfig struct {
|
||||
Enabled bool // Enable/Disable the profile
|
||||
CdrsAddress string // The address where CDRs can be reached
|
||||
CdrFormat string // The type of CDR file to process <csv>
|
||||
FieldSeparator rune // The separator to use when reading csvs
|
||||
DataUsageMultiplyFactor float64 // Conversion factor for data usage
|
||||
RunDelay time.Duration // Delay between runs, 0 for inotify driven requests
|
||||
CdrInDir string // Folder to process CDRs from
|
||||
CdrOutDir string // Folder to move processed CDRs to
|
||||
CdrSourceId string // Source identifier for the processed CDRs
|
||||
CdrFields []*CfgCdrField // List of fields to be processed
|
||||
Enabled bool // Enable/Disable the profile
|
||||
CdrsAddress string // The address where CDRs can be reached
|
||||
CdrFormat string // The type of CDR file to process <csv>
|
||||
FieldSeparator rune // The separator to use when reading csvs
|
||||
DataUsageMultiplyFactor float64 // Conversion factor for data usage
|
||||
RunDelay time.Duration // Delay between runs, 0 for inotify driven requests
|
||||
CdrInDir string // Folder to process CDRs from
|
||||
CdrOutDir string // Folder to move processed CDRs to
|
||||
CdrSourceId string // Source identifier for the processed CDRs
|
||||
CdrFilter utils.RSRFields // Filter CDR records to import
|
||||
CdrFields []*CfgCdrField // List of fields to be processed
|
||||
}
|
||||
|
||||
func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error {
|
||||
@@ -68,6 +70,14 @@ func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error {
|
||||
if jsnCfg.Cdr_out_dir != nil {
|
||||
self.CdrOutDir = *jsnCfg.Cdr_out_dir
|
||||
}
|
||||
if jsnCfg.Cdr_source_id != nil {
|
||||
self.CdrSourceId = *jsnCfg.Cdr_source_id
|
||||
}
|
||||
if jsnCfg.Cdr_filter != nil {
|
||||
if self.CdrFilter, err = utils.ParseRSRFields(*jsnCfg.Cdr_filter, utils.INFIELD_SEP); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if jsnCfg.Cdr_fields != nil {
|
||||
if self.CdrFields, err = CfgCdrFieldsFromCdrFieldsJsonCfg(*jsnCfg.Cdr_fields); err != nil {
|
||||
return err
|
||||
|
||||
@@ -39,8 +39,8 @@ func NewCfgCdrFieldFromCdrFieldJsonCfg(jsnCfgFld *CdrFieldJsonCfg) (*CfgCdrField
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if jsnCfgFld.Filter != nil {
|
||||
if cfgFld.Filter, err = utils.ParseRSRFields(*jsnCfgFld.Filter, utils.INFIELD_SEP); err != nil {
|
||||
if jsnCfgFld.Field_filter != nil {
|
||||
if cfgFld.FieldFilter, err = utils.ParseRSRFields(*jsnCfgFld.Field_filter, utils.INFIELD_SEP); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@@ -63,16 +63,16 @@ func NewCfgCdrFieldFromCdrFieldJsonCfg(jsnCfgFld *CdrFieldJsonCfg) (*CfgCdrField
|
||||
}
|
||||
|
||||
type CfgCdrField struct {
|
||||
Tag string // Identifier for the administrator
|
||||
Type string // Type of field
|
||||
CdrFieldId string // StoredCdr field name
|
||||
Value utils.RSRFields
|
||||
Filter utils.RSRFields
|
||||
Width int
|
||||
Strip string
|
||||
Padding string
|
||||
Layout string
|
||||
Mandatory bool
|
||||
Tag string // Identifier for the administrator
|
||||
Type string // Type of field
|
||||
CdrFieldId string // StoredCdr field name
|
||||
Value utils.RSRFields
|
||||
FieldFilter utils.RSRFields
|
||||
Width int
|
||||
Strip string
|
||||
Padding string
|
||||
Layout string
|
||||
Mandatory bool
|
||||
}
|
||||
|
||||
func CfgCdrFieldsFromCdrFieldsJsonCfg(jsnCfgFldss []*CdrFieldJsonCfg) ([]*CfgCdrField, error) {
|
||||
|
||||
@@ -74,7 +74,7 @@ func NewDefaultCGRConfig() (*CGRConfig, error) {
|
||||
return nil, err
|
||||
}
|
||||
cfg.dfltCdreProfile = cfg.CdreProfiles[utils.META_DEFAULT].Clone() // So default will stay unique, will have nil pointer in case of no defaults loaded which is an extra check
|
||||
cfg.dfltCdrcProfile = cfg.CdrcProfiles[utils.META_DEFAULT].Clone()
|
||||
cfg.dfltCdrcProfile = cfg.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT].Clone()
|
||||
dfltFsConnConfig = cfg.SmFsConfig.Connections[0] // We leave it crashing here on purpose if no Connection defaults defined
|
||||
dfltKamConnConfig = cfg.SmKamConfig.Connections[0]
|
||||
dfltOsipsConnConfig = cfg.SmOsipsConfig.Connections[0]
|
||||
@@ -193,10 +193,10 @@ type CGRConfig struct {
|
||||
CDRStatsEnabled bool // Enable CDR Stats service
|
||||
CDRStatConfig *CdrStatsConfig // Active cdr stats configuration instances, platform level
|
||||
CdreProfiles map[string]*CdreConfig
|
||||
CdrcProfiles map[string]*CdrcConfig // Number of CDRC instances running imports
|
||||
SmFsConfig *SmFsConfig // SM-FreeSWITCH configuration
|
||||
SmKamConfig *SmKamConfig // SM-Kamailio Configuration
|
||||
SmOsipsConfig *SmOsipsConfig // SM-OpenSIPS Configuration
|
||||
CdrcProfiles map[string]map[string]*CdrcConfig // Number of CDRC instances running imports, format map[dirPath]map[instanceName]{Configs}
|
||||
SmFsConfig *SmFsConfig // SM-FreeSWITCH configuration
|
||||
SmKamConfig *SmKamConfig // SM-Kamailio Configuration
|
||||
SmOsipsConfig *SmOsipsConfig // SM-OpenSIPS Configuration
|
||||
SMEnabled bool
|
||||
SMSwitchType string
|
||||
SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer
|
||||
@@ -243,7 +243,7 @@ type CGRConfig struct {
|
||||
|
||||
func (self *CGRConfig) checkConfigSanity() error {
|
||||
// CDRC sanity checks
|
||||
for _, cdrcInst := range self.CdrcProfiles {
|
||||
for _, cdrcInst := range self.CdrcProfiles["/var/log/cgrates/cdrc/in"] {
|
||||
if cdrcInst.Enabled == true {
|
||||
if len(cdrcInst.CdrFields) == 0 {
|
||||
return errors.New("CdrC enabled but no fields to be processed defined!")
|
||||
@@ -530,16 +530,20 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
|
||||
}
|
||||
if jsnCdrcCfg != nil {
|
||||
if self.CdrcProfiles == nil {
|
||||
self.CdrcProfiles = make(map[string]*CdrcConfig)
|
||||
self.CdrcProfiles = make(map[string]map[string]*CdrcConfig)
|
||||
}
|
||||
for profileName, jsnCrc1Cfg := range jsnCdrcCfg {
|
||||
if _, hasDir := self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir]; !hasDir {
|
||||
self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir] = make(map[string]*CdrcConfig)
|
||||
}
|
||||
if _, hasProfile := self.CdrcProfiles[profileName]; !hasProfile {
|
||||
self.CdrcProfiles[profileName] = new(CdrcConfig)
|
||||
if profileName != utils.META_DEFAULT {
|
||||
self.CdrcProfiles[profileName] = self.dfltCdrcProfile.Clone() // Clone default so we do not inherit pointers
|
||||
if profileName == utils.META_DEFAULT {
|
||||
self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir][profileName] = new(CdrcConfig)
|
||||
} else {
|
||||
self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir][profileName] = self.dfltCdrcProfile.Clone() // Clone default so we do not inherit pointers
|
||||
}
|
||||
}
|
||||
if err = self.CdrcProfiles[profileName].loadFromJsonCfg(jsnCrc1Cfg); err != nil {
|
||||
if err = self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir][profileName].loadFromJsonCfg(jsnCrc1Cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -182,6 +182,7 @@ const CGRATES_CFG_JSON = `
|
||||
"cdr_in_dir": "/var/log/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_dir": "/var/log/cgrates/cdrc/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
"cdr_filter": "", // Filter CDR records to import
|
||||
"cdr_fields":[ // import template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "tor", "cdr_field_id": "tor", "type": "cdrfield", "value": "2", "mandatory": true},
|
||||
{"tag": "accid", "cdr_field_id": "accid", "type": "cdrfield", "value": "3", "mandatory": true},
|
||||
|
||||
@@ -323,6 +323,7 @@ func TestDfCdrcJsonCfg(t *testing.T) {
|
||||
Cdr_in_dir: utils.StringPointer("/var/log/cgrates/cdrc/in"),
|
||||
Cdr_out_dir: utils.StringPointer("/var/log/cgrates/cdrc/out"),
|
||||
Cdr_source_id: utils.StringPointer("freeswitch_csv"),
|
||||
Cdr_filter: utils.StringPointer(""),
|
||||
Cdr_fields: &cdrFields,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -117,7 +117,7 @@ type CdrFieldJsonCfg struct {
|
||||
Strip *string
|
||||
Padding *string
|
||||
Layout *string
|
||||
Filter *string
|
||||
Field_filter *string
|
||||
Mandatory *bool
|
||||
}
|
||||
|
||||
@@ -149,6 +149,7 @@ type CdrcJsonCfg struct {
|
||||
Cdr_in_dir *string
|
||||
Cdr_out_dir *string
|
||||
Cdr_source_id *string
|
||||
Cdr_filter *string
|
||||
Cdr_fields *[]*CdrFieldJsonCfg
|
||||
}
|
||||
|
||||
|
||||
@@ -103,13 +103,15 @@ func TestCreateCdrDirs(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
for _, cdrcInst := range cfg.CdrcProfiles {
|
||||
for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Fatal("Error removing folder: ", dir, err)
|
||||
}
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
t.Fatal("Error creating folder: ", dir, err)
|
||||
for _, cdrcProfiles := range cfg.CdrcProfiles {
|
||||
for _, cdrcInst := range cdrcProfiles {
|
||||
for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Fatal("Error removing folder: ", dir, err)
|
||||
}
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
t.Fatal("Error creating folder: ", dir, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -157,7 +159,7 @@ dbafe9c8614c785a65aabd116dd3959c3c56f7f7,default,*voice,dsafdsag,rated,*out,cgra
|
||||
if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent1), 0644); err != nil {
|
||||
t.Fatal(err.Error)
|
||||
}
|
||||
if err := os.Rename(tmpFilePath, path.Join(cfg.CdrcProfiles["CDRC-CSV1"].CdrInDir, fileName)); err != nil {
|
||||
if err := os.Rename(tmpFilePath, path.Join("/tmp/cgrates/cdrc1/in", fileName)); err != nil {
|
||||
t.Fatal("Error moving file to processing directory: ", err)
|
||||
}
|
||||
}
|
||||
@@ -175,7 +177,7 @@ func TestHandleCdr2File(t *testing.T) {
|
||||
if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent), 0644); err != nil {
|
||||
t.Fatal(err.Error)
|
||||
}
|
||||
if err := os.Rename(tmpFilePath, path.Join(cfg.CdrcProfiles["CDRC-CSV2"].CdrInDir, fileName)); err != nil {
|
||||
if err := os.Rename(tmpFilePath, path.Join("/tmp/cgrates/cdrc2/in", fileName)); err != nil {
|
||||
t.Fatal("Error moving file to processing directory: ", err)
|
||||
}
|
||||
}
|
||||
@@ -192,7 +194,7 @@ func TestHandleCdr3File(t *testing.T) {
|
||||
if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent), 0644); err != nil {
|
||||
t.Fatal(err.Error)
|
||||
}
|
||||
if err := os.Rename(tmpFilePath, path.Join(cfg.CdrcProfiles["CDRC-CSV3"].CdrInDir, fileName)); err != nil {
|
||||
if err := os.Rename(tmpFilePath, path.Join("/tmp/cgrates/cdrc3/in", fileName)); err != nil {
|
||||
t.Fatal("Error moving file to processing directory: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,6 +78,8 @@ const (
|
||||
FIELDS_SEP = ","
|
||||
STATIC_HDRVAL_SEP = "::"
|
||||
REGEXP_PREFIX = "~"
|
||||
FILTER_VAL_START = "("
|
||||
FILTER_VAL_END = ")"
|
||||
JSON = "json"
|
||||
GOB = "gob"
|
||||
MSGPACK = "msgpack"
|
||||
|
||||
@@ -21,17 +21,24 @@ package utils
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func NewRSRField(fldStr string) (*RSRField, error) {
|
||||
if fldStrUnquoted, err := strconv.Unquote(fldStr); err == nil {
|
||||
fldStr = fldStrUnquoted
|
||||
}
|
||||
if len(fldStr) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
var filterVal string
|
||||
if strings.HasSuffix(fldStr, FILTER_VAL_END) { // Has filter, populate the var
|
||||
fltrStart := strings.LastIndex(fldStr, FILTER_VAL_START)
|
||||
if fltrStart < 1 {
|
||||
return nil, fmt.Errorf("Invalid FilterStartValue in string: %s", fldStr)
|
||||
}
|
||||
filterVal = fldStr[fltrStart+1 : len(fldStr)-1]
|
||||
fldStr = fldStr[:fltrStart] // Take the filter part out before compiling further
|
||||
|
||||
}
|
||||
|
||||
if strings.HasPrefix(fldStr, STATIC_VALUE_PREFIX) { // Special case when RSR is defined as static header/value
|
||||
var staticHdr, staticVal string
|
||||
if splt := strings.Split(fldStr, STATIC_HDRVAL_SEP); len(splt) == 2 { // Using / as separator since ':' is often use in date/time fields
|
||||
@@ -44,17 +51,17 @@ func NewRSRField(fldStr string) (*RSRField, error) {
|
||||
} else {
|
||||
staticHdr, staticVal = splt[0][1:], splt[0][1:] // If no split, header will remain as original, value as header without the prefix
|
||||
}
|
||||
return &RSRField{Id: staticHdr, staticValue: staticVal}, nil
|
||||
return &RSRField{Id: staticHdr, staticValue: staticVal, filterValue: filterVal}, nil
|
||||
}
|
||||
if !strings.HasPrefix(fldStr, REGEXP_PREFIX) {
|
||||
return &RSRField{Id: fldStr}, nil
|
||||
return &RSRField{Id: fldStr, filterValue: filterVal}, nil
|
||||
}
|
||||
spltRgxp := regexp.MustCompile(`:s\/`)
|
||||
spltRules := spltRgxp.Split(fldStr, -1)
|
||||
if len(spltRules) < 2 {
|
||||
return nil, fmt.Errorf("Invalid Split of Search&Replace field rule. %s", fldStr)
|
||||
}
|
||||
rsrField := &RSRField{Id: spltRules[0][1:]} // Original id in form ~hdr_name
|
||||
rsrField := &RSRField{Id: spltRules[0][1:], filterValue: filterVal} // Original id in form ~hdr_name
|
||||
rulesRgxp := regexp.MustCompile(`(?:(.+[^\\])\/(.*[^\\])*\/){1,}`)
|
||||
for _, ruleStr := range spltRules[1:] { // :s/ already removed through split
|
||||
allMatches := rulesRgxp.FindStringSubmatch(ruleStr)
|
||||
@@ -74,6 +81,7 @@ type RSRField struct {
|
||||
Id string // Identifier
|
||||
RSRules []*ReSearchReplace // Rules to use when processing field value
|
||||
staticValue string // If defined, enforces parsing always to this value
|
||||
filterValue string // The value to compare when used as filter
|
||||
}
|
||||
|
||||
// Parse the field value from a string
|
||||
@@ -105,6 +113,10 @@ func (rsrf *RSRField) RegexpMatched() bool { // Investigate whether we had a reg
|
||||
return false
|
||||
}
|
||||
|
||||
func (rsrf *RSRField) FilterPasses(value string) bool {
|
||||
return len(rsrf.filterValue) == 0 || rsrf.ParseValue(value) == rsrf.filterValue
|
||||
}
|
||||
|
||||
// Parses list of RSRFields, used for example as multiple filters in derived charging
|
||||
func ParseRSRFields(fldsStr, sep string) (RSRFields, error) {
|
||||
//rsrRlsPattern := regexp.MustCompile(`^(~\w+:s/.+/.*/)|(\^.+(/.+/)?)(;(~\w+:s/.+/.*/)|(\^.+(/.+/)?))*$`) //ToDo:Fix here rule able to confirm the content
|
||||
|
||||
@@ -33,6 +33,14 @@ func TestNewRSRField1(t *testing.T) {
|
||||
} else if !reflect.DeepEqual(expRSRField1, rsrField) {
|
||||
t.Errorf("Expecting: %v, received: %v", expRSRField1, rsrField)
|
||||
}
|
||||
// With filter
|
||||
expRSRField2 := &RSRField{Id: "sip_redirected_to", filterValue: "086517174963",
|
||||
RSRules: []*ReSearchReplace{&ReSearchReplace{SearchRegexp: regexp.MustCompile(`sip:\+49(\d+)@`), ReplaceTemplate: "0$1"}}}
|
||||
if rsrField, err := NewRSRField(`~sip_redirected_to:s/sip:\+49(\d+)@/0$1/(086517174963)`); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} else if !reflect.DeepEqual(expRSRField2, rsrField) {
|
||||
t.Errorf("Expecting: %v, received: %v", expRSRField2, rsrField)
|
||||
}
|
||||
// Separator escaped
|
||||
if rsrField, err := NewRSRField(`~sip_redirected_to:s\/sip:\+49(\d+)@/0$1/`); err == nil {
|
||||
t.Errorf("Parse error, field rule does not contain correct number of separators, received: %v", rsrField)
|
||||
@@ -178,3 +186,13 @@ func TestParseCdrcDn1(t *testing.T) {
|
||||
t.Errorf("Expecting: +4914001888, received: %s", parsed2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterPasses(t *testing.T) {
|
||||
rl, err := NewRSRField(`~1:s/^00(\d+)(?:[a-zA-Z].{3})*0*([1-9]\d+)$/+$1$2/:s/^\+49(18\d{2})$/+491400$1/(+49630415354)`)
|
||||
if err != nil {
|
||||
t.Error("Unexpected error: ", err)
|
||||
}
|
||||
if rl.FilterPasses("0031ABOC0630415354") {
|
||||
t.Error("Passing filter")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user