MySQL data structures refactoring, fixups rounding methods migration

This commit is contained in:
DanB
2014-05-08 10:20:46 +02:00
parent c879c7a4d5
commit e893862f26
17 changed files with 175 additions and 152 deletions

View File

@@ -189,7 +189,7 @@ func (self *SQLStorage) SetTPRates(tpid string, rts map[string][]*utils.RateSlot
return nil //Nothing to set
}
var buffer bytes.Buffer
buffer.WriteString(fmt.Sprintf("INSERT INTO %s (tpid, id, connect_fee, rate, rate_unit, rate_increment, group_interval_start, rounding_method, rounding_decimals) VALUES ",
buffer.WriteString(fmt.Sprintf("INSERT INTO %s (tpid, id, connect_fee, rate, rate_unit, rate_increment, group_interval_start) VALUES ",
utils.TBL_TP_RATES))
i := 0
for rtId, rtRows := range rts {
@@ -197,12 +197,12 @@ func (self *SQLStorage) SetTPRates(tpid string, rts map[string][]*utils.RateSlot
if i != 0 { //Consecutive values after the first will be prefixed with "," as separator
buffer.WriteRune(',')
}
buffer.WriteString(fmt.Sprintf("('%s', '%s', %f, %f, '%s', '%s','%s','%s', %d)",
buffer.WriteString(fmt.Sprintf("('%s', '%s', %f, %f, '%s', '%s','%s')",
tpid, rtId, rt.ConnectFee, rt.Rate, rt.RateUnit, rt.RateIncrement, rt.GroupIntervalStart))
i++
}
}
buffer.WriteString(" ON DUPLICATE KEY UPDATE connect_fee=values(connect_fee), rate=values(rate), rate_increment=values(rate_increment), group_interval_start=values(group_interval_start), rounding_method=values(rounding_method), rounding_decimals=values(rounding_decimals)")
buffer.WriteString(" ON DUPLICATE KEY UPDATE connect_fee=values(connect_fee),rate=values(rate),rate_increment=values(rate_increment),group_interval_start=values(group_interval_start)")
if _, err := self.Db.Exec(buffer.String()); err != nil {
return err
}
@@ -214,18 +214,18 @@ func (self *SQLStorage) SetTPDestinationRates(tpid string, drs map[string][]*uti
return nil //Nothing to set
}
var buffer bytes.Buffer
buffer.WriteString(fmt.Sprintf("INSERT INTO %s (tpid,id,destinations_id,rates_id) VALUES ", utils.TBL_TP_DESTINATION_RATES))
buffer.WriteString(fmt.Sprintf("INSERT INTO %s (tpid,id,destinations_id,rates_id,rounding_method,rounding_decimals) VALUES ", utils.TBL_TP_DESTINATION_RATES))
i := 0
for drId, drRows := range drs {
for _, dr := range drRows {
if i != 0 { //Consecutive values after the first will be prefixed with "," as separator
buffer.WriteRune(',')
}
buffer.WriteString(fmt.Sprintf("('%s','%s','%s','%s')", tpid, drId, dr.DestinationId, dr.RateId))
buffer.WriteString(fmt.Sprintf("('%s','%s','%s','%s','%s', %d)", tpid, drId, dr.DestinationId, dr.RateId, dr.RoundingMethod, dr.RoundingDecimals))
i++
}
}
buffer.WriteString(" ON DUPLICATE KEY UPDATE destinations_id=values(destinations_id),rates_id=values(rates_id)")
buffer.WriteString(" ON DUPLICATE KEY UPDATE destinations_id=values(destinations_id),rates_id=values(rates_id),rounding_method=values(rounding_method),rounding_decimals=values(rounding_decimals)")
if _, err := self.Db.Exec(buffer.String()); err != nil {
return err
}
@@ -495,9 +495,12 @@ func (self *SQLStorage) LogCallCost(cgrid, source, runid string, cc *CallCost) (
if err != nil {
Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err))
}
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, direction, tenant, category, account, subject, destination, cost, timespans, source, runid, cost_time) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', %f, '%s','%s','%s',now()) ON DUPLICATE KEY UPDATE direction=values(direction), tenant=values(tenant), category=values(category), account=values(account), subject=values(subject), destination=values(destination), cost=values(cost), timespans=values(timespans), source=values(source), cost_time=now()",
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cost_time,cost_source,cgrid,runid,tor,direction,tenant,category,account,subject,destination,cost,timespans) VALUES (now(),'%s','%s','%s','%s','%s','%s','%s','%s','%s','%s',%f,'%s') ON DUPLICATE KEY UPDATE cost_time=now(),cost_source=values(cost_source),tor=values(tor),direction=values(direction),tenant=values(tenant),category=values(category),account=values(account),subject=values(subject),destination=values(destination),cost=values(cost),timespans=values(timespans)",
utils.TBL_COST_DETAILS,
source,
cgrid,
runid,
cc.TOR,
cc.Direction,
cc.Tenant,
cc.Category,
@@ -505,9 +508,7 @@ func (self *SQLStorage) LogCallCost(cgrid, source, runid string, cc *CallCost) (
cc.Subject,
cc.Destination,
cc.Cost,
tss,
source,
runid))
tss))
if err != nil {
Logger.Err(fmt.Sprintf("failed to execute insert statement: %v", err))
}
@@ -515,17 +516,15 @@ func (self *SQLStorage) LogCallCost(cgrid, source, runid string, cc *CallCost) (
}
func (self *SQLStorage) GetCallCostLog(cgrid, source, runid string) (cc *CallCost, err error) {
qry := fmt.Sprintf("SELECT cgrid, direction, tenant, category, account, subject, destination, cost, timespans, source FROM %s WHERE cgrid='%s' AND runid='%s'",
qry := fmt.Sprintf("SELECT tor,direction,tenant,category,account,subject,destination,cost,timespans FROM %s WHERE cgrid='%s' AND runid='%s'",
utils.TBL_COST_DETAILS, cgrid, runid)
if len(source) != 0 {
qry += fmt.Sprintf(" AND source='%s'", source)
qry += fmt.Sprintf(" AND cost_source='%s'", source)
}
row := self.Db.QueryRow(qry)
var src string
var timespansJson string
cc = &CallCost{Cost: -1}
err = row.Scan(&cgrid, &cc.Direction, &cc.Tenant, &cc.Category, &cc.Account, &cc.Subject,
&cc.Destination, &cc.Cost, &timespansJson, &src)
err = row.Scan(&cc.TOR, &cc.Direction, &cc.Tenant, &cc.Category, &cc.Account, &cc.Subject, &cc.Destination, &cc.Cost, &timespansJson)
if len(timespansJson) == 0 { // No costs returned
return nil, nil
}
@@ -581,11 +580,20 @@ func (self *SQLStorage) SetCdr(cdr *utils.StoredCdr) (err error) {
}
func (self *SQLStorage) SetRatedCdr(storedCdr *utils.StoredCdr, extraInfo string) (err error) {
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid,runid,subject,cost,mediation_time,extra_info) VALUES ('%s','%s','%s',%f,now(),'%s') ON DUPLICATE KEY UPDATE subject=values(subject),cost=values(cost),mediation_time=now(),extra_info=values(extra_info)",
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (mediation_time,cgrid,runid,reqtype,direction,tenant,category,account,subject,destination,setup_time,answer_time,duration,cost,extra_info) VALUES (now(),'%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s',%f,'%s') ON DUPLICATE KEY UPDATE mediation_time=now(),reqtype=values(reqtype),direction=values(direction),tenant=values(tenant),category=values(category),account=values(account),subject=values(subject),destination=values(destination),setup_time=values(setup_time),answer_time=values(answer_time),duration=values(duration),cost=values(cost),extra_info=values(extra_info)",
utils.TBL_RATED_CDRS,
storedCdr.CgrId,
storedCdr.MediationRunId,
storedCdr.ReqType,
storedCdr.Direction,
storedCdr.Tenant,
storedCdr.Category,
storedCdr.Account,
storedCdr.Subject,
storedCdr.Destination,
storedCdr.SetupTime,
storedCdr.AnswerTime,
storedCdr.Duration,
storedCdr.Cost,
extraInfo))
if err != nil {
@@ -600,7 +608,32 @@ func (self *SQLStorage) SetRatedCdr(storedCdr *utils.StoredCdr, extraInfo string
func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, cdrHosts, cdrSources, reqTypes, directions, tenants, categories, accounts, subjects, destPrefixes []string, orderIdStart, orderIdEnd int64,
timeStart, timeEnd time.Time, ignoreErr, ignoreRated bool) ([]*utils.StoredCdr, error) {
var cdrs []*utils.StoredCdr
q := bytes.NewBufferString(fmt.Sprintf("SELECT %s.cgrid,%s.tbid,accid,cdrhost,cdrsource,reqtype,direction,tenant,category,account,%s.subject,destination,setup_time,answer_time,duration,extra_fields,runid,cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid", utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_RATED_CDRS, utils.TBL_CDRS_PRIMARY, utils.TBL_RATED_CDRS))
q := bytes.NewBufferString(fmt.Sprintf("SELECT %s.cgrid,%s.tbid,%s.accid,%s.cdrhost,%s.cdrsource,%s.reqtype,%s.direction,%s.tenant,%s.category,%s.account,%s.subject,%s.destination,%s.setup_time,%s.answer_time,%s.duration,%s.extra_fields,%s.runid,%s.cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid",
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_EXTRA,
utils.TBL_RATED_CDRS,
utils.TBL_RATED_CDRS,
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_EXTRA,
utils.TBL_CDRS_PRIMARY,
utils.TBL_CDRS_EXTRA,
utils.TBL_RATED_CDRS,
utils.TBL_CDRS_PRIMARY,
utils.TBL_RATED_CDRS))
fltr := new(bytes.Buffer)
if len(cgrIds) != 0 {
qIds := bytes.NewBufferString(" (")
@@ -622,7 +655,7 @@ func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, cdrHosts, cdrSources, reqT
if idx != 0 {
qIds.WriteString(" OR")
}
qIds.WriteString(fmt.Sprintf(" runid='%s'", runId))
qIds.WriteString(fmt.Sprintf(" %s.runid='%s'", utils.TBL_RATED_CDRS, runId))
}
qIds.WriteString(" )")
if fltr.Len() != 0 {
@@ -636,7 +669,7 @@ func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, cdrHosts, cdrSources, reqT
if idx != 0 {
qIds.WriteString(" OR")
}
qIds.WriteString(fmt.Sprintf(" cdrhost='%s'", host))
qIds.WriteString(fmt.Sprintf(" %s.cdrhost='%s'", utils.TBL_CDRS_PRIMARY, host))
}
qIds.WriteString(" )")
if fltr.Len() != 0 {
@@ -650,7 +683,7 @@ func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, cdrHosts, cdrSources, reqT
if idx != 0 {
qIds.WriteString(" OR")
}
qIds.WriteString(fmt.Sprintf(" cdrsource='%s'", src))
qIds.WriteString(fmt.Sprintf(" %s.cdrsource='%s'", utils.TBL_CDRS_PRIMARY, src))
}
qIds.WriteString(" )")
if fltr.Len() != 0 {
@@ -664,7 +697,7 @@ func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, cdrHosts, cdrSources, reqT
if idx != 0 {
qIds.WriteString(" OR")
}
qIds.WriteString(fmt.Sprintf(" reqtype='%s'", reqType))
qIds.WriteString(fmt.Sprintf(" %s.reqtype='%s'", utils.TBL_CDRS_PRIMARY, reqType))
}
qIds.WriteString(" )")
if fltr.Len() != 0 {
@@ -678,7 +711,7 @@ func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, cdrHosts, cdrSources, reqT
if idx != 0 {
qIds.WriteString(" OR")
}
qIds.WriteString(fmt.Sprintf(" direction='%s'", direction))
qIds.WriteString(fmt.Sprintf(" %s.direction='%s'", utils.TBL_CDRS_PRIMARY, direction))
}
qIds.WriteString(" )")
if fltr.Len() != 0 {
@@ -692,7 +725,7 @@ func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, cdrHosts, cdrSources, reqT
if idx != 0 {
qIds.WriteString(" OR")
}
qIds.WriteString(fmt.Sprintf(" tenant='%s'", tenant))
qIds.WriteString(fmt.Sprintf(" %s.tenant='%s'", utils.TBL_CDRS_PRIMARY, tenant))
}
qIds.WriteString(" )")
if fltr.Len() != 0 {
@@ -706,7 +739,7 @@ func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, cdrHosts, cdrSources, reqT
if idx != 0 {
qIds.WriteString(" OR")
}
qIds.WriteString(fmt.Sprintf(" category='%s'", category))
qIds.WriteString(fmt.Sprintf(" %s.category='%s'", utils.TBL_CDRS_PRIMARY, category))
}
qIds.WriteString(" )")
if fltr.Len() != 0 {
@@ -720,7 +753,7 @@ func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, cdrHosts, cdrSources, reqT
if idx != 0 {
qIds.WriteString(" OR")
}
qIds.WriteString(fmt.Sprintf(" account='%s'", account))
qIds.WriteString(fmt.Sprintf(" %s.account='%s'", utils.TBL_CDRS_PRIMARY, account))
}
qIds.WriteString(" )")
if fltr.Len() != 0 {
@@ -748,7 +781,7 @@ func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, cdrHosts, cdrSources, reqT
if idx != 0 {
qIds.WriteString(" OR")
}
qIds.WriteString(fmt.Sprintf(" destination LIKE '%s%%'", destPrefix))
qIds.WriteString(fmt.Sprintf(" %s.destination LIKE '%s%%'", utils.TBL_CDRS_PRIMARY, destPrefix))
}
qIds.WriteString(" )")
if fltr.Len() != 0 {
@@ -772,28 +805,28 @@ func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, cdrHosts, cdrSources, reqT
if fltr.Len() != 0 {
fltr.WriteString(" AND")
}
fltr.WriteString(fmt.Sprintf(" answer_time>='%s'", timeStart))
fltr.WriteString(fmt.Sprintf(" %s.answer_time>='%s'", utils.TBL_CDRS_PRIMARY, timeStart))
}
if !timeEnd.IsZero() {
if fltr.Len() != 0 {
fltr.WriteString(" AND")
}
fltr.WriteString(fmt.Sprintf(" answer_time<'%s'", timeEnd))
fltr.WriteString(fmt.Sprintf(" %s.answer_time<'%s'", utils.TBL_CDRS_PRIMARY, timeEnd))
}
if ignoreRated {
if fltr.Len() != 0 {
fltr.WriteString(" AND")
}
if ignoreErr {
fltr.WriteString(" cost IS NULL")
fltr.WriteString(fmt.Sprintf(" %s.cost IS NULL", utils.TBL_RATED_CDRS))
} else {
fltr.WriteString(" (cost=-1 OR cost IS NULL)")
fltr.WriteString(fmt.Sprintf(" (%s.cost=-1 OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS))
}
} else if ignoreErr {
if fltr.Len() != 0 {
fltr.WriteString(" AND")
}
fltr.WriteString(" (cost!=-1 OR cost IS NULL)")
fltr.WriteString(fmt.Sprintf(" (%s.cost!=-1 OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS))
}
if fltr.Len() != 0 {
q.WriteString(fmt.Sprintf(" WHERE %s", fltr.String()))
@@ -889,7 +922,7 @@ func (self *SQLStorage) GetTpDestinations(tpid, tag string) ([]*Destination, err
func (self *SQLStorage) GetTpRates(tpid, tag string) (map[string]*utils.TPRate, error) {
rts := make(map[string]*utils.TPRate)
q := fmt.Sprintf("SELECT id, connect_fee, rate, rate_unit, rate_increment, group_interval_start, rounding_method, rounding_decimals FROM %s WHERE tpid='%s' ", utils.TBL_TP_RATES, tpid)
q := fmt.Sprintf("SELECT id, connect_fee, rate, rate_unit, rate_increment, group_interval_start FROM %s WHERE tpid='%s' ", utils.TBL_TP_RATES, tpid)
if tag != "" {
q += fmt.Sprintf(" AND id='%s'", tag)
}
@@ -932,7 +965,7 @@ func (self *SQLStorage) GetTpRates(tpid, tag string) (map[string]*utils.TPRate,
func (self *SQLStorage) GetTpDestinationRates(tpid, tag string) (map[string]*utils.TPDestinationRate, error) {
rts := make(map[string]*utils.TPDestinationRate)
q := fmt.Sprintf("SELECT * FROM %s WHERE tpid='%s'", utils.TBL_TP_DESTINATION_RATES, tpid)
q := fmt.Sprintf("SELECT tpid,id,destinations_id,rates_id,rounding_method,rounding_decimals FROM %s WHERE tpid='%s'", utils.TBL_TP_DESTINATION_RATES, tpid)
if tag != "" {
q += fmt.Sprintf(" AND id='%s'", tag)
}
@@ -942,10 +975,9 @@ func (self *SQLStorage) GetTpDestinationRates(tpid, tag string) (map[string]*uti
}
defer rows.Close()
for rows.Next() {
var id int
var tpid, tag, destinations_tag, rate_tag, rounding_method string
var rounding_decimals int
if err := rows.Scan(&id, &tpid, &tag, &destinations_tag, &rate_tag, &rounding_method, &rounding_decimals); err != nil {
if err := rows.Scan(&tpid, &tag, &destinations_tag, &rate_tag, &rounding_method, &rounding_decimals); err != nil {
return nil, err
}