TPCSVImporter - DestinationRates implementation

This commit is contained in:
DanB
2013-07-29 14:18:50 +02:00
parent 1a1403ace1
commit f1d94aebaf
7 changed files with 48 additions and 13 deletions

View File

@@ -72,7 +72,7 @@ type DataStorage interface {
GetTPRate(string, string) (*utils.TPRate, error)
GetTPRateIds(string) ([]string, error)
ExistsTPDestinationRate(string, string) (bool, error)
SetTPDestinationRate(*utils.TPDestinationRate) error
SetTPDestinationRates(string, map[string][]*DestinationRate) error
GetTPDestinationRate(string, string) (*utils.TPDestinationRate, error)
GetTPDestinationRateIds(string) ([]string, error)
ExistsTPDestRateTiming(string, string) (bool, error)

View File

@@ -129,7 +129,7 @@ func (ms *MapStorage) ExistsTPDestinationRate(tpid, drId string) (bool, error) {
return false, errors.New(utils.ERR_NOT_IMPLEMENTED)
}
func (ms *MapStorage) SetTPDestinationRate(dr *utils.TPDestinationRate) error {
func (ms *MapStorage) SetTPDestinationRates(tpid string, drs map[string][]*DestinationRate) error {
return errors.New(utils.ERR_NOT_IMPLEMENTED)
}

View File

@@ -204,7 +204,7 @@ func (ms *MongoStorage) ExistsTPDestinationRate(tpid, drId string) (bool, error)
return false, errors.New(utils.ERR_NOT_IMPLEMENTED)
}
func (ms *MongoStorage) SetTPDestinationRate(dr *utils.TPDestinationRate) error {
func (ms *MongoStorage) SetTPDestinationRates(tpid string, drs map[string][]*DestinationRate) error {
return errors.New(utils.ERR_NOT_IMPLEMENTED)
}

View File

@@ -159,7 +159,7 @@ func (rs *RedisStorage) ExistsTPDestinationRate(tpid, drId string) (bool, error)
return false, errors.New(utils.ERR_NOT_IMPLEMENTED)
}
func (rs *RedisStorage) SetTPDestinationRate(dr *utils.TPDestinationRate) error {
func (rs *RedisStorage) SetTPDestinationRates(tpid string, drs map[string][]*DestinationRate) error {
return errors.New(utils.ERR_NOT_IMPLEMENTED)
}

View File

@@ -289,17 +289,19 @@ func (self *SQLStorage) ExistsTPDestinationRate(tpid, drId string) (bool, error)
return exists, nil
}
func (self *SQLStorage) SetTPDestinationRate(dr *utils.TPDestinationRate) error {
if len(dr.DestinationRates) == 0 {
func (self *SQLStorage) SetTPDestinationRates(tpid string, drs map[string][]*DestinationRate) error {
if len(drs) == 0 {
return nil //Nothing to set
}
// Using multiple values in query to spare some network processing time
qry := fmt.Sprintf("INSERT INTO %s (tpid, tag, destinations_tag, rates_tag) VALUES ", utils.TBL_TP_DESTINATION_RATES)
for idx, drPair := range dr.DestinationRates {
if idx != 0 { //Consecutive values after the first will be prefixed with "," as separator
qry += ","
for drId, drRows := range drs {
for idx, dr := range drRows {
if idx != 0 { //Consecutive values after the first will be prefixed with "," as separator
qry += ","
}
qry += fmt.Sprintf("('%s','%s','%s','%s')",
tpid, drId, dr.DestinationsTag, dr.RateTag)
}
qry += fmt.Sprintf("('%s','%s','%s','%s')", dr.TPid, dr.DestinationRateId, drPair.DestinationId, drPair.RateId)
}
if _, err := self.Db.Exec(qry); err != nil {
return err

View File

@@ -150,7 +150,35 @@ func (self *TPCSVImporter) importRates(fn string) error {
return nil
}
func (self *TPCSVImporter) importDestinationRates(fPath string) error {
func (self *TPCSVImporter) importDestinationRates(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser( self.DirPath, fn )
if err!=nil {
return err
}
lineNr := 0
for {
lineNr++
record, err := fParser.ParseNextLine()
if err == io.EOF { // Reached end of file
break
} else if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
dr := &DestinationRate{record[0], record[1], record[2], nil}
if err != nil {
return err
}
if err := self.StorDb.SetTPDestinationRates( self.TPid,
map[string][]*DestinationRate{ dr.Tag: []*DestinationRate{dr} } ); err != nil {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
}