CDRC - Offset handling and initial header, trailer and content processing for .fwv

This commit is contained in:
DanB
2015-07-27 20:48:51 +02:00
parent b121b3e607
commit a0821c3134
5 changed files with 202 additions and 16 deletions

View File

@@ -108,7 +108,7 @@ func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrs
}
cdrc := &Cdrc{cdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir,
runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator,
httpSkipTlsCheck: httpSkipTlsCheck, cdrs: cdrs, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles),
httpSkipTlsCheck: httpSkipTlsCheck, cdrcCfgs: cdrcCfgs, cdrs: cdrs, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles),
}
var processFile struct{}
for i := 0; i < cdrcCfg.MaxOpenFiles; i++ {
@@ -155,6 +155,7 @@ type Cdrc struct {
cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes
cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters
httpSkipTlsCheck bool
cdrcCfgs map[string]*config.CdrcConfig // All cdrc config profiles attached to this CDRC (key will be profile instance name)
cdrs engine.Connector
httpClient *http.Client
exitChan chan struct{}
@@ -250,7 +251,7 @@ func (self *Cdrc) processFile(filePath string) error {
recordsProcessor = NewCsvRecordsProcessor(csvReader, self.cdrFormat, fn, self.failedCallsPrefix,
self.cdrSourceIds, self.duMultiplyFactors, self.cdrFilters, self.cdrFields, self.httpSkipTlsCheck, self.partialRecordsCache)
} else if self.cdrFormat == utils.FWV {
recordsProcessor = NewFwvRecordsProcessor(file)
recordsProcessor = NewFwvRecordsProcessor(file, self.cdrcCfgs)
}
procRowNr := 0
timeStart := time.Now()

View File

@@ -19,8 +19,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package cdrc
import (
"bufio"
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"io"
"os"
)
@@ -36,23 +39,113 @@ if err != nil {
fmt.Printf("Have read in buffer: <%q>, len: %d", string(buf), len(string(buf)))
*/
func NewFwvRecordsProcessor(file *os.File) *FwvRecordsProcessor {
return &FwvRecordsProcessor{file: file}
func NewFwvRecordsProcessor(file *os.File, cdrcCfgs map[string]*config.CdrcConfig) *FwvRecordsProcessor {
frp := &FwvRecordsProcessor{file: file, cdrcCfgs: cdrcCfgs}
for _, frp.dfltCfg = range cdrcCfgs { // Set the first available instance to be used for common parameters
break
}
return frp
}
type FwvRecordsProcessor struct {
file *os.File
lineLen int // Length of the line to read
offset int // Index of the last processed byte
cdrFields [][]*config.CfgCdrField
file *os.File
cdrcCfgs map[string]*config.CdrcConfig
dfltCfg *config.CdrcConfig // General parameters
lineLen int64 // Length of the line in the file
offset int64 // Index of the next byte to process
trailerOffset int64 // Index where trailer starts, to be used as boundary when reading cdrs
}
// Sets the line length based on first line, sets offset back to initial after reading
func (self *FwvRecordsProcessor) setLineLen() error {
rdr := bufio.NewReader(self.file)
readBytes, err := rdr.ReadBytes('\n')
if err != nil {
return err
}
self.lineLen = int64(len(readBytes))
if _, err := self.file.Seek(0, 0); err != nil {
return err
}
return nil
}
func (self *FwvRecordsProcessor) ProcessNextRecord() ([]*engine.StoredCdr, error) {
recordStr := ""
return self.processRecord(recordStr)
defer func() { self.offset += self.lineLen }() // Schedule increasing the offset once we are out from processing the record
if self.offset == 0 { // First time, set the necessary offsets
if err := self.setLineLen(); err != nil {
engine.Logger.Err(fmt.Sprintf("<Cdrc> Row 0, error: cannot set lineLen: %s", err.Error()))
return nil, io.EOF
}
if len(self.dfltCfg.TrailerFields) != 0 {
if fi, err := self.file.Stat(); err != nil {
engine.Logger.Err(fmt.Sprintf("<Cdrc> Row 0, error: cannot get file stats: %s", err.Error()))
return nil, err
} else {
self.trailerOffset = fi.Size() - self.lineLen
}
}
if len(self.dfltCfg.HeaderFields) != 0 { // ToDo: Process here the header fields
if err := self.processHeader(); err != nil {
engine.Logger.Err(fmt.Sprintf("<Cdrc> Row 0, error reading header: %s", err.Error()))
return nil, io.EOF
}
return nil, nil
}
}
recordCdrs := make([]*engine.StoredCdr, 0) // More CDRs based on the number of filters and field templates
if self.trailerOffset != 0 && self.offset >= self.trailerOffset {
if err := self.processTrailer(); err != nil && err != io.EOF {
engine.Logger.Err(fmt.Sprintf("<Cdrc> Read trailer error: %s ", err.Error()))
}
return nil, io.EOF
}
buf := make([]byte, self.lineLen)
nRead, err := self.file.Read(buf)
if err != nil {
return nil, err
} else if nRead != len(buf) {
engine.Logger.Err(fmt.Sprintf("<Cdrc> Could not read complete line, have instead: %s", string(buf)))
return nil, io.EOF
}
for cfgKey := range self.cdrcCfgs {
filterBreak := false
// ToDo: Field filters
if filterBreak { // Stop importing cdrc fields profile due to non matching filter
continue
}
if storedCdr, err := self.recordToStoredCdr(string(buf), cfgKey); err != nil {
return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error())
} else if storedCdr != nil {
recordCdrs = append(recordCdrs, storedCdr)
}
}
return recordCdrs, nil
}
func (self *FwvRecordsProcessor) processRecord(record string) ([]*engine.StoredCdr, error) {
func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cfgKey string) (*engine.StoredCdr, error) {
//engine.Logger.Debug(fmt.Sprintf("RecordToStoredCdr: <%q>, cfgKey: %s, offset: %d, trailerOffset: %d, lineLen: %d", record, cfgKey, self.offset, self.trailerOffset, self.lineLen))
return nil, nil
}
func (self *FwvRecordsProcessor) processHeader() error {
buf := make([]byte, self.lineLen)
if nRead, err := self.file.Read(buf); err != nil {
return err
} else if nRead != len(buf) {
return fmt.Errorf("In header, line len: %d, have read: %d", self.lineLen, nRead)
}
//engine.Logger.Debug(fmt.Sprintf("Have read header: <%q>", string(buf)))
return nil
}
func (self *FwvRecordsProcessor) processTrailer() error {
buf := make([]byte, self.lineLen)
if nRead, err := self.file.ReadAt(buf, self.trailerOffset); err != nil {
return err
} else if nRead != len(buf) {
return fmt.Errorf("In trailer, line len: %d, have read: %d", self.lineLen, nRead)
}
//engine.Logger.Debug(fmt.Sprintf("Have read trailer: <%q>", string(buf)))
return nil
}

View File

@@ -18,14 +18,23 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package cdrc
import ()
import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"io/ioutil"
"net/rpc"
"net/rpc/jsonrpc"
"os"
"path"
"testing"
"time"
)
/*
var fwvCfgPath string
var fwvCfg *config.CGRConfig
var fwvRpc *rpc.Client
var fwvCdrcCfg *config.CdrcConfig
*/
var FW_CDR_FILE1 = `HDR0001DDB ABC Some Connect A.B. DDB-Some-10022-20120711-309.CDR 00030920120711100255
CDR0000010 0 20120708181506000123451234 0040123123120 004 000018009980010001ISDN ABC 10Buiten uw regio EHV 00000009190000000009
CDR0000020 0 20120708190945000123451234 0040123123120 004 000016009980010001ISDN ABC 10Buiten uw regio EHV 00000009190000000009
@@ -60,4 +69,82 @@ CDR0000300 0 20120709073707000123123459 0040123234531
CDR0000310 0 20120709085451000123451237 0040012323453100 001 000744009980030001ISDN ABD 20Internationaal NLB 00000000190000000000
CDR0000320 0 20120709091756000123451237 0040012323453100 001 000050009980030001ISDN ABD 20Internationaal NLB 00000000190000000000
CDR0000330 0 20120710070434000123123458 0040123232350 004 000012002760000001PSTN 276 10Buiten uw regio TB 00000009190000000009
TRL0001DDB ABC Some Connect A.B. DDB-Some-10022-20120711-309.CDR 0003090000003300000030550000000001000000000100Y `
TRL0001DDB ABC Some Connect A.B. DDB-Some-10022-20120711-309.CDR 0003090000003300000030550000000001000000000100Y
`
func TestFwvLclInitCfg(t *testing.T) {
if !*testLocal {
return
}
var err error
fwvCfgPath = path.Join(*dataDir, "conf", "samples", "cdrcfwv")
if fwvCfg, err = config.NewCGRConfigFromFolder(fwvCfgPath); err != nil {
t.Fatal("Got config error: ", err.Error())
}
}
// Creates cdr files and moves them into processing folder
func TestFwvLclCreateCdrFiles(t *testing.T) {
if !*testLocal {
return
}
if fwvCfg == nil {
t.Fatal("Empty default cdrc configuration")
}
fwvCdrcCfg = fwvCfg.CdrcProfiles["/tmp/cgr_fwv/cdrc/in"]["FWV1"]
if err := os.RemoveAll(fwvCdrcCfg.CdrInDir); err != nil {
t.Fatal("Error removing folder: ", fwvCdrcCfg.CdrInDir, err)
}
if err := os.MkdirAll(fwvCdrcCfg.CdrInDir, 0755); err != nil {
t.Fatal("Error creating folder: ", fwvCdrcCfg.CdrInDir, err)
}
if err := os.RemoveAll(fwvCdrcCfg.CdrOutDir); err != nil {
t.Fatal("Error removing folder: ", fwvCdrcCfg.CdrOutDir, err)
}
if err := os.MkdirAll(fwvCdrcCfg.CdrOutDir, 0755); err != nil {
t.Fatal("Error creating folder: ", fwvCdrcCfg.CdrOutDir, err)
}
}
func TestFwvLclStartEngine(t *testing.T) {
if !*testLocal {
return
}
if _, err := engine.StopStartEngine(fwvCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater
func TestFwvLclRpcConn(t *testing.T) {
if !*testLocal {
return
}
var err error
fwvRpc, err = jsonrpc.Dial("tcp", fwvCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
}
func TestFwvLclProcessFiles(t *testing.T) {
if !*testLocal {
return
}
fileName := "test1.fwv"
if err := ioutil.WriteFile(path.Join("/tmp", fileName), []byte(FW_CDR_FILE1), 0644); err != nil {
t.Fatal(err.Error)
}
if err := os.Rename(path.Join("/tmp", fileName), path.Join(fwvCdrcCfg.CdrInDir, fileName)); err != nil {
t.Fatal(err)
}
time.Sleep(time.Duration(1) * time.Second)
filesInDir, _ := ioutil.ReadDir(fwvCdrcCfg.CdrInDir)
if len(filesInDir) != 0 {
t.Errorf("Files in cdrcInDir: %d", len(filesInDir))
}
filesOutDir, _ := ioutil.ReadDir(fwvCdrcCfg.CdrOutDir)
if len(filesOutDir) != 1 {
t.Errorf("In CdrcOutDir, expecting 1 files, got: %d", len(filesOutDir))
}
}

View File

@@ -34,6 +34,9 @@ func NewCfgCdrFieldFromCdrFieldJsonCfg(jsnCfgFld *CdrFieldJsonCfg) (*CfgCdrField
if jsnCfgFld.Cdr_field_id != nil {
cfgFld.CdrFieldId = *jsnCfgFld.Cdr_field_id
}
if jsnCfgFld.Metatag_id != nil {
cfgFld.MetatagId = *jsnCfgFld.Metatag_id
}
if jsnCfgFld.Value != nil {
if cfgFld.Value, err = utils.ParseRSRFields(*jsnCfgFld.Value, utils.INFIELD_SEP); err != nil {
return nil, err
@@ -66,6 +69,7 @@ type CfgCdrField struct {
Tag string // Identifier for the administrator
Type string // Type of field
CdrFieldId string // StoredCdr field name
MetatagId string
Value utils.RSRFields
FieldFilter utils.RSRFields
Width int

View File

@@ -97,6 +97,7 @@ type CdrFieldJsonCfg struct {
Tag *string
Type *string
Cdr_field_id *string
Metatag_id *string
Value *string
Width *int
Strip *string