mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-16 05:39:54 +05:00
CDRC configuration changes: cdr_in_dir -> cdr_in_path, cdr_out_dir -> cdr_out_path, cdr_path -> cdr_root_path, cdr_formats slightly modified
This commit is contained in:
34
cdrc/cdrc.go
34
cdrc/cdrc.go
@@ -37,8 +37,6 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
CSV = "csv"
|
||||
FS_CSV = "freeswitch_csv"
|
||||
UNPAIRED_SUFFIX = ".unpaired"
|
||||
)
|
||||
|
||||
@@ -51,7 +49,7 @@ type RecordsProcessor interface {
|
||||
/*
|
||||
One instance of CDRC will act on one folder.
|
||||
Common parameters within configs processed:
|
||||
* cdrS, cdrFormat, cdrInDir, cdrOutDir, runDelay
|
||||
* cdrS, cdrFormat, CDRInPath, CDROutPath, runDelay
|
||||
Parameters specific per config instance:
|
||||
* duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields
|
||||
*/
|
||||
@@ -74,16 +72,16 @@ func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.R
|
||||
}
|
||||
var err error
|
||||
if cdrc.unpairedRecordsCache, err = NewUnpairedRecordsCache(cdrcCfg.PartialRecordCache,
|
||||
cdrcCfg.CdrOutDir, cdrcCfg.FieldSeparator); err != nil {
|
||||
cdrcCfg.CDROutPath, cdrcCfg.FieldSeparator); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cdrc.partialRecordsCache, err = NewPartialRecordsCache(cdrcCfg.PartialRecordCache,
|
||||
cdrcCfg.PartialCacheExpiryAction, cdrcCfg.CdrOutDir, cdrcCfg.FieldSeparator, roundDecimals,
|
||||
cdrcCfg.PartialCacheExpiryAction, cdrcCfg.CDROutPath, cdrcCfg.FieldSeparator, roundDecimals,
|
||||
cdrc.timezone, cdrc.httpSkipTlsCheck, cdrc.cdrs, filterS); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Before processing, make sure in and out folders exist
|
||||
for _, dir := range []string{cdrcCfg.CdrInDir, cdrcCfg.CdrOutDir} {
|
||||
for _, dir := range []string{cdrcCfg.CDRInPath, cdrcCfg.CDROutPath} {
|
||||
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("Nonexistent folder: %s", dir)
|
||||
}
|
||||
@@ -116,7 +114,7 @@ func (self *Cdrc) Run() error {
|
||||
for {
|
||||
select {
|
||||
case <-self.closeChan: // Exit, reinject closeChan for other CDRCs
|
||||
utils.Logger.Info(fmt.Sprintf("<Cdrc> Shutting down CDRC on path %s.", self.dfltCdrcCfg.CdrInDir))
|
||||
utils.Logger.Info(fmt.Sprintf("<Cdrc> Shutting down CDRC on path %s.", self.dfltCdrcCfg.CDRInPath))
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
@@ -132,18 +130,18 @@ func (self *Cdrc) trackCDRFiles() (err error) {
|
||||
return
|
||||
}
|
||||
defer watcher.Close()
|
||||
err = watcher.Add(self.dfltCdrcCfg.CdrInDir)
|
||||
err = watcher.Add(self.dfltCdrcCfg.CDRInPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
utils.Logger.Info(fmt.Sprintf("<Cdrc> Monitoring %s for file moves.", self.dfltCdrcCfg.CdrInDir))
|
||||
utils.Logger.Info(fmt.Sprintf("<Cdrc> Monitoring %s for file moves.", self.dfltCdrcCfg.CDRInPath))
|
||||
for {
|
||||
select {
|
||||
case <-self.closeChan: // Exit, reinject closeChan for other CDRCs
|
||||
utils.Logger.Info(fmt.Sprintf("<Cdrc> Shutting down CDRC on path %s.", self.dfltCdrcCfg.CdrInDir))
|
||||
utils.Logger.Info(fmt.Sprintf("<Cdrc> Shutting down CDRC on path %s.", self.dfltCdrcCfg.CDRInPath))
|
||||
return nil
|
||||
case ev := <-watcher.Events:
|
||||
if ev.Op&fsnotify.Create == fsnotify.Create && (self.dfltCdrcCfg.CdrFormat != FS_CSV || path.Ext(ev.Name) != ".csv") {
|
||||
if ev.Op&fsnotify.Create == fsnotify.Create && (self.dfltCdrcCfg.CdrFormat != utils.MetaFScsv || path.Ext(ev.Name) != ".csv") {
|
||||
go func() { //Enable async processing here
|
||||
if err = self.processFile(ev.Name); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", ev.Name, err.Error()))
|
||||
@@ -158,12 +156,12 @@ func (self *Cdrc) trackCDRFiles() (err error) {
|
||||
|
||||
// One run over the CDR folder
|
||||
func (self *Cdrc) processCdrDir() error {
|
||||
utils.Logger.Info(fmt.Sprintf("<Cdrc> Parsing folder %s for CDR files.", self.dfltCdrcCfg.CdrInDir))
|
||||
filesInDir, _ := ioutil.ReadDir(self.dfltCdrcCfg.CdrInDir)
|
||||
utils.Logger.Info(fmt.Sprintf("<Cdrc> Parsing folder %s for CDR files.", self.dfltCdrcCfg.CDRInPath))
|
||||
filesInDir, _ := ioutil.ReadDir(self.dfltCdrcCfg.CDRInPath)
|
||||
for _, file := range filesInDir {
|
||||
if self.dfltCdrcCfg.CdrFormat != FS_CSV || path.Ext(file.Name()) != ".csv" {
|
||||
if self.dfltCdrcCfg.CdrFormat != utils.MetaFScsv || path.Ext(file.Name()) != ".csv" {
|
||||
go func() { //Enable async processing here
|
||||
if err := self.processFile(path.Join(self.dfltCdrcCfg.CdrInDir, file.Name())); err != nil {
|
||||
if err := self.processFile(path.Join(self.dfltCdrcCfg.CDRInPath, file.Name())); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", file, err.Error()))
|
||||
}
|
||||
}()
|
||||
@@ -188,7 +186,7 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
}
|
||||
var recordsProcessor RecordsProcessor
|
||||
switch self.dfltCdrcCfg.CdrFormat {
|
||||
case CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE, utils.PartialCSV:
|
||||
case utils.MetaFileCSV, utils.MetaFScsv, utils.MetaKamFlatstore, utils.MetaOsipsFlatstore, utils.MetaPartialCSV:
|
||||
csvReader := csv.NewReader(bufio.NewReader(file))
|
||||
csvReader.Comma = self.dfltCdrcCfg.FieldSeparator
|
||||
csvReader.Comment = '#'
|
||||
@@ -199,7 +197,7 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
recordsProcessor = NewFwvRecordsProcessor(file, self.dfltCdrcCfg, self.cdrcCfgs,
|
||||
self.httpClient, self.httpSkipTlsCheck, self.timezone, self.filterS)
|
||||
case utils.XML:
|
||||
if recordsProcessor, err = NewXMLRecordsProcessor(file, self.dfltCdrcCfg.CDRPath,
|
||||
if recordsProcessor, err = NewXMLRecordsProcessor(file, self.dfltCdrcCfg.CDRRootPath,
|
||||
self.timezone, self.httpSkipTlsCheck, self.cdrcCfgs, self.filterS); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -234,7 +232,7 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
}
|
||||
}
|
||||
// Finished with file, move it to processed folder
|
||||
newPath := path.Join(self.dfltCdrcCfg.CdrOutDir, fn)
|
||||
newPath := path.Join(self.dfltCdrcCfg.CDROutPath, fn)
|
||||
if err := os.Rename(filePath, newPath); err != nil {
|
||||
utils.Logger.Err(err.Error())
|
||||
return err
|
||||
|
||||
@@ -65,7 +65,7 @@ func (self *CsvRecordsProcessor) ProcessNextRecord() ([]*engine.CDR, error) {
|
||||
return nil, err
|
||||
}
|
||||
self.processedRecordsNr += 1
|
||||
if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.dfltCdrcCfg.CdrFormat) {
|
||||
if utils.IsSliceMember([]string{utils.MetaKamFlatstore, utils.MetaOsipsFlatstore}, self.dfltCdrcCfg.CdrFormat) {
|
||||
if record, err = self.processFlatstoreRecord(record); err != nil {
|
||||
return nil, err
|
||||
} else if record == nil {
|
||||
@@ -122,7 +122,7 @@ func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR,
|
||||
storedCdr, err := self.recordToStoredCdr(record, cdrcCfg, tenant)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error())
|
||||
} else if self.dfltCdrcCfg.CdrFormat == utils.PartialCSV {
|
||||
} else if self.dfltCdrcCfg.CdrFormat == utils.MetaPartialCSV {
|
||||
if storedCdr, err = self.partialRecordsCache.MergePartialCDRRecord(NewPartialCDRRecord(storedCdr, self.partialCacheDumpFields)); err != nil {
|
||||
return nil, fmt.Errorf("Failed merging PartialCDR, error: %s", err.Error())
|
||||
} else if storedCdr == nil { // CDR was absorbed by cache since it was partial
|
||||
@@ -153,7 +153,7 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *con
|
||||
continue
|
||||
}
|
||||
}
|
||||
if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.dfltCdrcCfg.CdrFormat) { // Hardcode some values in case of flatstore
|
||||
if utils.IsSliceMember([]string{utils.MetaKamFlatstore, utils.MetaOsipsFlatstore}, self.dfltCdrcCfg.CdrFormat) { // Hardcode some values in case of flatstore
|
||||
switch cdrFldCfg.FieldId {
|
||||
case utils.OriginID:
|
||||
cdrFldCfg.Value = config.NewRSRParsersMustCompile("~3;~1;~2", true, utils.INFIELD_SEP) // in case of flatstore, accounting id is made up out of callid, from_tag and to_tag
|
||||
|
||||
@@ -212,7 +212,7 @@ func TestXMLRPProcess(t *testing.T) {
|
||||
ID: "TestXML",
|
||||
Enabled: true,
|
||||
CdrFormat: "xml",
|
||||
CDRPath: utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}),
|
||||
CDRRootPath: utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}),
|
||||
CdrSourceId: "TestXML",
|
||||
ContentFields: []*config.FCTemplate{
|
||||
{Tag: "TOR", Type: utils.META_COMPOSED, FieldId: utils.ToR,
|
||||
@@ -285,7 +285,7 @@ func TestXMLRPProcessWithNewFilters(t *testing.T) {
|
||||
ID: "XMLWithFilters",
|
||||
Enabled: true,
|
||||
CdrFormat: "xml",
|
||||
CDRPath: utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}),
|
||||
CDRRootPath: utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}),
|
||||
CdrSourceId: "XMLWithFilters",
|
||||
Filters: []string{"*string:~broadWorksCDR.cdrData.headerModule.type:Normal"},
|
||||
ContentFields: []*config.FCTemplate{
|
||||
@@ -552,7 +552,7 @@ func TestXMLRPNestingSeparator(t *testing.T) {
|
||||
ID: "msw_xml",
|
||||
Enabled: true,
|
||||
CdrFormat: "xml",
|
||||
CDRPath: utils.HierarchyPath([]string{"File", "CDRs", "Call"}),
|
||||
CDRRootPath: utils.HierarchyPath([]string{"File", "CDRs", "Call"}),
|
||||
CdrSourceId: "zw_cfs1",
|
||||
Filters: []string{},
|
||||
ContentFields: []*config.FCTemplate{
|
||||
|
||||
@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package config
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -35,10 +34,10 @@ type CdrcCfg struct {
|
||||
Timezone string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
RunDelay time.Duration // Delay between runs, 0 for inotify driven requests
|
||||
MaxOpenFiles int // Maximum number of files opened simultaneously
|
||||
CdrInDir string // Folder to process CDRs from
|
||||
CdrOutDir string // Folder to move processed CDRs to
|
||||
CDRInPath string // Folder to process CDRs from
|
||||
CDROutPath string // Folder to move processed CDRs to
|
||||
FailedCallsPrefix string // Used in case of flatstore CDRs to avoid searching for BYE records
|
||||
CDRPath utils.HierarchyPath // used for XML CDRs to specify the path towards CDR elements
|
||||
CDRRootPath utils.HierarchyPath // used for XML CDRs to specify the path towards CDR elements
|
||||
CdrSourceId string // Source identifier for the processed CDRs
|
||||
Filters []string
|
||||
Tenant RSRParsers
|
||||
@@ -73,7 +72,7 @@ func (self *CdrcCfg) loadFromJsonCfg(jsnCfg *CdrcJsonCfg, separator string) erro
|
||||
}
|
||||
}
|
||||
if jsnCfg.Cdr_format != nil {
|
||||
self.CdrFormat = strings.TrimPrefix(*jsnCfg.Cdr_format, "*")
|
||||
self.CdrFormat = *jsnCfg.Cdr_format
|
||||
}
|
||||
if jsnCfg.Field_separator != nil && len(*jsnCfg.Field_separator) > 0 {
|
||||
sepStr := *jsnCfg.Field_separator
|
||||
@@ -88,17 +87,17 @@ func (self *CdrcCfg) loadFromJsonCfg(jsnCfg *CdrcJsonCfg, separator string) erro
|
||||
if jsnCfg.Max_open_files != nil {
|
||||
self.MaxOpenFiles = *jsnCfg.Max_open_files
|
||||
}
|
||||
if jsnCfg.Cdr_in_dir != nil {
|
||||
self.CdrInDir = *jsnCfg.Cdr_in_dir
|
||||
if jsnCfg.Cdr_in_path != nil {
|
||||
self.CDRInPath = *jsnCfg.Cdr_in_path
|
||||
}
|
||||
if jsnCfg.Cdr_out_dir != nil {
|
||||
self.CdrOutDir = *jsnCfg.Cdr_out_dir
|
||||
if jsnCfg.Cdr_out_path != nil {
|
||||
self.CDROutPath = *jsnCfg.Cdr_out_path
|
||||
}
|
||||
if jsnCfg.Failed_calls_prefix != nil {
|
||||
self.FailedCallsPrefix = *jsnCfg.Failed_calls_prefix
|
||||
}
|
||||
if jsnCfg.Cdr_path != nil {
|
||||
self.CDRPath = utils.ParseHierarchyPath(*jsnCfg.Cdr_path, "")
|
||||
if jsnCfg.Cdr_root_path != nil {
|
||||
self.CDRRootPath = utils.ParseHierarchyPath(*jsnCfg.Cdr_root_path, "")
|
||||
}
|
||||
if jsnCfg.Cdr_source_id != nil {
|
||||
self.CdrSourceId = *jsnCfg.Cdr_source_id
|
||||
@@ -164,11 +163,11 @@ func (self *CdrcCfg) Clone() *CdrcCfg {
|
||||
clnCdrc.Timezone = self.Timezone
|
||||
clnCdrc.RunDelay = self.RunDelay
|
||||
clnCdrc.MaxOpenFiles = self.MaxOpenFiles
|
||||
clnCdrc.CdrInDir = self.CdrInDir
|
||||
clnCdrc.CdrOutDir = self.CdrOutDir
|
||||
clnCdrc.CDRPath = make(utils.HierarchyPath, len(self.CDRPath))
|
||||
for i, path := range self.CDRPath {
|
||||
clnCdrc.CDRPath[i] = path
|
||||
clnCdrc.CDRInPath = self.CDRInPath
|
||||
clnCdrc.CDROutPath = self.CDROutPath
|
||||
clnCdrc.CDRRootPath = make(utils.HierarchyPath, len(self.CDRRootPath))
|
||||
for i, path := range self.CDRRootPath {
|
||||
clnCdrc.CDRRootPath[i] = path
|
||||
}
|
||||
clnCdrc.FailedCallsPrefix = self.FailedCallsPrefix
|
||||
clnCdrc.Filters = make([]string, len(self.Filters))
|
||||
|
||||
@@ -32,11 +32,11 @@ var cdrcCfg = CdrcCfg{
|
||||
CdrFormat: "csv",
|
||||
FieldSeparator: ',',
|
||||
MaxOpenFiles: 1024,
|
||||
CdrInDir: "/var/spool/cgrates/cdrc/in",
|
||||
CdrOutDir: "/var/spool/cgrates/cdrc/out",
|
||||
CDRInPath: "/var/spool/cgrates/cdrc/in",
|
||||
CDROutPath: "/var/spool/cgrates/cdrc/out",
|
||||
FailedCallsPrefix: "missed_calls",
|
||||
CDRPath: []string{""},
|
||||
CdrSourceId: "freeswitch_csv",
|
||||
CDRRootPath: []string{""},
|
||||
CdrSourceId: "cdrc_csv",
|
||||
Filters: []string{},
|
||||
Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP),
|
||||
PartialRecordCache: time.Duration(10 * time.Second),
|
||||
@@ -87,11 +87,11 @@ func TestCdrcCfgloadFromJsonCfg(t *testing.T) {
|
||||
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
"run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify
|
||||
"max_open_files": 1024, // maximum simultaneous files to process, 0 for unlimited
|
||||
"cdr_in_dir": "/var/spool/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_dir": "/var/spool/cgrates/cdrc/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"cdr_in_path": "/var/spool/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_path": "/var/spool/cgrates/cdrc/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"failed_calls_prefix": "missed_calls", // used in case of flatstore CDRs to avoid searching for BYE records
|
||||
"cdr_path": "", // path towards one CDR element in case of XML CDRs
|
||||
"cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
"cdr_root_path": "", // path towards one CDR element in case of XML CDRs
|
||||
"cdr_source_id": "cdrc_csv", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
"filters" :[], // new filters used in FilterS subsystem
|
||||
"tenant": "cgrates.org", // default tenant
|
||||
"continue_on_success": false, // continue to the next template if executed
|
||||
|
||||
@@ -19,15 +19,15 @@
|
||||
{
|
||||
"id": "CDRC-CSV1",
|
||||
"enabled": true, // enable CDR client functionality
|
||||
"cdr_in_dir": "/tmp/cgrates/cdrc1/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_dir": "/tmp/cgrates/cdrc1/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"cdr_in_path": "/tmp/cgrates/cdrc1/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_path": "/tmp/cgrates/cdrc1/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"cdr_source_id": "csv1", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
},
|
||||
{
|
||||
"id": "CDRC-CSV2",
|
||||
"enabled": true, // enable CDR client functionality
|
||||
"cdr_in_dir": "/tmp/cgrates/cdrc2/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_dir": "/tmp/cgrates/cdrc2/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"cdr_in_path": "/tmp/cgrates/cdrc2/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_path": "/tmp/cgrates/cdrc2/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"data_usage_multiply_factor": 0.000976563,
|
||||
"run_delay": 1,
|
||||
"cdr_source_id": "csv2", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
|
||||
@@ -7,8 +7,8 @@
|
||||
{
|
||||
"id": "CDRC-CSV3",
|
||||
"enabled": true, // enable CDR client functionality
|
||||
"cdr_in_dir": "/tmp/cgrates/cdrc3/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_dir": "/tmp/cgrates/cdrc3/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"cdr_in_path": "/tmp/cgrates/cdrc3/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_path": "/tmp/cgrates/cdrc3/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"cdr_source_id": "csv3", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
},
|
||||
],
|
||||
|
||||
@@ -1035,16 +1035,16 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) (err error) {
|
||||
if err := cdrcInstCfg.loadFromJsonCfg(jsnCrc1Cfg, self.generalCfg.RsrSepatarot); err != nil {
|
||||
return err
|
||||
}
|
||||
if cdrcInstCfg.CdrInDir == "" {
|
||||
return utils.ErrCDRCNoInDir
|
||||
if cdrcInstCfg.CDRInPath == "" {
|
||||
return utils.ErrCDRCNoInPath
|
||||
}
|
||||
if _, hasDir := self.CdrcProfiles[cdrcInstCfg.CdrInDir]; !hasDir {
|
||||
self.CdrcProfiles[cdrcInstCfg.CdrInDir] = make([]*CdrcCfg, 0)
|
||||
if _, hasDir := self.CdrcProfiles[cdrcInstCfg.CDRInPath]; !hasDir {
|
||||
self.CdrcProfiles[cdrcInstCfg.CDRInPath] = make([]*CdrcCfg, 0)
|
||||
}
|
||||
if indxFound != -1 { // Replace previous config so we have inheritance
|
||||
self.CdrcProfiles[pathFound][indxFound] = cdrcInstCfg
|
||||
} else {
|
||||
self.CdrcProfiles[cdrcInstCfg.CdrInDir] = append(self.CdrcProfiles[cdrcInstCfg.CdrInDir], cdrcInstCfg)
|
||||
self.CdrcProfiles[cdrcInstCfg.CDRInPath] = append(self.CdrcProfiles[cdrcInstCfg.CDRInPath], cdrcInstCfg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -242,16 +242,16 @@ const CGRATES_CFG_JSON = `
|
||||
"cdrs_conns": [ // connections to CDRs. <*internal|x.y.z.y:1234>
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"cdr_format": "*csv", // CDR file format <*csv|*freeswitch_csv|*fwv|*opensips_flatstore|*partial_csv>
|
||||
"cdr_format": "*file_csv", // CDR file format <*file_csv|*freeswitch_csv|*fwv|*opensips_flatstore|*partial_csv>
|
||||
"field_separator": ",", // separator used in case of csv files
|
||||
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
"run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify
|
||||
"max_open_files": 1024, // maximum simultaneous files to process, 0 for unlimited
|
||||
"cdr_in_dir": "/var/spool/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_dir": "/var/spool/cgrates/cdrc/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"cdr_in_path": "/var/spool/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_path": "/var/spool/cgrates/cdrc/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"failed_calls_prefix": "missed_calls", // used in case of flatstore CDRs to avoid searching for BYE records
|
||||
"cdr_path": "", // path towards one CDR element in case of XML CDRs
|
||||
"cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
"cdr_root_path": "", // path towards one CDR element in case of XML CDRs
|
||||
"cdr_source_id": "cdrc_csv", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
"filters" :[], // limit parsing based on the filters
|
||||
"tenant": "", // tenant used by import
|
||||
"continue_on_success": false, // continue to the next template if executed
|
||||
|
||||
@@ -445,16 +445,16 @@ func TestDfCdrcJsonCfg(t *testing.T) {
|
||||
Cdrs_conns: &[]*RemoteHostJson{{
|
||||
Address: utils.StringPointer(utils.MetaInternal),
|
||||
}},
|
||||
Cdr_format: utils.StringPointer("*csv"),
|
||||
Cdr_format: utils.StringPointer("*file_csv"),
|
||||
Field_separator: utils.StringPointer(","),
|
||||
Timezone: utils.StringPointer(""),
|
||||
Run_delay: utils.IntPointer(0),
|
||||
Max_open_files: utils.IntPointer(1024),
|
||||
Cdr_in_dir: utils.StringPointer("/var/spool/cgrates/cdrc/in"),
|
||||
Cdr_out_dir: utils.StringPointer("/var/spool/cgrates/cdrc/out"),
|
||||
Cdr_in_path: utils.StringPointer("/var/spool/cgrates/cdrc/in"),
|
||||
Cdr_out_path: utils.StringPointer("/var/spool/cgrates/cdrc/out"),
|
||||
Failed_calls_prefix: utils.StringPointer("missed_calls"),
|
||||
Cdr_path: utils.StringPointer(""),
|
||||
Cdr_source_id: utils.StringPointer("freeswitch_csv"),
|
||||
Cdr_root_path: utils.StringPointer(""),
|
||||
Cdr_source_id: utils.StringPointer("cdrc_csv"),
|
||||
Filters: &[]string{},
|
||||
Tenant: utils.StringPointer(""),
|
||||
Continue_on_success: utils.BoolPointer(false),
|
||||
@@ -1400,16 +1400,16 @@ func TestNewCgrJsonCfgFromFile(t *testing.T) {
|
||||
{
|
||||
Id: utils.StringPointer("CDRC-CSV1"),
|
||||
Enabled: utils.BoolPointer(true),
|
||||
Cdr_in_dir: utils.StringPointer("/tmp/cgrates/cdrc1/in"),
|
||||
Cdr_out_dir: utils.StringPointer("/tmp/cgrates/cdrc1/out"),
|
||||
Cdr_in_path: utils.StringPointer("/tmp/cgrates/cdrc1/in"),
|
||||
Cdr_out_path: utils.StringPointer("/tmp/cgrates/cdrc1/out"),
|
||||
Cdr_source_id: utils.StringPointer("csv1"),
|
||||
},
|
||||
{
|
||||
Id: utils.StringPointer("CDRC-CSV2"),
|
||||
Enabled: utils.BoolPointer(true),
|
||||
Run_delay: utils.IntPointer(1),
|
||||
Cdr_in_dir: utils.StringPointer("/tmp/cgrates/cdrc2/in"),
|
||||
Cdr_out_dir: utils.StringPointer("/tmp/cgrates/cdrc2/out"),
|
||||
Cdr_in_path: utils.StringPointer("/tmp/cgrates/cdrc2/in"),
|
||||
Cdr_out_path: utils.StringPointer("/tmp/cgrates/cdrc2/out"),
|
||||
Cdr_source_id: utils.StringPointer("csv2"),
|
||||
Content_fields: &cdrFields,
|
||||
},
|
||||
|
||||
@@ -206,16 +206,16 @@ func TestCgrCfgCDRC(t *testing.T) {
|
||||
Enabled: true,
|
||||
DryRun: false,
|
||||
CdrsConns: []*RemoteHost{{Address: utils.MetaInternal}},
|
||||
CdrFormat: "csv",
|
||||
CdrFormat: utils.MetaFileCSV,
|
||||
FieldSeparator: rune(','),
|
||||
Timezone: "",
|
||||
RunDelay: 0,
|
||||
MaxOpenFiles: 1024,
|
||||
CdrInDir: "/var/spool/cgrates/cdrc/in",
|
||||
CdrOutDir: "/var/spool/cgrates/cdrc/out",
|
||||
CDRInPath: "/var/spool/cgrates/cdrc/in",
|
||||
CDROutPath: "/var/spool/cgrates/cdrc/out",
|
||||
FailedCallsPrefix: "missed_calls",
|
||||
CDRPath: utils.HierarchyPath([]string{""}),
|
||||
CdrSourceId: "freeswitch_csv",
|
||||
CDRRootPath: utils.HierarchyPath([]string{""}),
|
||||
CdrSourceId: "cdrc_csv",
|
||||
Filters: []string{},
|
||||
ContinueOnSuccess: false,
|
||||
PartialRecordCache: time.Duration(10 * time.Second),
|
||||
@@ -1572,16 +1572,16 @@ func TestCDRCWithDefault(t *testing.T) {
|
||||
Enabled: false,
|
||||
DryRun: false,
|
||||
CdrsConns: []*RemoteHost{{Address: utils.MetaInternal}},
|
||||
CdrFormat: "csv",
|
||||
CdrFormat: utils.MetaFileCSV,
|
||||
FieldSeparator: rune(','),
|
||||
Timezone: "",
|
||||
RunDelay: 0,
|
||||
MaxOpenFiles: 1024,
|
||||
CdrInDir: "/var/spool/cgrates/cdrc/in",
|
||||
CdrOutDir: "/var/spool/cgrates/cdrc/out",
|
||||
CDRInPath: "/var/spool/cgrates/cdrc/in",
|
||||
CDROutPath: "/var/spool/cgrates/cdrc/out",
|
||||
FailedCallsPrefix: "missed_calls",
|
||||
CDRPath: utils.HierarchyPath([]string{""}),
|
||||
CdrSourceId: "freeswitch_csv",
|
||||
CDRRootPath: utils.HierarchyPath([]string{""}),
|
||||
CdrSourceId: "cdrc_csv",
|
||||
Filters: []string{},
|
||||
ContinueOnSuccess: false,
|
||||
PartialRecordCache: time.Duration(10 * time.Second),
|
||||
|
||||
@@ -38,15 +38,15 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
|
||||
ID: utils.META_DEFAULT,
|
||||
Enabled: false,
|
||||
CdrsConns: []*RemoteHost{{Address: utils.MetaInternal}},
|
||||
CdrFormat: "csv",
|
||||
CdrFormat: "*file_csv",
|
||||
FieldSeparator: ',',
|
||||
RunDelay: 0,
|
||||
MaxOpenFiles: 1024,
|
||||
CdrInDir: "/var/spool/cgrates/cdrc/in",
|
||||
CdrOutDir: "/var/spool/cgrates/cdrc/out",
|
||||
CDRInPath: "/var/spool/cgrates/cdrc/in",
|
||||
CDROutPath: "/var/spool/cgrates/cdrc/out",
|
||||
FailedCallsPrefix: "missed_calls",
|
||||
CDRPath: utils.HierarchyPath([]string{""}),
|
||||
CdrSourceId: "freeswitch_csv",
|
||||
CDRRootPath: utils.HierarchyPath([]string{""}),
|
||||
CdrSourceId: "cdrc_csv",
|
||||
Filters: []string{},
|
||||
PartialRecordCache: time.Duration(10) * time.Second,
|
||||
PartialCacheExpiryAction: utils.MetaDumpToFile,
|
||||
@@ -115,14 +115,14 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
|
||||
ID: "CDRC-CSV1",
|
||||
Enabled: true,
|
||||
CdrsConns: []*RemoteHost{{Address: utils.MetaInternal}},
|
||||
CdrFormat: "csv",
|
||||
CdrFormat: "*file_csv",
|
||||
FieldSeparator: ',',
|
||||
RunDelay: 0,
|
||||
MaxOpenFiles: 1024,
|
||||
CdrInDir: "/tmp/cgrates/cdrc1/in",
|
||||
CdrOutDir: "/tmp/cgrates/cdrc1/out",
|
||||
CDRInPath: "/tmp/cgrates/cdrc1/in",
|
||||
CDROutPath: "/tmp/cgrates/cdrc1/out",
|
||||
FailedCallsPrefix: "missed_calls",
|
||||
CDRPath: utils.HierarchyPath([]string{""}),
|
||||
CDRRootPath: utils.HierarchyPath([]string{""}),
|
||||
CdrSourceId: "csv1",
|
||||
Filters: []string{},
|
||||
PartialRecordCache: time.Duration(10) * time.Second,
|
||||
@@ -192,14 +192,14 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
|
||||
ID: "CDRC-CSV2",
|
||||
Enabled: true,
|
||||
CdrsConns: []*RemoteHost{{Address: utils.MetaInternal}},
|
||||
CdrFormat: "csv",
|
||||
CdrFormat: "*file_csv",
|
||||
FieldSeparator: ',',
|
||||
RunDelay: 1000000000,
|
||||
MaxOpenFiles: 1024,
|
||||
CdrInDir: "/tmp/cgrates/cdrc2/in",
|
||||
CdrOutDir: "/tmp/cgrates/cdrc2/out",
|
||||
CDRInPath: "/tmp/cgrates/cdrc2/in",
|
||||
CDROutPath: "/tmp/cgrates/cdrc2/out",
|
||||
FailedCallsPrefix: "missed_calls",
|
||||
CDRPath: utils.HierarchyPath([]string{""}),
|
||||
CDRRootPath: utils.HierarchyPath([]string{""}),
|
||||
CdrSourceId: "csv2",
|
||||
Filters: []string{},
|
||||
PartialRecordCache: time.Duration(10) * time.Second,
|
||||
@@ -253,14 +253,14 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
|
||||
ID: "CDRC-CSV3",
|
||||
Enabled: true,
|
||||
CdrsConns: []*RemoteHost{{Address: utils.MetaInternal}},
|
||||
CdrFormat: "csv",
|
||||
CdrFormat: utils.MetaFileCSV,
|
||||
FieldSeparator: ',',
|
||||
RunDelay: 0,
|
||||
MaxOpenFiles: 1024,
|
||||
CdrInDir: "/tmp/cgrates/cdrc3/in",
|
||||
CdrOutDir: "/tmp/cgrates/cdrc3/out",
|
||||
CDRInPath: "/tmp/cgrates/cdrc3/in",
|
||||
CDROutPath: "/tmp/cgrates/cdrc3/out",
|
||||
FailedCallsPrefix: "missed_calls",
|
||||
CDRPath: utils.HierarchyPath([]string{""}),
|
||||
CDRRootPath: utils.HierarchyPath([]string{""}),
|
||||
CdrSourceId: "csv3",
|
||||
Filters: []string{},
|
||||
PartialRecordCache: time.Duration(10) * time.Second,
|
||||
|
||||
@@ -151,10 +151,10 @@ type CdrcJsonCfg struct {
|
||||
Field_separator *string
|
||||
Timezone *string
|
||||
Run_delay *int
|
||||
Cdr_in_dir *string
|
||||
Cdr_out_dir *string
|
||||
Cdr_in_path *string
|
||||
Cdr_out_path *string
|
||||
Failed_calls_prefix *string
|
||||
Cdr_path *string
|
||||
Cdr_root_path *string
|
||||
Cdr_source_id *string
|
||||
Filters *[]string
|
||||
Tenant *string
|
||||
|
||||
@@ -180,7 +180,7 @@ const (
|
||||
STATIC_VALUE_PREFIX = "^"
|
||||
CSV = "csv"
|
||||
FWV = "fwv"
|
||||
PartialCSV = "partial_csv"
|
||||
MetaPartialCSV = "*partial_csv"
|
||||
DRYRUN = "dry_run"
|
||||
META_COMBIMED = "*combimed"
|
||||
MetaInternal = "*internal"
|
||||
@@ -282,8 +282,8 @@ const (
|
||||
CGR_AUTHORIZE = "CGR_AUTHORIZE"
|
||||
CONFIG_PATH = "/etc/cgrates/"
|
||||
DISCONNECT_CAUSE = "DisconnectCause"
|
||||
KAM_FLATSTORE = "kamailio_flatstore"
|
||||
OSIPS_FLATSTORE = "opensips_flatstore"
|
||||
MetaKamFlatstore = "*kamailio_flatstore"
|
||||
MetaOsipsFlatstore = "*opensips_flatstore"
|
||||
MetaRating = "*rating"
|
||||
NOT_AVAILABLE = "N/A"
|
||||
MetaEmpty = "*empty"
|
||||
@@ -418,6 +418,7 @@ const (
|
||||
CDRPoster = "cdr"
|
||||
MetaFileCSV = "*file_csv"
|
||||
MetaFileFWV = "*file_fwv"
|
||||
MetaFScsv = "*freeswitch_csv"
|
||||
Accounts = "Accounts"
|
||||
AccountService = "AccountS"
|
||||
Actions = "Actions"
|
||||
|
||||
@@ -72,7 +72,7 @@ var (
|
||||
ErrSessionNotFound = errors.New("SESSION_NOT_FOUND")
|
||||
ErrJsonIncompleteComment = errors.New("JSON_INCOMPLETE_COMMENT")
|
||||
ErrCDRCNoProfileID = errors.New("CDRC_PROFILE_WITHOUT_ID")
|
||||
ErrCDRCNoInDir = errors.New("CDRC_PROFILE_WITHOUT_IN_DIR")
|
||||
ErrCDRCNoInPath = errors.New("CDRC_PROFILE_WITHOUT_IN_PATH")
|
||||
ErrNotEnoughParameters = errors.New("NotEnoughParameters")
|
||||
ErrNotConnected = errors.New("NOT_CONNECTED")
|
||||
RalsErrorPrfx = "RALS_ERROR"
|
||||
|
||||
Reference in New Issue
Block a user