CDRC configuration change to increase flexibility, list now

This commit is contained in:
DanB
2016-04-28 15:40:14 +02:00
parent e1d8d96402
commit f819912a0f
22 changed files with 151 additions and 130 deletions

View File

@@ -55,7 +55,7 @@ Common parameters within configs processed:
Parameters specific per config instance:
* duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields
*/
func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection, closeChan chan struct{}, dfltTimezone string) (*Cdrc, error) {
func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection, closeChan chan struct{}, dfltTimezone string) (*Cdrc, error) {
var cdrcCfg *config.CdrcConfig
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
break
@@ -83,7 +83,7 @@ func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrs
type Cdrc struct {
httpSkipTlsCheck bool
cdrcCfgs map[string]*config.CdrcConfig // All cdrc config profiles attached to this CDRC (key will be profile instance name)
cdrcCfgs []*config.CdrcConfig // All cdrc config profiles attached to this CDRC (key will be profile instance name)
dfltCdrcCfg *config.CdrcConfig
timezone string
cdrs rpcclient.RpcClientConnection

View File

@@ -49,7 +49,7 @@ README:
var cfgPath string
var cfg *config.CGRConfig
var cdrcCfgs map[string]*config.CdrcConfig
var cdrcCfgs []*config.CdrcConfig
var cdrcCfg *config.CdrcConfig
var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args

View File

@@ -181,7 +181,7 @@ func (self *PartialRecordsCache) UncachePartial(fileName string, pr *PartialFlat
}
func NewCsvRecordsProcessor(csvReader *csv.Reader, timezone, fileName string,
dfltCdrcCfg *config.CdrcConfig, cdrcCfgs map[string]*config.CdrcConfig,
dfltCdrcCfg *config.CdrcConfig, cdrcCfgs []*config.CdrcConfig,
httpSkipTlsCheck bool, partialRecordsCache *PartialRecordsCache) *CsvRecordsProcessor {
return &CsvRecordsProcessor{csvReader: csvReader, timezone: timezone, fileName: fileName,
dfltCdrcCfg: dfltCdrcCfg, cdrcCfgs: cdrcCfgs,
@@ -194,7 +194,7 @@ type CsvRecordsProcessor struct {
timezone string // Timezone for CDRs which are not clearly specifying it
fileName string
dfltCdrcCfg *config.CdrcConfig
cdrcCfgs map[string]*config.CdrcConfig
cdrcCfgs []*config.CdrcConfig
processedRecordsNr int64 // Number of content records in file
httpSkipTlsCheck bool
partialRecordsCache *PartialRecordsCache // Shared by cdrc so we can cache for all files in a folder
@@ -247,8 +247,8 @@ func (self *CsvRecordsProcessor) processPartialRecord(record []string) ([]string
// Takes the record from a slice and turns it into StoredCdrs, posting them to the cdrServer
func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR, error) {
recordCdrs := make([]*engine.CDR, 0) // More CDRs based on the number of filters and field templates
for cdrcId, cdrcCfg := range self.cdrcCfgs { // cdrFields coming from more templates will produce individual storCdr records
recordCdrs := make([]*engine.CDR, 0) // More CDRs based on the number of filters and field templates
for _, cdrcCfg := range self.cdrcCfgs { // cdrFields coming from more templates will produce individual storCdr records
// Make sure filters are matching
filterBreak := false
for _, rsrFilter := range cdrcCfg.CdrFilter {
@@ -265,12 +265,12 @@ func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR,
if filterBreak { // Stop importing cdrc fields profile due to non matching filter
continue
}
if storedCdr, err := self.recordToStoredCdr(record, cdrcId); err != nil {
if storedCdr, err := self.recordToStoredCdr(record, cdrcCfg); err != nil {
return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error())
} else {
recordCdrs = append(recordCdrs, storedCdr)
}
if !self.cdrcCfgs[cdrcId].ContinueOnSuccess {
if !cdrcCfg.ContinueOnSuccess {
break
}
}
@@ -278,11 +278,11 @@ func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR,
}
// Takes the record out of csv and turns it into storedCdr which can be processed by CDRS
func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcId string) (*engine.CDR, error) {
storedCdr := &engine.CDR{OriginHost: "0.0.0.0", Source: self.cdrcCfgs[cdrcId].CdrSourceId, ExtraFields: make(map[string]string), Cost: -1}
func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *config.CdrcConfig) (*engine.CDR, error) {
storedCdr := &engine.CDR{OriginHost: "0.0.0.0", Source: cdrcCfg.CdrSourceId, ExtraFields: make(map[string]string), Cost: -1}
var err error
var lazyHttpFields []*config.CfgCdrField
for _, cdrFldCfg := range self.cdrcCfgs[cdrcId].ContentFields {
for _, cdrFldCfg := range cdrcCfg.ContentFields {
if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.dfltCdrcCfg.CdrFormat) { // Hardcode some values in case of flatstore
switch cdrFldCfg.FieldId {
case utils.ACCID:
@@ -315,8 +315,8 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcId strin
}
}
storedCdr.CGRID = utils.Sha1(storedCdr.OriginID, storedCdr.SetupTime.UTC().String())
if storedCdr.ToR == utils.DATA && self.cdrcCfgs[cdrcId].DataUsageMultiplyFactor != 0 {
storedCdr.Usage = time.Duration(float64(storedCdr.Usage.Nanoseconds()) * self.cdrcCfgs[cdrcId].DataUsageMultiplyFactor)
if storedCdr.ToR == utils.DATA && cdrcCfg.DataUsageMultiplyFactor != 0 {
storedCdr.Usage = time.Duration(float64(storedCdr.Usage.Nanoseconds()) * cdrcCfg.DataUsageMultiplyFactor)
}
for _, httpFieldCfg := range lazyHttpFields { // Lazy process the http fields
var outValByte []byte

View File

@@ -30,21 +30,21 @@ import (
func TestCsvRecordForkCdr(t *testing.T) {
cgrConfig, _ := config.NewDefaultCGRConfig()
cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT]
cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][0]
cdrcConfig.CdrSourceId = "TEST_CDRC"
cdrcConfig.ContentFields = append(cdrcConfig.ContentFields, &config.CfgCdrField{Tag: "SupplierTest", Type: utils.META_COMPOSED, FieldId: utils.SUPPLIER, Value: []*utils.RSRField{&utils.RSRField{Id: "14"}}})
cdrcConfig.ContentFields = append(cdrcConfig.ContentFields, &config.CfgCdrField{Tag: "DisconnectCauseTest", Type: utils.META_COMPOSED, FieldId: utils.DISCONNECT_CAUSE,
Value: []*utils.RSRField{&utils.RSRField{Id: "16"}}})
//
csvProcessor := &CsvRecordsProcessor{dfltCdrcCfg: cdrcConfig, cdrcCfgs: map[string]*config.CdrcConfig{"*default": cdrcConfig}}
csvProcessor := &CsvRecordsProcessor{dfltCdrcCfg: cdrcConfig, cdrcCfgs: []*config.CdrcConfig{cdrcConfig}}
cdrRow := []string{"firstField", "secondField"}
_, err := csvProcessor.recordToStoredCdr(cdrRow, "*default")
_, err := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig)
if err == nil {
t.Error("Failed to corectly detect missing fields from record")
}
cdrRow = []string{"ignored", "ignored", utils.VOICE, "acc1", utils.META_PREPAID, "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963",
"2013-02-03 19:50:00", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1", "NORMAL_DISCONNECT"}
rtCdr, err := csvProcessor.recordToStoredCdr(cdrRow, "*default")
rtCdr, err := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig)
if err != nil {
t.Error("Failed to parse CDR in rated cdr", err)
}
@@ -76,14 +76,14 @@ func TestCsvRecordForkCdr(t *testing.T) {
func TestCsvDataMultiplyFactor(t *testing.T) {
cgrConfig, _ := config.NewDefaultCGRConfig()
cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT]
cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][0]
cdrcConfig.CdrSourceId = "TEST_CDRC"
cdrcConfig.ContentFields = []*config.CfgCdrField{&config.CfgCdrField{Tag: "TORField", Type: utils.META_COMPOSED, FieldId: utils.TOR, Value: []*utils.RSRField{&utils.RSRField{Id: "0"}}},
&config.CfgCdrField{Tag: "UsageField", Type: utils.META_COMPOSED, FieldId: utils.USAGE, Value: []*utils.RSRField{&utils.RSRField{Id: "1"}}}}
csvProcessor := &CsvRecordsProcessor{dfltCdrcCfg: cdrcConfig, cdrcCfgs: map[string]*config.CdrcConfig{"*default": cdrcConfig}}
csvProcessor.cdrcCfgs["*default"].DataUsageMultiplyFactor = 0
csvProcessor := &CsvRecordsProcessor{dfltCdrcCfg: cdrcConfig, cdrcCfgs: []*config.CdrcConfig{cdrcConfig}}
csvProcessor.cdrcCfgs[0].DataUsageMultiplyFactor = 0
cdrRow := []string{"*data", "1"}
rtCdr, err := csvProcessor.recordToStoredCdr(cdrRow, "*default")
rtCdr, err := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig)
if err != nil {
t.Error("Failed to parse CDR in rated cdr", err)
}
@@ -100,7 +100,7 @@ func TestCsvDataMultiplyFactor(t *testing.T) {
if !reflect.DeepEqual(expectedCdr, rtCdr) {
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
}
csvProcessor.cdrcCfgs["*default"].DataUsageMultiplyFactor = 1024
csvProcessor.cdrcCfgs[0].DataUsageMultiplyFactor = 1024
expectedCdr = &engine.CDR{
CGRID: utils.Sha1("", sTime.String()),
ToR: cdrRow[0],
@@ -110,7 +110,7 @@ func TestCsvDataMultiplyFactor(t *testing.T) {
ExtraFields: map[string]string{},
Cost: -1,
}
if rtCdr, _ := csvProcessor.recordToStoredCdr(cdrRow, "*default"); !reflect.DeepEqual(expectedCdr, rtCdr) {
if rtCdr, _ := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig); !reflect.DeepEqual(expectedCdr, rtCdr) {
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
}
cdrRow = []string{"*voice", "1"}
@@ -123,7 +123,7 @@ func TestCsvDataMultiplyFactor(t *testing.T) {
ExtraFields: map[string]string{},
Cost: -1,
}
if rtCdr, _ := csvProcessor.recordToStoredCdr(cdrRow, "*default"); !reflect.DeepEqual(expectedCdr, rtCdr) {
if rtCdr, _ := csvProcessor.recordToStoredCdr(cdrRow, cdrcConfig); !reflect.DeepEqual(expectedCdr, rtCdr) {
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
}
}

View File

@@ -85,7 +85,11 @@ func TestFlatstoreLclCreateCdrFiles(t *testing.T) {
if flatstoreCfg == nil {
t.Fatal("Empty default cdrc configuration")
}
flatstoreCdrcCfg = flatstoreCfg.CdrcProfiles["/tmp/cgr_flatstore/cdrc/in"]["FLATSTORE"]
for _, cdrcCfg := range flatstoreCfg.CdrcProfiles["/tmp/cgr_flatstore/cdrc/in"] {
if cdrcCfg.ID == "FLATSTORE" {
flatstoreCdrcCfg = cdrcCfg
}
}
if err := os.RemoveAll(flatstoreCdrcCfg.CdrInDir); err != nil {
t.Fatal("Error removing folder: ", flatstoreCdrcCfg.CdrInDir, err)
}

View File

@@ -49,14 +49,14 @@ func fwvValue(cdrLine string, indexStart, width int, padding string) string {
return rawVal
}
func NewFwvRecordsProcessor(file *os.File, dfltCfg *config.CdrcConfig, cdrcCfgs map[string]*config.CdrcConfig, httpClient *http.Client, httpSkipTlsCheck bool, timezone string) *FwvRecordsProcessor {
func NewFwvRecordsProcessor(file *os.File, dfltCfg *config.CdrcConfig, cdrcCfgs []*config.CdrcConfig, httpClient *http.Client, httpSkipTlsCheck bool, timezone string) *FwvRecordsProcessor {
return &FwvRecordsProcessor{file: file, cdrcCfgs: cdrcCfgs, dfltCfg: dfltCfg, httpSkipTlsCheck: httpSkipTlsCheck, timezone: timezone}
}
type FwvRecordsProcessor struct {
file *os.File
dfltCfg *config.CdrcConfig // General parameters
cdrcCfgs map[string]*config.CdrcConfig
cdrcCfgs []*config.CdrcConfig
httpClient *http.Client
httpSkipTlsCheck bool
timezone string
@@ -125,11 +125,11 @@ func (self *FwvRecordsProcessor) ProcessNextRecord() ([]*engine.CDR, error) {
}
self.processedRecordsNr += 1
record := string(buf)
for cfgKey, cdrcCfg := range self.cdrcCfgs {
if passes := self.recordPassesCfgFilter(record, cfgKey); !passes {
for _, cdrcCfg := range self.cdrcCfgs {
if passes := self.recordPassesCfgFilter(record, cdrcCfg); !passes {
continue
}
if storedCdr, err := self.recordToStoredCdr(record, cfgKey); err != nil {
if storedCdr, err := self.recordToStoredCdr(record, cdrcCfg, cdrcCfg.ID); err != nil {
return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error())
} else {
recordCdrs = append(recordCdrs, storedCdr)
@@ -141,9 +141,9 @@ func (self *FwvRecordsProcessor) ProcessNextRecord() ([]*engine.CDR, error) {
return recordCdrs, nil
}
func (self *FwvRecordsProcessor) recordPassesCfgFilter(record, configKey string) bool {
func (self *FwvRecordsProcessor) recordPassesCfgFilter(record string, cdrcCfg *config.CdrcConfig) bool {
filterPasses := true
for _, rsrFilter := range self.cdrcCfgs[configKey].CdrFilter {
for _, rsrFilter := range cdrcCfg.CdrFilter {
if rsrFilter == nil { // Nil filter does not need to match anything
continue
}
@@ -158,8 +158,8 @@ func (self *FwvRecordsProcessor) recordPassesCfgFilter(record, configKey string)
return filterPasses
}
// Converts a record (header or normal) to StoredCdr
func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cfgKey string) (*engine.CDR, error) {
// Converts a record (header or normal) to CDR
func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cdrcCfg *config.CdrcConfig, cfgKey string) (*engine.CDR, error) {
var err error
var lazyHttpFields []*config.CfgCdrField
var cfgFields []*config.CfgCdrField
@@ -171,13 +171,13 @@ func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cfgKey string)
storedCdr = &engine.CDR{OriginHost: "0.0.0.0", ExtraFields: make(map[string]string), Cost: -1}
}
if cfgKey == "*header" {
cfgFields = self.dfltCfg.HeaderFields
storedCdr.Source = self.dfltCfg.CdrSourceId
duMultiplyFactor = self.dfltCfg.DataUsageMultiplyFactor
cfgFields = cdrcCfg.HeaderFields
storedCdr.Source = cdrcCfg.CdrSourceId
duMultiplyFactor = cdrcCfg.DataUsageMultiplyFactor
} else {
cfgFields = self.cdrcCfgs[cfgKey].ContentFields
storedCdr.Source = self.cdrcCfgs[cfgKey].CdrSourceId
duMultiplyFactor = self.cdrcCfgs[cfgKey].DataUsageMultiplyFactor
cfgFields = cdrcCfg.ContentFields
storedCdr.Source = cdrcCfg.CdrSourceId
duMultiplyFactor = cdrcCfg.DataUsageMultiplyFactor
}
for _, cdrFldCfg := range cfgFields {
var fieldVal string
@@ -244,7 +244,7 @@ func (self *FwvRecordsProcessor) processHeader() error {
return fmt.Errorf("In header, line len: %d, have read: %d", self.lineLen, nRead)
}
var err error
if self.headerCdr, err = self.recordToStoredCdr(string(buf), "*header"); err != nil {
if self.headerCdr, err = self.recordToStoredCdr(string(buf), self.dfltCfg, "*header"); err != nil {
return err
}
return nil

View File

@@ -19,8 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package cdrc
import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"io/ioutil"
"net/rpc"
"net/rpc/jsonrpc"
@@ -28,6 +26,9 @@ import (
"path"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
)
var fwvCfgPath string
@@ -91,7 +92,11 @@ func TestFwvLclCreateCdrFiles(t *testing.T) {
if fwvCfg == nil {
t.Fatal("Empty default cdrc configuration")
}
fwvCdrcCfg = fwvCfg.CdrcProfiles["/tmp/cgr_fwv/cdrc/in"]["FWV1"]
for _, cdrcCfg := range fwvCfg.CdrcProfiles["/tmp/cgr_fwv/cdrc/in"] {
if cdrcCfg.ID == "FWV1" {
fwvCdrcCfg = cdrcCfg
}
}
if err := os.RemoveAll(fwvCdrcCfg.CdrInDir); err != nil {
t.Fatal("Error removing folder: ", fwvCdrcCfg.CdrInDir, err)
}

View File

@@ -45,11 +45,11 @@ func TestFwvValue(t *testing.T) {
func TestFwvRecordPassesCfgFilter(t *testing.T) {
//record, configKey string) bool {
cgrConfig, _ := config.NewDefaultCGRConfig()
cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT] // We don't really care that is for .csv since all we want to test are the filters
cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][0] // We don't really care that is for .csv since all we want to test are the filters
cdrcConfig.CdrFilter = utils.ParseRSRFieldsMustCompile(`~52:s/^0(\d{9})/+49${1}/(^+49123123120)`, utils.INFIELD_SEP)
fwvRp := &FwvRecordsProcessor{cdrcCfgs: cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"]}
cdrLine := "CDR0000010 0 20120708181506000123451234 0040123123120 004 000018009980010001ISDN ABC 10Buiten uw regio EHV 00000009190000000009"
if passesFilter := fwvRp.recordPassesCfgFilter(cdrLine, utils.META_DEFAULT); !passesFilter {
if passesFilter := fwvRp.recordPassesCfgFilter(cdrLine, cdrcConfig); !passesFilter {
t.Error("Not passes filter")
}
}

View File

@@ -86,22 +86,23 @@ func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConn
}
// Start CDRCs
for _, cdrcCfgs := range cfg.CdrcProfiles {
var cdrcCfg *config.CdrcConfig
for _, cdrcCfg = range cdrcCfgs { // Take a random config out since they should be the same
break
var enabledCfgs []*config.CdrcConfig
for _, cdrcCfg := range cdrcCfgs { // Take a random config out since they should be the same
if cdrcCfg.Enabled {
enabledCfgs = append(enabledCfgs, cdrcCfg)
}
}
if cdrcCfg.Enabled == false {
continue // Ignore not enabled
if len(enabledCfgs) != 0 {
go startCdrc(internalCdrSChan, internalRaterChan, cdrcCfgs, cfg.HttpSkipTlsVerify, cdrcChildrenChan, exitChan)
}
go startCdrc(internalCdrSChan, internalRaterChan, cdrcCfgs, cfg.HttpSkipTlsVerify, cdrcChildrenChan, exitChan)
}
cdrcInitialized = true // Initialized
}
}
// Fires up a cdrc instance
func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool,
func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool,
closeChan chan struct{}, exitChan chan bool) {
var cdrcCfg *config.CdrcConfig
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one

View File

@@ -25,6 +25,7 @@ import (
)
type CdrcConfig struct {
ID string // free-form text identifying this CDRC instance
Enabled bool // Enable/Disable the profile
DryRun bool // Do not post CDRs to the server
CdrsConns []*HaPoolConfig // The address where CDRs can be reached
@@ -51,6 +52,9 @@ func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error {
return nil
}
var err error
if jsnCfg.Id != nil {
self.ID = *jsnCfg.Id
}
if jsnCfg.Enabled != nil {
self.Enabled = *jsnCfg.Enabled
}
@@ -129,6 +133,7 @@ func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error {
// Clone itself into a new CdrcConfig
func (self *CdrcConfig) Clone() *CdrcConfig {
clnCdrc := new(CdrcConfig)
clnCdrc.ID = self.ID
clnCdrc.Enabled = self.Enabled
clnCdrc.CdrsConns = make([]*HaPoolConfig, len(self.CdrsConns))
for idx, cdrConn := range self.CdrsConns {

View File

@@ -18,14 +18,16 @@
"enabled": true, // enable Rater service: <true|false>
},
"cdrc": {
"CDRC-CSV1": {
"cdrc": [
{
"id": "CDRC-CSV1",
"enabled": true, // enable CDR client functionality
"cdr_in_dir": "/tmp/cgrates/cdrc1/in", // absolute path towards the directory where the CDRs are stored
"cdr_out_dir": "/tmp/cgrates/cdrc1/out", // absolute path towards the directory where processed CDRs will be moved
"cdr_source_id": "csv1", // free form field, tag identifying the source of the CDRs within CDRS database
},
"CDRC-CSV2": {
{
"id": "CDRC-CSV2",
"enabled": true, // enable CDR client functionality
"cdr_in_dir": "/tmp/cgrates/cdrc2/in", // absolute path towards the directory where the CDRs are stored
"cdr_out_dir": "/tmp/cgrates/cdrc2/out", // absolute path towards the directory where processed CDRs will be moved
@@ -38,7 +40,7 @@
{"field_id": "Usage", "value": "~9:s/^(\\d+)$/${1}s/"},
],
},
},
],
"sm_freeswitch": {
"enabled": true, // starts SessionManager service: <true|false>

View File

@@ -1,24 +1,14 @@
{
"cdrc": {
"CDRC-CSV2": {
"enabled": true, // enable CDR client functionality
"cdr_in_dir": "/tmp/cgrates/cdrc2/in", // absolute path towards the directory where the CDRs are stored
"cdr_out_dir": "/tmp/cgrates/cdrc2/out", // absolute path towards the directory where processed CDRs will be moved
"data_usage_multiply_factor": 0.000976563,
"cdr_source_id": "csv2", // free form field, tag identifying the source of the CDRs within CDRS database
"content_fields":[ // import template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"field_id": "ToR", "value": "~7:s/^(voice|data|sms|generic)$/*$1/"},
{"field_id": "AnswerTime", "value": "2"},
],
},
"CDRC-CSV3": {
"cdrc": [
{
"id": "CDRC-CSV3",
"enabled": true, // enable CDR client functionality
"cdr_in_dir": "/tmp/cgrates/cdrc3/in", // absolute path towards the directory where the CDRs are stored
"cdr_out_dir": "/tmp/cgrates/cdrc3/out", // absolute path towards the directory where processed CDRs will be moved
"cdr_source_id": "csv3", // free form field, tag identifying the source of the CDRs within CDRS database
},
},
],
"sm_freeswitch": {
"enabled": true, // starts SessionManager service: <true|false>

View File

@@ -85,7 +85,7 @@ func NewDefaultCGRConfig() (*CGRConfig, error) {
return nil, err
}
cfg.dfltCdreProfile = cfg.CdreProfiles[utils.META_DEFAULT].Clone() // So default will stay unique, will have nil pointer in case of no defaults loaded which is an extra check
cfg.dfltCdrcProfile = cfg.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT].Clone()
cfg.dfltCdrcProfile = cfg.CdrcProfiles["/var/log/cgrates/cdrc/in"][0].Clone()
dfltFsConnConfig = cfg.SmFsConfig.EventSocketConns[0] // We leave it crashing here on purpose if no Connection defaults defined
dfltKamConnConfig = cfg.SmKamConfig.EvapiConns[0]
if err := cfg.checkConfigSanity(); err != nil {
@@ -231,7 +231,7 @@ type CGRConfig struct {
CDRStatsEnabled bool // Enable CDR Stats service
CDRStatsSaveInterval time.Duration // Save interval duration
CdreProfiles map[string]*CdreConfig
CdrcProfiles map[string]map[string]*CdrcConfig // Number of CDRC instances running imports, format map[dirPath]map[instanceName]{Configs}
CdrcProfiles map[string][]*CdrcConfig // Number of CDRC instances running imports, format map[dirPath][]{Configs}
SmGenericConfig *SmGenericConfig
SmFsConfig *SmFsConfig // SMFreeSWITCH configuration
SmKamConfig *SmKamConfig // SM-Kamailio Configuration
@@ -319,12 +319,12 @@ func (self *CGRConfig) checkConfigSanity() error {
}
// CDRC sanity checks
for _, cdrcCfgs := range self.CdrcProfiles {
for instID, cdrcInst := range cdrcCfgs {
for _, cdrcInst := range cdrcCfgs {
if !cdrcInst.Enabled {
continue
}
if len(cdrcInst.CdrsConns) == 0 {
return fmt.Errorf("<CDRC> Instance: %s, CdrC enabled but no CDRS defined!", instID)
return fmt.Errorf("<CDRC> Instance: %s, CdrC enabled but no CDRS defined!", cdrcInst.ID)
}
for _, conn := range cdrcInst.CdrsConns {
if conn.Address == utils.MetaInternal && !self.CDRSEnabled {
@@ -844,28 +844,26 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
}
}
}
if jsnCdrcCfg != nil {
if self.CdrcProfiles == nil {
self.CdrcProfiles = make(map[string]map[string]*CdrcConfig)
self.CdrcProfiles = make(map[string][]*CdrcConfig)
}
for profileName, jsnCrc1Cfg := range jsnCdrcCfg {
for _, jsnCrc1Cfg := range jsnCdrcCfg {
if _, hasDir := self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir]; !hasDir {
self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir] = make(map[string]*CdrcConfig)
self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir] = make([]*CdrcConfig, 0)
}
if _, hasProfile := self.CdrcProfiles[profileName]; !hasProfile {
if profileName == utils.META_DEFAULT {
self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir][profileName] = new(CdrcConfig)
} else {
self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir][profileName] = self.dfltCdrcProfile.Clone() // Clone default so we do not inherit pointers
}
var cdrcInstCfg *CdrcConfig
if *jsnCrc1Cfg.Id == utils.META_DEFAULT {
cdrcInstCfg = new(CdrcConfig)
} else {
cdrcInstCfg = self.dfltCdrcProfile.Clone() // Clone default so we do not inherit pointers
}
if err = self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir][profileName].loadFromJsonCfg(jsnCrc1Cfg); err != nil {
if err := cdrcInstCfg.loadFromJsonCfg(jsnCrc1Cfg); err != nil {
return err
}
self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir] = append(self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir], cdrcInstCfg)
}
}
if jsnSmGenericCfg != nil {
if err := self.SmGenericConfig.loadFromJsonCfg(jsnSmGenericCfg); err != nil {
return err

View File

@@ -166,8 +166,9 @@ const CGRATES_CFG_JSON = `
},
"cdrc": {
"*default": {
"cdrc": [
{
"id": "*default", // identifier of the CDRC runner
"enabled": false, // enable CDR client functionality
"dry_run": false, // do not send the CDRs to CDRS, just parse them
"cdrs_conns": [
@@ -202,8 +203,8 @@ const CGRATES_CFG_JSON = `
{"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "13", "mandatory": true},
],
"trailer_fields": [], // template of the import trailer fields
}
},
},
],
"sm_generic": {
"enabled": false, // starts SessionManager service: <true|false>

View File

@@ -188,12 +188,12 @@ func (self CgrJsonCfg) CdreJsonCfgs() (map[string]*CdreJsonCfg, error) {
return cfg, nil
}
func (self CgrJsonCfg) CdrcJsonCfg() (map[string]*CdrcJsonCfg, error) {
func (self CgrJsonCfg) CdrcJsonCfg() ([]*CdrcJsonCfg, error) {
rawCfg, hasKey := self[CDRC_JSN]
if !hasKey {
return nil, nil
}
cfg := make(map[string]*CdrcJsonCfg)
cfg := make([]*CdrcJsonCfg, 0)
if err := json.Unmarshal(*rawCfg, &cfg); err != nil {
return nil, err
}

View File

@@ -302,8 +302,9 @@ func TestDfCdrcJsonCfg(t *testing.T) {
&CdrFieldJsonCfg{Tag: utils.StringPointer("Usage"), Field_id: utils.StringPointer(utils.USAGE), Type: utils.StringPointer(utils.META_COMPOSED),
Value: utils.StringPointer("13"), Mandatory: utils.BoolPointer(true)},
}
eCfg := map[string]*CdrcJsonCfg{
"*default": &CdrcJsonCfg{
eCfg := []*CdrcJsonCfg{
&CdrcJsonCfg{
Id: utils.StringPointer(utils.META_DEFAULT),
Enabled: utils.BoolPointer(false),
Dry_run: utils.BoolPointer(false),
Cdrs_conns: &[]*HaPoolJsonCfg{&HaPoolJsonCfg{
@@ -330,7 +331,7 @@ func TestDfCdrcJsonCfg(t *testing.T) {
if cfg, err := dfCgrJsonCfg.CdrcJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, cfg) {
t.Error("Received: ", cfg["*default"])
t.Errorf("Expecting: \n%s\n, received: \n%s\n: ", utils.ToIJSON(eCfg), utils.ToIJSON(cfg))
}
}
@@ -633,14 +634,16 @@ func TestNewCgrJsonCfgFromFile(t *testing.T) {
&CdrFieldJsonCfg{Field_id: utils.StringPointer(utils.ANSWER_TIME), Value: utils.StringPointer("1")},
&CdrFieldJsonCfg{Field_id: utils.StringPointer(utils.USAGE), Value: utils.StringPointer(`~9:s/^(\d+)$/${1}s/`)},
}
eCfgCdrc := map[string]*CdrcJsonCfg{
"CDRC-CSV1": &CdrcJsonCfg{
eCfgCdrc := []*CdrcJsonCfg{
&CdrcJsonCfg{
Id: utils.StringPointer("CDRC-CSV1"),
Enabled: utils.BoolPointer(true),
Cdr_in_dir: utils.StringPointer("/tmp/cgrates/cdrc1/in"),
Cdr_out_dir: utils.StringPointer("/tmp/cgrates/cdrc1/out"),
Cdr_source_id: utils.StringPointer("csv1"),
},
"CDRC-CSV2": &CdrcJsonCfg{
&CdrcJsonCfg{
Id: utils.StringPointer("CDRC-CSV2"),
Enabled: utils.BoolPointer(true),
Data_usage_multiply_factor: utils.Float64Pointer(0.000976563),
Run_delay: utils.IntPointer(1),
@@ -653,8 +656,7 @@ func TestNewCgrJsonCfgFromFile(t *testing.T) {
if cfg, err := cgrJsonCfg.CdrcJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfgCdrc, cfg) {
key := "CDRC-CSV2"
t.Errorf("Expecting:\n %+v\n received:\n %+v\n", utils.ToIJSON(eCfgCdrc[key]), utils.ToIJSON(cfg[key]))
t.Errorf("Expecting:\n %+v\n received:\n %+v\n", utils.ToIJSON(eCfgCdrc), utils.ToIJSON(cfg))
}
eCfgSmFs := &SmFsJsonCfg{
Enabled: utils.BoolPointer(true),

View File

@@ -32,10 +32,11 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
t.Error(err)
}
eCgrCfg, _ := NewDefaultCGRConfig()
eCgrCfg.CdrcProfiles = make(map[string]map[string]*CdrcConfig)
eCgrCfg.CdrcProfiles = make(map[string][]*CdrcConfig)
// Default instance first
eCgrCfg.CdrcProfiles["/var/log/cgrates/cdrc/in"] = map[string]*CdrcConfig{
"*default": &CdrcConfig{
eCgrCfg.CdrcProfiles["/var/log/cgrates/cdrc/in"] = []*CdrcConfig{
&CdrcConfig{
ID: utils.META_DEFAULT,
Enabled: false,
CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}},
CdrFormat: "csv",
@@ -79,8 +80,9 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
TrailerFields: make([]*CfgCdrField, 0),
},
}
eCgrCfg.CdrcProfiles["/tmp/cgrates/cdrc1/in"] = map[string]*CdrcConfig{
"CDRC-CSV1": &CdrcConfig{
eCgrCfg.CdrcProfiles["/tmp/cgrates/cdrc1/in"] = []*CdrcConfig{
&CdrcConfig{
ID: "CDRC-CSV1",
Enabled: true,
CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}},
CdrFormat: "csv",
@@ -122,14 +124,15 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
TrailerFields: make([]*CfgCdrField, 0),
},
}
eCgrCfg.CdrcProfiles["/tmp/cgrates/cdrc2/in"] = map[string]*CdrcConfig{
"CDRC-CSV2": &CdrcConfig{
eCgrCfg.CdrcProfiles["/tmp/cgrates/cdrc2/in"] = []*CdrcConfig{
&CdrcConfig{
ID: "CDRC-CSV2",
Enabled: true,
CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}},
CdrFormat: "csv",
FieldSeparator: ',',
DataUsageMultiplyFactor: 0.000976563,
RunDelay: 0,
RunDelay: 1000000000,
MaxOpenFiles: 1024,
CdrInDir: "/tmp/cgrates/cdrc2/in",
CdrOutDir: "/tmp/cgrates/cdrc2/out",
@@ -137,16 +140,19 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
HeaderFields: make([]*CfgCdrField, 0),
ContentFields: []*CfgCdrField{
&CfgCdrField{Tag: "", Type: "", FieldId: utils.TOR, Value: utils.ParseRSRFieldsMustCompile("~7:s/^(voice|data|sms|generic)$/*$1/", utils.INFIELD_SEP),
&CfgCdrField{FieldId: utils.TOR, Value: utils.ParseRSRFieldsMustCompile("~7:s/^(voice|data|sms|mms|generic)$/*$1/", utils.INFIELD_SEP),
FieldFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), Width: 0, Strip: "", Padding: "", Layout: "", Mandatory: false},
&CfgCdrField{Tag: "", Type: "", FieldId: utils.ANSWER_TIME, Value: utils.ParseRSRFieldsMustCompile("2", utils.INFIELD_SEP),
&CfgCdrField{Tag: "", Type: "", FieldId: utils.ANSWER_TIME, Value: utils.ParseRSRFieldsMustCompile("1", utils.INFIELD_SEP),
FieldFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), Width: 0, Strip: "", Padding: "", Layout: "", Mandatory: false},
&CfgCdrField{FieldId: utils.USAGE, Value: utils.ParseRSRFieldsMustCompile("~9:s/^(\\d+)$/${1}s/", utils.INFIELD_SEP),
FieldFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), Width: 0, Strip: "", Padding: "", Layout: "", Mandatory: false},
},
TrailerFields: make([]*CfgCdrField, 0),
},
}
eCgrCfg.CdrcProfiles["/tmp/cgrates/cdrc3/in"] = map[string]*CdrcConfig{
"CDRC-CSV3": &CdrcConfig{
eCgrCfg.CdrcProfiles["/tmp/cgrates/cdrc3/in"] = []*CdrcConfig{
&CdrcConfig{
ID: "CDRC-CSV3",
Enabled: true,
CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}},
CdrFormat: "csv",
@@ -189,6 +195,6 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
},
}
if !reflect.DeepEqual(eCgrCfg.CdrcProfiles, cgrCfg.CdrcProfiles) {
t.Errorf("Expected: %+v, received: %+v", eCgrCfg.CdrcProfiles, cgrCfg.CdrcProfiles)
t.Errorf("Expected: \n%s\n, received: \n%s\n", utils.ToJSON(eCgrCfg.CdrcProfiles), utils.ToJSON(cgrCfg.CdrcProfiles))
}
}

View File

@@ -143,6 +143,7 @@ type CdreJsonCfg struct {
// Cdrc config section
type CdrcJsonCfg struct {
Id *string
Enabled *bool
Dry_run *bool
Cdrs_conns *[]*HaPoolJsonCfg

View File

@@ -22,8 +22,9 @@
},
"cdrc": {
"FLATSTORE": {
"cdrc": [
{
"id": "FLATSTORE",
"enabled": true, // enable CDR client functionality
"cdrs_conns": [
{"address": "*internal"} // address where to reach CDR server. <*internal|x.y.z.y:1234>
@@ -56,6 +57,6 @@
{"tag": "DialogId", "cdr_field_id": "DialogId", "type": "cdrfield", "value": "11"},
],
},
},
],
}

View File

@@ -22,8 +22,9 @@
},
"cdrc": {
"FWV1": {
"cdrc": [
{
"id": "FWV1",
"enabled": true, // enable CDR client functionality
"dry_run": true,
"cdrs_conns": [
@@ -62,6 +63,6 @@
{"tag": "TotalDuration", "type": "metatag", "metatag_id":"total_duration", "value": "150", "width": 12},
],
},
},
],
}

View File

@@ -1,8 +1,9 @@
{
// Contains CDRC template for FreeSWITCH CDR
"cdrc": {
"CDRC-CSV2": {
"cdrc": [
{
"id": "CDRC-CSV2",
"enabled": true, // enable CDR client functionality
"cdr_in_dir": "/tmp/cgrates/cdrc_fs/in", // absolute path towards the directory where the CDRs are stored
"cdr_out_dir": "/tmp/cgrates/cdrc_fs/out", // absolute path towards the directory where processed CDRs will be moved
@@ -22,6 +23,6 @@
{"tag": "usage", "cdr_field_id": "usage", "type": "cdrfield", "value": "~8:s/^(\\d+)$/${1}s/", "mandatory": true},
],
},
},
],
}

View File

@@ -16,14 +16,16 @@
"enabled": true, // start the CDR Server service: <true|false>
},
"cdrc": {
"CDRC-CSV1": {
"cdrc": [
{
"id": "CDRC-CSV1",
"enabled": true, // enable CDR client functionality
"cdr_in_dir": "/tmp/cgrates/cdrc1/in", // absolute path towards the directory where the CDRs are stored
"cdr_out_dir": "/tmp/cgrates/cdrc1/out", // absolute path towards the directory where processed CDRs will be moved
"cdr_source_id": "csv1", // free form field, tag identifying the source of the CDRs within CDRS database
},
"CDRC-CSV2": {
{
"id": "CDRC-CSV2",
"enabled": true, // enable CDR client functionality
"cdr_in_dir": "/tmp/cgrates/cdrc2/in", // absolute path towards the directory where the CDRs are stored
"cdr_out_dir": "/tmp/cgrates/cdrc2/out", // absolute path towards the directory where processed CDRs will be moved
@@ -43,7 +45,8 @@
{"cdr_field_id": "usage", "value": "~9:s/^(\\d+)$/${1}s/"},
],
},
"CDRC-CSV3": {
{
"id": "CDRC-CSV3",
"enabled": true, // enable CDR client functionality
"field_separator": ";", // separator used in case of csv files
"cdr_in_dir": "/tmp/cgrates/cdrc3/in", // absolute path towards the directory where the CDRs are stored
@@ -64,7 +67,7 @@
{"cdr_field_id": "usage", "value": "~6:s/^(\\d+)$/${1}s/"},
],
}
},
],
"cdre": {
"CDRE-FW1": {