Removing Direction, PDD, DisconnectCause, Supplier from main fields of CDR; MySQL/Postgres storing nanoseconds instead of seconds for usage, tests update

This commit is contained in:
DanB
2017-10-30 18:18:37 +01:00
parent a96225bb93
commit 0da9a10de7
52 changed files with 1405 additions and 1799 deletions

View File

@@ -57,7 +57,7 @@ func (self *SQLStorage) Flush(scriptsPath string) (err error) {
return err
}
}
if _, err := self.Db.Query(fmt.Sprintf("SELECT 1 FROM %s", utils.TBLCDRs)); err != nil {
if _, err := self.Db.Query(fmt.Sprintf("SELECT 1 FROM %s", utils.CDRsTBL)); err != nil {
return err
}
if err := SetDBVersions(self); err != nil {
@@ -668,19 +668,14 @@ func (self *SQLStorage) SetSMCost(smc *SMCost) error {
if smc.CostDetails == nil {
return nil
}
tss, err := json.Marshal(smc.CostDetails)
if err != nil {
utils.Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err))
return err
}
tx := self.db.Begin()
cd := &TBLSMCosts{
cd := &SMCostSQL{
Cgrid: smc.CGRID,
RunID: smc.RunID,
OriginHost: smc.OriginHost,
OriginID: smc.OriginID,
CostSource: smc.CostSource,
CostDetails: string(tss),
CostDetails: smc.CostDetails.AsJSON(),
Usage: smc.Usage,
CreatedAt: time.Now(),
}
@@ -695,7 +690,7 @@ func (self *SQLStorage) SetSMCost(smc *SMCost) error {
func (self *SQLStorage) RemoveSMCost(smc *SMCost) error {
tx := self.db.Begin()
if err := tx.Where(&TBLSMCosts{Cgrid: smc.CGRID, RunID: smc.RunID}).Delete(SMCost{}).Error; err != nil {
if err := tx.Where(&SMCostSQL{Cgrid: smc.CGRID, RunID: smc.RunID}).Delete(SMCost{}).Error; err != nil {
tx.Rollback()
return err
}
@@ -706,7 +701,7 @@ func (self *SQLStorage) RemoveSMCost(smc *SMCost) error {
// GetSMCosts is used to retrieve one or multiple SMCosts based on filter
func (self *SQLStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix string) ([]*SMCost, error) {
var smCosts []*SMCost
filter := &TBLSMCosts{}
filter := &SMCostSQL{}
if cgrid != "" {
filter.Cgrid = cgrid
}
@@ -720,7 +715,7 @@ func (self *SQLStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix stri
if originIDPrefix != "" {
q = self.db.Where(filter).Where(fmt.Sprintf("origin_id LIKE '%s%%'", originIDPrefix))
}
results := make([]*TBLSMCosts, 0)
results := make([]*SMCostSQL, 0)
if err := q.Find(&results).Error; err != nil {
return nil, err
}
@@ -756,73 +751,18 @@ func (self *SQLStorage) LogActionTiming(source string, at *ActionTiming, as Acti
}
func (self *SQLStorage) SetCDR(cdr *CDR, allowUpdate bool) error {
extraFields, err := json.Marshal(cdr.ExtraFields)
if err != nil {
return err
}
tx := self.db.Begin()
saved := tx.Save(&TBLCDRs{
Cgrid: cdr.CGRID,
RunID: cdr.RunID,
OriginHost: cdr.OriginHost,
Source: cdr.Source,
OriginID: cdr.OriginID,
Tor: cdr.ToR,
RequestType: cdr.RequestType,
Direction: cdr.Direction,
Tenant: cdr.Tenant,
Category: cdr.Category,
Account: cdr.Account,
Subject: cdr.Subject,
Destination: cdr.Destination,
SetupTime: cdr.SetupTime,
Pdd: cdr.PDD.Seconds(),
AnswerTime: cdr.AnswerTime,
Usage: cdr.Usage.Seconds(),
Supplier: cdr.Supplier,
DisconnectCause: cdr.DisconnectCause,
ExtraFields: string(extraFields),
CostSource: cdr.CostSource,
Cost: cdr.Cost,
CostDetails: cdr.CostDetailsJson(),
AccountSummary: utils.ToJSON(cdr.AccountSummary),
ExtraInfo: cdr.ExtraInfo,
CreatedAt: time.Now(),
})
cdrSql := cdr.AsCDRsql()
cdrSql.CreatedAt = time.Now()
saved := tx.Save(cdrSql)
if saved.Error != nil {
tx.Rollback()
if !allowUpdate {
return saved.Error
}
tx = self.db.Begin()
updated := tx.Model(&TBLCDRs{}).Where(&TBLCDRs{Cgrid: cdr.CGRID, RunID: cdr.RunID, OriginID: cdr.OriginID}).Updates(
TBLCDRs{
OriginHost: cdr.OriginHost,
Source: cdr.Source,
OriginID: cdr.OriginID,
Tor: cdr.ToR,
RequestType: cdr.RequestType,
Direction: cdr.Direction,
Tenant: cdr.Tenant,
Category: cdr.Category,
Account: cdr.Account,
Subject: cdr.Subject,
Destination: cdr.Destination,
SetupTime: cdr.SetupTime,
Pdd: cdr.PDD.Seconds(),
AnswerTime: cdr.AnswerTime,
Usage: cdr.Usage.Seconds(),
Supplier: cdr.Supplier,
DisconnectCause: cdr.DisconnectCause,
ExtraFields: string(extraFields),
CostSource: cdr.CostSource,
Cost: cdr.Cost,
CostDetails: cdr.CostDetailsJson(),
AccountSummary: utils.ToJSON(cdr.AccountSummary),
ExtraInfo: cdr.ExtraInfo,
UpdatedAt: time.Now(),
},
)
cdrSql.UpdatedAt = time.Now()
updated := tx.Model(&CDRsql{}).Where(&CDRsql{CGRID: cdr.CGRID, RunID: cdr.RunID, OriginID: cdr.OriginID}).Updates(cdrSql)
if updated.Error != nil {
tx.Rollback()
return updated.Error
@@ -836,7 +776,7 @@ func (self *SQLStorage) SetCDR(cdr *CDR, allowUpdate bool) error {
// qryFltr.Unscoped will ignore soft deletes or delete records permanently
func (self *SQLStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, int64, error) {
var cdrs []*CDR
q := self.db.Table(utils.TBLCDRs).Select("*")
q := self.db.Table(utils.CDRsTBL).Select("*")
if qryFltr.Unscoped {
q = q.Unscoped()
}
@@ -942,10 +882,10 @@ func (self *SQLStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
q = q.Where("disconnect_cause not in (?)", qryFltr.NotDisconnectCauses)
}
if len(qryFltr.Costs) != 0 {
q = q.Where(utils.TBLCDRs+".cost in (?)", qryFltr.Costs)
q = q.Where(utils.CDRsTBL+".cost in (?)", qryFltr.Costs)
}
if len(qryFltr.NotCosts) != 0 {
q = q.Where(utils.TBLCDRs+".cost not in (?)", qryFltr.NotCosts)
q = q.Where(utils.CDRsTBL+".cost not in (?)", qryFltr.NotCosts)
}
if len(qryFltr.ExtraFields) != 0 { // Extra fields searches, implemented as contains in extra field
qIds := bytes.NewBufferString("(")
@@ -982,10 +922,10 @@ func (self *SQLStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
q = q.Where(qIds.String())
}
if qryFltr.OrderIDStart != nil { // Keep backwards compatible by testing 0 value
q = q.Where(utils.TBLCDRs+".id >= ?", *qryFltr.OrderIDStart)
q = q.Where(utils.CDRsTBL+".id >= ?", *qryFltr.OrderIDStart)
}
if qryFltr.OrderIDEnd != nil {
q = q.Where(utils.TBLCDRs+".id < ?", *qryFltr.OrderIDEnd)
q = q.Where(utils.CDRsTBL+".id < ?", *qryFltr.OrderIDEnd)
}
if qryFltr.SetupTimeStart != nil {
q = q.Where("setup_time >= ?", qryFltr.SetupTimeStart)
@@ -1012,41 +952,41 @@ func (self *SQLStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
q = q.Where("updated_at < ?", qryFltr.UpdatedAtEnd)
}
if len(qryFltr.MinUsage) != 0 {
if minUsage, err := utils.ParseDurationWithSecs(qryFltr.MinUsage); err != nil {
minUsage, err := utils.ParseDurationWithNanosecs(qryFltr.MinUsage)
if err != nil {
return nil, 0, err
}
if self.db.Dialect().GetName() == utils.MYSQL { // MySQL needs escaping for usage
q = q.Where("`usage` >= ?", minUsage.Nanoseconds())
} else {
if self.db.Dialect().GetName() == utils.MYSQL { // MySQL needs escaping for usage
q = q.Where("`usage` >= ?", minUsage.Seconds())
} else {
q = q.Where("usage >= ?", minUsage.Seconds())
}
q = q.Where("usage >= ?", minUsage.Nanoseconds())
}
}
if len(qryFltr.MaxUsage) != 0 {
if maxUsage, err := utils.ParseDurationWithSecs(qryFltr.MaxUsage); err != nil {
maxUsage, err := utils.ParseDurationWithNanosecs(qryFltr.MaxUsage)
if err != nil {
return nil, 0, err
}
if self.db.Dialect().GetName() == utils.MYSQL { // MySQL needs escaping for usage
q = q.Where("`usage` < ?", maxUsage.Nanoseconds())
} else {
if self.db.Dialect().GetName() == utils.MYSQL { // MySQL needs escaping for usage
q = q.Where("`usage` < ?", maxUsage.Seconds())
} else {
q = q.Where("usage < ?", maxUsage.Seconds())
}
q = q.Where("usage < ?", maxUsage.Nanoseconds())
}
}
if len(qryFltr.MinPDD) != 0 {
if minPDD, err := utils.ParseDurationWithSecs(qryFltr.MinPDD); err != nil {
if minPDD, err := utils.ParseDurationWithNanosecs(qryFltr.MinPDD); err != nil {
return nil, 0, err
} else {
q = q.Where("pdd >= ?", minPDD.Seconds())
q = q.Where("pdd >= ?", minPDD.Nanoseconds())
}
}
if len(qryFltr.MaxPDD) != 0 {
if maxPDD, err := utils.ParseDurationWithSecs(qryFltr.MaxPDD); err != nil {
if maxPDD, err := utils.ParseDurationWithNanosecs(qryFltr.MaxPDD); err != nil {
return nil, 0, err
} else {
q = q.Where("pdd < ?", maxPDD.Seconds())
q = q.Where("pdd < ?", maxPDD.Nanoseconds())
}
}
@@ -1087,58 +1027,16 @@ func (self *SQLStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
return nil, cnt, nil
}
// Execute query
results := make([]*TBLCDRs, 0)
results := make([]*CDRsql, 0)
if err := q.Find(&results).Error; err != nil {
return nil, 0, err
}
for _, result := range results {
extraFieldsMp := make(map[string]string)
if result.ExtraFields != "" {
if err := json.Unmarshal([]byte(result.ExtraFields), &extraFieldsMp); err != nil {
return nil, 0, fmt.Errorf("JSON unmarshal error for cgrid: %s, runid: %v, error: %s", result.Cgrid, result.RunID, err.Error())
}
if cdr, err := NewCDRFromSQL(result); err != nil {
return nil, 0, err
} else {
cdrs = append(cdrs, cdr)
}
var callCost CallCost
if result.CostDetails != "" {
if err := json.Unmarshal([]byte(result.CostDetails), &callCost); err != nil {
return nil, 0, fmt.Errorf("JSON unmarshal callcost error for cgrid: %s, runid: %v, error: %s", result.Cgrid, result.RunID, err.Error())
}
}
acntSummary, err := NewAccountSummaryFromJSON(result.AccountSummary)
if err != nil {
return nil, 0, fmt.Errorf("JSON unmarshal account summary error for cgrid: %s, runid: %v, error: %s", result.Cgrid, result.RunID, err.Error())
}
usageDur := time.Duration(result.Usage * utils.NANO_MULTIPLIER)
pddDur := time.Duration(result.Pdd * utils.NANO_MULTIPLIER)
storCdr := &CDR{
CGRID: result.Cgrid,
RunID: result.RunID,
OrderID: result.ID,
OriginHost: result.OriginHost,
Source: result.Source,
OriginID: result.OriginID,
ToR: result.Tor,
RequestType: result.RequestType,
Direction: result.Direction,
Tenant: result.Tenant,
Category: result.Category,
Account: result.Account,
Subject: result.Subject,
Destination: result.Destination,
SetupTime: result.SetupTime,
PDD: pddDur,
AnswerTime: result.AnswerTime,
Usage: usageDur,
Supplier: result.Supplier,
DisconnectCause: result.DisconnectCause,
ExtraFields: extraFieldsMp,
CostSource: result.CostSource,
Cost: result.Cost,
CostDetails: &callCost,
AccountSummary: acntSummary,
ExtraInfo: result.ExtraInfo,
}
cdrs = append(cdrs, storCdr)
}
if len(cdrs) == 0 && !remove {
return cdrs, 0, utils.ErrNotFound