From 9f28fa0d24d8fd387241d0b1118303066719e3c7 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 25 Jul 2013 20:29:47 +0200 Subject: [PATCH] Partial TPCSVImporter, TPCSVFileParser --- cmd/cgr-loader/cgr-loader.go | 75 +++++------------ data/rates/prepaid1centpsec/Timings.csv | 2 +- engine/loader_csv.go | 30 ++++--- engine/loader_helpers.go | 80 ++++++++++++++---- engine/tpimporter_csv.go | 107 ++++++++++++++++++++++-- utils/consts.go | 11 +++ 6 files changed, 220 insertions(+), 85 deletions(-) diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index fff828964..ceeb5a991 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -26,7 +26,6 @@ import ( "github.com/cgrates/cgrates/config" "log" "path" - "regexp" ) var ( @@ -50,18 +49,12 @@ var ( tpid = flag.String("tpid", "", "The tariff plan id from the database") dataPath = flag.String("path", ".", "The path containing the data files") version = flag.Bool("version", false, "Prints the application version.") + verbose = flag.Bool("verbose", false, "Enable detailed verbose logging output") fromStorDb = flag.Bool("from_stordb", false, "Load the tariff plan from storDb to dataDb") toStorDb = flag.Bool("to_stordb", false, "Import the tariff plan from files to storDb") - sep rune ) -type validator struct { - fn string - re *regexp.Regexp - message string -} - func main() { flag.Parse() if *version { @@ -72,64 +65,42 @@ func main() { var dataDb, storDb engine.DataStorage // Init necessary db connections if *fromStorDb { - dataDb, errDataDb = engine.ConfigureDatabase(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass) - storDb, errStorDb = engine.ConfigureDatabase(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass) + dataDb, errDataDb = engine.ConfigureDatabase(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass) + storDb, errStorDb = engine.ConfigureDatabase(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass) } else if *toStorDb { // Import from csv files to storDb - storDb, errStorDb = engine.ConfigureDatabase(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass) + storDb, errStorDb = engine.ConfigureDatabase(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass) } else { // Default load from csv files to dataDb - dataDb, errDataDb = engine.ConfigureDatabase(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass) + dataDb, errDataDb = engine.ConfigureDatabase(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass) } - defer dataDb.Close() - defer storDb.Close() + // Defer databases opened to be closed when we are done + for _,db := range []engine.DataStorage{ dataDb, storDb } { + if db != nil { defer db.Close() } + } + // Stop on db errors for _,err = range []error{errDataDb, errStorDb} { if err != nil { log.Fatalf("Could not open database connection: %v", err) } } - var loader engine.TPLoader - if *fromStorDb { + if *fromStorDb { // Load Tariff Plan from storDb into dataDb loader = engine.NewDbReader(storDb, dataDb, *tpid) - } else { // Default load from csv files to dataDb - dataFilesValidators := []*validator{ - &validator{utils.DESTINATIONS_CSV, - regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\d+.?\d*){1}$`), - "Tag[0-9A-Za-z_],Prefix[0-9]"}, - &validator{utils.TIMINGS_CSV, - regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\*all\s*,\s*|(?:\d{1,4};?)+\s*,\s*|\s*,\s*){4}(?:\d{2}:\d{2}:\d{2}|\*asap){1}$`), - "Tag[0-9A-Za-z_],Years[0-9;]|*all|,Months[0-9;]|*all|,MonthDays[0-9;]|*all|,WeekDays[0-9;]|*all|,Time[0-9:]|*asap(00:00:00)"}, - &validator{utils.RATES_CSV, - regexp.MustCompile(`(?:\w+\s*,\s*){2}(?:\d+.?\d*,?){4}$`), - "Tag[0-9A-Za-z_],ConnectFee[0-9.],Price[0-9.],PricedUnits[0-9.],RateIncrement[0-9.]"}, - &validator{utils.DESTINATION_RATES_CSV, - regexp.MustCompile(`(?:\w+\s*,\s*){2}(?:\d+.?\d*,?){4}$`), - "Tag[0-9A-Za-z_],DestinationsTag[0-9A-Za-z_],RateTag[0-9A-Za-z_]"}, - &validator{utils.DESTRATE_TIMINGS_CSV, - regexp.MustCompile(`(?:\w+\s*,\s*){3}(?:\d+.?\d*){1}$`), - "Tag[0-9A-Za-z_],DestinationRatesTag[0-9A-Za-z_],TimingProfile[0-9A-Za-z_],Weight[0-9.]"}, - &validator{utils.RATE_PROFILES_CSV, - regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\d+\s*,\s*){1}(?:OUT\s*,\s*|IN\s*,\s*){1}(?:\*all\s*,\s*|[\w:\.]+\s*,\s*){1}(?:\w*\s*,\s*){1}(?:\w+\s*,\s*){1}(?:\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z){1}$`), - "Tenant[0-9A-Za-z_],TOR[0-9],Direction OUT|IN,Subject[0-9A-Za-z_:.]|*all,RatesFallbackSubject[0-9A-Za-z_]|,RatesTimingTag[0-9A-Za-z_],ActivationTime[[0-9T:X]] (2012-01-01T00:00:00Z)"}, - &validator{utils.ACTIONS_CSV, - regexp.MustCompile(`(?:\w+\s*,\s*){3}(?:OUT\s*,\s*|IN\s*,\s*){1}(?:\d+\s*,\s*){1}(?:\w+\s*,\s*|\*all\s*,\s*){1}(?:ABSOLUTE\s*,\s*|PERCENT\s*,\s*|\s*,\s*){1}(?:\d*\.?\d*\s*,?\s*){3}$`), - "Tag[0-9A-Za-z_],Action[0-9A-Za-z_],BalanceTag[0-9A-Za-z_],Direction OUT|IN,Units[0-9],DestinationTag[0-9A-Za-z_]|*all,PriceType ABSOLUT|PERCENT,PriceValue[0-9.],MinutesWeight[0-9.],Weight[0-9.]"}, - &validator{utils.ACTION_TIMINGS_CSV, - regexp.MustCompile(`(?:\w+\s*,\s*){3}(?:\d+\.?\d*){1}`), - "Tag[0-9A-Za-z_],ActionsTag[0-9A-Za-z_],TimingTag[0-9A-Za-z_],Weight[0-9.]"}, - &validator{utils.ACTION_TRIGGERS_CSV, - regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:MONETARY\s*,\s*|SMS\s*,\s*|MINUTES\s*,\s*|INTERNET\s*,\s*|INTERNET_TIME\s*,\s*){1}(?:OUT\s*,\s*|IN\s*,\s*){1}(?:\d+\.?\d*\s*,\s*){1}(?:\w+\s*,\s*|\*all\s*,\s*){1}(?:\w+\s*,\s*){1}(?:\d+\.?\d*){1}$`), - "Tag[0-9A-Za-z_],BalanceTag MONETARY|SMS|MINUTES|INTERNET|INTERNET_TIME,Direction OUT|IN,ThresholdValue[0-9.],DestinationTag[0-9A-Za-z_]|*all,ActionsTag[0-9A-Za-z_],Weight[0-9.]"}, - &validator{utils.ACCOUNT_ACTIONS_CSV, - regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:[\w:.]+\s*,\s*){1}(?:OUT\s*,\s*|IN\s*,\s*){1}(?:\w+\s*,?\s*){2}$`), - "Tenant[0-9A-Za-z_],Account[0-9A-Za-z_:.],Direction OUT|IN,ActionTimingsTag[0-9A-Za-z_],ActionTriggersTag[0-9A-Za-z_]"}, + } else if *toStorDb { // Import files from a directory into storDb + if *tpid == "" { + log.Fatal("TPid required, please define it via -tpid command argument.") } - for _, v := range dataFilesValidators { - err := engine.ValidateCSVData(path.Join(*dataPath, v.fn), v.re) + csvImporter := engine.TPCSVImporter{ *tpid, storDb, *dataPath, ',', *verbose } + if errImport := csvImporter.Run(); errImport != nil { + log.Fatal(errImport) + } + return + } else { // Default load from csv files to dataDb + for fn, v := range engine.FileValidators { + err := engine.ValidateCSVData(path.Join(*dataPath, fn), v.Rule) if err != nil { - log.Fatal(err, "\n\t", v.message) + log.Fatal(err, "\n\t", v.Message) } } - //sep = []rune(*separator)[0] loader = engine.NewFileCSVReader(dataDb, ',', utils.DESTINATIONS_CSV, utils.TIMINGS_CSV, utils.RATES_CSV, utils.DESTINATION_RATES_CSV, utils.DESTRATE_TIMINGS_CSV, utils.RATE_PROFILES_CSV, utils.ACTIONS_CSV, utils.ACTION_TIMINGS_CSV, utils.ACTION_TRIGGERS_CSV, utils.ACCOUNT_ACTIONS_CSV) } diff --git a/data/rates/prepaid1centpsec/Timings.csv b/data/rates/prepaid1centpsec/Timings.csv index 95475f4dc..272ee70ca 100644 --- a/data/rates/prepaid1centpsec/Timings.csv +++ b/data/rates/prepaid1centpsec/Timings.csv @@ -1,3 +1,3 @@ -Tag,Years,Months,MonthDays,WeekDays,Time +#Tag,Years,Months,MonthDays,WeekDays,Time ALWAYS,*all,*all,*all,*all,00:00:00 ONE_TIME_RUN,,,,,*asap diff --git a/engine/loader_csv.go b/engine/loader_csv.go index 0ef392239..63063e005 100644 --- a/engine/loader_csv.go +++ b/engine/loader_csv.go @@ -33,7 +33,7 @@ import ( type CSVReader struct { sep rune storage DataStorage - readerFunc func(string, rune) (*csv.Reader, *os.File, error) + readerFunc func(string, rune, int) (*csv.Reader, *os.File, error) actions map[string][]*Action actionsTimings map[string][]*ActionTiming actionsTriggers map[string][]*ActionTrigger @@ -74,20 +74,24 @@ func NewStringCSVReader(storage DataStorage, sep rune, destinationsFn, timingsFn return c } -func openFileCSVReader(fn string, comma rune) (csvReader *csv.Reader, fp *os.File, err error) { +func openFileCSVReader(fn string, comma rune, nrFields int) (csvReader *csv.Reader, fp *os.File, err error) { fp, err = os.Open(fn) if err != nil { return } csvReader = csv.NewReader(fp) csvReader.Comma = comma + csvReader.Comment = utils.COMMENT_CHAR + csvReader.FieldsPerRecord = nrFields csvReader.TrailingComma = true return } -func openStringCSVReader(data string, comma rune) (csvReader *csv.Reader, fp *os.File, err error) { +func openStringCSVReader(data string, comma rune, nrFields int) (csvReader *csv.Reader, fp *os.File, err error) { csvReader = csv.NewReader(strings.NewReader(data)) csvReader.Comma = comma + csvReader.Comment = utils.COMMENT_CHAR + csvReader.FieldsPerRecord = nrFields csvReader.TrailingComma = true return } @@ -164,7 +168,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) { } func (csvr *CSVReader) LoadDestinations() (err error) { - csvReader, fp, err := csvr.readerFunc(csvr.destinationsFn, csvr.sep) + csvReader, fp, err := csvr.readerFunc(csvr.destinationsFn, csvr.sep, utils.DESTINATION_RATES_NRCOLS) if err != nil { log.Print("Could not load destinations file: ", err) // allow writing of the other values @@ -197,7 +201,7 @@ func (csvr *CSVReader) LoadDestinations() (err error) { } func (csvr *CSVReader) LoadTimings() (err error) { - csvReader, fp, err := csvr.readerFunc(csvr.timingsFn, csvr.sep) + csvReader, fp, err := csvr.readerFunc(csvr.timingsFn, csvr.sep, utils.TIMINGS_NRCOLS) if err != nil { log.Print("Could not load timings file: ", err) // allow writing of the other values @@ -219,7 +223,7 @@ func (csvr *CSVReader) LoadTimings() (err error) { } func (csvr *CSVReader) LoadRates() (err error) { - csvReader, fp, err := csvr.readerFunc(csvr.ratesFn, csvr.sep) + csvReader, fp, err := csvr.readerFunc(csvr.ratesFn, csvr.sep, utils.RATES_NRCOLS) if err != nil { log.Print("Could not load rates file: ", err) // allow writing of the other values @@ -245,7 +249,7 @@ func (csvr *CSVReader) LoadRates() (err error) { } func (csvr *CSVReader) LoadDestinationRates() (err error) { - csvReader, fp, err := csvr.readerFunc(csvr.destinationratesFn, csvr.sep) + csvReader, fp, err := csvr.readerFunc(csvr.destinationratesFn, csvr.sep, utils.DESTINATION_RATES_NRCOLS) if err != nil { log.Print("Could not load rates file: ", err) // allow writing of the other values @@ -276,7 +280,7 @@ func (csvr *CSVReader) LoadDestinationRates() (err error) { } func (csvr *CSVReader) LoadDestinationRateTimings() (err error) { - csvReader, fp, err := csvr.readerFunc(csvr.destinationratetimingsFn, csvr.sep) + csvReader, fp, err := csvr.readerFunc(csvr.destinationratetimingsFn, csvr.sep, utils.DESTRATE_TIMINGS_NRCOLS) if err != nil { log.Print("Could not load rate timings file: ", err) // allow writing of the other values @@ -313,7 +317,7 @@ func (csvr *CSVReader) LoadDestinationRateTimings() (err error) { } func (csvr *CSVReader) LoadRatingProfiles() (err error) { - csvReader, fp, err := csvr.readerFunc(csvr.ratingprofilesFn, csvr.sep) + csvReader, fp, err := csvr.readerFunc(csvr.ratingprofilesFn, csvr.sep, utils.RATE_PROFILES_NRCOLS) if err != nil { log.Print("Could not load rating profiles file: ", err) // allow writing of the other values @@ -360,7 +364,7 @@ func (csvr *CSVReader) LoadRatingProfiles() (err error) { } func (csvr *CSVReader) LoadActions() (err error) { - csvReader, fp, err := csvr.readerFunc(csvr.actionsFn, csvr.sep) + csvReader, fp, err := csvr.readerFunc(csvr.actionsFn, csvr.sep, utils.ACTIONS_NRCOLS) if err != nil { log.Print("Could not load action triggers file: ", err) // allow writing of the other values @@ -429,7 +433,7 @@ func (csvr *CSVReader) LoadActions() (err error) { } func (csvr *CSVReader) LoadActionTimings() (err error) { - csvReader, fp, err := csvr.readerFunc(csvr.actiontimingsFn, csvr.sep) + csvReader, fp, err := csvr.readerFunc(csvr.actiontimingsFn, csvr.sep, utils.ACTION_TIMINGS_NRCOLS) if err != nil { log.Print("Could not load action triggers file: ", err) // allow writing of the other values @@ -474,7 +478,7 @@ func (csvr *CSVReader) LoadActionTimings() (err error) { } func (csvr *CSVReader) LoadActionTriggers() (err error) { - csvReader, fp, err := csvr.readerFunc(csvr.actiontriggersFn, csvr.sep) + csvReader, fp, err := csvr.readerFunc(csvr.actiontriggersFn, csvr.sep, utils.ACTION_TRIGGERS_NRCOLS) if err != nil { log.Print("Could not load action triggers file: ", err) // allow writing of the other values @@ -513,7 +517,7 @@ func (csvr *CSVReader) LoadActionTriggers() (err error) { } func (csvr *CSVReader) LoadAccountActions() (err error) { - csvReader, fp, err := csvr.readerFunc(csvr.accountactionsFn, csvr.sep) + csvReader, fp, err := csvr.readerFunc(csvr.accountactionsFn, csvr.sep, utils.ACCOUNT_ACTIONS_NRCOLS) if err != nil { log.Print("Could not load account actions file: ", err) // allow writing of the other values diff --git a/engine/loader_helpers.go b/engine/loader_helpers.go index 5ee6bb3ee..72fe59160 100644 --- a/engine/loader_helpers.go +++ b/engine/loader_helpers.go @@ -20,12 +20,14 @@ package engine import ( "bufio" + "path" "errors" "fmt" "log" "os" "regexp" "strconv" + "strings" "github.com/cgrates/cgrates/utils" ) @@ -199,41 +201,91 @@ func ValidateCSVData(fn string, re *regexp.Regexp) (err error) { return } -type TPCSVRowValidator struct { - FileName string // File name +type FileLineRegexValidator struct { + FieldsPerRecord int // Number of fields in one record, useful for crosschecks Rule *regexp.Regexp // Regexp rule - ErrMessage string // Error message + Message string // Pass this message as helper } -var TPCSVRowValidators = []*TPCSVRowValidator{ - &TPCSVRowValidator{utils.DESTINATIONS_CSV, +var FileValidators = map[string]*FileLineRegexValidator{ + utils.DESTINATIONS_CSV: &FileLineRegexValidator{ utils.DESTINATIONS_NRCOLS, regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\d+.?\d*){1}$`), "Tag[0-9A-Za-z_],Prefix[0-9]"}, - &TPCSVRowValidator{utils.TIMINGS_CSV, + utils.TIMINGS_CSV: &FileLineRegexValidator{ utils.TIMINGS_NRCOLS, regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\*all\s*,\s*|(?:\d{1,4};?)+\s*,\s*|\s*,\s*){4}(?:\d{2}:\d{2}:\d{2}|\*asap){1}$`), "Tag[0-9A-Za-z_],Years[0-9;]|*all|,Months[0-9;]|*all|,MonthDays[0-9;]|*all|,WeekDays[0-9;]|*all|,Time[0-9:]|*asap(00:00:00)"}, - &TPCSVRowValidator{utils.RATES_CSV, + utils.RATES_CSV: &FileLineRegexValidator{ utils.RATES_NRCOLS, regexp.MustCompile(`(?:\w+\s*,\s*){2}(?:\d+.?\d*,?){4}$`), "Tag[0-9A-Za-z_],ConnectFee[0-9.],Price[0-9.],PricedUnits[0-9.],RateIncrement[0-9.]"}, - &TPCSVRowValidator{utils.DESTINATION_RATES_CSV, + utils.DESTINATION_RATES_CSV: &FileLineRegexValidator{ utils.DESTINATION_RATES_NRCOLS, regexp.MustCompile(`(?:\w+\s*,\s*){2}(?:\d+.?\d*,?){4}$`), "Tag[0-9A-Za-z_],DestinationsTag[0-9A-Za-z_],RateTag[0-9A-Za-z_]"}, - &TPCSVRowValidator{utils.DESTRATE_TIMINGS_CSV, + utils.DESTRATE_TIMINGS_CSV: &FileLineRegexValidator{ utils.DESTRATE_TIMINGS_NRCOLS, regexp.MustCompile(`(?:\w+\s*,\s*){3}(?:\d+.?\d*){1}$`), "Tag[0-9A-Za-z_],DestinationRatesTag[0-9A-Za-z_],TimingProfile[0-9A-Za-z_],Weight[0-9.]"}, - &TPCSVRowValidator{utils.RATE_PROFILES_CSV, + utils.RATE_PROFILES_CSV: &FileLineRegexValidator{ utils.RATE_PROFILES_NRCOLS, regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:\d+\s*,\s*){1}(?:OUT\s*,\s*|IN\s*,\s*){1}(?:\*all\s*,\s*|[\w:\.]+\s*,\s*){1}(?:\w*\s*,\s*){1}(?:\w+\s*,\s*){1}(?:\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z){1}$`), "Tenant[0-9A-Za-z_],TOR[0-9],Direction OUT|IN,Subject[0-9A-Za-z_:.]|*all,RatesFallbackSubject[0-9A-Za-z_]|,RatesTimingTag[0-9A-Za-z_],ActivationTime[[0-9T:X]] (2012-01-01T00:00:00Z)"}, - &TPCSVRowValidator{utils.ACTIONS_CSV, + utils.ACTIONS_CSV: &FileLineRegexValidator{ utils.ACTIONS_NRCOLS, regexp.MustCompile(`(?:\w+\s*,\s*){3}(?:OUT\s*,\s*|IN\s*,\s*){1}(?:\d+\s*,\s*){1}(?:\w+\s*,\s*|\*all\s*,\s*){1}(?:ABSOLUTE\s*,\s*|PERCENT\s*,\s*|\s*,\s*){1}(?:\d*\.?\d*\s*,?\s*){3}$`), "Tag[0-9A-Za-z_],Action[0-9A-Za-z_],BalanceTag[0-9A-Za-z_],Direction OUT|IN,Units[0-9],DestinationTag[0-9A-Za-z_]|*all,PriceType ABSOLUT|PERCENT,PriceValue[0-9.],MinutesWeight[0-9.],Weight[0-9.]"}, - &TPCSVRowValidator{utils.ACTION_TIMINGS_CSV, + utils.ACTION_TIMINGS_CSV: &FileLineRegexValidator{ utils.ACTION_TIMINGS_NRCOLS, regexp.MustCompile(`(?:\w+\s*,\s*){3}(?:\d+\.?\d*){1}`), "Tag[0-9A-Za-z_],ActionsTag[0-9A-Za-z_],TimingTag[0-9A-Za-z_],Weight[0-9.]"}, - &TPCSVRowValidator{utils.ACTION_TRIGGERS_CSV, + utils.ACTION_TRIGGERS_CSV: &FileLineRegexValidator{ utils.ACTION_TRIGGERS_NRCOLS, regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:MONETARY\s*,\s*|SMS\s*,\s*|MINUTES\s*,\s*|INTERNET\s*,\s*|INTERNET_TIME\s*,\s*){1}(?:OUT\s*,\s*|IN\s*,\s*){1}(?:\d+\.?\d*\s*,\s*){1}(?:\w+\s*,\s*|\*all\s*,\s*){1}(?:\w+\s*,\s*){1}(?:\d+\.?\d*){1}$`), "Tag[0-9A-Za-z_],BalanceTag MONETARY|SMS|MINUTES|INTERNET|INTERNET_TIME,Direction OUT|IN,ThresholdValue[0-9.],DestinationTag[0-9A-Za-z_]|*all,ActionsTag[0-9A-Za-z_],Weight[0-9.]"}, - &TPCSVRowValidator{utils.ACCOUNT_ACTIONS_CSV, + utils.ACCOUNT_ACTIONS_CSV: &FileLineRegexValidator{ utils.ACCOUNT_ACTIONS_NRCOLS, regexp.MustCompile(`(?:\w+\s*,\s*){1}(?:[\w:.]+\s*,\s*){1}(?:OUT\s*,\s*|IN\s*,\s*){1}(?:\w+\s*,?\s*){2}$`), "Tenant[0-9A-Za-z_],Account[0-9A-Za-z_:.],Direction OUT|IN,ActionTimingsTag[0-9A-Za-z_],ActionTriggersTag[0-9A-Za-z_]"}, } + + + +func NewTPCSVFileParser(dirPath, fileName string) (*TPCSVFileParser, error) { + validator, hasValidator := FileValidators[fileName] + if !hasValidator { + return nil, fmt.Errorf("No validator found for file <%s>", fileName) + } + // Open the file here + fin, err := os.Open( path.Join(dirPath, fileName) ) + if err != nil { + return nil, err + } + //defer fin.Close() + reader := bufio.NewReader(fin) + return &TPCSVFileParser{validator, reader}, nil +} + +// Opens the connection to a file and returns the parsed lines one by one when ParseLine() is called +type TPCSVFileParser struct { + validator *FileLineRegexValidator // Row validator + reader *bufio.Reader // Reader to the file we are interested in +} + +func (self *TPCSVFileParser) ParseNextLine() ( []string, error ) { + line, truncated, err := self.reader.ReadLine() + if err != nil { + return nil, err + } else if truncated { + return nil, errors.New("Line too long.") + } + // skip commented lines + if strings.HasPrefix(string(line), string(utils.COMMENT_CHAR)) { + return nil, errors.New("Line starts with comment character.") + } + // Validate here string line + if !self.validator.Rule.Match(line) { + return nil, fmt.Errorf("Invalid line, <%s>", self.validator.Message) + } + // Open csv reader directly on string line + csvReader, _, err := openStringCSVReader( string(line), ',', self.validator.FieldsPerRecord ) + if err != nil { + return nil, err + } + record, err := csvReader.Read() // if no errors, record should be good to go having right format and length + if err != nil { + return nil, err + } + return record, nil +} diff --git a/engine/tpimporter_csv.go b/engine/tpimporter_csv.go index b57d49489..85872e594 100644 --- a/engine/tpimporter_csv.go +++ b/engine/tpimporter_csv.go @@ -18,13 +18,110 @@ along with this program. If not, see package engine -// Import tariff plan from csv into storDb +import ( + "io" + "io/ioutil" + "log" + "github.com/cgrates/cgrates/utils" +) -type TPImporterCSV struct { - sep rune - storDb DataStorage + +// Import tariff plan from csv into storDb +type TPCSVImporter struct { + TPid string // Load data on this tpid + StorDb DataStorage // StorDb connection handle + DirPath string // Directory path to import from + Sep rune // Separator in the csv file + Verbose bool // If true will print a detailed information instead of silently discarding it } -func (self *TPImporterCSV) ProcessFolder (fPath string) (err error) { +func (self *TPCSVImporter) Run() error { + + // Maps csv file to handler which should process it + fileHandlers := map[string]func(string)error{ + utils.TIMINGS_CSV: self.importTimings, + utils.DESTINATIONS_CSV: self.importDestinations, + utils.RATES_CSV: self.importRates, + utils.DESTINATION_RATES_CSV: self.importDestinationRates, + utils.DESTRATE_TIMINGS_CSV: self.importDestRateTimings, + utils.RATE_PROFILES_CSV: self.importRatingProfiles, + utils.ACTIONS_CSV: self.importActions, + utils.ACTION_TIMINGS_CSV: self.importActionTimings, + utils.ACTION_TRIGGERS_CSV: self.importActionTriggers, + utils.ACCOUNT_ACTIONS_CSV: self.importAccountActions, + } + + files, _ := ioutil.ReadDir(self.DirPath) + for _, f := range files { + fHandler,hasName := fileHandlers[f.Name()] + if !hasName { + continue + } + fHandler( f.Name() ) + } return nil } + +// Handler importing timings from file, saved row by row to storDb +func (self *TPCSVImporter) importTimings(fn string) error { + fParser, err := NewTPCSVFileParser( self.DirPath, fn ) + if err!=nil { + return err + } + lineNr := 0 + for { + lineNr++ + record, err := fParser.ParseNextLine() + if err == io.EOF { // Reached end of file + break + } else if err != nil { + if self.Verbose { + log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error()) + } + continue + } + tm := NewTiming( record... ) + if err := self.StorDb.SetTPTiming(self.TPid, tm); err != nil { + log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error()) + } + } + return nil +} + +func (self *TPCSVImporter) importDestinations(fPath string) error { + return nil +} + +func (self *TPCSVImporter) importRates(fPath string) error { + return nil +} + +func (self *TPCSVImporter) importDestinationRates(fPath string) error { + return nil +} + +func (self *TPCSVImporter) importDestRateTimings(fPath string) error { + return nil +} + +func (self *TPCSVImporter) importRatingProfiles(fPath string) error { + return nil +} + +func (self *TPCSVImporter) importActions(fPath string) error { + return nil +} + +func (self *TPCSVImporter) importActionTimings(fPath string) error { + return nil +} + +func (self *TPCSVImporter) importActionTriggers(fPath string) error { + return nil +} + +func (self *TPCSVImporter) importAccountActions(fPath string) error { + return nil +} + + diff --git a/utils/consts.go b/utils/consts.go index 00dc4a53b..103805c34 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -39,7 +39,18 @@ const ( ACTION_TIMINGS_CSV = "ActionTimings.csv" ACTION_TRIGGERS_CSV = "ActionTriggers.csv" ACCOUNT_ACTIONS_CSV = "AccountActions.csv" + TIMINGS_NRCOLS = 6 + DESTINATIONS_NRCOLS = 2 + RATES_NRCOLS = 9 + DESTINATION_RATES_NRCOLS = 3 + DESTRATE_TIMINGS_NRCOLS = 4 + RATE_PROFILES_NRCOLS = 7 + ACTIONS_NRCOLS = 11 + ACTION_TIMINGS_NRCOLS = 4 + ACTION_TRIGGERS_NRCOLS = 8 + ACCOUNT_ACTIONS_NRCOLS = 5 ROUNDING_UP = "up" ROUNDING_MIDDLE = "middle" ROUNDING_DOWN = "down" + COMMENT_CHAR = '#' )