mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-19 22:28:45 +05:00
Few more methods on TPCSVImporter, API TPRates modifications to include GroupInterval
This commit is contained in:
@@ -116,7 +116,7 @@ func topupResetAction(ub *UserBalance, a *Action) (err error) {
|
||||
if a.BalanceId == MINUTES {
|
||||
ub.MinuteBuckets = make([]*MinuteBucket, 0)
|
||||
} else {
|
||||
ub.BalanceMap[a.BalanceId+a.Direction] = BalanceChain{&Balance{Value: 0}}
|
||||
ub.BalanceMap[a.BalanceId+a.Direction] = BalanceChain{&Balance{Value: 0}} // ToDo: can ub be empty here?
|
||||
}
|
||||
genericMakeNegative(a)
|
||||
genericDebit(ub, a)
|
||||
|
||||
@@ -68,7 +68,7 @@ type DataStorage interface {
|
||||
GetTPDestination(string, string) (*Destination, error)
|
||||
GetTPDestinationIds(string) ([]string, error)
|
||||
ExistsTPRate(string, string) (bool, error)
|
||||
SetTPRate(*utils.TPRate) error
|
||||
SetTPRates(string, map[string][]*Rate) error
|
||||
GetTPRate(string, string) (*utils.TPRate, error)
|
||||
GetTPRateIds(string) ([]string, error)
|
||||
ExistsTPDestinationRate(string, string) (bool, error)
|
||||
|
||||
@@ -113,7 +113,7 @@ func (ms *MapStorage) ExistsTPRate(tpid, rtId string) (bool, error) {
|
||||
return false, errors.New(utils.ERR_NOT_IMPLEMENTED)
|
||||
}
|
||||
|
||||
func (ms *MapStorage) SetTPRate(rt *utils.TPRate) error {
|
||||
func (ms *MapStorage) SetTPRates(tpid string, rts map[string][]*Rate) error {
|
||||
return errors.New(utils.ERR_NOT_IMPLEMENTED)
|
||||
}
|
||||
|
||||
|
||||
@@ -188,7 +188,7 @@ func (ms *MongoStorage) ExistsTPRate(tpid, rtId string) (bool, error) {
|
||||
return false, errors.New(utils.ERR_NOT_IMPLEMENTED)
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTPRate(rt *utils.TPRate) error {
|
||||
func (ms *MongoStorage) SetTPRates(tpid string, rts map[string][]*Rate) error {
|
||||
return errors.New(utils.ERR_NOT_IMPLEMENTED)
|
||||
}
|
||||
|
||||
|
||||
@@ -143,7 +143,7 @@ func (rs *RedisStorage) ExistsTPRate(tpid, rtId string) (bool, error) {
|
||||
return false, errors.New(utils.ERR_NOT_IMPLEMENTED)
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetTPRate(rt *utils.TPRate) error {
|
||||
func (rs *RedisStorage) SetTPRates(tpid string, rts map[string][]*Rate) error {
|
||||
return errors.New(utils.ERR_NOT_IMPLEMENTED)
|
||||
}
|
||||
|
||||
|
||||
@@ -210,19 +210,29 @@ func (self *SQLStorage) ExistsTPRate(tpid, rtId string) (bool, error) {
|
||||
return exists, nil
|
||||
}
|
||||
|
||||
func (self *SQLStorage) SetTPRate(rt *utils.TPRate) error {
|
||||
for _, rtSlot := range rt.RateSlots {
|
||||
if _, err := self.Db.Exec(fmt.Sprintf("INSERT INTO %s (tpid, tag, connect_fee, rate, rated_units, rate_increments, rounding_method, rounding_decimals, weight) VALUES ('%s', '%s', %f, %f, %d, %d,'%s', %d, %f)",
|
||||
utils.TBL_TP_RATES, rt.TPid, rt.RateId, rtSlot.ConnectFee, rtSlot.Rate, rtSlot.RatedUnits, rtSlot.RateIncrements,
|
||||
rtSlot.RoundingMethod, rtSlot.RoundingDecimals, rtSlot.Weight)); err != nil {
|
||||
return err
|
||||
func (self *SQLStorage) SetTPRates(tpid string, rts map[string][]*Rate) error {
|
||||
if len(rts) == 0 {
|
||||
return nil //Nothing to set
|
||||
}
|
||||
qry := fmt.Sprintf("INSERT INTO %s (tpid, tag, connect_fee, rate, rated_units, rate_increments, group_interval, rounding_method, rounding_decimals, weight) VALUES ", utils.TBL_TP_RATES)
|
||||
for rtId, rtRows := range rts {
|
||||
for idx, rt := range rtRows {
|
||||
if idx != 0 { //Consecutive values after the first will be prefixed with "," as separator
|
||||
qry += ","
|
||||
}
|
||||
qry += fmt.Sprintf("('%s', '%s', %f, %f, %d, %d,%d,'%s', %d, %f)",
|
||||
tpid, rtId, rt.ConnectFee, rt.Price, int(rt.PricedUnits), int(rt.RateIncrements), int(rt.GroupInterval),
|
||||
rt.RoundingMethod, rt.RoundingDecimals, rt.Weight)
|
||||
}
|
||||
}
|
||||
if _, err := self.Db.Exec(qry); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *SQLStorage) GetTPRate(tpid, rtId string) (*utils.TPRate, error) {
|
||||
rows, err := self.Db.Query(fmt.Sprintf("SELECT connect_fee, rate, rated_units, rate_increments, rounding_method, rounding_decimals, weight FROM %s WHERE tpid='%s' AND tag='%s'", utils.TBL_TP_RATES, tpid, rtId))
|
||||
rows, err := self.Db.Query(fmt.Sprintf("SELECT connect_fee, rate, rated_units, rate_increments, group_interval, rounding_method, rounding_decimals, weight FROM %s WHERE tpid='%s' AND tag='%s'", utils.TBL_TP_RATES, tpid, rtId))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -232,13 +242,14 @@ func (self *SQLStorage) GetTPRate(tpid, rtId string) (*utils.TPRate, error) {
|
||||
for rows.Next() {
|
||||
i++ //Keep here a reference so we know we got at least one prefix
|
||||
var connectFee, rate, weight float64
|
||||
var ratedUnits, rateIncrements, roundingDecimals int
|
||||
var ratedUnits, rateIncrements, roundingDecimals, groupInterval int
|
||||
var roundingMethod string
|
||||
err = rows.Scan(&connectFee, &rate, &ratedUnits, &rateIncrements, &roundingMethod, &roundingDecimals, &weight)
|
||||
err = rows.Scan(&connectFee, &rate, &ratedUnits, &rateIncrements, &groupInterval, &roundingMethod, &roundingDecimals, &weight)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rt.RateSlots = append(rt.RateSlots, utils.RateSlot{connectFee, rate, ratedUnits, rateIncrements, roundingMethod, roundingDecimals, weight})
|
||||
rt.RateSlots = append(rt.RateSlots, utils.RateSlot{connectFee, rate, ratedUnits, rateIncrements, groupInterval,
|
||||
roundingMethod, roundingDecimals, weight})
|
||||
}
|
||||
if i == 0 {
|
||||
return nil, nil
|
||||
|
||||
@@ -35,6 +35,8 @@ type TPCSVImporter struct {
|
||||
Verbose bool // If true will print a detailed information instead of silently discarding it
|
||||
}
|
||||
|
||||
// Maps csv file to handler which should process it. Defined like this since tests on 1.0.3 were failing on Travis.
|
||||
// Change it to func(string) error as soon as Travis updates.
|
||||
var fileHandlers = map[string]func(*TPCSVImporter,string) error{
|
||||
utils.TIMINGS_CSV: (*TPCSVImporter).importTimings,
|
||||
utils.DESTINATIONS_CSV: (*TPCSVImporter).importDestinations,
|
||||
@@ -49,10 +51,6 @@ var fileHandlers = map[string]func(*TPCSVImporter,string) error{
|
||||
}
|
||||
|
||||
func (self *TPCSVImporter) Run() error {
|
||||
|
||||
// Maps csv file to handler which should process it
|
||||
|
||||
|
||||
files, _ := ioutil.ReadDir(self.DirPath)
|
||||
for _, f := range files {
|
||||
fHandler,hasName := fileHandlers[f.Name()]
|
||||
@@ -66,6 +64,9 @@ func (self *TPCSVImporter) Run() error {
|
||||
|
||||
// Handler importing timings from file, saved row by row to storDb
|
||||
func (self *TPCSVImporter) importTimings(fn string) error {
|
||||
if self.Verbose {
|
||||
log.Printf("Processing file: <%s> ", fn)
|
||||
}
|
||||
fParser, err := NewTPCSVFileParser( self.DirPath, fn )
|
||||
if err!=nil {
|
||||
return err
|
||||
@@ -90,11 +91,62 @@ func (self *TPCSVImporter) importTimings(fn string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *TPCSVImporter) importDestinations(fPath string) error {
|
||||
func (self *TPCSVImporter) importDestinations(fn string) error {
|
||||
if self.Verbose {
|
||||
log.Printf("Processing file: <%s> ", fn)
|
||||
}
|
||||
fParser, err := NewTPCSVFileParser( self.DirPath, fn )
|
||||
if err!=nil {
|
||||
return err
|
||||
}
|
||||
lineNr := 0
|
||||
for {
|
||||
lineNr++
|
||||
record, err := fParser.ParseNextLine()
|
||||
if err == io.EOF { // Reached end of file
|
||||
break
|
||||
} else if err != nil {
|
||||
if self.Verbose {
|
||||
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
|
||||
}
|
||||
continue
|
||||
}
|
||||
dst := &Destination{record[0], []string{record[1]}}
|
||||
if err := self.StorDb.SetTPDestination(self.TPid, dst); err != nil {
|
||||
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *TPCSVImporter) importRates(fPath string) error {
|
||||
func (self *TPCSVImporter) importRates(fn string) error {
|
||||
if self.Verbose {
|
||||
log.Printf("Processing file: <%s> ", fn)
|
||||
}
|
||||
fParser, err := NewTPCSVFileParser( self.DirPath, fn )
|
||||
if err!=nil {
|
||||
return err
|
||||
}
|
||||
lineNr := 0
|
||||
for {
|
||||
lineNr++
|
||||
record, err := fParser.ParseNextLine()
|
||||
if err == io.EOF { // Reached end of file
|
||||
break
|
||||
} else if err != nil {
|
||||
if self.Verbose {
|
||||
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
|
||||
}
|
||||
continue
|
||||
}
|
||||
rt, err := NewRate(record[0], record[1], record[2], record[3], record[4], record[5], record[6], record[7], record[8])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := self.StorDb.SetTPRates( self.TPid, map[string][]*Rate{ record[0]: []*Rate{rt} } ); err != nil {
|
||||
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user