mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Fix CDRC sharing dataUsageMultiplyFactor and cdrSource
This commit is contained in:
51
cdrc/cdrc.go
51
cdrc/cdrc.go
@@ -81,18 +81,29 @@ func populateStoredCdrField(cdr *engine.StoredCdr, fieldId, fieldVal string) err
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
One instance of CDRC will act on one folder.
|
||||
Common parameters within configs processed:
|
||||
* cdrS, cdrFormat, cdrInDir, cdrOutDir, runDelay
|
||||
Parameters specific per config instance:
|
||||
* duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields
|
||||
*/
|
||||
func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrServer *engine.CdrServer, exitChan chan struct{}) (*Cdrc, error) {
|
||||
var cdrcCfg *config.CdrcConfig
|
||||
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
|
||||
break
|
||||
}
|
||||
cdrc := &Cdrc{cdrsAddress: cdrcCfg.Cdrs, CdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir,
|
||||
cdrSourceId: cdrcCfg.CdrSourceId, runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator, duMultiplyFactor: cdrcCfg.DataUsageMultiplyFactor,
|
||||
runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator,
|
||||
httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan}
|
||||
cdrc.cdrSourceIds = make([]string, len(cdrcCfgs))
|
||||
cdrc.duMultiplyFactors = make([]float64, len(cdrcCfgs))
|
||||
cdrc.cdrFilters = make([]utils.RSRFields, len(cdrcCfgs))
|
||||
cdrc.cdrFields = make([][]*config.CfgCdrField, len(cdrcCfgs))
|
||||
idx := 0
|
||||
for _, cfg := range cdrcCfgs {
|
||||
cdrc.cdrSourceIds[idx] = cfg.CdrSourceId
|
||||
cdrc.duMultiplyFactors[idx] = cfg.DataUsageMultiplyFactor
|
||||
cdrc.cdrFilters[idx] = cfg.CdrFilter
|
||||
cdrc.cdrFields[idx] = cfg.CdrFields
|
||||
idx += 1
|
||||
@@ -111,17 +122,17 @@ type Cdrc struct {
|
||||
cdrsAddress,
|
||||
CdrFormat,
|
||||
cdrInDir,
|
||||
cdrOutDir,
|
||||
cdrSourceId string
|
||||
runDelay time.Duration
|
||||
csvSep rune
|
||||
duMultiplyFactor float64
|
||||
cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes
|
||||
cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters
|
||||
httpSkipTlsCheck bool
|
||||
cdrServer *engine.CdrServer // Reference towards internal cdrServer if that is the case
|
||||
httpClient *http.Client
|
||||
exitChan chan struct{}
|
||||
cdrOutDir string
|
||||
cdrSourceIds []string // Should be in sync with cdrFields on indexes
|
||||
runDelay time.Duration
|
||||
csvSep rune
|
||||
duMultiplyFactors []float64
|
||||
cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes
|
||||
cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters
|
||||
httpSkipTlsCheck bool
|
||||
cdrServer *engine.CdrServer // Reference towards internal cdrServer if that is the case
|
||||
httpClient *http.Client
|
||||
exitChan chan struct{}
|
||||
}
|
||||
|
||||
// When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing
|
||||
@@ -216,7 +227,7 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
continue // Other csv related errors, ignore
|
||||
}
|
||||
recordCdrs := make([]*engine.StoredCdr, 0) // More CDRs based on the number of filters and field templates
|
||||
for idx, cdrFieldsInst := range self.cdrFields {
|
||||
for idx := range self.cdrFields {
|
||||
// Make sure filters are matching
|
||||
filterBreak := false
|
||||
for _, rsrFilter := range self.cdrFilters[idx] {
|
||||
@@ -233,7 +244,7 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
if filterBreak { // Stop importing cdrc fields profile due to non matching filter
|
||||
continue
|
||||
}
|
||||
if storedCdr, err := self.recordToStoredCdr(record, cdrFieldsInst); err != nil {
|
||||
if storedCdr, err := self.recordToStoredCdr(record, idx); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Row %d - failed converting to StoredCdr, error: %s", procRowNr, err.Error()))
|
||||
continue
|
||||
} else {
|
||||
@@ -265,12 +276,12 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Takes the record out of csv and turns it into http form which can be posted
|
||||
func (self *Cdrc) recordToStoredCdr(record []string, cdrFields []*config.CfgCdrField) (*engine.StoredCdr, error) {
|
||||
storedCdr := &engine.StoredCdr{CdrHost: "0.0.0.0", CdrSource: self.cdrSourceId, ExtraFields: make(map[string]string), Cost: -1}
|
||||
// Takes the record out of csv and turns it into storedCdr which can be processed by CDRS
|
||||
func (self *Cdrc) recordToStoredCdr(record []string, cfgIdx int) (*engine.StoredCdr, error) {
|
||||
storedCdr := &engine.StoredCdr{CdrHost: "0.0.0.0", CdrSource: self.cdrSourceIds[cfgIdx], ExtraFields: make(map[string]string), Cost: -1}
|
||||
var err error
|
||||
var lazyHttpFields []*config.CfgCdrField
|
||||
for _, cdrFldCfg := range cdrFields {
|
||||
for _, cdrFldCfg := range self.cdrFields[cfgIdx] {
|
||||
var fieldVal string
|
||||
if utils.IsSliceMember([]string{CSV, FS_CSV}, self.CdrFormat) {
|
||||
if cdrFldCfg.Type == utils.CDRFIELD {
|
||||
@@ -298,8 +309,8 @@ func (self *Cdrc) recordToStoredCdr(record []string, cdrFields []*config.CfgCdrF
|
||||
}
|
||||
}
|
||||
storedCdr.CgrId = utils.Sha1(storedCdr.AccId, storedCdr.SetupTime.String())
|
||||
if storedCdr.TOR == utils.DATA && self.duMultiplyFactor != 0 {
|
||||
storedCdr.Usage = time.Duration(float64(storedCdr.Usage.Nanoseconds()) * self.duMultiplyFactor)
|
||||
if storedCdr.TOR == utils.DATA && self.duMultiplyFactors[cfgIdx] != 0 {
|
||||
storedCdr.Usage = time.Duration(float64(storedCdr.Usage.Nanoseconds()) * self.duMultiplyFactors[cfgIdx])
|
||||
}
|
||||
for _, httpFieldCfg := range lazyHttpFields { // Lazy process the http fields
|
||||
var outValByte []byte
|
||||
|
||||
@@ -32,15 +32,15 @@ func TestRecordForkCdr(t *testing.T) {
|
||||
cgrConfig, _ := config.NewDefaultCGRConfig()
|
||||
cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT]
|
||||
cdrcConfig.CdrFields = append(cdrcConfig.CdrFields, &config.CfgCdrField{Tag: "SupplierTest", Type: utils.CDRFIELD, CdrFieldId: "supplier", Value: []*utils.RSRField{&utils.RSRField{Id: "14"}}})
|
||||
cdrc := &Cdrc{CdrFormat: CSV, cdrSourceId: "TEST_CDRC", cdrFields: [][]*config.CfgCdrField{cdrcConfig.CdrFields}}
|
||||
cdrc := &Cdrc{CdrFormat: CSV, cdrSourceIds: []string{"TEST_CDRC"}, cdrFields: [][]*config.CfgCdrField{cdrcConfig.CdrFields}}
|
||||
cdrRow := []string{"firstField", "secondField"}
|
||||
_, err := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0])
|
||||
_, err := cdrc.recordToStoredCdr(cdrRow, 0)
|
||||
if err == nil {
|
||||
t.Error("Failed to corectly detect missing fields from record")
|
||||
}
|
||||
cdrRow = []string{"ignored", "ignored", utils.VOICE, "acc1", utils.META_PREPAID, "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963",
|
||||
"2013-02-03 19:50:00", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1"}
|
||||
rtCdr, err := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0])
|
||||
rtCdr, err := cdrc.recordToStoredCdr(cdrRow, 0)
|
||||
if err != nil {
|
||||
t.Error("Failed to parse CDR in rated cdr", err)
|
||||
}
|
||||
@@ -71,9 +71,9 @@ func TestRecordForkCdr(t *testing.T) {
|
||||
func TestDataMultiplyFactor(t *testing.T) {
|
||||
cdrFields := []*config.CfgCdrField{&config.CfgCdrField{Tag: "TORField", Type: utils.CDRFIELD, CdrFieldId: "tor", Value: []*utils.RSRField{&utils.RSRField{Id: "0"}}},
|
||||
&config.CfgCdrField{Tag: "UsageField", Type: utils.CDRFIELD, CdrFieldId: "usage", Value: []*utils.RSRField{&utils.RSRField{Id: "1"}}}}
|
||||
cdrc := &Cdrc{CdrFormat: CSV, cdrSourceId: "TEST_CDRC", cdrFields: [][]*config.CfgCdrField{cdrFields}}
|
||||
cdrc := &Cdrc{CdrFormat: CSV, cdrSourceIds: []string{"TEST_CDRC"}, duMultiplyFactors: []float64{0}, cdrFields: [][]*config.CfgCdrField{cdrFields}}
|
||||
cdrRow := []string{"*data", "1"}
|
||||
rtCdr, err := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0])
|
||||
rtCdr, err := cdrc.recordToStoredCdr(cdrRow, 0)
|
||||
if err != nil {
|
||||
t.Error("Failed to parse CDR in rated cdr", err)
|
||||
}
|
||||
@@ -90,7 +90,7 @@ func TestDataMultiplyFactor(t *testing.T) {
|
||||
if !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
|
||||
}
|
||||
cdrc.duMultiplyFactor = 1024
|
||||
cdrc.duMultiplyFactors = []float64{1024}
|
||||
expectedCdr = &engine.StoredCdr{
|
||||
CgrId: utils.Sha1("", sTime.String()),
|
||||
TOR: cdrRow[0],
|
||||
@@ -100,7 +100,7 @@ func TestDataMultiplyFactor(t *testing.T) {
|
||||
ExtraFields: map[string]string{},
|
||||
Cost: -1,
|
||||
}
|
||||
if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0]); !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow, 0); !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
|
||||
}
|
||||
cdrRow = []string{"*voice", "1"}
|
||||
@@ -113,7 +113,7 @@ func TestDataMultiplyFactor(t *testing.T) {
|
||||
ExtraFields: map[string]string{},
|
||||
Cost: -1,
|
||||
}
|
||||
if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0]); !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow, 0); !reflect.DeepEqual(expectedCdr, rtCdr) {
|
||||
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user