Refactoring CDRs to support RSRFields

This commit is contained in:
DanB
2014-05-05 20:14:40 +02:00
parent 7bda45fcce
commit f7abbacfe5
39 changed files with 848 additions and 1205 deletions

View File

@@ -128,7 +128,7 @@ func (self *SQLStorage) RemTPData(table, tpid string, args ...string) error {
q := fmt.Sprintf("DELETE FROM %s WHERE tpid='%s' AND id='%s'", table, tpid, args[0])
switch table {
case utils.TBL_TP_RATE_PROFILES:
q = fmt.Sprintf("DELETE FROM %s WHERE tpid='%s' AND loadid='%s' AND tenant='%s' AND tor='%s' AND direction='%s' AND subject='%s'",
q = fmt.Sprintf("DELETE FROM %s WHERE tpid='%s' AND loadid='%s' AND tenant='%s' AND category='%s' AND direction='%s' AND subject='%s'",
table, tpid, args[0], args[1], args[2], args[3], args[4])
case utils.TBL_TP_ACCOUNT_ACTIONS:
q = fmt.Sprintf("DELETE FROM %s WHERE tpid='%s' AND loadid='%s' AND tenant='%s' AND account='%s' AND direction='%s'",
@@ -261,7 +261,7 @@ func (self *SQLStorage) SetTPRatingProfiles(tpid string, rps map[string]*utils.T
return nil //Nothing to set
}
var buffer bytes.Buffer
buffer.WriteString(fmt.Sprintf("INSERT INTO %s (tpid,loadid,tenant,tor,direction,subject,activation_time,rating_plan_id,fallback_subjects) VALUES ",
buffer.WriteString(fmt.Sprintf("INSERT INTO %s (tpid,loadid,tenant,category,direction,subject,activation_time,rating_plan_id,fallback_subjects) VALUES ",
utils.TBL_TP_RATE_PROFILES))
i := 0
for _, rp := range rps {
@@ -442,7 +442,7 @@ func (self *SQLStorage) SetTPActionTriggers(tpid string, ats map[string][]*utils
return nil //Nothing to set
}
var buffer bytes.Buffer
buffer.WriteString(fmt.Sprintf("INSERT INTO %s (tpid,id,balance_type,direction,threshold_type,threshold_value,destination_id,actions_id,weight) VALUES ",
buffer.WriteString(fmt.Sprintf("INSERT INTO %s (tpid,id,balance_type,direction,threshold_type,threshold_value,recurrent,destination_id,actions_id,weight) VALUES ",
utils.TBL_TP_ACTION_TRIGGERS))
i := 0
for atId, atRows := range ats {
@@ -450,13 +450,13 @@ func (self *SQLStorage) SetTPActionTriggers(tpid string, ats map[string][]*utils
if i != 0 { //Consecutive values after the first will be prefixed with "," as separator
buffer.WriteRune(',')
}
buffer.WriteString(fmt.Sprintf("('%s','%s','%s','%s','%s', %f, '%s','%s',%f)",
buffer.WriteString(fmt.Sprintf("('%s','%s','%s','%s','%s', %f, %t, '%s','%s',%f)",
tpid, atId, atsRow.BalanceType, atsRow.Direction, atsRow.ThresholdType,
atsRow.ThresholdValue, atsRow.DestinationId, atsRow.ActionsId, atsRow.Weight))
atsRow.ThresholdValue, atsRow.Recurrent, atsRow.DestinationId, atsRow.ActionsId, atsRow.Weight))
i++
}
}
buffer.WriteString(" ON DUPLICATE KEY UPDATE weight=values(weight)")
buffer.WriteString(" ON DUPLICATE KEY UPDATE recurrent=values(recurrent), weight=values(weight)")
if _, err := self.Db.Exec(buffer.String()); err != nil {
return err
}
@@ -496,7 +496,7 @@ func (self *SQLStorage) LogCallCost(cgrid, source, runid string, cc *CallCost) (
if err != nil {
Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err))
}
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, direction, tenant, tor, account, subject, destination, cost, timespans, source, runid, cost_time)VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', %f, '%s','%s','%s',now()) ON DUPLICATE KEY UPDATE direction=values(direction), tenant=values(tenant), tor=values(tor), account=values(account), subject=values(subject), destination=values(destination), cost=values(cost), timespans=values(timespans), source=values(source), cost_time=now()",
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, direction, tenant, category, account, subject, destination, cost, timespans, source, runid, cost_time) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', %f, '%s','%s','%s',now()) ON DUPLICATE KEY UPDATE direction=values(direction), tenant=values(tenant), category=values(category), account=values(account), subject=values(subject), destination=values(destination), cost=values(cost), timespans=values(timespans), source=values(source), cost_time=now()",
utils.TBL_COST_DETAILS,
cgrid,
cc.Direction,
@@ -516,7 +516,7 @@ func (self *SQLStorage) LogCallCost(cgrid, source, runid string, cc *CallCost) (
}
func (self *SQLStorage) GetCallCostLog(cgrid, source, runid string) (cc *CallCost, err error) {
qry := fmt.Sprintf("SELECT cgrid, direction, tenant, tor, account, subject, destination, cost, timespans, source FROM %s WHERE cgrid='%s' AND runid='%s'",
qry := fmt.Sprintf("SELECT cgrid, direction, tenant, category, account, subject, destination, cost, timespans, source FROM %s WHERE cgrid='%s' AND runid='%s'",
utils.TBL_COST_DETAILS, cgrid, runid)
if len(source) != 0 {
qry += fmt.Sprintf(" AND source='%s'", source)
@@ -544,38 +544,34 @@ func (self *SQLStorage) LogActionTiming(source string, at *ActionTiming, as Acti
}
func (self *SQLStorage) LogError(uuid, source, runid, errstr string) (err error) { return }
func (self *SQLStorage) SetCdr(cdr utils.RawCDR) (err error) {
// map[account:1001 direction:out orig_ip:172.16.1.1 tor:call accid:accid23 answer_time:2013-02-03 19:54:00 cdrsource:freeswitch_csv destination:+4986517174963 duration:62 reqtype:prepaid subject:1001 supplier:supplier1 tenant:cgrates.org]
setupTime, _ := cdr.GetSetupTime() // Ignore errors, we want to store the cdr no matter what
answerTime, _ := cdr.GetAnswerTime() // Ignore errors, we want to store the cdr no matter what
dur, _ := cdr.GetDuration()
func (self *SQLStorage) SetCdr(cdr *utils.StoredCdr) (err error) {
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (NULL,'%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', %d)",
utils.TBL_CDRS_PRIMARY,
cdr.GetCgrId(),
cdr.GetAccId(),
cdr.GetCdrHost(),
cdr.GetCdrSource(),
cdr.GetReqType(),
cdr.GetDirection(),
cdr.GetTenant(),
cdr.GetCategory(),
cdr.GetAccount(),
cdr.GetSubject(),
cdr.GetDestination(),
setupTime,
answerTime,
dur,
cdr.CgrId,
cdr.AccId,
cdr.CdrHost,
cdr.CdrSource,
cdr.ReqType,
cdr.Direction,
cdr.Tenant,
cdr.Category,
cdr.Account,
cdr.Subject,
cdr.Destination,
cdr.SetupTime,
cdr.AnswerTime,
cdr.Duration,
))
if err != nil {
Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err))
}
extraFields, err := json.Marshal(cdr.GetExtraFields())
extraFields, err := json.Marshal(cdr.ExtraFields)
if err != nil {
Logger.Err(fmt.Sprintf("Error marshalling cdr extra fields to json: %v", err))
}
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s VALUES ('NULL','%s', '%s')",
utils.TBL_CDRS_EXTRA,
cdr.GetCgrId(),
cdr.CgrId,
extraFields,
))
if err != nil {
@@ -602,10 +598,10 @@ func (self *SQLStorage) SetRatedCdr(storedCdr *utils.StoredCdr, extraInfo string
// Return a slice of CDRs from storDb using optional filters.a
// ignoreErr - do not consider cdrs with rating errors
// ignoreRated - do not consider cdrs which were already rated, including here the ones with errors
func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, cdrHosts, cdrSources, reqTypes, directions, tenants, tors, accounts, subjects, destPrefixes []string, orderIdStart, orderIdEnd int64,
func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, cdrHosts, cdrSources, reqTypes, directions, tenants, categories, accounts, subjects, destPrefixes []string, orderIdStart, orderIdEnd int64,
timeStart, timeEnd time.Time, ignoreErr, ignoreRated bool) ([]*utils.StoredCdr, error) {
var cdrs []*utils.StoredCdr
q := bytes.NewBufferString(fmt.Sprintf("SELECT %s.cgrid,%s.tbid,accid,cdrhost,cdrsource,reqtype,direction,tenant,tor,account,%s.subject,destination,setup_time,answer_time,duration,extra_fields,runid,cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid", utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_RATED_CDRS, utils.TBL_CDRS_PRIMARY, utils.TBL_RATED_CDRS))
q := bytes.NewBufferString(fmt.Sprintf("SELECT %s.cgrid,%s.tbid,accid,cdrhost,cdrsource,reqtype,direction,tenant,category,account,%s.subject,destination,setup_time,answer_time,duration,extra_fields,runid,cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid", utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_RATED_CDRS, utils.TBL_CDRS_PRIMARY, utils.TBL_RATED_CDRS))
fltr := new(bytes.Buffer)
if len(cgrIds) != 0 {
qIds := bytes.NewBufferString(" (")
@@ -705,13 +701,13 @@ func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, cdrHosts, cdrSources, reqT
}
fltr.Write(qIds.Bytes())
}
if len(tors) != 0 {
if len(categories) != 0 {
qIds := bytes.NewBufferString(" (")
for idx, tor := range tors {
for idx, category := range categories {
if idx != 0 {
qIds.WriteString(" OR")
}
qIds.WriteString(fmt.Sprintf(" tor='%s'", tor))
qIds.WriteString(fmt.Sprintf(" category='%s'", category))
}
qIds.WriteString(" )")
if fltr.Len() != 0 {
@@ -809,14 +805,14 @@ func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, cdrHosts, cdrSources, reqT
}
defer rows.Close()
for rows.Next() {
var cgrid, accid, cdrhost, cdrsrc, reqtype, direction, tenant, tor, account, subject, destination string
var cgrid, accid, cdrhost, cdrsrc, reqtype, direction, tenant, category, account, subject, destination string
var extraFields []byte
var setupTime, answerTime time.Time
var runid sql.NullString // So we can export unmediated CDRs
var orderid, duration int64
var cost sql.NullFloat64 // So we can export unmediated CDRs
var extraFieldsMp map[string]string
if err := rows.Scan(&cgrid, &orderid, &accid, &cdrhost, &cdrsrc, &reqtype, &direction, &tenant, &tor, &account, &subject, &destination, &setupTime, &answerTime, &duration,
if err := rows.Scan(&cgrid, &orderid, &accid, &cdrhost, &cdrsrc, &reqtype, &direction, &tenant, &category, &account, &subject, &destination, &setupTime, &answerTime, &duration,
&extraFields, &runid, &cost); err != nil {
return nil, err
}
@@ -825,7 +821,7 @@ func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, cdrHosts, cdrSources, reqT
}
storCdr := &utils.StoredCdr{
CgrId: cgrid, OrderId: orderid, AccId: accid, CdrHost: cdrhost, CdrSource: cdrsrc, ReqType: reqtype, Direction: direction, Tenant: tenant,
Category: tor, Account: account, Subject: subject, Destination: destination, SetupTime: setupTime, AnswerTime: answerTime, Duration: time.Duration(duration),
Category: category, Account: account, Subject: subject, Destination: destination, SetupTime: setupTime, AnswerTime: answerTime, Duration: time.Duration(duration),
ExtraFields: extraFieldsMp, MediationRunId: runid.String, Cost: cost.Float64,
}
cdrs = append(cdrs, storCdr)
@@ -1030,7 +1026,7 @@ func (self *SQLStorage) GetTpRatingPlans(tpid, tag string) (map[string][]*utils.
}
func (self *SQLStorage) GetTpRatingProfiles(qryRpf *utils.TPRatingProfile) (map[string]*utils.TPRatingProfile, error) {
q := fmt.Sprintf("SELECT loadid,direction,tenant,tor,subject,activation_time,rating_plan_id,fallback_subjects FROM %s WHERE tpid='%s'",
q := fmt.Sprintf("SELECT loadid,direction,tenant,category,subject,activation_time,rating_plan_id,fallback_subjects FROM %s WHERE tpid='%s'",
utils.TBL_TP_RATE_PROFILES, qryRpf.TPid)
if len(qryRpf.LoadId) != 0 {
q += fmt.Sprintf(" AND loadid='%s'", qryRpf.LoadId)
@@ -1039,7 +1035,7 @@ func (self *SQLStorage) GetTpRatingProfiles(qryRpf *utils.TPRatingProfile) (map[
q += fmt.Sprintf(" AND tenant='%s'", qryRpf.Tenant)
}
if len(qryRpf.Category) != 0 {
q += fmt.Sprintf(" AND tor='%s'", qryRpf.Category)
q += fmt.Sprintf(" AND category='%s'", qryRpf.Category)
}
if len(qryRpf.Direction) != 0 {
q += fmt.Sprintf(" AND direction='%s'", qryRpf.Direction)
@@ -1054,11 +1050,11 @@ func (self *SQLStorage) GetTpRatingProfiles(qryRpf *utils.TPRatingProfile) (map[
defer rows.Close()
rpfs := make(map[string]*utils.TPRatingProfile)
for rows.Next() {
var rcvLoadId, tenant, tor, direction, subject, fallback_subjects, rating_plan_tag, activation_time string
if err := rows.Scan(&rcvLoadId, &tenant, &tor, &direction, &subject, &activation_time, &rating_plan_tag, &fallback_subjects); err != nil {
var rcvLoadId, tenant, category, direction, subject, fallback_subjects, rating_plan_tag, activation_time string
if err := rows.Scan(&rcvLoadId, &tenant, &category, &direction, &subject, &activation_time, &rating_plan_tag, &fallback_subjects); err != nil {
return nil, err
}
rp := &utils.TPRatingProfile{TPid: qryRpf.TPid, LoadId: rcvLoadId, Tenant: tenant, Category: tor, Direction: direction, Subject: subject}
rp := &utils.TPRatingProfile{TPid: qryRpf.TPid, LoadId: rcvLoadId, Tenant: tenant, Category: category, Direction: direction, Subject: subject}
if existingRp, has := rpfs[rp.KeyId()]; !has {
rp.RatingPlanActivations = []*utils.TPRatingActivation{
&utils.TPRatingActivation{ActivationTime: activation_time, RatingPlanId: rating_plan_tag, FallbackSubjects: fallback_subjects}}
@@ -1203,7 +1199,7 @@ func (self *SQLStorage) GetTpActions(tpid, tag string) (map[string][]*utils.TPAc
func (self *SQLStorage) GetTpActionTriggers(tpid, tag string) (map[string][]*utils.TPActionTrigger, error) {
ats := make(map[string][]*utils.TPActionTrigger)
q := fmt.Sprintf("SELECT tpid,id,balance_type,direction,threshold_type,threshold_value,destination_id,actions_id,weight FROM %s WHERE tpid='%s'",
q := fmt.Sprintf("SELECT tpid,id,balance_type,direction,threshold_type,threshold_value,recurrent,destination_id,actions_id,weight FROM %s WHERE tpid='%s'",
utils.TBL_TP_ACTION_TRIGGERS, tpid)
if tag != "" {
q += fmt.Sprintf(" AND id='%s'", tag)
@@ -1226,6 +1222,7 @@ func (self *SQLStorage) GetTpActionTriggers(tpid, tag string) (map[string][]*uti
Direction: direction,
ThresholdType: thresholdType,
ThresholdValue: threshold,
Recurrent: recurrent,
DestinationId: destinations_tag,
ActionsId: actions_tag,
Weight: weight,