Move Partial creation on an upper layer

This commit is contained in:
TeoV
2019-06-11 12:42:03 +03:00
committed by Dan Christian Bogos
parent cb5731fcf4
commit ccb9af3cb7
5 changed files with 40 additions and 26 deletions

View File

@@ -78,19 +78,26 @@ func NewCdrc(cdrcCfgs []*config.CdrcCfg, httpSkipTlsCheck bool, cdrs rpcclient.R
cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop
}
}
cdrc.unpairedRecordsCache = NewUnpairedRecordsCache(cdrcCfg.PartialRecordCache,
cdrcCfg.CDROutPath, cdrcCfg.FieldSeparator)
cdrc.partialRecordsCache = NewPartialRecordsCache(cdrcCfg.PartialRecordCache,
cdrcCfg.PartialCacheExpiryAction, cdrcCfg.CDROutPath,
cdrcCfg.FieldSeparator, cdrc.timezone, httpSkipTlsCheck, cdrs, filterS)
cdrc.filterS = filterS
return
}
type Cdrc struct {
httpSkipTlsCheck bool
cdrcCfgs []*config.CdrcCfg // All cdrc config profiles attached to this CDRC (key will be profile instance name)
dfltCdrcCfg *config.CdrcCfg
timezone string
cdrs rpcclient.RpcClientConnection
closeChan chan struct{} // Used to signal config reloads when we need to span different CDRC-Client
maxOpenFiles chan struct{} // Maximum number of simultaneous files processed
filterS *engine.FilterS
httpSkipTlsCheck bool
cdrcCfgs []*config.CdrcCfg // All cdrc config profiles attached to this CDRC (key will be profile instance name)
dfltCdrcCfg *config.CdrcCfg
timezone string
cdrs rpcclient.RpcClientConnection
closeChan chan struct{} // Used to signal config reloads when we need to span different CDRC-Client
maxOpenFiles chan struct{} // Maximum number of simultaneous files processed
filterS *engine.FilterS
unpairedRecordsCache *UnpairedRecordsCache // Shared between all files in the folder we process
partialRecordsCache *PartialRecordsCache
}
// When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing
@@ -180,7 +187,8 @@ func (self *Cdrc) processFile(filePath string) error {
csvReader.Comment = '#'
recordsProcessor = NewCsvRecordsProcessor(csvReader, self.timezone, fn, self.dfltCdrcCfg,
self.cdrcCfgs, self.httpSkipTlsCheck,
self.dfltCdrcCfg.CacheDumpFields, self.filterS, self.cdrs)
self.dfltCdrcCfg.CacheDumpFields, self.filterS, self.cdrs,
self.unpairedRecordsCache, self.partialRecordsCache)
case utils.MetaFileFWV:
recordsProcessor = NewFwvRecordsProcessor(file, self.dfltCdrcCfg, self.cdrcCfgs,
self.httpSkipTlsCheck, self.timezone, self.filterS)

View File

@@ -35,16 +35,14 @@ import (
func NewCsvRecordsProcessor(csvReader *csv.Reader, timezone, fileName string,
dfltCdrcCfg *config.CdrcCfg, cdrcCfgs []*config.CdrcCfg,
httpSkipTlsCheck bool, cacheDumpFields []*config.FCTemplate,
filterS *engine.FilterS, cdrs rpcclient.RpcClientConnection) *CsvRecordsProcessor {
filterS *engine.FilterS, cdrs rpcclient.RpcClientConnection,
unp *UnpairedRecordsCache, prt *PartialRecordsCache) *CsvRecordsProcessor {
return &CsvRecordsProcessor{csvReader: csvReader,
timezone: timezone, fileName: fileName,
dfltCdrcCfg: dfltCdrcCfg, cdrcCfgs: cdrcCfgs,
httpSkipTlsCheck: httpSkipTlsCheck,
unpairedRecordsCache: NewUnpairedRecordsCache(dfltCdrcCfg.PartialRecordCache,
dfltCdrcCfg.CDROutPath, dfltCdrcCfg.FieldSeparator),
partialRecordsCache: NewPartialRecordsCache(dfltCdrcCfg.PartialRecordCache,
dfltCdrcCfg.PartialCacheExpiryAction, dfltCdrcCfg.CDROutPath,
dfltCdrcCfg.FieldSeparator, timezone, httpSkipTlsCheck, cdrs, filterS),
httpSkipTlsCheck: httpSkipTlsCheck,
unpairedRecordsCache: unp,
partialRecordsCache: prt,
partialCacheDumpFields: cacheDumpFields, filterS: filterS}
}
@@ -133,7 +131,6 @@ func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR,
} else if self.dfltCdrcCfg.CdrFormat == utils.MetaPartialCSV {
fmt.Println("===Teo===")
fmt.Println(utils.ToJSON(record))
fmt.Println(utils.ToJSON(storedCdr))
if storedCdr, err = self.partialRecordsCache.MergePartialCDRRecord(NewPartialCDRRecord(storedCdr, self.partialCacheDumpFields)); err != nil {
return nil, fmt.Errorf("Failed merging PartialCDR, error: %s", err.Error())
} else if storedCdr == nil { // CDR was absorbed by cache since it was partial

View File

@@ -65,9 +65,15 @@ type PartialRecordsCache struct {
// Dumps the cache into a .unpaired file in the outdir and cleans cache after
func (prc *PartialRecordsCache) dumpPartialRecords(originID string) {
fmt.Printf("=== PartialRecordsCache: \n")
for key, val := range prc.partialRecords {
fmt.Printf("=== partialRecords: %+v : %+v \n", key, utils.ToJSON(val.cdrs))
}
_, err := prc.guard.Guard(func() (interface{}, error) {
if prc.partialRecords[originID].Len() != 0 { // Only write the file if there are records in the cache
dumpFilePath := path.Join(prc.cdrOutDir, fmt.Sprintf("%s.%s.%d", originID, PartialRecordsSuffix, time.Now().Unix()))
fmt.Println("dumpFilePath: ", dumpFilePath)
fileOut, err := os.Create(dumpFilePath)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<Cdrc> Failed creating %s, error: %s", dumpFilePath, err.Error()))
@@ -155,6 +161,8 @@ func (prc *PartialRecordsCache) uncachePartialCDR(pCDR *PartialCDRRecord) {
// Returns PartialCDR only if merge was possible
func (prc *PartialRecordsCache) MergePartialCDRRecord(pCDR *PartialCDRRecord) (*engine.CDR, error) {
fmt.Println("===Enter in MergePartialCDRRecord===")
fmt.Println("===", utils.ToJSON(pCDR.cdrs))
if pCDR.Len() == 0 || pCDR.cdrs[0].OriginID == "" { // Sanity check
return nil, nil
}

View File

@@ -20,7 +20,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package cdrc
import (
"fmt"
"io/ioutil"
"net/rpc"
"net/rpc/jsonrpc"
@@ -97,11 +96,9 @@ func TestPartcsvITCreateCdrDirs(t *testing.T) {
}
func TestPartcsvITStartEngine(t *testing.T) {
// if _, err := engine.StopStartEngine(partpartcsvCfgPath, *waitRater); err != nil {
// t.Fatal(err)
// }
fmt.Println("START THE ENGINE MANUAL ")
time.Sleep(10 * time.Second)
if _, err := engine.StopStartEngine(partpartcsvCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater

View File

@@ -33,13 +33,15 @@
"cdr_in_path": "/tmp/cdrctests/partcsv1/in", // absolute path towards the directory where the CDRs are stored
"cdr_out_path": "/tmp/cdrctests/partcsv1/out", // absolute path towards the directory where processed CDRs will be moved
"cdr_source_id": "partial_csv_test", // free form field, tag identifying the source of the CDRs within CDRS database
"partial_record_cache": "1s", // duration to cache partial records when not pairing
"partial_record_cache": "2s", // duration to cache partial records when not pairing
"partial_cache_expiry_action": "*dump_to_file",
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "ToR", "field_id": "ToR", "type": "*composed", "value": "*voice", "mandatory": true},
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~0"},
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~1"},
{"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~4"},
{"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~3"},
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "*rated", "mandatory": true},
{"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "*out", "mandatory": true},
@@ -67,7 +69,7 @@
{
"id": "post_on_expiry",
"enabled": true,
"cdr_format": "partial_csv",
"cdr_format": "*partial_csv",
"cdr_in_path": "/tmp/cdrctests/partcsv2/in", // absolute path towards the directory where the CDRs are stored
"cdr_out_path": "/tmp/cdrctests/partcsv2/out", // absolute path towards the directory where processed CDRs will be moved
"cdr_source_id": "partial_csv_test2", // free form field, tag identifying the source of the CDRs within CDRS database
@@ -78,6 +80,8 @@
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~0"},
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~1"},
{"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~4"},
{"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~3"},
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "*rated", "mandatory": true},
{"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "*out", "mandatory": true},
@@ -94,4 +98,4 @@
],
}
}