diff --git a/config/config_defaults.go b/config/config_defaults.go index 27cf001bf..d116a2e6f 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -466,7 +466,6 @@ const CGRATES_CFG_JSON = ` "max_open_files": 1024, // maximum simultaneous files to process, 0 for unlimited "tp_in_dir": "/var/spool/cgrates/tploader/in", // absolute path towards the directory where the CDRs are stored "tp_out_dir": "/var/spool/cgrates/tploader/out", // absolute path towards the directory where processed CDRs will be moved - "data":[ // data profiles to load { "type": "*attributes", // data source type diff --git a/loaders/libloader.go b/loaders/libloader.go index 36f1e3741..4ebe88f9f 100644 --- a/loaders/libloader.go +++ b/loaders/libloader.go @@ -29,6 +29,12 @@ import ( type LoaderData map[string]interface{} +func (ld LoaderData) TenantID() string { + tnt := ld[utils.Tenant].(string) + prflID := ld[utils.ID].(string) + return utils.ConcatenatedKey(tnt, prflID) +} + // UpdateFromCSV will update LoaderData with data received from fileName, // contained in record and processed with cfgTpl func (ld LoaderData) UpdateFromCSV(fileName string, record []string, diff --git a/loaders/loader.go b/loaders/loader.go index a4ba7b8ca..da3761790 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -21,10 +21,12 @@ package loaders import ( "encoding/csv" "fmt" + "io" "os" "path" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -36,14 +38,33 @@ type openedCSVFile struct { // Loader is one instance loading from a folder type Loader struct { - tpInDir string - tpOutDir string - lockFilename string - cacheSConns []*config.HaPoolConfig - fieldSep string - dataTpls []*config.LoaderSDataType - rdrs map[string]map[string]*openedCSVFile // map[loaderType]map[fileName]*openedCSVFile for common incremental read - procRows int // keep here the last processed row in the file/-s + ldrID string + tpInDir string + tpOutDir string + lockFilename string + cacheSConns []*config.HaPoolConfig + fieldSep string + dataTpls map[string][]*config.CfgCdrField // map[loaderType]*config.CfgCdrField + rdrs map[string]map[string]*openedCSVFile // map[loaderType]map[fileName]*openedCSVFile for common incremental read + procRows int // keep here the last processed row in the file/-s + bufLoaderData map[string][]LoaderData // cache of data read, indexed on tenantID + dm *engine.DataManager + timezone string +} + +// ProcessFolder will process the content in the folder with locking +func (ldr *Loader) ProcessFolder() (err error) { + if err = ldr.lockFolder(); err != nil { + return + } + defer ldr.unlockFolder() + for ldrType := range ldr.rdrs { + if err = ldr.processFiles(ldrType); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> loaderType: <%s>, err: %s", + utils.LoaderS, ldrType, err.Error())) + } + } + return } // lockFolder will attempt to lock the folder by creating the lock file @@ -58,27 +79,6 @@ func (ldr *Loader) unlockFolder() (err error) { ldr.lockFilename)) } -// ProcessFolder will process the content in the folder with locking -func (ldr *Loader) ProcessFolder() (err error) { - if err = ldr.lockFolder(); err != nil { - return - } - defer ldr.unlockFolder() - for loaderType := range ldr.rdrs { - switch loaderType { - case utils.MetaAttributes: - if err = ldr.processAttributes(); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> loaderType: <%s>, err: %s", - utils.LoaderS, loaderType, err.Error())) - } - default: - utils.Logger.Warning(fmt.Sprintf("<%s> unsupported loaderType: <%s>", - utils.LoaderS, loaderType)) - } - } - return -} - // unreferenceFile will cleanup an used file by closing and removing from referece map func (ldr *Loader) unreferenceFile(loaderType, fileName string) (err error) { openedCSVFile := ldr.rdrs[loaderType][fileName] @@ -86,18 +86,96 @@ func (ldr *Loader) unreferenceFile(loaderType, fileName string) (err error) { return openedCSVFile.fd.Close() } -// processAttributes contains the procedure for loading Attributes -func (ldr *Loader) processAttributes() (err error) { - // open files as csv readers - for fName := range ldr.rdrs[utils.MetaAttributes] { +func (ldr *Loader) storeLoadedData(loaderType string, + lds map[string][]LoaderData) (err error) { + switch loaderType { + case utils.MetaAttributes: + for _, lDataSet := range lds { + attrModels := make(engine.TPAttributes, len(lDataSet)) + for i, ld := range lDataSet { + attrModels[i] = new(engine.TPAttribute) + if err = utils.UpdateStructWithIfaceMap(attrModels[i], ld); err != nil { + return + } + } + for _, tpApf := range attrModels.AsTPAttributes() { + if apf, err := engine.APItoAttributeProfile(tpApf, ldr.timezone); err != nil { + return err + } else if err := ldr.dm.SetAttributeProfile(apf, true); err != nil { + return err + } + } + } + } + return +} + +func (ldr *Loader) processFiles(loaderType string) (err error) { + for fName := range ldr.rdrs[loaderType] { var fd *os.File if fd, err = os.Open(path.Join(ldr.tpInDir, fName)); err != nil { return err } - ldr.rdrs[utils.MetaAttributes][fName] = &openedCSVFile{ + ldr.rdrs[loaderType][fName] = &openedCSVFile{ fileName: fName, fd: fd, csvRdr: csv.NewReader(fd)} - defer ldr.unreferenceFile(utils.MetaAttributes, fName) + defer ldr.unreferenceFile(loaderType, fName) } // start processing lines + keepLooping := true // controls looping + lineNr := 0 + for keepLooping { + lineNr += 1 + var hasErrors bool + lData := make(LoaderData) // one row + for fName, rdr := range ldr.rdrs[loaderType] { + var record []string + if record, err = rdr.csvRdr.Read(); err != nil { + if err == io.EOF { + keepLooping = false + break + } + hasErrors = true + utils.Logger.Warning( + fmt.Sprintf("<%s> <%s> reading line: %d, error: %s", + utils.LoaderS, ldr.ldrID, lineNr, err.Error())) + } + if hasErrors { // if any of the readers will give errors, we ignore the line + continue + } + if err := lData.UpdateFromCSV(fName, record, + ldr.dataTpls[utils.MetaAttributes]); err != nil { + fmt.Sprintf("<%s> <%s> line: %d, error: %s", + utils.LoaderS, ldr.ldrID, lineNr, err.Error()) + hasErrors = true + continue + } + // Record from map + // update dataDB + } + tntID := lData.TenantID() + if _, has := ldr.bufLoaderData[tntID]; !has && + len(ldr.bufLoaderData) == 1 { // process previous records before going futher + var prevTntID string + for prevTntID = range ldr.bufLoaderData { + break // have stolen the existing key in buffer + } + if err = ldr.storeLoadedData(loaderType, + map[string][]LoaderData{prevTntID: ldr.bufLoaderData[prevTntID]}); err != nil { + return + } + delete(ldr.bufLoaderData, prevTntID) + } + ldr.bufLoaderData[tntID] = append(ldr.bufLoaderData[tntID], lData) + } + // proceed with last element in bufLoaderData + var tntID string + for tntID = range ldr.bufLoaderData { + break // get the first tenantID + } + if err = ldr.storeLoadedData(loaderType, + map[string][]LoaderData{tntID: ldr.bufLoaderData[tntID]}); err != nil { + return + } + delete(ldr.bufLoaderData, tntID) return } diff --git a/utils/reflect.go b/utils/reflect.go index 3062de354..f0ed0d900 100644 --- a/utils/reflect.go +++ b/utils/reflect.go @@ -175,6 +175,22 @@ func IfaceAsDuration(itm interface{}) (d time.Duration, err error) { return } +func IfaceAsInt64(itm interface{}) (i int64, err error) { + switch itm.(type) { + case int: + return int64(itm.(int)), nil + case time.Duration: + return itm.(time.Duration).Nanoseconds(), nil + case int64: + return itm.(int64), nil + case string: + return strconv.ParseInt(itm.(string), 10, 64) + default: + err = fmt.Errorf("cannot convert field: %+v to int", itm) + } + return +} + func IfaceAsFloat64(itm interface{}) (f float64, err error) { switch itm.(type) { case float64: @@ -191,6 +207,24 @@ func IfaceAsFloat64(itm interface{}) (f float64, err error) { return } +func IfaceAsBool(itm interface{}) (b bool, err error) { + switch itm.(type) { + case bool: + return itm.(bool), nil + case string: + return strconv.ParseBool(itm.(string)) + case int: + return itm.(int) > 0, nil + case int64: + return itm.(int64) > 0, nil + case float64: + return itm.(float64) > 0, nil + default: + err = fmt.Errorf("cannot convert field: %+v to bool", itm) + } + return +} + // AsMapStringIface converts an item (mostly struct) as map[string]interface{} func AsMapStringIface(item interface{}) (map[string]interface{}, error) { out := make(map[string]interface{}) diff --git a/utils/struct.go b/utils/struct.go index 66802d81b..b8e59ae12 100644 --- a/utils/struct.go +++ b/utils/struct.go @@ -251,3 +251,43 @@ func UpdateStructWithStrMap(s interface{}, m map[string]string) []string { } return notMatched } + +// UpdateStructWithIfaceMap will update struct fields with values coming from map +// if map values are not matching the ones in strcut convertion is being attempted +// ToDo: add here more fields +func UpdateStructWithIfaceMap(s interface{}, mp map[string]interface{}) (err error) { + for key, val := range mp { + fld := reflect.ValueOf(s).Elem().FieldByName(key) + if fld.IsValid() { + switch fld.Kind() { + case reflect.Bool: + if valBool, err := IfaceAsBool(val); err != nil { + return err + } else { + fld.SetBool(valBool) + } + case reflect.Int, reflect.Int64: + if valInt, err := IfaceAsInt64(val); err != nil { + return err + } else { + fld.SetInt(valInt) + } + case reflect.Float64: + if valFlt, err := IfaceAsFloat64(val); err != nil { + return err + } else { + fld.SetFloat(valFlt) + } + case reflect.String: + if valStr, canCast := CastFieldIfToString(val); !canCast { + return fmt.Errorf("cannot convert field: %+v to string", val) + } else { + fld.SetString(valStr) + } + default: // improper use of function + return fmt.Errorf("cannot update unsupported struct field: %+v", fld) + } + } + } + return +} diff --git a/utils/struct_test.go b/utils/struct_test.go index 4ff0b4fa8..f09b7776d 100644 --- a/utils/struct_test.go +++ b/utils/struct_test.go @@ -168,3 +168,45 @@ func TestStructFromMapStringInterfaceValue(t *testing.T) { t.Errorf("error converting structure value: %s", ToIJSON(rt)) } } + +func TestUpdateStructWithIfaceMap(t *testing.T) { + type myStruct struct { + String string + Bool bool + Float float64 + Int int64 + } + s := new(myStruct) + mp := map[string]interface{}{ + "String": "s", + "Bool": true, + "Float": 6.4, + "Int": 2, + } + eStruct := &myStruct{ + String: "s", + Bool: true, + Float: 6.4, + Int: 2, + } + if err := UpdateStructWithIfaceMap(s, mp); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eStruct, s) { + t.Errorf("expecting: %+v, received: %+v", eStruct, s) + } + mp = map[string]interface{}{ + "String": "aaa", + "Bool": false, + } + eStruct = &myStruct{ + String: "aaa", + Bool: false, + Float: 6.4, + Int: 2, + } + if err := UpdateStructWithIfaceMap(s, mp); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eStruct, s) { + t.Errorf("expecting: %+v, received: %+v", eStruct, s) + } +}