LoaderS with storeLoadedData method, reflect.UpdateStructWithIfaceMap

This commit is contained in:
DanB
2018-03-23 20:36:16 +01:00
parent e81a3ae10b
commit be89785ad6
6 changed files with 235 additions and 36 deletions

View File

@@ -466,7 +466,6 @@ const CGRATES_CFG_JSON = `
"max_open_files": 1024, // maximum simultaneous files to process, 0 for unlimited
"tp_in_dir": "/var/spool/cgrates/tploader/in", // absolute path towards the directory where the CDRs are stored
"tp_out_dir": "/var/spool/cgrates/tploader/out", // absolute path towards the directory where processed CDRs will be moved
"data":[ // data profiles to load
{
"type": "*attributes", // data source type

View File

@@ -29,6 +29,12 @@ import (
type LoaderData map[string]interface{}
func (ld LoaderData) TenantID() string {
tnt := ld[utils.Tenant].(string)
prflID := ld[utils.ID].(string)
return utils.ConcatenatedKey(tnt, prflID)
}
// UpdateFromCSV will update LoaderData with data received from fileName,
// contained in record and processed with cfgTpl
func (ld LoaderData) UpdateFromCSV(fileName string, record []string,

View File

@@ -21,10 +21,12 @@ package loaders
import (
"encoding/csv"
"fmt"
"io"
"os"
"path"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -36,14 +38,33 @@ type openedCSVFile struct {
// Loader is one instance loading from a folder
type Loader struct {
tpInDir string
tpOutDir string
lockFilename string
cacheSConns []*config.HaPoolConfig
fieldSep string
dataTpls []*config.LoaderSDataType
rdrs map[string]map[string]*openedCSVFile // map[loaderType]map[fileName]*openedCSVFile for common incremental read
procRows int // keep here the last processed row in the file/-s
ldrID string
tpInDir string
tpOutDir string
lockFilename string
cacheSConns []*config.HaPoolConfig
fieldSep string
dataTpls map[string][]*config.CfgCdrField // map[loaderType]*config.CfgCdrField
rdrs map[string]map[string]*openedCSVFile // map[loaderType]map[fileName]*openedCSVFile for common incremental read
procRows int // keep here the last processed row in the file/-s
bufLoaderData map[string][]LoaderData // cache of data read, indexed on tenantID
dm *engine.DataManager
timezone string
}
// ProcessFolder will process the content in the folder with locking
func (ldr *Loader) ProcessFolder() (err error) {
if err = ldr.lockFolder(); err != nil {
return
}
defer ldr.unlockFolder()
for ldrType := range ldr.rdrs {
if err = ldr.processFiles(ldrType); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> loaderType: <%s>, err: %s",
utils.LoaderS, ldrType, err.Error()))
}
}
return
}
// lockFolder will attempt to lock the folder by creating the lock file
@@ -58,27 +79,6 @@ func (ldr *Loader) unlockFolder() (err error) {
ldr.lockFilename))
}
// ProcessFolder will process the content in the folder with locking
func (ldr *Loader) ProcessFolder() (err error) {
if err = ldr.lockFolder(); err != nil {
return
}
defer ldr.unlockFolder()
for loaderType := range ldr.rdrs {
switch loaderType {
case utils.MetaAttributes:
if err = ldr.processAttributes(); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> loaderType: <%s>, err: %s",
utils.LoaderS, loaderType, err.Error()))
}
default:
utils.Logger.Warning(fmt.Sprintf("<%s> unsupported loaderType: <%s>",
utils.LoaderS, loaderType))
}
}
return
}
// unreferenceFile will cleanup an used file by closing and removing from referece map
func (ldr *Loader) unreferenceFile(loaderType, fileName string) (err error) {
openedCSVFile := ldr.rdrs[loaderType][fileName]
@@ -86,18 +86,96 @@ func (ldr *Loader) unreferenceFile(loaderType, fileName string) (err error) {
return openedCSVFile.fd.Close()
}
// processAttributes contains the procedure for loading Attributes
func (ldr *Loader) processAttributes() (err error) {
// open files as csv readers
for fName := range ldr.rdrs[utils.MetaAttributes] {
func (ldr *Loader) storeLoadedData(loaderType string,
lds map[string][]LoaderData) (err error) {
switch loaderType {
case utils.MetaAttributes:
for _, lDataSet := range lds {
attrModels := make(engine.TPAttributes, len(lDataSet))
for i, ld := range lDataSet {
attrModels[i] = new(engine.TPAttribute)
if err = utils.UpdateStructWithIfaceMap(attrModels[i], ld); err != nil {
return
}
}
for _, tpApf := range attrModels.AsTPAttributes() {
if apf, err := engine.APItoAttributeProfile(tpApf, ldr.timezone); err != nil {
return err
} else if err := ldr.dm.SetAttributeProfile(apf, true); err != nil {
return err
}
}
}
}
return
}
func (ldr *Loader) processFiles(loaderType string) (err error) {
for fName := range ldr.rdrs[loaderType] {
var fd *os.File
if fd, err = os.Open(path.Join(ldr.tpInDir, fName)); err != nil {
return err
}
ldr.rdrs[utils.MetaAttributes][fName] = &openedCSVFile{
ldr.rdrs[loaderType][fName] = &openedCSVFile{
fileName: fName, fd: fd, csvRdr: csv.NewReader(fd)}
defer ldr.unreferenceFile(utils.MetaAttributes, fName)
defer ldr.unreferenceFile(loaderType, fName)
}
// start processing lines
keepLooping := true // controls looping
lineNr := 0
for keepLooping {
lineNr += 1
var hasErrors bool
lData := make(LoaderData) // one row
for fName, rdr := range ldr.rdrs[loaderType] {
var record []string
if record, err = rdr.csvRdr.Read(); err != nil {
if err == io.EOF {
keepLooping = false
break
}
hasErrors = true
utils.Logger.Warning(
fmt.Sprintf("<%s> <%s> reading line: %d, error: %s",
utils.LoaderS, ldr.ldrID, lineNr, err.Error()))
}
if hasErrors { // if any of the readers will give errors, we ignore the line
continue
}
if err := lData.UpdateFromCSV(fName, record,
ldr.dataTpls[utils.MetaAttributes]); err != nil {
fmt.Sprintf("<%s> <%s> line: %d, error: %s",
utils.LoaderS, ldr.ldrID, lineNr, err.Error())
hasErrors = true
continue
}
// Record from map
// update dataDB
}
tntID := lData.TenantID()
if _, has := ldr.bufLoaderData[tntID]; !has &&
len(ldr.bufLoaderData) == 1 { // process previous records before going futher
var prevTntID string
for prevTntID = range ldr.bufLoaderData {
break // have stolen the existing key in buffer
}
if err = ldr.storeLoadedData(loaderType,
map[string][]LoaderData{prevTntID: ldr.bufLoaderData[prevTntID]}); err != nil {
return
}
delete(ldr.bufLoaderData, prevTntID)
}
ldr.bufLoaderData[tntID] = append(ldr.bufLoaderData[tntID], lData)
}
// proceed with last element in bufLoaderData
var tntID string
for tntID = range ldr.bufLoaderData {
break // get the first tenantID
}
if err = ldr.storeLoadedData(loaderType,
map[string][]LoaderData{tntID: ldr.bufLoaderData[tntID]}); err != nil {
return
}
delete(ldr.bufLoaderData, tntID)
return
}

View File

@@ -175,6 +175,22 @@ func IfaceAsDuration(itm interface{}) (d time.Duration, err error) {
return
}
func IfaceAsInt64(itm interface{}) (i int64, err error) {
switch itm.(type) {
case int:
return int64(itm.(int)), nil
case time.Duration:
return itm.(time.Duration).Nanoseconds(), nil
case int64:
return itm.(int64), nil
case string:
return strconv.ParseInt(itm.(string), 10, 64)
default:
err = fmt.Errorf("cannot convert field: %+v to int", itm)
}
return
}
func IfaceAsFloat64(itm interface{}) (f float64, err error) {
switch itm.(type) {
case float64:
@@ -191,6 +207,24 @@ func IfaceAsFloat64(itm interface{}) (f float64, err error) {
return
}
func IfaceAsBool(itm interface{}) (b bool, err error) {
switch itm.(type) {
case bool:
return itm.(bool), nil
case string:
return strconv.ParseBool(itm.(string))
case int:
return itm.(int) > 0, nil
case int64:
return itm.(int64) > 0, nil
case float64:
return itm.(float64) > 0, nil
default:
err = fmt.Errorf("cannot convert field: %+v to bool", itm)
}
return
}
// AsMapStringIface converts an item (mostly struct) as map[string]interface{}
func AsMapStringIface(item interface{}) (map[string]interface{}, error) {
out := make(map[string]interface{})

View File

@@ -251,3 +251,43 @@ func UpdateStructWithStrMap(s interface{}, m map[string]string) []string {
}
return notMatched
}
// UpdateStructWithIfaceMap will update struct fields with values coming from map
// if map values are not matching the ones in strcut convertion is being attempted
// ToDo: add here more fields
func UpdateStructWithIfaceMap(s interface{}, mp map[string]interface{}) (err error) {
for key, val := range mp {
fld := reflect.ValueOf(s).Elem().FieldByName(key)
if fld.IsValid() {
switch fld.Kind() {
case reflect.Bool:
if valBool, err := IfaceAsBool(val); err != nil {
return err
} else {
fld.SetBool(valBool)
}
case reflect.Int, reflect.Int64:
if valInt, err := IfaceAsInt64(val); err != nil {
return err
} else {
fld.SetInt(valInt)
}
case reflect.Float64:
if valFlt, err := IfaceAsFloat64(val); err != nil {
return err
} else {
fld.SetFloat(valFlt)
}
case reflect.String:
if valStr, canCast := CastFieldIfToString(val); !canCast {
return fmt.Errorf("cannot convert field: %+v to string", val)
} else {
fld.SetString(valStr)
}
default: // improper use of function
return fmt.Errorf("cannot update unsupported struct field: %+v", fld)
}
}
}
return
}

View File

@@ -168,3 +168,45 @@ func TestStructFromMapStringInterfaceValue(t *testing.T) {
t.Errorf("error converting structure value: %s", ToIJSON(rt))
}
}
func TestUpdateStructWithIfaceMap(t *testing.T) {
type myStruct struct {
String string
Bool bool
Float float64
Int int64
}
s := new(myStruct)
mp := map[string]interface{}{
"String": "s",
"Bool": true,
"Float": 6.4,
"Int": 2,
}
eStruct := &myStruct{
String: "s",
Bool: true,
Float: 6.4,
Int: 2,
}
if err := UpdateStructWithIfaceMap(s, mp); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eStruct, s) {
t.Errorf("expecting: %+v, received: %+v", eStruct, s)
}
mp = map[string]interface{}{
"String": "aaa",
"Bool": false,
}
eStruct = &myStruct{
String: "aaa",
Bool: false,
Float: 6.4,
Int: 2,
}
if err := UpdateStructWithIfaceMap(s, mp); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eStruct, s) {
t.Errorf("expecting: %+v, received: %+v", eStruct, s)
}
}