diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index e83bf5ad0..53bbcbe01 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -71,7 +71,7 @@ func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclien if cdrc.unpairedRecordsCache, err = NewUnpairedRecordsCache(cdrcCfg.PartialRecordCache, cdrcCfg.CdrOutDir, cdrcCfg.FieldSeparator); err != nil { return nil, err } - if cdrc.partialRecordsCache, err = NewPartialRecordsCache(cdrcCfg.PartialRecordCache, cdrcCfg.CdrOutDir, cdrcCfg.FieldSeparator, roundDecimals, cdrc.timezone, cdrc.httpSkipTlsCheck); err != nil { + if cdrc.partialRecordsCache, err = NewPartialRecordsCache(cdrcCfg.PartialRecordCache, cdrcCfg.PartialCacheExpiryAction, cdrcCfg.CdrOutDir, cdrcCfg.FieldSeparator, roundDecimals, cdrc.timezone, cdrc.httpSkipTlsCheck, cdrc.cdrs); err != nil { return nil, err } // Before processing, make sure in and out folders exist diff --git a/cdrc/partial_cdr.go b/cdrc/partial_cdr.go index 8504d6b75..0bbd1d0d1 100644 --- a/cdrc/partial_cdr.go +++ b/cdrc/partial_cdr.go @@ -30,24 +30,27 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) const ( PartialRecordsSuffix = "partial" ) -func NewPartialRecordsCache(ttl time.Duration, cdrOutDir string, csvSep rune, roundDecimals int, timezone string, httpSkipTlsCheck bool) (*PartialRecordsCache, error) { - return &PartialRecordsCache{ttl: ttl, cdrOutDir: cdrOutDir, csvSep: csvSep, roundDecimals: roundDecimals, timezone: timezone, httpSkipTlsCheck: httpSkipTlsCheck, +func NewPartialRecordsCache(ttl time.Duration, expiryAction string, cdrOutDir string, csvSep rune, roundDecimals int, timezone string, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection) (*PartialRecordsCache, error) { + return &PartialRecordsCache{ttl: ttl, expiryAction: expiryAction, cdrOutDir: cdrOutDir, csvSep: csvSep, roundDecimals: roundDecimals, timezone: timezone, httpSkipTlsCheck: httpSkipTlsCheck, partialRecords: make(map[string]*PartialCDRRecord), dumpTimers: make(map[string]*time.Timer), guard: engine.Guardian}, nil } type PartialRecordsCache struct { ttl time.Duration + expiryAction string cdrOutDir string csvSep rune roundDecimals int timezone string httpSkipTlsCheck bool + cdrs rpcclient.RpcClientConnection partialRecords map[string]*PartialCDRRecord // [OriginID]*PartialRecord dumpTimers map[string]*time.Timer // [OriginID]*time.Timer which can be canceled or reset guard *engine.GuardianLock @@ -85,22 +88,51 @@ func (prc *PartialRecordsCache) dumpPartialRecords(originID string) { } } +// Called when record expires in cache, will send the CDR merged (forcing it's completion) to the CDRS +func (prc *PartialRecordsCache) postCDR(originID string) { + _, err := prc.guard.Guard(func() (interface{}, error) { + if prc.partialRecords[originID].Len() != 0 { // Only write the file if there are records in the cache + cdr := prc.partialRecords[originID].MergeCDRs() + cdr.Partial = false // force completion + var reply string + if err := prc.cdrs.Call("CdrsV1.ProcessCDR", cdr, &reply); err != nil { + utils.Logger.Err(fmt.Sprintf(" Failed sending CDR %+v from partial cache, error: %s", cdr, err.Error())) + } else if reply != utils.OK { + utils.Logger.Err(fmt.Sprintf(" Received unexpected reply for CDR, %+v, reply: %s", cdr, reply)) + } + } + delete(prc.partialRecords, originID) + return nil, nil + }, 0, originID) + if err != nil { + utils.Logger.Err(fmt.Sprintf(" Failed posting from cache CDR with originID: %s, error: %s", originID, err.Error())) + } +} + // Called to cache a partial record. // If exists in cache, CDRs will be updated // Locking should be handled at higher layer -func (prc *PartialRecordsCache) cachePartialCDR(pCDR *PartialCDRRecord) *PartialCDRRecord { +func (prc *PartialRecordsCache) cachePartialCDR(pCDR *PartialCDRRecord) (*PartialCDRRecord, error) { originID := pCDR.cdrs[0].OriginID - if tmr, hasIt := prc.dumpTimers[originID]; hasIt { + if tmr, hasIt := prc.dumpTimers[originID]; hasIt { // Update existing timer tmr.Reset(prc.ttl) } else { - prc.dumpTimers[originID] = time.AfterFunc(prc.ttl, func() { prc.dumpPartialRecords(originID) }) // Schedule dumping of the partial CDR + switch prc.expiryAction { + case utils.MetaDumpToFile: + prc.dumpTimers[originID] = time.AfterFunc(prc.ttl, func() { prc.dumpPartialRecords(originID) }) // Schedule dumping of the partial CDR + case utils.MetaPostCDR: + prc.dumpTimers[originID] = time.AfterFunc(prc.ttl, func() { prc.postCDR(originID) }) // Schedule dumping of the partial CDR + default: + return nil, fmt.Errorf("Unsupported PartialCacheExpiryAction: %s", prc.expiryAction) + } + } if _, hasIt := prc.partialRecords[originID]; !hasIt { prc.partialRecords[originID] = pCDR } else { // Exists, update it's records prc.partialRecords[originID].cdrs = append(prc.partialRecords[originID].cdrs, pCDR.cdrs...) } - return prc.partialRecords[originID] + return prc.partialRecords[originID], nil } // Called to uncache partialCDR and remove automatic dumping of the cached records @@ -122,7 +154,10 @@ func (prc *PartialRecordsCache) MergePartialCDRRecord(pCDR *PartialCDRRecord) (* if _, hasIt := prc.partialRecords[originID]; !hasIt && pCDR.Len() == 1 && !pCDR.cdrs[0].Partial { return pCDR.cdrs[0], nil // Special case when not a partial CDR and not having cached CDRs on same OriginID } - cachedPartialCDR := prc.cachePartialCDR(pCDR) + cachedPartialCDR, err := prc.cachePartialCDR(pCDR) + if err != nil { + return nil, err + } var final bool for _, cdr := range pCDR.cdrs { if !cdr.Partial { diff --git a/config/cdrcconfig.go b/config/cdrcconfig.go index e6cfbff30..c632bdf13 100644 --- a/config/cdrcconfig.go +++ b/config/cdrcconfig.go @@ -25,28 +25,29 @@ import ( ) type CdrcConfig struct { - ID string // free-form text identifying this CDRC instance - Enabled bool // Enable/Disable the profile - DryRun bool // Do not post CDRs to the server - CdrsConns []*HaPoolConfig // The address where CDRs can be reached - CdrFormat string // The type of CDR file to process - FieldSeparator rune // The separator to use when reading csvs - DataUsageMultiplyFactor float64 // Conversion factor for data usage - Timezone string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> - RunDelay time.Duration // Delay between runs, 0 for inotify driven requests - MaxOpenFiles int // Maximum number of files opened simultaneously - CdrInDir string // Folder to process CDRs from - CdrOutDir string // Folder to move processed CDRs to - FailedCallsPrefix string // Used in case of flatstore CDRs to avoid searching for BYE records - CDRPath utils.HierarchyPath // used for XML CDRs to specify the path towards CDR elements - CdrSourceId string // Source identifier for the processed CDRs - CdrFilter utils.RSRFields // Filter CDR records to import - ContinueOnSuccess bool // Continue after execution - PartialRecordCache time.Duration // Duration to cache partial records when not pairing - HeaderFields []*CfgCdrField - ContentFields []*CfgCdrField - TrailerFields []*CfgCdrField - CacheDumpFields []*CfgCdrField + ID string // free-form text identifying this CDRC instance + Enabled bool // Enable/Disable the profile + DryRun bool // Do not post CDRs to the server + CdrsConns []*HaPoolConfig // The address where CDRs can be reached + CdrFormat string // The type of CDR file to process + FieldSeparator rune // The separator to use when reading csvs + DataUsageMultiplyFactor float64 // Conversion factor for data usage + Timezone string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> + RunDelay time.Duration // Delay between runs, 0 for inotify driven requests + MaxOpenFiles int // Maximum number of files opened simultaneously + CdrInDir string // Folder to process CDRs from + CdrOutDir string // Folder to move processed CDRs to + FailedCallsPrefix string // Used in case of flatstore CDRs to avoid searching for BYE records + CDRPath utils.HierarchyPath // used for XML CDRs to specify the path towards CDR elements + CdrSourceId string // Source identifier for the processed CDRs + CdrFilter utils.RSRFields // Filter CDR records to import + ContinueOnSuccess bool // Continue after execution + PartialRecordCache time.Duration // Duration to cache partial records when not pairing + PartialCacheExpiryAction string + HeaderFields []*CfgCdrField + ContentFields []*CfgCdrField + TrailerFields []*CfgCdrField + CacheDumpFields []*CfgCdrField } func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error { @@ -117,6 +118,9 @@ func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error { return err } } + if jsnCfg.Partial_cache_expiry_action != nil { + self.PartialCacheExpiryAction = *jsnCfg.Partial_cache_expiry_action + } if jsnCfg.Header_fields != nil { if self.HeaderFields, err = CfgCdrFieldsFromCdrFieldsJsonCfg(*jsnCfg.Header_fields); err != nil { return err @@ -159,6 +163,8 @@ func (self *CdrcConfig) Clone() *CdrcConfig { clnCdrc.CdrInDir = self.CdrInDir clnCdrc.CdrOutDir = self.CdrOutDir clnCdrc.CdrSourceId = self.CdrSourceId + clnCdrc.PartialRecordCache = self.PartialRecordCache + clnCdrc.PartialCacheExpiryAction = self.PartialCacheExpiryAction clnCdrc.HeaderFields = make([]*CfgCdrField, len(self.HeaderFields)) clnCdrc.ContentFields = make([]*CfgCdrField, len(self.ContentFields)) clnCdrc.TrailerFields = make([]*CfgCdrField, len(self.TrailerFields)) diff --git a/config/config_defaults.go b/config/config_defaults.go index 1d9856bf7..0ef218447 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -28,24 +28,24 @@ const CGRATES_CFG_JSON = ` // This is what you get when you load CGRateS with an empty configuration file. "general": { - "http_skip_tls_verify": false, // if enabled Http Client will accept any TLS certificate - "rounding_decimals": 5, // system level precision for floats - "dbdata_encoding": "msgpack", // encoding used to store object data in strings: + "http_skip_tls_verify": false, // if enabled Http Client will accept any TLS certificate + "rounding_decimals": 5, // system level precision for floats + "dbdata_encoding": "msgpack", // encoding used to store object data in strings: "tpexport_dir": "/var/spool/cgrates/tpe", // path towards export folder for offline Tariff Plans - "httpposter_attempts": 3, // number of http attempts before considering request failed (eg: *call_url) + "httpposter_attempts": 3, // number of http attempts before considering request failed (eg: *call_url) "http_failed_dir": "/var/spool/cgrates/http_failed", // directory path where we store failed http requests - "default_request_type": "*rated", // default request type to consider when missing from requests: <""|*prepaid|*postpaid|*pseudoprepaid|*rated> - "default_category": "call", // default category to consider when missing from requests - "default_tenant": "cgrates.org", // default tenant to consider when missing from requests - "default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> - "connect_attempts": 3, // initial server connect attempts - "reconnects": -1, // number of retries in case of connection lost - "connect_timeout": "1s", // consider connection unsuccessful on timeout, 0 to disable the feature - "reply_timeout": "2s", // consider connection down for replies taking longer than this value - "response_cache_ttl": "0s", // the life span of a cached response - "internal_ttl": "2m", // maximum duration to wait for internal connections before giving up - "locking_timeout": "5s", // timeout internal locks to avoid deadlocks - "cache_dump_dir": "", // cache dump for faster start (leave empty to disable) + "default_request_type": "*rated", // default request type to consider when missing from requests: <""|*prepaid|*postpaid|*pseudoprepaid|*rated> + "default_category": "call", // default category to consider when missing from requests + "default_tenant": "cgrates.org", // default tenant to consider when missing from requests + "default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> + "connect_attempts": 3, // initial server connect attempts + "reconnects": -1, // number of retries in case of connection lost + "connect_timeout": "1s", // consider connection unsuccessful on timeout, 0 to disable the feature + "reply_timeout": "2s", // consider connection down for replies taking longer than this value + "response_cache_ttl": "0s", // the life span of a cached response + "internal_ttl": "2m", // maximum duration to wait for internal connections before giving up + "locking_timeout": "5s", // timeout internal locks to avoid deadlocks + "cache_dump_dir": "", // cache dump for faster start (leave empty to disable) }, @@ -137,28 +137,28 @@ const CGRATES_CFG_JSON = ` "cdrc": [ { "id": "*default", // identifier of the CDRC runner - "enabled": false, // enable CDR client functionality - "dry_run": false, // do not send the CDRs to CDRS, just parse them + "enabled": false, // enable CDR client functionality + "dry_run": false, // do not send the CDRs to CDRS, just parse them "cdrs_conns": [ - {"address": "*internal"} // address where to reach CDR server. <*internal|x.y.z.y:1234> + {"address": "*internal"} // address where to reach CDR server. <*internal|x.y.z.y:1234> ], - "cdr_format": "csv", // CDR file format - "field_separator": ",", // separator used in case of csv files - "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> - "run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify - "max_open_files": 1024, // maximum simultaneous files to process, 0 for unlimited - "data_usage_multiply_factor": 1024, // conversion factor for data usage - "cdr_in_dir": "/var/spool/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored + "cdr_format": "csv", // CDR file format + "field_separator": ",", // separator used in case of csv files + "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> + "run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify + "max_open_files": 1024, // maximum simultaneous files to process, 0 for unlimited + "data_usage_multiply_factor": 1024, // conversion factor for data usage + "cdr_in_dir": "/var/spool/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored "cdr_out_dir": "/var/spool/cgrates/cdrc/out", // absolute path towards the directory where processed CDRs will be moved - "failed_calls_prefix": "missed_calls", // used in case of flatstore CDRs to avoid searching for BYE records - "cdr_path": "", // path towards one CDR element in case of XML CDRs - "cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database - "cdr_filter": "", // filter CDR records to import - "continue_on_success": false, // continue to the next template if executed - "partial_record_cache": "10s", // duration to cache partial records when not pairing - "cache_expiry_action": "*post_cdr", // action taken when cache when records in cache are timed-out - "header_fields": [], // template of the import header fields - "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + "failed_calls_prefix": "missed_calls", // used in case of flatstore CDRs to avoid searching for BYE records + "cdr_path": "", // path towards one CDR element in case of XML CDRs + "cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database + "cdr_filter": "", // filter CDR records to import + "continue_on_success": false, // continue to the next template if executed + "partial_record_cache": "10s", // duration to cache partial records when not pairing + "partial_cache_expiry_action": "*dump_to_file", // action taken when cache when records in cache are timed-out <*dump_to_file|*post_cdr> + "header_fields": [], // template of the import header fields + "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "2", "mandatory": true}, {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "3", "mandatory": true}, {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "4", "mandatory": true}, @@ -297,10 +297,10 @@ const CGRATES_CFG_JSON = ` "enabled": false, // starts SessionManager service: "listen_udp": "127.0.0.1:2020", // address where to listen for datagram events coming from OpenSIPS "rals_conns": [ - {"address": "*internal"} // address where to reach the Rater <""|*internal|127.0.0.1:2013> + {"address": "*internal"} // address where to reach the Rater <""|*internal|127.0.0.1:2013> ], "cdrs_conns": [ - {"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234> + {"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234> ], "reconnects": 5, // number of reconnects if connection is lost "create_cdr": false, // create CDR out of events and sends it to CDRS component diff --git a/config/config_json_test.go b/config/config_json_test.go index baea5acfb..6db6c0dc2 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -349,24 +349,25 @@ func TestDfCdrcJsonCfg(t *testing.T) { Cdrs_conns: &[]*HaPoolJsonCfg{&HaPoolJsonCfg{ Address: utils.StringPointer(utils.MetaInternal), }}, - Cdr_format: utils.StringPointer("csv"), - Field_separator: utils.StringPointer(","), - Timezone: utils.StringPointer(""), - Run_delay: utils.IntPointer(0), - Max_open_files: utils.IntPointer(1024), - Data_usage_multiply_factor: utils.Float64Pointer(1024.0), - Cdr_in_dir: utils.StringPointer("/var/spool/cgrates/cdrc/in"), - Cdr_out_dir: utils.StringPointer("/var/spool/cgrates/cdrc/out"), - Failed_calls_prefix: utils.StringPointer("missed_calls"), - Cdr_path: utils.StringPointer(""), - Cdr_source_id: utils.StringPointer("freeswitch_csv"), - Cdr_filter: utils.StringPointer(""), - Continue_on_success: utils.BoolPointer(false), - Partial_record_cache: utils.StringPointer("10s"), - Header_fields: &eFields, - Content_fields: &cdrFields, - Trailer_fields: &eFields, - Cache_dump_fields: &cacheDumpFields, + Cdr_format: utils.StringPointer("csv"), + Field_separator: utils.StringPointer(","), + Timezone: utils.StringPointer(""), + Run_delay: utils.IntPointer(0), + Max_open_files: utils.IntPointer(1024), + Data_usage_multiply_factor: utils.Float64Pointer(1024.0), + Cdr_in_dir: utils.StringPointer("/var/spool/cgrates/cdrc/in"), + Cdr_out_dir: utils.StringPointer("/var/spool/cgrates/cdrc/out"), + Failed_calls_prefix: utils.StringPointer("missed_calls"), + Cdr_path: utils.StringPointer(""), + Cdr_source_id: utils.StringPointer("freeswitch_csv"), + Cdr_filter: utils.StringPointer(""), + Continue_on_success: utils.BoolPointer(false), + Partial_record_cache: utils.StringPointer("10s"), + Partial_cache_expiry_action: utils.StringPointer(utils.MetaDumpToFile), + Header_fields: &eFields, + Content_fields: &cdrFields, + Trailer_fields: &eFields, + Cache_dump_fields: &cacheDumpFields, }, } if cfg, err := dfCgrJsonCfg.CdrcJsonCfg(); err != nil { diff --git a/config/configcdrc_test.go b/config/configcdrc_test.go index 6f7a8a5e0..0fb0097df 100644 --- a/config/configcdrc_test.go +++ b/config/configcdrc_test.go @@ -36,22 +36,23 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) { // Default instance first eCgrCfg.CdrcProfiles["/var/spool/cgrates/cdrc/in"] = []*CdrcConfig{ &CdrcConfig{ - ID: utils.META_DEFAULT, - Enabled: false, - CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}}, - CdrFormat: "csv", - FieldSeparator: ',', - DataUsageMultiplyFactor: 1024, - RunDelay: 0, - MaxOpenFiles: 1024, - CdrInDir: "/var/spool/cgrates/cdrc/in", - CdrOutDir: "/var/spool/cgrates/cdrc/out", - FailedCallsPrefix: "missed_calls", - CDRPath: utils.HierarchyPath([]string{""}), - CdrSourceId: "freeswitch_csv", - CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), - PartialRecordCache: time.Duration(10) * time.Second, - HeaderFields: make([]*CfgCdrField, 0), + ID: utils.META_DEFAULT, + Enabled: false, + CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}}, + CdrFormat: "csv", + FieldSeparator: ',', + DataUsageMultiplyFactor: 1024, + RunDelay: 0, + MaxOpenFiles: 1024, + CdrInDir: "/var/spool/cgrates/cdrc/in", + CdrOutDir: "/var/spool/cgrates/cdrc/out", + FailedCallsPrefix: "missed_calls", + CDRPath: utils.HierarchyPath([]string{""}), + CdrSourceId: "freeswitch_csv", + CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), + PartialRecordCache: time.Duration(10) * time.Second, + PartialCacheExpiryAction: utils.MetaDumpToFile, + HeaderFields: make([]*CfgCdrField, 0), ContentFields: []*CfgCdrField{ &CfgCdrField{Tag: "TOR", Type: utils.META_COMPOSED, FieldId: utils.TOR, Value: utils.ParseRSRFieldsMustCompile("2", utils.INFIELD_SEP), FieldFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), Width: 0, Strip: "", Padding: "", Layout: "", Mandatory: true}, @@ -100,20 +101,22 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) { } eCgrCfg.CdrcProfiles["/tmp/cgrates/cdrc1/in"] = []*CdrcConfig{ &CdrcConfig{ - ID: "CDRC-CSV1", - Enabled: true, - CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}}, - CdrFormat: "csv", - FieldSeparator: ',', - DataUsageMultiplyFactor: 1024, - RunDelay: 0, - MaxOpenFiles: 1024, - CdrInDir: "/tmp/cgrates/cdrc1/in", - CdrOutDir: "/tmp/cgrates/cdrc1/out", - CDRPath: nil, - CdrSourceId: "csv1", - CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), - HeaderFields: make([]*CfgCdrField, 0), + ID: "CDRC-CSV1", + Enabled: true, + CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}}, + CdrFormat: "csv", + FieldSeparator: ',', + DataUsageMultiplyFactor: 1024, + RunDelay: 0, + MaxOpenFiles: 1024, + CdrInDir: "/tmp/cgrates/cdrc1/in", + CdrOutDir: "/tmp/cgrates/cdrc1/out", + CDRPath: nil, + CdrSourceId: "csv1", + CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), + PartialRecordCache: time.Duration(10) * time.Second, + PartialCacheExpiryAction: utils.MetaDumpToFile, + HeaderFields: make([]*CfgCdrField, 0), ContentFields: []*CfgCdrField{ &CfgCdrField{Tag: "TOR", Type: utils.META_COMPOSED, FieldId: utils.TOR, Value: utils.ParseRSRFieldsMustCompile("2", utils.INFIELD_SEP), FieldFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), Width: 0, Strip: "", Padding: "", Layout: "", Mandatory: true}, @@ -162,20 +165,22 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) { } eCgrCfg.CdrcProfiles["/tmp/cgrates/cdrc2/in"] = []*CdrcConfig{ &CdrcConfig{ - ID: "CDRC-CSV2", - Enabled: true, - CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}}, - CdrFormat: "csv", - FieldSeparator: ',', - DataUsageMultiplyFactor: 0.000976563, - RunDelay: 1000000000, - MaxOpenFiles: 1024, - CdrInDir: "/tmp/cgrates/cdrc2/in", - CdrOutDir: "/tmp/cgrates/cdrc2/out", - CDRPath: nil, - CdrSourceId: "csv2", - CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), - HeaderFields: make([]*CfgCdrField, 0), + ID: "CDRC-CSV2", + Enabled: true, + CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}}, + CdrFormat: "csv", + FieldSeparator: ',', + DataUsageMultiplyFactor: 0.000976563, + RunDelay: 1000000000, + MaxOpenFiles: 1024, + CdrInDir: "/tmp/cgrates/cdrc2/in", + CdrOutDir: "/tmp/cgrates/cdrc2/out", + CDRPath: nil, + CdrSourceId: "csv2", + CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), + PartialRecordCache: time.Duration(10) * time.Second, + PartialCacheExpiryAction: utils.MetaDumpToFile, + HeaderFields: make([]*CfgCdrField, 0), ContentFields: []*CfgCdrField{ &CfgCdrField{FieldId: utils.TOR, Value: utils.ParseRSRFieldsMustCompile("~7:s/^(voice|data|sms|mms|generic)$/*$1/", utils.INFIELD_SEP), FieldFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), Width: 0, Strip: "", Padding: "", Layout: "", Mandatory: false}, @@ -206,20 +211,22 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) { } eCgrCfg.CdrcProfiles["/tmp/cgrates/cdrc3/in"] = []*CdrcConfig{ &CdrcConfig{ - ID: "CDRC-CSV3", - Enabled: true, - CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}}, - CdrFormat: "csv", - FieldSeparator: ',', - DataUsageMultiplyFactor: 1024, - RunDelay: 0, - MaxOpenFiles: 1024, - CdrInDir: "/tmp/cgrates/cdrc3/in", - CdrOutDir: "/tmp/cgrates/cdrc3/out", - CDRPath: nil, - CdrSourceId: "csv3", - CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), - HeaderFields: make([]*CfgCdrField, 0), + ID: "CDRC-CSV3", + Enabled: true, + CdrsConns: []*HaPoolConfig{&HaPoolConfig{Address: utils.MetaInternal}}, + CdrFormat: "csv", + FieldSeparator: ',', + DataUsageMultiplyFactor: 1024, + RunDelay: 0, + MaxOpenFiles: 1024, + CdrInDir: "/tmp/cgrates/cdrc3/in", + CdrOutDir: "/tmp/cgrates/cdrc3/out", + CDRPath: nil, + CdrSourceId: "csv3", + CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), + PartialRecordCache: time.Duration(10) * time.Second, + PartialCacheExpiryAction: utils.MetaDumpToFile, + HeaderFields: make([]*CfgCdrField, 0), ContentFields: []*CfgCdrField{ &CfgCdrField{Tag: "TOR", Type: utils.META_COMPOSED, FieldId: utils.TOR, Value: utils.ParseRSRFieldsMustCompile("2", utils.INFIELD_SEP), FieldFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), Width: 0, Strip: "", Padding: "", Layout: "", Mandatory: true}, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index e8c0a6079..875113b2c 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -148,28 +148,29 @@ type CdreJsonCfg struct { // Cdrc config section type CdrcJsonCfg struct { - Id *string - Enabled *bool - Dry_run *bool - Cdrs_conns *[]*HaPoolJsonCfg - Cdr_format *string - Field_separator *string - Timezone *string - Run_delay *int - Data_usage_multiply_factor *float64 - Cdr_in_dir *string - Cdr_out_dir *string - Failed_calls_prefix *string - Cdr_path *string - Cdr_source_id *string - Cdr_filter *string - Continue_on_success *bool - Max_open_files *int - Partial_record_cache *string - Header_fields *[]*CdrFieldJsonCfg - Content_fields *[]*CdrFieldJsonCfg - Trailer_fields *[]*CdrFieldJsonCfg - Cache_dump_fields *[]*CdrFieldJsonCfg + Id *string + Enabled *bool + Dry_run *bool + Cdrs_conns *[]*HaPoolJsonCfg + Cdr_format *string + Field_separator *string + Timezone *string + Run_delay *int + Data_usage_multiply_factor *float64 + Cdr_in_dir *string + Cdr_out_dir *string + Failed_calls_prefix *string + Cdr_path *string + Cdr_source_id *string + Cdr_filter *string + Continue_on_success *bool + Max_open_files *int + Partial_record_cache *string + Partial_cache_expiry_action *string + Header_fields *[]*CdrFieldJsonCfg + Content_fields *[]*CdrFieldJsonCfg + Trailer_fields *[]*CdrFieldJsonCfg + Cache_dump_fields *[]*CdrFieldJsonCfg } // SM-Generic config section diff --git a/utils/consts.go b/utils/consts.go index a95efad0e..94c9ae892 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -300,4 +300,6 @@ const ( MetaDateTime = "*datetime" MetaMaskedDestination = "*masked_destination" MetaUnixTimestamp = "*unix_timestamp" + MetaPostCDR = "*post_cdr" + MetaDumpToFile = "*dump_to_file" )