diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 6aaa89c05..b5a7f704b 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -108,7 +108,7 @@ func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrs } cdrc := &Cdrc{cdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir, runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator, - httpSkipTlsCheck: httpSkipTlsCheck, cdrs: cdrs, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles), + httpSkipTlsCheck: httpSkipTlsCheck, cdrcCfgs: cdrcCfgs, cdrs: cdrs, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles), } var processFile struct{} for i := 0; i < cdrcCfg.MaxOpenFiles; i++ { @@ -155,6 +155,7 @@ type Cdrc struct { cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters httpSkipTlsCheck bool + cdrcCfgs map[string]*config.CdrcConfig // All cdrc config profiles attached to this CDRC (key will be profile instance name) cdrs engine.Connector httpClient *http.Client exitChan chan struct{} @@ -250,7 +251,7 @@ func (self *Cdrc) processFile(filePath string) error { recordsProcessor = NewCsvRecordsProcessor(csvReader, self.cdrFormat, fn, self.failedCallsPrefix, self.cdrSourceIds, self.duMultiplyFactors, self.cdrFilters, self.cdrFields, self.httpSkipTlsCheck, self.partialRecordsCache) } else if self.cdrFormat == utils.FWV { - recordsProcessor = NewFwvRecordsProcessor(file) + recordsProcessor = NewFwvRecordsProcessor(file, self.cdrcCfgs) } procRowNr := 0 timeStart := time.Now() diff --git a/cdrc/fwv.go b/cdrc/fwv.go index 50faaa1b9..b30e6bf90 100644 --- a/cdrc/fwv.go +++ b/cdrc/fwv.go @@ -19,8 +19,11 @@ along with this program. If not, see package cdrc import ( + "bufio" + "fmt" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "io" "os" ) @@ -36,23 +39,113 @@ if err != nil { fmt.Printf("Have read in buffer: <%q>, len: %d", string(buf), len(string(buf))) */ -func NewFwvRecordsProcessor(file *os.File) *FwvRecordsProcessor { - return &FwvRecordsProcessor{file: file} +func NewFwvRecordsProcessor(file *os.File, cdrcCfgs map[string]*config.CdrcConfig) *FwvRecordsProcessor { + frp := &FwvRecordsProcessor{file: file, cdrcCfgs: cdrcCfgs} + for _, frp.dfltCfg = range cdrcCfgs { // Set the first available instance to be used for common parameters + break + } + return frp } type FwvRecordsProcessor struct { - file *os.File - lineLen int // Length of the line to read - offset int // Index of the last processed byte - cdrFields [][]*config.CfgCdrField + file *os.File + cdrcCfgs map[string]*config.CdrcConfig + dfltCfg *config.CdrcConfig // General parameters + lineLen int64 // Length of the line in the file + offset int64 // Index of the next byte to process + trailerOffset int64 // Index where trailer starts, to be used as boundary when reading cdrs +} + +// Sets the line length based on first line, sets offset back to initial after reading +func (self *FwvRecordsProcessor) setLineLen() error { + rdr := bufio.NewReader(self.file) + readBytes, err := rdr.ReadBytes('\n') + if err != nil { + return err + } + self.lineLen = int64(len(readBytes)) + if _, err := self.file.Seek(0, 0); err != nil { + return err + } + return nil } func (self *FwvRecordsProcessor) ProcessNextRecord() ([]*engine.StoredCdr, error) { - - recordStr := "" - return self.processRecord(recordStr) + defer func() { self.offset += self.lineLen }() // Schedule increasing the offset once we are out from processing the record + if self.offset == 0 { // First time, set the necessary offsets + if err := self.setLineLen(); err != nil { + engine.Logger.Err(fmt.Sprintf(" Row 0, error: cannot set lineLen: %s", err.Error())) + return nil, io.EOF + } + if len(self.dfltCfg.TrailerFields) != 0 { + if fi, err := self.file.Stat(); err != nil { + engine.Logger.Err(fmt.Sprintf(" Row 0, error: cannot get file stats: %s", err.Error())) + return nil, err + } else { + self.trailerOffset = fi.Size() - self.lineLen + } + } + if len(self.dfltCfg.HeaderFields) != 0 { // ToDo: Process here the header fields + if err := self.processHeader(); err != nil { + engine.Logger.Err(fmt.Sprintf(" Row 0, error reading header: %s", err.Error())) + return nil, io.EOF + } + return nil, nil + } + } + recordCdrs := make([]*engine.StoredCdr, 0) // More CDRs based on the number of filters and field templates + if self.trailerOffset != 0 && self.offset >= self.trailerOffset { + if err := self.processTrailer(); err != nil && err != io.EOF { + engine.Logger.Err(fmt.Sprintf(" Read trailer error: %s ", err.Error())) + } + return nil, io.EOF + } + buf := make([]byte, self.lineLen) + nRead, err := self.file.Read(buf) + if err != nil { + return nil, err + } else if nRead != len(buf) { + engine.Logger.Err(fmt.Sprintf(" Could not read complete line, have instead: %s", string(buf))) + return nil, io.EOF + } + for cfgKey := range self.cdrcCfgs { + filterBreak := false + // ToDo: Field filters + if filterBreak { // Stop importing cdrc fields profile due to non matching filter + continue + } + if storedCdr, err := self.recordToStoredCdr(string(buf), cfgKey); err != nil { + return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error()) + } else if storedCdr != nil { + recordCdrs = append(recordCdrs, storedCdr) + } + } + return recordCdrs, nil } -func (self *FwvRecordsProcessor) processRecord(record string) ([]*engine.StoredCdr, error) { +func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cfgKey string) (*engine.StoredCdr, error) { + //engine.Logger.Debug(fmt.Sprintf("RecordToStoredCdr: <%q>, cfgKey: %s, offset: %d, trailerOffset: %d, lineLen: %d", record, cfgKey, self.offset, self.trailerOffset, self.lineLen)) return nil, nil } + +func (self *FwvRecordsProcessor) processHeader() error { + buf := make([]byte, self.lineLen) + if nRead, err := self.file.Read(buf); err != nil { + return err + } else if nRead != len(buf) { + return fmt.Errorf("In header, line len: %d, have read: %d", self.lineLen, nRead) + } + //engine.Logger.Debug(fmt.Sprintf("Have read header: <%q>", string(buf))) + return nil +} + +func (self *FwvRecordsProcessor) processTrailer() error { + buf := make([]byte, self.lineLen) + if nRead, err := self.file.ReadAt(buf, self.trailerOffset); err != nil { + return err + } else if nRead != len(buf) { + return fmt.Errorf("In trailer, line len: %d, have read: %d", self.lineLen, nRead) + } + //engine.Logger.Debug(fmt.Sprintf("Have read trailer: <%q>", string(buf))) + return nil +} diff --git a/cdrc/fwv_local_test.go b/cdrc/fwv_local_test.go index bd2bd9f58..7f4b96fb4 100644 --- a/cdrc/fwv_local_test.go +++ b/cdrc/fwv_local_test.go @@ -18,14 +18,23 @@ along with this program. If not, see package cdrc -import () +import ( + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "io/ioutil" + "net/rpc" + "net/rpc/jsonrpc" + "os" + "path" + "testing" + "time" +) -/* var fwvCfgPath string var fwvCfg *config.CGRConfig var fwvRpc *rpc.Client var fwvCdrcCfg *config.CdrcConfig -*/ + var FW_CDR_FILE1 = `HDR0001DDB ABC Some Connect A.B. DDB-Some-10022-20120711-309.CDR 00030920120711100255 CDR0000010 0 20120708181506000123451234 0040123123120 004 000018009980010001ISDN ABC 10Buiten uw regio EHV 00000009190000000009 CDR0000020 0 20120708190945000123451234 0040123123120 004 000016009980010001ISDN ABC 10Buiten uw regio EHV 00000009190000000009 @@ -60,4 +69,82 @@ CDR0000300 0 20120709073707000123123459 0040123234531 CDR0000310 0 20120709085451000123451237 0040012323453100 001 000744009980030001ISDN ABD 20Internationaal NLB 00000000190000000000 CDR0000320 0 20120709091756000123451237 0040012323453100 001 000050009980030001ISDN ABD 20Internationaal NLB 00000000190000000000 CDR0000330 0 20120710070434000123123458 0040123232350 004 000012002760000001PSTN 276 10Buiten uw regio TB 00000009190000000009 -TRL0001DDB ABC Some Connect A.B. DDB-Some-10022-20120711-309.CDR 0003090000003300000030550000000001000000000100Y ` +TRL0001DDB ABC Some Connect A.B. DDB-Some-10022-20120711-309.CDR 0003090000003300000030550000000001000000000100Y +` + +func TestFwvLclInitCfg(t *testing.T) { + if !*testLocal { + return + } + var err error + fwvCfgPath = path.Join(*dataDir, "conf", "samples", "cdrcfwv") + if fwvCfg, err = config.NewCGRConfigFromFolder(fwvCfgPath); err != nil { + t.Fatal("Got config error: ", err.Error()) + } +} + +// Creates cdr files and moves them into processing folder +func TestFwvLclCreateCdrFiles(t *testing.T) { + if !*testLocal { + return + } + if fwvCfg == nil { + t.Fatal("Empty default cdrc configuration") + } + fwvCdrcCfg = fwvCfg.CdrcProfiles["/tmp/cgr_fwv/cdrc/in"]["FWV1"] + if err := os.RemoveAll(fwvCdrcCfg.CdrInDir); err != nil { + t.Fatal("Error removing folder: ", fwvCdrcCfg.CdrInDir, err) + } + if err := os.MkdirAll(fwvCdrcCfg.CdrInDir, 0755); err != nil { + t.Fatal("Error creating folder: ", fwvCdrcCfg.CdrInDir, err) + } + if err := os.RemoveAll(fwvCdrcCfg.CdrOutDir); err != nil { + t.Fatal("Error removing folder: ", fwvCdrcCfg.CdrOutDir, err) + } + if err := os.MkdirAll(fwvCdrcCfg.CdrOutDir, 0755); err != nil { + t.Fatal("Error creating folder: ", fwvCdrcCfg.CdrOutDir, err) + } +} + +func TestFwvLclStartEngine(t *testing.T) { + if !*testLocal { + return + } + if _, err := engine.StopStartEngine(fwvCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func TestFwvLclRpcConn(t *testing.T) { + if !*testLocal { + return + } + var err error + fwvRpc, err = jsonrpc.Dial("tcp", fwvCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +func TestFwvLclProcessFiles(t *testing.T) { + if !*testLocal { + return + } + fileName := "test1.fwv" + if err := ioutil.WriteFile(path.Join("/tmp", fileName), []byte(FW_CDR_FILE1), 0644); err != nil { + t.Fatal(err.Error) + } + if err := os.Rename(path.Join("/tmp", fileName), path.Join(fwvCdrcCfg.CdrInDir, fileName)); err != nil { + t.Fatal(err) + } + time.Sleep(time.Duration(1) * time.Second) + filesInDir, _ := ioutil.ReadDir(fwvCdrcCfg.CdrInDir) + if len(filesInDir) != 0 { + t.Errorf("Files in cdrcInDir: %d", len(filesInDir)) + } + filesOutDir, _ := ioutil.ReadDir(fwvCdrcCfg.CdrOutDir) + if len(filesOutDir) != 1 { + t.Errorf("In CdrcOutDir, expecting 1 files, got: %d", len(filesOutDir)) + } +} diff --git a/config/cfgcdrfield.go b/config/cfgcdrfield.go index ce1168d77..4e9aef604 100644 --- a/config/cfgcdrfield.go +++ b/config/cfgcdrfield.go @@ -34,6 +34,9 @@ func NewCfgCdrFieldFromCdrFieldJsonCfg(jsnCfgFld *CdrFieldJsonCfg) (*CfgCdrField if jsnCfgFld.Cdr_field_id != nil { cfgFld.CdrFieldId = *jsnCfgFld.Cdr_field_id } + if jsnCfgFld.Metatag_id != nil { + cfgFld.MetatagId = *jsnCfgFld.Metatag_id + } if jsnCfgFld.Value != nil { if cfgFld.Value, err = utils.ParseRSRFields(*jsnCfgFld.Value, utils.INFIELD_SEP); err != nil { return nil, err @@ -66,6 +69,7 @@ type CfgCdrField struct { Tag string // Identifier for the administrator Type string // Type of field CdrFieldId string // StoredCdr field name + MetatagId string Value utils.RSRFields FieldFilter utils.RSRFields Width int diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 73d03376d..f4477f451 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -97,6 +97,7 @@ type CdrFieldJsonCfg struct { Tag *string Type *string Cdr_field_id *string + Metatag_id *string Value *string Width *int Strip *string