mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Remove rounding decimals from cdrc and add logs for partial cdr test
This commit is contained in:
committed by
Dan Christian Bogos
parent
3b746aad50
commit
cb5731fcf4
10
cdrc/cdrc.go
10
cdrc/cdrc.go
@@ -53,7 +53,7 @@ Parameters specific per config instance:
|
||||
* cdrSourceId, cdrFilter, cdrFields
|
||||
*/
|
||||
func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection,
|
||||
closeChan chan struct{}, dfltTimezone string, roundDecimals int, filterS *engine.FilterS) (cdrc *Cdrc, err error) {
|
||||
closeChan chan struct{}, dfltTimezone string, filterS *engine.FilterS) (cdrc *Cdrc, err error) {
|
||||
cdrcCfg := cdrcCfgs[0]
|
||||
cdrc = &Cdrc{
|
||||
httpSkipTlsCheck: httpSkipTlsCheck,
|
||||
@@ -63,17 +63,16 @@ func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.R
|
||||
cdrs: cdrs,
|
||||
closeChan: closeChan,
|
||||
maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles),
|
||||
roundDecimals: roundDecimals,
|
||||
}
|
||||
// Before processing, make sure in and out folders exist
|
||||
if utils.IsSliceMember(utils.MainCDRFields, cdrcCfg.CdrFormat) {
|
||||
if utils.IsSliceMember(utils.CDRCFileFormats, cdrcCfg.CdrFormat) {
|
||||
for _, dir := range []string{cdrcCfg.CDRInPath, cdrcCfg.CDROutPath} {
|
||||
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("<CDRC> nonexistent folder: %s", dir)
|
||||
}
|
||||
}
|
||||
}
|
||||
if utils.IsSliceMember(utils.MainCDRFields, cdrcCfg.CdrFormat) {
|
||||
if utils.IsSliceMember(utils.CDRCFileFormats, cdrcCfg.CdrFormat) {
|
||||
var processFile struct{}
|
||||
for i := 0; i < cdrcCfg.MaxOpenFiles; i++ {
|
||||
cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop
|
||||
@@ -92,7 +91,6 @@ type Cdrc struct {
|
||||
closeChan chan struct{} // Used to signal config reloads when we need to span different CDRC-Client
|
||||
maxOpenFiles chan struct{} // Maximum number of simultaneous files processed
|
||||
filterS *engine.FilterS
|
||||
roundDecimals int
|
||||
}
|
||||
|
||||
// When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing
|
||||
@@ -182,7 +180,7 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
csvReader.Comment = '#'
|
||||
recordsProcessor = NewCsvRecordsProcessor(csvReader, self.timezone, fn, self.dfltCdrcCfg,
|
||||
self.cdrcCfgs, self.httpSkipTlsCheck,
|
||||
self.dfltCdrcCfg.CacheDumpFields, self.filterS, self.cdrs, self.roundDecimals)
|
||||
self.dfltCdrcCfg.CacheDumpFields, self.filterS, self.cdrs)
|
||||
case utils.MetaFileFWV:
|
||||
recordsProcessor = NewFwvRecordsProcessor(file, self.dfltCdrcCfg, self.cdrcCfgs,
|
||||
self.httpSkipTlsCheck, self.timezone, self.filterS)
|
||||
|
||||
11
cdrc/csv.go
11
cdrc/csv.go
@@ -35,7 +35,7 @@ import (
|
||||
func NewCsvRecordsProcessor(csvReader *csv.Reader, timezone, fileName string,
|
||||
dfltCdrcCfg *config.CdrcCfg, cdrcCfgs []*config.CdrcCfg,
|
||||
httpSkipTlsCheck bool, cacheDumpFields []*config.FCTemplate,
|
||||
filterS *engine.FilterS, cdrs rpcclient.RpcClientConnection, roundDecimals int) *CsvRecordsProcessor {
|
||||
filterS *engine.FilterS, cdrs rpcclient.RpcClientConnection) *CsvRecordsProcessor {
|
||||
return &CsvRecordsProcessor{csvReader: csvReader,
|
||||
timezone: timezone, fileName: fileName,
|
||||
dfltCdrcCfg: dfltCdrcCfg, cdrcCfgs: cdrcCfgs,
|
||||
@@ -44,8 +44,7 @@ func NewCsvRecordsProcessor(csvReader *csv.Reader, timezone, fileName string,
|
||||
dfltCdrcCfg.CDROutPath, dfltCdrcCfg.FieldSeparator),
|
||||
partialRecordsCache: NewPartialRecordsCache(dfltCdrcCfg.PartialRecordCache,
|
||||
dfltCdrcCfg.PartialCacheExpiryAction, dfltCdrcCfg.CDROutPath,
|
||||
dfltCdrcCfg.FieldSeparator, roundDecimals,
|
||||
timezone, httpSkipTlsCheck, cdrs, filterS),
|
||||
dfltCdrcCfg.FieldSeparator, timezone, httpSkipTlsCheck, cdrs, filterS),
|
||||
partialCacheDumpFields: cacheDumpFields, filterS: filterS}
|
||||
|
||||
}
|
||||
@@ -132,12 +131,18 @@ func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR,
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error())
|
||||
} else if self.dfltCdrcCfg.CdrFormat == utils.MetaPartialCSV {
|
||||
fmt.Println("===Teo===")
|
||||
fmt.Println(utils.ToJSON(record))
|
||||
fmt.Println(utils.ToJSON(storedCdr))
|
||||
if storedCdr, err = self.partialRecordsCache.MergePartialCDRRecord(NewPartialCDRRecord(storedCdr, self.partialCacheDumpFields)); err != nil {
|
||||
return nil, fmt.Errorf("Failed merging PartialCDR, error: %s", err.Error())
|
||||
} else if storedCdr == nil { // CDR was absorbed by cache since it was partial
|
||||
fmt.Println("===CDR ABSORBED===")
|
||||
continue
|
||||
}
|
||||
}
|
||||
fmt.Println("=== storedCdr to save : ===")
|
||||
fmt.Println(utils.ToJSON(storedCdr))
|
||||
recordCdrs = append(recordCdrs, storedCdr)
|
||||
if !cdrcCfg.ContinueOnSuccess {
|
||||
break
|
||||
|
||||
@@ -166,7 +166,8 @@ func TestCsvITAnalyseCDRs(t *testing.T) {
|
||||
} else if len(reply) != 5 { // 1 injected, 1 rated, 1 *raw and it's pair in *default run
|
||||
t.Error("Unexpected number of CDRs returned: ", len(reply))
|
||||
}
|
||||
if err := cdrcRpc.Call(utils.ApierV2GetCDRs, utils.RPCCDRsFilter{DestinationPrefixes: []string{"08651"}}, &reply); err == nil || err.Error() != utils.NotFoundCaps {
|
||||
if err := cdrcRpc.Call(utils.ApierV2GetCDRs, utils.RPCCDRsFilter{DestinationPrefixes: []string{"08651"}},
|
||||
&reply); err == nil || err.Error() != utils.NotFoundCaps {
|
||||
t.Error("Unexpected error: ", err) // Original 08651 was converted
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,10 +39,10 @@ const (
|
||||
)
|
||||
|
||||
func NewPartialRecordsCache(ttl time.Duration, expiryAction string, cdrOutDir string, csvSep rune,
|
||||
roundDecimals int, timezone string, httpSkipTlsCheck bool,
|
||||
timezone string, httpSkipTlsCheck bool,
|
||||
cdrs rpcclient.RpcClientConnection, filterS *engine.FilterS) *PartialRecordsCache {
|
||||
return &PartialRecordsCache{ttl: ttl, expiryAction: expiryAction, cdrOutDir: cdrOutDir,
|
||||
csvSep: csvSep, roundDecimals: roundDecimals, timezone: timezone,
|
||||
csvSep: csvSep, timezone: timezone,
|
||||
httpSkipTlsCheck: httpSkipTlsCheck, cdrs: cdrs,
|
||||
partialRecords: make(map[string]*PartialCDRRecord),
|
||||
dumpTimers: make(map[string]*time.Timer),
|
||||
@@ -54,7 +54,6 @@ type PartialRecordsCache struct {
|
||||
expiryAction string
|
||||
cdrOutDir string
|
||||
csvSep rune
|
||||
roundDecimals int
|
||||
timezone string
|
||||
httpSkipTlsCheck bool
|
||||
cdrs rpcclient.RpcClientConnection
|
||||
|
||||
@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package cdrc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/rpc"
|
||||
"net/rpc/jsonrpc"
|
||||
@@ -96,9 +97,11 @@ func TestPartcsvITCreateCdrDirs(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPartcsvITStartEngine(t *testing.T) {
|
||||
if _, err := engine.StopStartEngine(partpartcsvCfgPath, *waitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// if _, err := engine.StopStartEngine(partpartcsvCfgPath, *waitRater); err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
fmt.Println("START THE ENGINE MANUAL ")
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
|
||||
// Connect rpc client to rater
|
||||
@@ -151,7 +154,7 @@ func TestPartcsvITProcessedFiles(t *testing.T) {
|
||||
if outContent1, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut1, "file1.csv")); err != nil {
|
||||
t.Error(err)
|
||||
} else if partCsvFileContent1 != string(outContent1) {
|
||||
t.Errorf("Expecting: %q, received: %q", partCsvFileContent1, string(outContent1))
|
||||
t.Errorf("Expecting: %q, \n received: %q", partCsvFileContent1, string(outContent1))
|
||||
}
|
||||
if outContent2, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut1, "file2.csv")); err != nil {
|
||||
t.Error(err)
|
||||
@@ -172,7 +175,7 @@ func TestPartcsvITProcessedFiles(t *testing.T) {
|
||||
if contentCacheDump, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut1, fileName)); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(eCacheDumpFile1) != len(string(contentCacheDump)) {
|
||||
t.Errorf("Expecting: %q, received: %q", eCacheDumpFile1, string(contentCacheDump))
|
||||
t.Errorf("Expecting: %q, \n received: %q", eCacheDumpFile1, string(contentCacheDump))
|
||||
}
|
||||
if outContent3, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut2, "file3.csv")); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -144,8 +144,7 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne
|
||||
}
|
||||
}
|
||||
cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan,
|
||||
cfg.GeneralCfg().DefaultTimezone, cfg.GeneralCfg().RoundingDecimals,
|
||||
filterS)
|
||||
cfg.GeneralCfg().DefaultTimezone, filterS)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("Cdrc config parsing error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
|
||||
@@ -40,8 +40,6 @@
|
||||
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~0"},
|
||||
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
|
||||
{"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~1"},
|
||||
{"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"},
|
||||
{"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~4"},
|
||||
{"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~3"},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "*rated", "mandatory": true},
|
||||
{"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "*out", "mandatory": true},
|
||||
@@ -80,8 +78,6 @@
|
||||
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~0"},
|
||||
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
|
||||
{"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~1"},
|
||||
{"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"},
|
||||
{"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~4"},
|
||||
{"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~3"},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "*rated", "mandatory": true},
|
||||
{"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "*out", "mandatory": true},
|
||||
|
||||
@@ -44,7 +44,6 @@
|
||||
"field_separator": "|", // separator used in case of csv files
|
||||
"run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify
|
||||
"max_open_files": 1024, // maximum simultaneous files to process
|
||||
"data_usage_multiply_factor": 1024, // conversion factor for data usage
|
||||
"cdr_in_path": "/tmp/cgr_flatstore/cdrc/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_path": "/tmp/cgr_flatstore/cdrc/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"failed_calls_prefix": "missed_calls", // used in case of flatstore CDRs to avoid searching for BYE records
|
||||
|
||||
@@ -51,7 +51,7 @@
|
||||
"cdr_format": "*file_xml", // CDR file format <csv|freeswitch_csv|fwv|opensips_flatstore|partial_csv.
|
||||
"cdr_in_path": "/tmp/cdrcxmlwithfilters2/xmlit2/in",
|
||||
"cdr_out_path": "/tmp/cdrcxmlwithfilters2/xmlit2/out",
|
||||
"cdr_path": "File.CDRs.Call", // path towards one CDR element in case of XML CDRs
|
||||
"cdr_root_path": "File.CDRs.Call", // path towards one CDR element in case of XML CDRs
|
||||
"cdr_source_id": "zw_cfs1", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
"content_fields":[ // import content_fields template, id will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "*voice", "mandatory": true},
|
||||
|
||||
Reference in New Issue
Block a user