diff --git a/loaders/loader.go b/loaders/loader.go index da3761790..89546ec8c 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -32,7 +32,7 @@ import ( type openedCSVFile struct { fileName string - fd *os.File + rdr io.ReadCloser // keep reference so we can close it when done csvRdr *csv.Reader } @@ -59,7 +59,12 @@ func (ldr *Loader) ProcessFolder() (err error) { } defer ldr.unlockFolder() for ldrType := range ldr.rdrs { - if err = ldr.processFiles(ldrType); err != nil { + if err = ldr.openFiles(ldrType); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> loaderType: <%s> cannot open files, err: %s", + utils.LoaderS, ldrType, err.Error())) + continue + } + if err = ldr.processContent(ldrType); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> loaderType: <%s>, err: %s", utils.LoaderS, ldrType, err.Error())) } @@ -83,7 +88,7 @@ func (ldr *Loader) unlockFolder() (err error) { func (ldr *Loader) unreferenceFile(loaderType, fileName string) (err error) { openedCSVFile := ldr.rdrs[loaderType][fileName] ldr.rdrs[loaderType][fileName] = nil - return openedCSVFile.fd.Close() + return openedCSVFile.rdr.Close() } func (ldr *Loader) storeLoadedData(loaderType string, @@ -110,16 +115,20 @@ func (ldr *Loader) storeLoadedData(loaderType string, return } -func (ldr *Loader) processFiles(loaderType string) (err error) { +func (ldr *Loader) openFiles(loaderType string) (err error) { for fName := range ldr.rdrs[loaderType] { - var fd *os.File - if fd, err = os.Open(path.Join(ldr.tpInDir, fName)); err != nil { + var rdr *os.File + if rdr, err = os.Open(path.Join(ldr.tpInDir, fName)); err != nil { return err } ldr.rdrs[loaderType][fName] = &openedCSVFile{ - fileName: fName, fd: fd, csvRdr: csv.NewReader(fd)} + fileName: fName, rdr: rdr, csvRdr: csv.NewReader(rdr)} defer ldr.unreferenceFile(loaderType, fName) } + return +} + +func (ldr *Loader) processContent(loaderType string) (err error) { // start processing lines keepLooping := true // controls looping lineNr := 0 @@ -152,6 +161,9 @@ func (ldr *Loader) processFiles(loaderType string) (err error) { // Record from map // update dataDB } + if len(lData) == 0 { // no data, could be the last line in file + continue + } tntID := lData.TenantID() if _, has := ldr.bufLoaderData[tntID]; !has && len(ldr.bufLoaderData) == 1 { // process previous records before going futher diff --git a/loaders/loader_test.go b/loaders/loader_test.go new file mode 100644 index 000000000..2f6a40e96 --- /dev/null +++ b/loaders/loader_test.go @@ -0,0 +1,135 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package loaders + +import ( + "encoding/csv" + "io/ioutil" + "reflect" + "strings" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var attrsCSV = `#Tenant,ID,Contexts,FilterIDs,ActivationInterval,FieldName,Initial,Substitute,Append,Weight +cgrates.org,TestLoader1,*sessions;*cdrs,*string:Account:1007,2014-01-14T00:00:00Z,Account,*any,1001,false,10 +cgrates.org,TestLoader1,lcr,*string:Account:1008;*string:Account:1009,,Subject,*any,1001,true, +` + +func TestLoaderProcessContentSingleFile(t *testing.T) { + data, _ := engine.NewMapStorage() + ldr := &Loader{ + ldrID: "TestLoaderProcessContent", + //rdrs map[string]map[string]*openedCSVFile + bufLoaderData: make(map[string][]LoaderData), + dm: engine.NewDataManager(data), + timezone: "UTC", + } + ldr.dataTpls = map[string][]*config.CfgCdrField{ + utils.MetaAttributes: []*config.CfgCdrField{ + &config.CfgCdrField{Tag: "TenantID", + FieldId: "Tenant", + Type: utils.META_COMPOSED, + Value: utils.ParseRSRFieldsMustCompile("0", utils.INFIELD_SEP), + Mandatory: true}, + &config.CfgCdrField{Tag: "ProfileID", + FieldId: "ID", + Type: utils.META_COMPOSED, + Value: utils.ParseRSRFieldsMustCompile("1", utils.INFIELD_SEP), + Mandatory: true}, + &config.CfgCdrField{Tag: "Contexts", + FieldId: "Contexts", + Type: utils.META_COMPOSED, + Value: utils.ParseRSRFieldsMustCompile("2", utils.INFIELD_SEP)}, + &config.CfgCdrField{Tag: "FilterIDs", + FieldId: "FilterIDs", + Type: utils.META_COMPOSED, + Value: utils.ParseRSRFieldsMustCompile("3", utils.INFIELD_SEP)}, + &config.CfgCdrField{Tag: "ActivationInterval", + FieldId: "ActivationInterval", + Type: utils.META_COMPOSED, + Value: utils.ParseRSRFieldsMustCompile("4", utils.INFIELD_SEP)}, + &config.CfgCdrField{Tag: "FieldName", + FieldId: "FieldName", + Type: utils.META_COMPOSED, + Value: utils.ParseRSRFieldsMustCompile("5", utils.INFIELD_SEP)}, + &config.CfgCdrField{Tag: "Initial", + FieldId: "Initial", + Type: utils.META_COMPOSED, + Value: utils.ParseRSRFieldsMustCompile("6", utils.INFIELD_SEP)}, + &config.CfgCdrField{Tag: "Substitute", + FieldId: "Substitute", + Type: utils.META_COMPOSED, + Value: utils.ParseRSRFieldsMustCompile("7", utils.INFIELD_SEP)}, + &config.CfgCdrField{Tag: "Append", + FieldId: "Append", + Type: utils.META_COMPOSED, + Value: utils.ParseRSRFieldsMustCompile("8", utils.INFIELD_SEP)}, + &config.CfgCdrField{Tag: "Weight", + FieldId: "Weight", + Type: utils.META_COMPOSED, + Value: utils.ParseRSRFieldsMustCompile("9", utils.INFIELD_SEP)}, + }, + } + rdr := ioutil.NopCloser(strings.NewReader(attrsCSV)) + csvRdr := csv.NewReader(rdr) + csvRdr.Comment = '#' + ldr.rdrs = map[string]map[string]*openedCSVFile{ + utils.MetaAttributes: map[string]*openedCSVFile{ + "Attributes.csv": &openedCSVFile{fileName: "Attributes.csv", + rdr: rdr, csvRdr: csvRdr}}, + } + if err := ldr.processContent(utils.MetaAttributes); err != nil { + t.Error(err) + } + eAP := &engine.AttributeProfile{ + Tenant: "cgrates.org", + ID: "TestLoader1", + Contexts: []string{utils.MetaSessionS, utils.MetaCDRs, "lcr"}, + FilterIDs: []string{"*string:Account:1007", + "*string:Account:1008", "*string:Account:1009"}, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 1, 14, 0, 0, 0, 0, time.UTC)}, + Attributes: []*engine.Attribute{ + &engine.Attribute{ + FieldName: "Account", + Initial: utils.ANY, + Substitute: "1001", + Append: false, + }, + &engine.Attribute{ + FieldName: "Subject", + Initial: utils.ANY, + Substitute: "1001", + Append: true, + }}, + Weight: 10.0, + } + if ap, err := ldr.dm.GetAttributeProfile("cgrates.org", "TestLoader1", + false, utils.NonTransactional); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eAP.Attributes, ap.Attributes) { + t.Errorf("expecting: %s, received: %s", + utils.ToJSON(eAP), utils.ToJSON(ap)) + } +} diff --git a/utils/struct.go b/utils/struct.go index c58822130..1aa755316 100644 --- a/utils/struct.go +++ b/utils/struct.go @@ -261,18 +261,27 @@ func UpdateStructWithIfaceMap(s interface{}, mp map[string]interface{}) (err err if fld.IsValid() { switch fld.Kind() { case reflect.Bool: + if val == "" { // auto-populate defaults so we can parse empty strings + val = false + } if valBool, err := IfaceAsBool(val); err != nil { return err } else { fld.SetBool(valBool) } case reflect.Int, reflect.Int64: + if val == "" { + val = 0 + } if valInt, err := IfaceAsInt64(val); err != nil { return err } else { fld.SetInt(valInt) } case reflect.Float64: + if val == "" { + val = 0.0 + } if valFlt, err := IfaceAsFloat64(val); err != nil { return err } else {