Loader TestProcessContent with single file, defaults to map[string]interface{} -> struct

This commit is contained in:
DanB
2018-03-26 18:10:59 +02:00
parent a2ca4ef7e3
commit 9da53d5fdb
3 changed files with 163 additions and 7 deletions

View File

@@ -32,7 +32,7 @@ import (
type openedCSVFile struct {
fileName string
fd *os.File
rdr io.ReadCloser // keep reference so we can close it when done
csvRdr *csv.Reader
}
@@ -59,7 +59,12 @@ func (ldr *Loader) ProcessFolder() (err error) {
}
defer ldr.unlockFolder()
for ldrType := range ldr.rdrs {
if err = ldr.processFiles(ldrType); err != nil {
if err = ldr.openFiles(ldrType); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> loaderType: <%s> cannot open files, err: %s",
utils.LoaderS, ldrType, err.Error()))
continue
}
if err = ldr.processContent(ldrType); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> loaderType: <%s>, err: %s",
utils.LoaderS, ldrType, err.Error()))
}
@@ -83,7 +88,7 @@ func (ldr *Loader) unlockFolder() (err error) {
func (ldr *Loader) unreferenceFile(loaderType, fileName string) (err error) {
openedCSVFile := ldr.rdrs[loaderType][fileName]
ldr.rdrs[loaderType][fileName] = nil
return openedCSVFile.fd.Close()
return openedCSVFile.rdr.Close()
}
func (ldr *Loader) storeLoadedData(loaderType string,
@@ -110,16 +115,20 @@ func (ldr *Loader) storeLoadedData(loaderType string,
return
}
func (ldr *Loader) processFiles(loaderType string) (err error) {
func (ldr *Loader) openFiles(loaderType string) (err error) {
for fName := range ldr.rdrs[loaderType] {
var fd *os.File
if fd, err = os.Open(path.Join(ldr.tpInDir, fName)); err != nil {
var rdr *os.File
if rdr, err = os.Open(path.Join(ldr.tpInDir, fName)); err != nil {
return err
}
ldr.rdrs[loaderType][fName] = &openedCSVFile{
fileName: fName, fd: fd, csvRdr: csv.NewReader(fd)}
fileName: fName, rdr: rdr, csvRdr: csv.NewReader(rdr)}
defer ldr.unreferenceFile(loaderType, fName)
}
return
}
func (ldr *Loader) processContent(loaderType string) (err error) {
// start processing lines
keepLooping := true // controls looping
lineNr := 0
@@ -152,6 +161,9 @@ func (ldr *Loader) processFiles(loaderType string) (err error) {
// Record from map
// update dataDB
}
if len(lData) == 0 { // no data, could be the last line in file
continue
}
tntID := lData.TenantID()
if _, has := ldr.bufLoaderData[tntID]; !has &&
len(ldr.bufLoaderData) == 1 { // process previous records before going futher

135
loaders/loader_test.go Normal file
View File

@@ -0,0 +1,135 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package loaders
import (
"encoding/csv"
"io/ioutil"
"reflect"
"strings"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var attrsCSV = `#Tenant,ID,Contexts,FilterIDs,ActivationInterval,FieldName,Initial,Substitute,Append,Weight
cgrates.org,TestLoader1,*sessions;*cdrs,*string:Account:1007,2014-01-14T00:00:00Z,Account,*any,1001,false,10
cgrates.org,TestLoader1,lcr,*string:Account:1008;*string:Account:1009,,Subject,*any,1001,true,
`
func TestLoaderProcessContentSingleFile(t *testing.T) {
data, _ := engine.NewMapStorage()
ldr := &Loader{
ldrID: "TestLoaderProcessContent",
//rdrs map[string]map[string]*openedCSVFile
bufLoaderData: make(map[string][]LoaderData),
dm: engine.NewDataManager(data),
timezone: "UTC",
}
ldr.dataTpls = map[string][]*config.CfgCdrField{
utils.MetaAttributes: []*config.CfgCdrField{
&config.CfgCdrField{Tag: "TenantID",
FieldId: "Tenant",
Type: utils.META_COMPOSED,
Value: utils.ParseRSRFieldsMustCompile("0", utils.INFIELD_SEP),
Mandatory: true},
&config.CfgCdrField{Tag: "ProfileID",
FieldId: "ID",
Type: utils.META_COMPOSED,
Value: utils.ParseRSRFieldsMustCompile("1", utils.INFIELD_SEP),
Mandatory: true},
&config.CfgCdrField{Tag: "Contexts",
FieldId: "Contexts",
Type: utils.META_COMPOSED,
Value: utils.ParseRSRFieldsMustCompile("2", utils.INFIELD_SEP)},
&config.CfgCdrField{Tag: "FilterIDs",
FieldId: "FilterIDs",
Type: utils.META_COMPOSED,
Value: utils.ParseRSRFieldsMustCompile("3", utils.INFIELD_SEP)},
&config.CfgCdrField{Tag: "ActivationInterval",
FieldId: "ActivationInterval",
Type: utils.META_COMPOSED,
Value: utils.ParseRSRFieldsMustCompile("4", utils.INFIELD_SEP)},
&config.CfgCdrField{Tag: "FieldName",
FieldId: "FieldName",
Type: utils.META_COMPOSED,
Value: utils.ParseRSRFieldsMustCompile("5", utils.INFIELD_SEP)},
&config.CfgCdrField{Tag: "Initial",
FieldId: "Initial",
Type: utils.META_COMPOSED,
Value: utils.ParseRSRFieldsMustCompile("6", utils.INFIELD_SEP)},
&config.CfgCdrField{Tag: "Substitute",
FieldId: "Substitute",
Type: utils.META_COMPOSED,
Value: utils.ParseRSRFieldsMustCompile("7", utils.INFIELD_SEP)},
&config.CfgCdrField{Tag: "Append",
FieldId: "Append",
Type: utils.META_COMPOSED,
Value: utils.ParseRSRFieldsMustCompile("8", utils.INFIELD_SEP)},
&config.CfgCdrField{Tag: "Weight",
FieldId: "Weight",
Type: utils.META_COMPOSED,
Value: utils.ParseRSRFieldsMustCompile("9", utils.INFIELD_SEP)},
},
}
rdr := ioutil.NopCloser(strings.NewReader(attrsCSV))
csvRdr := csv.NewReader(rdr)
csvRdr.Comment = '#'
ldr.rdrs = map[string]map[string]*openedCSVFile{
utils.MetaAttributes: map[string]*openedCSVFile{
"Attributes.csv": &openedCSVFile{fileName: "Attributes.csv",
rdr: rdr, csvRdr: csvRdr}},
}
if err := ldr.processContent(utils.MetaAttributes); err != nil {
t.Error(err)
}
eAP := &engine.AttributeProfile{
Tenant: "cgrates.org",
ID: "TestLoader1",
Contexts: []string{utils.MetaSessionS, utils.MetaCDRs, "lcr"},
FilterIDs: []string{"*string:Account:1007",
"*string:Account:1008", "*string:Account:1009"},
ActivationInterval: &utils.ActivationInterval{
ActivationTime: time.Date(2014, 1, 14, 0, 0, 0, 0, time.UTC)},
Attributes: []*engine.Attribute{
&engine.Attribute{
FieldName: "Account",
Initial: utils.ANY,
Substitute: "1001",
Append: false,
},
&engine.Attribute{
FieldName: "Subject",
Initial: utils.ANY,
Substitute: "1001",
Append: true,
}},
Weight: 10.0,
}
if ap, err := ldr.dm.GetAttributeProfile("cgrates.org", "TestLoader1",
false, utils.NonTransactional); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eAP.Attributes, ap.Attributes) {
t.Errorf("expecting: %s, received: %s",
utils.ToJSON(eAP), utils.ToJSON(ap))
}
}

View File

@@ -261,18 +261,27 @@ func UpdateStructWithIfaceMap(s interface{}, mp map[string]interface{}) (err err
if fld.IsValid() {
switch fld.Kind() {
case reflect.Bool:
if val == "" { // auto-populate defaults so we can parse empty strings
val = false
}
if valBool, err := IfaceAsBool(val); err != nil {
return err
} else {
fld.SetBool(valBool)
}
case reflect.Int, reflect.Int64:
if val == "" {
val = 0
}
if valInt, err := IfaceAsInt64(val); err != nil {
return err
} else {
fld.SetInt(valInt)
}
case reflect.Float64:
if val == "" {
val = 0.0
}
if valFlt, err := IfaceAsFloat64(val); err != nil {
return err
} else {