CDRC - PartialCacheExpiryAction

This commit is contained in:
DanB
2016-07-31 19:12:50 +02:00
parent 7271ea85d5
commit ae8549e13c
8 changed files with 217 additions and 165 deletions

View File

@@ -71,7 +71,7 @@ func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclien
if cdrc.unpairedRecordsCache, err = NewUnpairedRecordsCache(cdrcCfg.PartialRecordCache, cdrcCfg.CdrOutDir, cdrcCfg.FieldSeparator); err != nil {
return nil, err
}
if cdrc.partialRecordsCache, err = NewPartialRecordsCache(cdrcCfg.PartialRecordCache, cdrcCfg.CdrOutDir, cdrcCfg.FieldSeparator, roundDecimals, cdrc.timezone, cdrc.httpSkipTlsCheck); err != nil {
if cdrc.partialRecordsCache, err = NewPartialRecordsCache(cdrcCfg.PartialRecordCache, cdrcCfg.PartialCacheExpiryAction, cdrcCfg.CdrOutDir, cdrcCfg.FieldSeparator, roundDecimals, cdrc.timezone, cdrc.httpSkipTlsCheck, cdrc.cdrs); err != nil {
return nil, err
}
// Before processing, make sure in and out folders exist

View File

@@ -30,24 +30,27 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
const (
PartialRecordsSuffix = "partial"
)
func NewPartialRecordsCache(ttl time.Duration, cdrOutDir string, csvSep rune, roundDecimals int, timezone string, httpSkipTlsCheck bool) (*PartialRecordsCache, error) {
return &PartialRecordsCache{ttl: ttl, cdrOutDir: cdrOutDir, csvSep: csvSep, roundDecimals: roundDecimals, timezone: timezone, httpSkipTlsCheck: httpSkipTlsCheck,
func NewPartialRecordsCache(ttl time.Duration, expiryAction string, cdrOutDir string, csvSep rune, roundDecimals int, timezone string, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection) (*PartialRecordsCache, error) {
return &PartialRecordsCache{ttl: ttl, expiryAction: expiryAction, cdrOutDir: cdrOutDir, csvSep: csvSep, roundDecimals: roundDecimals, timezone: timezone, httpSkipTlsCheck: httpSkipTlsCheck,
partialRecords: make(map[string]*PartialCDRRecord), dumpTimers: make(map[string]*time.Timer), guard: engine.Guardian}, nil
}
type PartialRecordsCache struct {
ttl time.Duration
expiryAction string
cdrOutDir string
csvSep rune
roundDecimals int
timezone string
httpSkipTlsCheck bool
cdrs rpcclient.RpcClientConnection
partialRecords map[string]*PartialCDRRecord // [OriginID]*PartialRecord
dumpTimers map[string]*time.Timer // [OriginID]*time.Timer which can be canceled or reset
guard *engine.GuardianLock
@@ -85,22 +88,51 @@ func (prc *PartialRecordsCache) dumpPartialRecords(originID string) {
}
}
// Called when record expires in cache, will send the CDR merged (forcing it's completion) to the CDRS
func (prc *PartialRecordsCache) postCDR(originID string) {
_, err := prc.guard.Guard(func() (interface{}, error) {
if prc.partialRecords[originID].Len() != 0 { // Only write the file if there are records in the cache
cdr := prc.partialRecords[originID].MergeCDRs()
cdr.Partial = false // force completion
var reply string
if err := prc.cdrs.Call("CdrsV1.ProcessCDR", cdr, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<Cdrc> Failed sending CDR %+v from partial cache, error: %s", cdr, err.Error()))
} else if reply != utils.OK {
utils.Logger.Err(fmt.Sprintf("<Cdrc> Received unexpected reply for CDR, %+v, reply: %s", cdr, reply))
}
}
delete(prc.partialRecords, originID)
return nil, nil
}, 0, originID)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRC> Failed posting from cache CDR with originID: %s, error: %s", originID, err.Error()))
}
}
// Called to cache a partial record.
// If exists in cache, CDRs will be updated
// Locking should be handled at higher layer
func (prc *PartialRecordsCache) cachePartialCDR(pCDR *PartialCDRRecord) *PartialCDRRecord {
func (prc *PartialRecordsCache) cachePartialCDR(pCDR *PartialCDRRecord) (*PartialCDRRecord, error) {
originID := pCDR.cdrs[0].OriginID
if tmr, hasIt := prc.dumpTimers[originID]; hasIt {
if tmr, hasIt := prc.dumpTimers[originID]; hasIt { // Update existing timer
tmr.Reset(prc.ttl)
} else {
prc.dumpTimers[originID] = time.AfterFunc(prc.ttl, func() { prc.dumpPartialRecords(originID) }) // Schedule dumping of the partial CDR
switch prc.expiryAction {
case utils.MetaDumpToFile:
prc.dumpTimers[originID] = time.AfterFunc(prc.ttl, func() { prc.dumpPartialRecords(originID) }) // Schedule dumping of the partial CDR
case utils.MetaPostCDR:
prc.dumpTimers[originID] = time.AfterFunc(prc.ttl, func() { prc.postCDR(originID) }) // Schedule dumping of the partial CDR
default:
return nil, fmt.Errorf("Unsupported PartialCacheExpiryAction: %s", prc.expiryAction)
}
}
if _, hasIt := prc.partialRecords[originID]; !hasIt {
prc.partialRecords[originID] = pCDR
} else { // Exists, update it's records
prc.partialRecords[originID].cdrs = append(prc.partialRecords[originID].cdrs, pCDR.cdrs...)
}
return prc.partialRecords[originID]
return prc.partialRecords[originID], nil
}
// Called to uncache partialCDR and remove automatic dumping of the cached records
@@ -122,7 +154,10 @@ func (prc *PartialRecordsCache) MergePartialCDRRecord(pCDR *PartialCDRRecord) (*
if _, hasIt := prc.partialRecords[originID]; !hasIt && pCDR.Len() == 1 && !pCDR.cdrs[0].Partial {
return pCDR.cdrs[0], nil // Special case when not a partial CDR and not having cached CDRs on same OriginID
}
cachedPartialCDR := prc.cachePartialCDR(pCDR)
cachedPartialCDR, err := prc.cachePartialCDR(pCDR)
if err != nil {
return nil, err
}
var final bool
for _, cdr := range pCDR.cdrs {
if !cdr.Partial {

View File

@@ -25,28 +25,29 @@ import (
)
type CdrcConfig struct {
ID string // free-form text identifying this CDRC instance
Enabled bool // Enable/Disable the profile
DryRun bool // Do not post CDRs to the server
CdrsConns []*HaPoolConfig // The address where CDRs can be reached
CdrFormat string // The type of CDR file to process <csv|opensips_flatstore>
FieldSeparator rune // The separator to use when reading csvs
DataUsageMultiplyFactor float64 // Conversion factor for data usage
Timezone string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
RunDelay time.Duration // Delay between runs, 0 for inotify driven requests
MaxOpenFiles int // Maximum number of files opened simultaneously
CdrInDir string // Folder to process CDRs from
CdrOutDir string // Folder to move processed CDRs to
FailedCallsPrefix string // Used in case of flatstore CDRs to avoid searching for BYE records
CDRPath utils.HierarchyPath // used for XML CDRs to specify the path towards CDR elements
CdrSourceId string // Source identifier for the processed CDRs
CdrFilter utils.RSRFields // Filter CDR records to import
ContinueOnSuccess bool // Continue after execution
PartialRecordCache time.Duration // Duration to cache partial records when not pairing
HeaderFields []*CfgCdrField
ContentFields []*CfgCdrField
TrailerFields []*CfgCdrField
CacheDumpFields []*CfgCdrField
ID string // free-form text identifying this CDRC instance
Enabled bool // Enable/Disable the profile
DryRun bool // Do not post CDRs to the server
CdrsConns []*HaPoolConfig // The address where CDRs can be reached
CdrFormat string // The type of CDR file to process <csv|opensips_flatstore>
FieldSeparator rune // The separator to use when reading csvs
DataUsageMultiplyFactor float64 // Conversion factor for data usage
Timezone string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
RunDelay time.Duration // Delay between runs, 0 for inotify driven requests
MaxOpenFiles int // Maximum number of files opened simultaneously
CdrInDir string // Folder to process CDRs from
CdrOutDir string // Folder to move processed CDRs to
FailedCallsPrefix string // Used in case of flatstore CDRs to avoid searching for BYE records
CDRPath utils.HierarchyPath // used for XML CDRs to specify the path towards CDR elements
CdrSourceId string // Source identifier for the processed CDRs
CdrFilter utils.RSRFields // Filter CDR records to import
ContinueOnSuccess bool // Continue after execution
PartialRecordCache time.Duration // Duration to cache partial records when not pairing
PartialCacheExpiryAction string
HeaderFields []*CfgCdrField
ContentFields []*CfgCdrField
TrailerFields []*CfgCdrField
CacheDumpFields []*CfgCdrField
}
func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error {
@@ -117,6 +118,9 @@ func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error {
return err
}
}
if jsnCfg.Partial_cache_expiry_action != nil {
self.PartialCacheExpiryAction = *jsnCfg.Partial_cache_expiry_action
}
if jsnCfg.Header_fields != nil {
if self.HeaderFields, err = CfgCdrFieldsFromCdrFieldsJsonCfg(*jsnCfg.Header_fields); err != nil {
return err
@@ -159,6 +163,8 @@ func (self *CdrcConfig) Clone() *CdrcConfig {
clnCdrc.CdrInDir = self.CdrInDir
clnCdrc.CdrOutDir = self.CdrOutDir
clnCdrc.CdrSourceId = self.CdrSourceId
clnCdrc.PartialRecordCache = self.PartialRecordCache
clnCdrc.PartialCacheExpiryAction = self.PartialCacheExpiryAction
clnCdrc.HeaderFields = make([]*CfgCdrField, len(self.HeaderFields))
clnCdrc.ContentFields = make([]*CfgCdrField, len(self.ContentFields))
clnCdrc.TrailerFields = make([]*CfgCdrField, len(self.TrailerFields))

View File

@@ -28,24 +28,24 @@ const CGRATES_CFG_JSON = `
// This is what you get when you load CGRateS with an empty configuration file.
"general": {
"http_skip_tls_verify": false, // if enabled Http Client will accept any TLS certificate
"rounding_decimals": 5, // system level precision for floats
"dbdata_encoding": "msgpack", // encoding used to store object data in strings: <msgpack|json>
"http_skip_tls_verify": false, // if enabled Http Client will accept any TLS certificate
"rounding_decimals": 5, // system level precision for floats
"dbdata_encoding": "msgpack", // encoding used to store object data in strings: <msgpack|json>
"tpexport_dir": "/var/spool/cgrates/tpe", // path towards export folder for offline Tariff Plans
"httpposter_attempts": 3, // number of http attempts before considering request failed (eg: *call_url)
"httpposter_attempts": 3, // number of http attempts before considering request failed (eg: *call_url)
"http_failed_dir": "/var/spool/cgrates/http_failed", // directory path where we store failed http requests
"default_request_type": "*rated", // default request type to consider when missing from requests: <""|*prepaid|*postpaid|*pseudoprepaid|*rated>
"default_category": "call", // default category to consider when missing from requests
"default_tenant": "cgrates.org", // default tenant to consider when missing from requests
"default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"connect_attempts": 3, // initial server connect attempts
"reconnects": -1, // number of retries in case of connection lost
"connect_timeout": "1s", // consider connection unsuccessful on timeout, 0 to disable the feature
"reply_timeout": "2s", // consider connection down for replies taking longer than this value
"response_cache_ttl": "0s", // the life span of a cached response
"internal_ttl": "2m", // maximum duration to wait for internal connections before giving up
"locking_timeout": "5s", // timeout internal locks to avoid deadlocks
"cache_dump_dir": "", // cache dump for faster start (leave empty to disable)
"default_request_type": "*rated", // default request type to consider when missing from requests: <""|*prepaid|*postpaid|*pseudoprepaid|*rated>
"default_category": "call", // default category to consider when missing from requests
"default_tenant": "cgrates.org", // default tenant to consider when missing from requests
"default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"connect_attempts": 3, // initial server connect attempts
"reconnects": -1, // number of retries in case of connection lost
"connect_timeout": "1s", // consider connection unsuccessful on timeout, 0 to disable the feature
"reply_timeout": "2s", // consider connection down for replies taking longer than this value
"response_cache_ttl": "0s", // the life span of a cached response
"internal_ttl": "2m", // maximum duration to wait for internal connections before giving up
"locking_timeout": "5s", // timeout internal locks to avoid deadlocks
"cache_dump_dir": "", // cache dump for faster start (leave empty to disable)
},
@@ -137,28 +137,28 @@ const CGRATES_CFG_JSON = `
"cdrc": [
{
"id": "*default", // identifier of the CDRC runner
"enabled": false, // enable CDR client functionality
"dry_run": false, // do not send the CDRs to CDRS, just parse them
"enabled": false, // enable CDR client functionality
"dry_run": false, // do not send the CDRs to CDRS, just parse them
"cdrs_conns": [
{"address": "*internal"} // address where to reach CDR server. <*internal|x.y.z.y:1234>
{"address": "*internal"} // address where to reach CDR server. <*internal|x.y.z.y:1234>
],
"cdr_format": "csv", // CDR file format <csv|freeswitch_csv|fwv|opensips_flatstore|partial_csv>
"field_separator": ",", // separator used in case of csv files
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify
"max_open_files": 1024, // maximum simultaneous files to process, 0 for unlimited
"data_usage_multiply_factor": 1024, // conversion factor for data usage
"cdr_in_dir": "/var/spool/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored
"cdr_format": "csv", // CDR file format <csv|freeswitch_csv|fwv|opensips_flatstore|partial_csv>
"field_separator": ",", // separator used in case of csv files
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify
"max_open_files": 1024, // maximum simultaneous files to process, 0 for unlimited
"data_usage_multiply_factor": 1024, // conversion factor for data usage
"cdr_in_dir": "/var/spool/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored
"cdr_out_dir": "/var/spool/cgrates/cdrc/out", // absolute path towards the directory where processed CDRs will be moved
"failed_calls_prefix": "missed_calls", // used in case of flatstore CDRs to avoid searching for BYE records
"cdr_path": "", // path towards one CDR element in case of XML CDRs
"cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database
"cdr_filter": "", // filter CDR records to import
"continue_on_success": false, // continue to the next template if executed
"partial_record_cache": "10s", // duration to cache partial records when not pairing
"cache_expiry_action": "*post_cdr", // action taken when cache when records in cache are timed-out
"header_fields": [], // template of the import header fields
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
"failed_calls_prefix": "missed_calls", // used in case of flatstore CDRs to avoid searching for BYE records
"cdr_path": "", // path towards one CDR element in case of XML CDRs
"cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database
"cdr_filter": "", // filter CDR records to import
"continue_on_success": false, // continue to the next template if executed
"partial_record_cache": "10s", // duration to cache partial records when not pairing
"partial_cache_expiry_action": "*dump_to_file", // action taken when cache when records in cache are timed-out <*dump_to_file|*post_cdr>
"header_fields": [], // template of the import header fields
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "2", "mandatory": true},
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "3", "mandatory": true},
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "4", "mandatory": true},
@@ -297,10 +297,10 @@ const CGRATES_CFG_JSON = `
"enabled": false, // starts SessionManager service: <true|false>
"listen_udp": "127.0.0.1:2020", // address where to listen for datagram events coming from OpenSIPS
"rals_conns": [
{"address": "*internal"} // address where to reach the Rater <""|*internal|127.0.0.1:2013>
{"address": "*internal"} // address where to reach the Rater <""|*internal|127.0.0.1:2013>
],
"cdrs_conns": [
{"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234>
{"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234>
],
"reconnects": 5, // number of reconnects if connection is lost
"create_cdr": false, // create CDR out of events and sends it to CDRS component

View File

@@ -349,24 +349,25 @@ func TestDfCdrcJsonCfg(t *testing.T) {
Cdrs_conns: &[]*HaPoolJsonCfg{&HaPoolJsonCfg{
Address: utils.StringPointer(utils.MetaInternal),
}},
Cdr_format: utils.StringPointer("csv"),
Field_separator: utils.StringPointer(","),
Timezone: utils.StringPointer(""),
Run_delay: utils.IntPointer(0),
Max_open_files: utils.IntPointer(1024),
Data_usage_multiply_factor: utils.Float64Pointer(1024.0),
Cdr_in_dir: utils.StringPointer("/var/spool/cgrates/cdrc/in"),
Cdr_out_dir: utils.StringPointer("/var/spool/cgrates/cdrc/out"),
Failed_calls_prefix: utils.StringPointer("missed_calls"),
Cdr_path: utils.StringPointer(""),
Cdr_source_id: utils.StringPointer("freeswitch_csv"),
Cdr_filter: utils.StringPointer(""),
Continue_on_success: utils.BoolPointer(false),
Partial_record_cache: utils.StringPointer("10s"),
Header_fields: &eFields,
Content_fields: &cdrFields,
Trailer_fields: &eFields,
Cache_dump_fields: &cacheDumpFields,
Cdr_format: utils.StringPointer("csv"),
Field_separator: utils.StringPointer(","),
Timezone: utils.StringPointer(""),
Run_delay: utils.IntPointer(0),
Max_open_files: utils.IntPointer(1024),
Data_usage_multiply_factor: utils.Float64Pointer(1024.0),
Cdr_in_dir: utils.StringPointer("/var/spool/cgrates/cdrc/in"),
Cdr_out_dir: utils.StringPointer("/var/spool/cgrates/cdrc/out"),
Failed_calls_prefix: utils.StringPointer("missed_calls"),
Cdr_path: utils.StringPointer(""),
Cdr_source_id: utils.StringPointer("freeswitch_csv"),
Cdr_filter: utils.StringPointer(""),
Continue_on_success: utils.BoolPointer(false),
Partial_record_cache: utils.StringPointer("10s"),
Partial_cache_expiry_action: utils.StringPointer(utils.MetaDumpToFile),
Header_fields: &eFields,
Content_fields: &cdrFields,
Trailer_fields: &eFields,
Cache_dump_fields: &cacheDumpFields,
},
}
if cfg, err := dfCgrJsonCfg.CdrcJsonCfg(); err != nil {

View File

@@ -36,22 +36,23 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
// Default instance first
eCgrCfg.CdrcProfiles["/var/spool/cgrates/cdrc/in"] = []*CdrcConfig{
&CdrcConfig{
ID: utils.META_DEFAULT,
Enabled: false,
CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}},
CdrFormat: "csv",
FieldSeparator: ',',
DataUsageMultiplyFactor: 1024,
RunDelay: 0,
MaxOpenFiles: 1024,
CdrInDir: "/var/spool/cgrates/cdrc/in",
CdrOutDir: "/var/spool/cgrates/cdrc/out",
FailedCallsPrefix: "missed_calls",
CDRPath: utils.HierarchyPath([]string{""}),
CdrSourceId: "freeswitch_csv",
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
PartialRecordCache: time.Duration(10) * time.Second,
HeaderFields: make([]*CfgCdrField, 0),
ID: utils.META_DEFAULT,
Enabled: false,
CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}},
CdrFormat: "csv",
FieldSeparator: ',',
DataUsageMultiplyFactor: 1024,
RunDelay: 0,
MaxOpenFiles: 1024,
CdrInDir: "/var/spool/cgrates/cdrc/in",
CdrOutDir: "/var/spool/cgrates/cdrc/out",
FailedCallsPrefix: "missed_calls",
CDRPath: utils.HierarchyPath([]string{""}),
CdrSourceId: "freeswitch_csv",
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
PartialRecordCache: time.Duration(10) * time.Second,
PartialCacheExpiryAction: utils.MetaDumpToFile,
HeaderFields: make([]*CfgCdrField, 0),
ContentFields: []*CfgCdrField{
&CfgCdrField{Tag: "TOR", Type: utils.META_COMPOSED, FieldId: utils.TOR, Value: utils.ParseRSRFieldsMustCompile("2", utils.INFIELD_SEP),
FieldFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), Width: 0, Strip: "", Padding: "", Layout: "", Mandatory: true},
@@ -100,20 +101,22 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
}
eCgrCfg.CdrcProfiles["/tmp/cgrates/cdrc1/in"] = []*CdrcConfig{
&CdrcConfig{
ID: "CDRC-CSV1",
Enabled: true,
CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}},
CdrFormat: "csv",
FieldSeparator: ',',
DataUsageMultiplyFactor: 1024,
RunDelay: 0,
MaxOpenFiles: 1024,
CdrInDir: "/tmp/cgrates/cdrc1/in",
CdrOutDir: "/tmp/cgrates/cdrc1/out",
CDRPath: nil,
CdrSourceId: "csv1",
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
HeaderFields: make([]*CfgCdrField, 0),
ID: "CDRC-CSV1",
Enabled: true,
CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}},
CdrFormat: "csv",
FieldSeparator: ',',
DataUsageMultiplyFactor: 1024,
RunDelay: 0,
MaxOpenFiles: 1024,
CdrInDir: "/tmp/cgrates/cdrc1/in",
CdrOutDir: "/tmp/cgrates/cdrc1/out",
CDRPath: nil,
CdrSourceId: "csv1",
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
PartialRecordCache: time.Duration(10) * time.Second,
PartialCacheExpiryAction: utils.MetaDumpToFile,
HeaderFields: make([]*CfgCdrField, 0),
ContentFields: []*CfgCdrField{
&CfgCdrField{Tag: "TOR", Type: utils.META_COMPOSED, FieldId: utils.TOR, Value: utils.ParseRSRFieldsMustCompile("2", utils.INFIELD_SEP),
FieldFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), Width: 0, Strip: "", Padding: "", Layout: "", Mandatory: true},
@@ -162,20 +165,22 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
}
eCgrCfg.CdrcProfiles["/tmp/cgrates/cdrc2/in"] = []*CdrcConfig{
&CdrcConfig{
ID: "CDRC-CSV2",
Enabled: true,
CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}},
CdrFormat: "csv",
FieldSeparator: ',',
DataUsageMultiplyFactor: 0.000976563,
RunDelay: 1000000000,
MaxOpenFiles: 1024,
CdrInDir: "/tmp/cgrates/cdrc2/in",
CdrOutDir: "/tmp/cgrates/cdrc2/out",
CDRPath: nil,
CdrSourceId: "csv2",
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
HeaderFields: make([]*CfgCdrField, 0),
ID: "CDRC-CSV2",
Enabled: true,
CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}},
CdrFormat: "csv",
FieldSeparator: ',',
DataUsageMultiplyFactor: 0.000976563,
RunDelay: 1000000000,
MaxOpenFiles: 1024,
CdrInDir: "/tmp/cgrates/cdrc2/in",
CdrOutDir: "/tmp/cgrates/cdrc2/out",
CDRPath: nil,
CdrSourceId: "csv2",
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
PartialRecordCache: time.Duration(10) * time.Second,
PartialCacheExpiryAction: utils.MetaDumpToFile,
HeaderFields: make([]*CfgCdrField, 0),
ContentFields: []*CfgCdrField{
&CfgCdrField{FieldId: utils.TOR, Value: utils.ParseRSRFieldsMustCompile("~7:s/^(voice|data|sms|mms|generic)$/*$1/", utils.INFIELD_SEP),
FieldFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), Width: 0, Strip: "", Padding: "", Layout: "", Mandatory: false},
@@ -206,20 +211,22 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
}
eCgrCfg.CdrcProfiles["/tmp/cgrates/cdrc3/in"] = []*CdrcConfig{
&CdrcConfig{
ID: "CDRC-CSV3",
Enabled: true,
CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}},
CdrFormat: "csv",
FieldSeparator: ',',
DataUsageMultiplyFactor: 1024,
RunDelay: 0,
MaxOpenFiles: 1024,
CdrInDir: "/tmp/cgrates/cdrc3/in",
CdrOutDir: "/tmp/cgrates/cdrc3/out",
CDRPath: nil,
CdrSourceId: "csv3",
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
HeaderFields: make([]*CfgCdrField, 0),
ID: "CDRC-CSV3",
Enabled: true,
CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}},
CdrFormat: "csv",
FieldSeparator: ',',
DataUsageMultiplyFactor: 1024,
RunDelay: 0,
MaxOpenFiles: 1024,
CdrInDir: "/tmp/cgrates/cdrc3/in",
CdrOutDir: "/tmp/cgrates/cdrc3/out",
CDRPath: nil,
CdrSourceId: "csv3",
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
PartialRecordCache: time.Duration(10) * time.Second,
PartialCacheExpiryAction: utils.MetaDumpToFile,
HeaderFields: make([]*CfgCdrField, 0),
ContentFields: []*CfgCdrField{
&CfgCdrField{Tag: "TOR", Type: utils.META_COMPOSED, FieldId: utils.TOR, Value: utils.ParseRSRFieldsMustCompile("2", utils.INFIELD_SEP),
FieldFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), Width: 0, Strip: "", Padding: "", Layout: "", Mandatory: true},

View File

@@ -148,28 +148,29 @@ type CdreJsonCfg struct {
// Cdrc config section
type CdrcJsonCfg struct {
Id *string
Enabled *bool
Dry_run *bool
Cdrs_conns *[]*HaPoolJsonCfg
Cdr_format *string
Field_separator *string
Timezone *string
Run_delay *int
Data_usage_multiply_factor *float64
Cdr_in_dir *string
Cdr_out_dir *string
Failed_calls_prefix *string
Cdr_path *string
Cdr_source_id *string
Cdr_filter *string
Continue_on_success *bool
Max_open_files *int
Partial_record_cache *string
Header_fields *[]*CdrFieldJsonCfg
Content_fields *[]*CdrFieldJsonCfg
Trailer_fields *[]*CdrFieldJsonCfg
Cache_dump_fields *[]*CdrFieldJsonCfg
Id *string
Enabled *bool
Dry_run *bool
Cdrs_conns *[]*HaPoolJsonCfg
Cdr_format *string
Field_separator *string
Timezone *string
Run_delay *int
Data_usage_multiply_factor *float64
Cdr_in_dir *string
Cdr_out_dir *string
Failed_calls_prefix *string
Cdr_path *string
Cdr_source_id *string
Cdr_filter *string
Continue_on_success *bool
Max_open_files *int
Partial_record_cache *string
Partial_cache_expiry_action *string
Header_fields *[]*CdrFieldJsonCfg
Content_fields *[]*CdrFieldJsonCfg
Trailer_fields *[]*CdrFieldJsonCfg
Cache_dump_fields *[]*CdrFieldJsonCfg
}
// SM-Generic config section

View File

@@ -300,4 +300,6 @@ const (
MetaDateTime = "*datetime"
MetaMaskedDestination = "*masked_destination"
MetaUnixTimestamp = "*unix_timestamp"
MetaPostCDR = "*post_cdr"
MetaDumpToFile = "*dump_to_file"
)