From edd8d5540f0c68d70a3d08d30bca788b7262d9c7 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 13 Nov 2014 19:41:15 +0100 Subject: [PATCH] Initial Postgres support --- engine/storage_mysql.go | 401 +++++++++++++++++++++++- engine/storage_mysql_local_test.go | 2 +- engine/storage_postgres.go | 407 ++++++++++++++++++++++++ engine/storage_psql_local_test.go | 10 +- engine/storage_sql.go | 477 +++-------------------------- 5 files changed, 857 insertions(+), 440 deletions(-) diff --git a/engine/storage_mysql.go b/engine/storage_mysql.go index 852e17f32..623d626b7 100644 --- a/engine/storage_mysql.go +++ b/engine/storage_mysql.go @@ -19,14 +19,17 @@ along with this program. If not, see package engine import ( + "bytes" + "database/sql" "encoding/json" "fmt" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "path" + "strconv" "time" - _ "github.com/go-sql-driver/mysql" + "github.com/go-sql-driver/mysql" "github.com/jinzhu/gorm" ) @@ -105,3 +108,399 @@ func (self *MySQLStorage) LogCallCost(cgrid, source, runid string, cc *CallCost) } return nil } + +func (self *MySQLStorage) SetRatedCdr(storedCdr *utils.StoredCdr, extraInfo string) (err error) { + _, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid,runid,reqtype,direction,tenant,category,account,subject,destination,setup_time,answer_time,`usage`,cost,extra_info,created_at) VALUES ('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s',%v,%f,'%s',%d) ON DUPLICATE KEY UPDATE reqtype=values(reqtype),direction=values(direction),tenant=values(tenant),category=values(category),account=values(account),subject=values(subject),destination=values(destination),setup_time=values(setup_time),answer_time=values(answer_time),`usage`=values(`usage`),cost=values(cost),extra_info=values(extra_info), updated_at=%d", + utils.TBL_RATED_CDRS, + storedCdr.CgrId, + storedCdr.MediationRunId, + storedCdr.ReqType, + storedCdr.Direction, + storedCdr.Tenant, + storedCdr.Category, + storedCdr.Account, + storedCdr.Subject, + storedCdr.Destination, + storedCdr.SetupTime, + storedCdr.AnswerTime, + storedCdr.Usage.Seconds(), + storedCdr.Cost, + extraInfo, + time.Now().Unix(), + time.Now().Unix())) + if err != nil { + Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %s", err.Error())) + } + return +} + +// Return a slice of CDRs from storDb using optional filters.a +// ignoreErr - do not consider cdrs with rating errors +// ignoreRated - do not consider cdrs which were already rated, including here the ones with errors +func (self *MySQLStorage) GetStoredCdrs(cgrIds, runIds, tors, cdrHosts, cdrSources, reqTypes, directions, tenants, categories, accounts, subjects, destPrefixes, ratedAccounts, ratedSubjects []string, + orderIdStart, orderIdEnd int64, timeStart, timeEnd time.Time, ignoreErr, ignoreRated, ignoreDerived bool, pagination *utils.Paginator) ([]*utils.StoredCdr, error) { + var cdrs []*utils.StoredCdr + var q *bytes.Buffer // Need to query differently since in case of primary, unmediated CDRs some values will be missing + if ignoreDerived { + q = bytes.NewBufferString(fmt.Sprintf("SELECT %s.cgrid,%s.id,%s.tor,%s.accid,%s.cdrhost,%s.cdrsource,%s.reqtype,%s.direction,%s.tenant,%s.category,%s.account,%s.subject,%s.destination,%s.setup_time,%s.answer_time,%s.`usage`,%s.extra_fields,%s.runid,%s.account,%s.subject,%s.cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid AND %s.runid=%s.runid", + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_EXTRA, + utils.TBL_RATED_CDRS, + utils.TBL_COST_DETAILS, + utils.TBL_COST_DETAILS, + utils.TBL_RATED_CDRS, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_EXTRA, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_EXTRA, + utils.TBL_RATED_CDRS, + utils.TBL_CDRS_PRIMARY, + utils.TBL_RATED_CDRS, + utils.TBL_COST_DETAILS, + utils.TBL_RATED_CDRS, + utils.TBL_COST_DETAILS, + utils.TBL_RATED_CDRS, + utils.TBL_COST_DETAILS)) + } else { + q = bytes.NewBufferString(fmt.Sprintf("SELECT %s.cgrid,%s.id,%s.tor,%s.accid,%s.cdrhost,%s.cdrsource,%s.reqtype,%s.direction,%s.tenant,%s.category,%s.account,%s.subject,%s.destination,%s.setup_time,%s.answer_time,%s.`usage`,%s.extra_fields,%s.runid,%s.account,%s.subject,%s.cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid AND %s.runid=%s.runid", + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_CDRS_EXTRA, + utils.TBL_RATED_CDRS, + utils.TBL_COST_DETAILS, + utils.TBL_COST_DETAILS, + utils.TBL_RATED_CDRS, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_EXTRA, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_EXTRA, + utils.TBL_RATED_CDRS, + utils.TBL_CDRS_PRIMARY, + utils.TBL_RATED_CDRS, + utils.TBL_COST_DETAILS, + utils.TBL_RATED_CDRS, + utils.TBL_COST_DETAILS, + utils.TBL_RATED_CDRS, + utils.TBL_COST_DETAILS)) + } + q.WriteString(" WHERE") + for idx, tblName := range []string{utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_COST_DETAILS, utils.TBL_RATED_CDRS} { + if idx != 0 { + q.WriteString(" AND") + } + q.WriteString(fmt.Sprintf(" (%s.deleted_at IS NULL OR %s.deleted_at <= '0001-01-02')", tblName, tblName)) + } + fltr := new(bytes.Buffer) + if len(cgrIds) != 0 { + qIds := bytes.NewBufferString(" (") + for idxId, cgrId := range cgrIds { + if idxId != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.cgrid='%s'", utils.TBL_CDRS_PRIMARY, cgrId)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(runIds) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, runId := range runIds { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.runid='%s'", utils.TBL_RATED_CDRS, runId)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(tors) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, host := range tors { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.tor='%s'", utils.TBL_CDRS_PRIMARY, host)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(cdrHosts) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, host := range cdrHosts { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.cdrhost='%s'", utils.TBL_CDRS_PRIMARY, host)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(cdrSources) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, src := range cdrSources { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.cdrsource='%s'", utils.TBL_CDRS_PRIMARY, src)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(reqTypes) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, reqType := range reqTypes { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.reqtype='%s'", utils.TBL_CDRS_PRIMARY, reqType)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(directions) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, direction := range directions { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.direction='%s'", utils.TBL_CDRS_PRIMARY, direction)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(tenants) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, tenant := range tenants { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.tenant='%s'", utils.TBL_CDRS_PRIMARY, tenant)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(categories) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, category := range categories { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.category='%s'", utils.TBL_CDRS_PRIMARY, category)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(accounts) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, account := range accounts { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.account='%s'", utils.TBL_CDRS_PRIMARY, account)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(subjects) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, subject := range subjects { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.subject='%s'", utils.TBL_CDRS_PRIMARY, subject)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(destPrefixes) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, destPrefix := range destPrefixes { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.destination LIKE '%s%%'", utils.TBL_CDRS_PRIMARY, destPrefix)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(ratedAccounts) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, ratedAccount := range ratedAccounts { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.account='%s'", utils.TBL_COST_DETAILS, ratedAccount)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(ratedSubjects) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, ratedSubject := range ratedSubjects { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.subject='%s'", utils.TBL_COST_DETAILS, ratedSubject)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if orderIdStart != 0 { + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.WriteString(fmt.Sprintf(" %s.id>=%d", utils.TBL_CDRS_PRIMARY, orderIdStart)) + } + if orderIdEnd != 0 { + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.WriteString(fmt.Sprintf(" %s.id<%d", utils.TBL_CDRS_PRIMARY, orderIdEnd)) + } + if !timeStart.IsZero() { + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.WriteString(fmt.Sprintf(" %s.answer_time>='%s'", utils.TBL_CDRS_PRIMARY, timeStart)) + } + if !timeEnd.IsZero() { + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.WriteString(fmt.Sprintf(" %s.answer_time<'%s'", utils.TBL_CDRS_PRIMARY, timeEnd)) + } + if ignoreRated { + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + if ignoreErr { + fltr.WriteString(fmt.Sprintf(" %s.cost IS NULL", utils.TBL_RATED_CDRS)) + } else { + fltr.WriteString(fmt.Sprintf(" (%s.cost=-1 OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS)) + } + } else if ignoreErr { + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.WriteString(fmt.Sprintf(" (%s.cost!=-1 OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS)) + } + if ignoreDerived { + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.WriteString(fmt.Sprintf(" (%s.runid='%s' OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.DEFAULT_RUNID, utils.TBL_RATED_CDRS)) + } + if fltr.Len() != 0 { + q.WriteString(fmt.Sprintf(" AND %s", fltr.String())) + } + if pagination != nil { + limLow, limHigh := pagination.GetLimits() + q.WriteString(fmt.Sprintf(" LIMIT %d,%d", limLow, limHigh)) + } + rows, err := self.Db.Query(q.String()) + if err != nil { + return nil, err + } + defer rows.Close() + for rows.Next() { + var cgrid, tor, accid, cdrhost, cdrsrc, reqtype, direction, tenant, category, account, subject, destination, runid, ratedAccount, ratedSubject sql.NullString + var extraFields []byte + var setupTime, answerTime mysql.NullTime + var orderid int64 + var usage, cost sql.NullFloat64 + var extraFieldsMp map[string]string + if err := rows.Scan(&cgrid, &orderid, &tor, &accid, &cdrhost, &cdrsrc, &reqtype, &direction, &tenant, &category, &account, &subject, &destination, &setupTime, &answerTime, &usage, + &extraFields, &runid, &ratedAccount, &ratedSubject, &cost); err != nil { + return nil, err + } + if len(extraFields) != 0 { + if err := json.Unmarshal(extraFields, &extraFieldsMp); err != nil { + return nil, fmt.Errorf("JSON unmarshal error for cgrid: %s, runid: %v, error: %s", cgrid.String, runid.String, err.Error()) + } + } + usageDur, _ := time.ParseDuration(strconv.FormatFloat(usage.Float64, 'f', -1, 64) + "s") + storCdr := &utils.StoredCdr{ + CgrId: cgrid.String, OrderId: orderid, TOR: tor.String, AccId: accid.String, CdrHost: cdrhost.String, CdrSource: cdrsrc.String, ReqType: reqtype.String, + Direction: direction.String, Tenant: tenant.String, + Category: category.String, Account: account.String, Subject: subject.String, Destination: destination.String, + SetupTime: setupTime.Time, AnswerTime: answerTime.Time, Usage: usageDur, + ExtraFields: extraFieldsMp, MediationRunId: runid.String, RatedAccount: ratedAccount.String, RatedSubject: ratedSubject.String, Cost: cost.Float64, + } + if !cost.Valid { //There was no cost provided, will fakely insert 0 if we do not handle it and reflect on re-rating + storCdr.Cost = -1 + } + cdrs = append(cdrs, storCdr) + } + return cdrs, nil +} diff --git a/engine/storage_mysql_local_test.go b/engine/storage_mysql_local_test.go index 2d82dd9d5..6f759382a 100644 --- a/engine/storage_mysql_local_test.go +++ b/engine/storage_mysql_local_test.go @@ -797,7 +797,7 @@ func TestMySQLGetStoredCdrs(t *testing.T) { } } -func TestRemStoredCdrs(t *testing.T) { +func TestMySQLRemStoredCdrs(t *testing.T) { if !*testLocal { return } diff --git a/engine/storage_postgres.go b/engine/storage_postgres.go index 26a3ab6e8..27286a50c 100644 --- a/engine/storage_postgres.go +++ b/engine/storage_postgres.go @@ -19,14 +19,18 @@ along with this program. If not, see package engine import ( + "bytes" + "database/sql" "encoding/json" "fmt" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "path" + "strconv" "time" _ "github.com/bmizerany/pq" + "github.com/go-sql-driver/mysql" "github.com/jinzhu/gorm" ) @@ -120,3 +124,406 @@ func (self *PostgresStorage) LogCallCost(cgrid, source, runid string, cc *CallCo tx.Commit() return nil } + +func (self *PostgresStorage) SetRatedCdr(cdr *utils.StoredCdr, extraInfo string) (err error) { + tx := self.db.Begin() + saved := tx.Save(&TblRatedCdr{ + Cgrid: cdr.CgrId, + Runid: cdr.MediationRunId, + Reqtype: cdr.ReqType, + Direction: cdr.Direction, + Tenant: cdr.Tenant, + Category: cdr.Category, + Account: cdr.Account, + Subject: cdr.Subject, + Destination: cdr.Destination, + SetupTime: cdr.SetupTime, + AnswerTime: cdr.AnswerTime, + Usage: cdr.Usage.Seconds(), + Cost: cdr.Cost, + ExtraInfo: extraInfo, + CreatedAt: time.Now(), + }) + if saved.Error != nil { + tx.Rollback() + tx = self.db.Begin() + updated := tx.Model(TblRatedCdr{}).Where(&TblRatedCdr{Cgrid: cdr.CgrId, Runid: cdr.MediationRunId}).Updates(&TblRatedCdr{Reqtype: cdr.ReqType, + Direction: cdr.Direction, Tenant: cdr.Tenant, Category: cdr.Category, Account: cdr.Account, Subject: cdr.Subject, Destination: cdr.Destination, + SetupTime: cdr.SetupTime, AnswerTime: cdr.AnswerTime, Usage: cdr.Usage.Seconds(), Cost: cdr.Cost, ExtraInfo: extraInfo, UpdatedAt: time.Now()}) + if updated.Error != nil { + tx.Rollback() + return updated.Error + } + } + tx.Commit() + return nil + +} + +func (self *PostgresStorage) GetStoredCdrs(cgrIds, runIds, tors, cdrHosts, cdrSources, reqTypes, directions, tenants, categories, accounts, subjects, destPrefixes, ratedAccounts, ratedSubjects []string, + orderIdStart, orderIdEnd int64, timeStart, timeEnd time.Time, ignoreErr, ignoreRated, ignoreDerived bool, pagination *utils.Paginator) ([]*utils.StoredCdr, error) { + var cdrs []*utils.StoredCdr + var q *bytes.Buffer // Need to query differently since in case of primary, unmediated CDRs some values will be missing + if ignoreDerived { + q = bytes.NewBufferString(fmt.Sprintf("SELECT %s.cgrid,%s.id,%s.tor,%s.accid,%s.cdrhost,%s.cdrsource,%s.reqtype,%s.direction,%s.tenant,%s.category,%s.account,%s.subject,%s.destination,%s.setup_time,%s.answer_time,%s.usage,%s.extra_fields,%s.runid,%s.account,%s.subject,%s.cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid AND %s.runid=%s.runid", + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_EXTRA, + utils.TBL_RATED_CDRS, + utils.TBL_COST_DETAILS, + utils.TBL_COST_DETAILS, + utils.TBL_RATED_CDRS, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_EXTRA, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_EXTRA, + utils.TBL_RATED_CDRS, + utils.TBL_CDRS_PRIMARY, + utils.TBL_RATED_CDRS, + utils.TBL_COST_DETAILS, + utils.TBL_RATED_CDRS, + utils.TBL_COST_DETAILS, + utils.TBL_RATED_CDRS, + utils.TBL_COST_DETAILS)) + } else { + q = bytes.NewBufferString(fmt.Sprintf("SELECT %s.cgrid,%s.id,%s.tor,%s.accid,%s.cdrhost,%s.cdrsource,%s.reqtype,%s.direction,%s.tenant,%s.category,%s.account,%s.subject,%s.destination,%s.setup_time,%s.answer_time,%s.usage,%s.extra_fields,%s.runid,%s.account,%s.subject,%s.cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid AND %s.runid=%s.runid", + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, + utils.TBL_CDRS_EXTRA, + utils.TBL_RATED_CDRS, + utils.TBL_COST_DETAILS, + utils.TBL_COST_DETAILS, + utils.TBL_RATED_CDRS, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_EXTRA, + utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_EXTRA, + utils.TBL_RATED_CDRS, + utils.TBL_CDRS_PRIMARY, + utils.TBL_RATED_CDRS, + utils.TBL_COST_DETAILS, + utils.TBL_RATED_CDRS, + utils.TBL_COST_DETAILS, + utils.TBL_RATED_CDRS, + utils.TBL_COST_DETAILS)) + } + q.WriteString(" WHERE") + for idx, tblName := range []string{utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_COST_DETAILS, utils.TBL_RATED_CDRS} { + if idx != 0 { + q.WriteString(" AND") + } + q.WriteString(fmt.Sprintf(" (%s.deleted_at IS NULL OR %s.deleted_at <= '0001-01-02')", tblName, tblName)) + } + fltr := new(bytes.Buffer) + if len(cgrIds) != 0 { + qIds := bytes.NewBufferString(" (") + for idxId, cgrId := range cgrIds { + if idxId != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.cgrid='%s'", utils.TBL_CDRS_PRIMARY, cgrId)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(runIds) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, runId := range runIds { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.runid='%s'", utils.TBL_RATED_CDRS, runId)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(tors) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, host := range tors { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.tor='%s'", utils.TBL_CDRS_PRIMARY, host)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(cdrHosts) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, host := range cdrHosts { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.cdrhost='%s'", utils.TBL_CDRS_PRIMARY, host)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(cdrSources) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, src := range cdrSources { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.cdrsource='%s'", utils.TBL_CDRS_PRIMARY, src)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(reqTypes) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, reqType := range reqTypes { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.reqtype='%s'", utils.TBL_CDRS_PRIMARY, reqType)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(directions) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, direction := range directions { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.direction='%s'", utils.TBL_CDRS_PRIMARY, direction)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(tenants) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, tenant := range tenants { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.tenant='%s'", utils.TBL_CDRS_PRIMARY, tenant)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(categories) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, category := range categories { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.category='%s'", utils.TBL_CDRS_PRIMARY, category)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(accounts) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, account := range accounts { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.account='%s'", utils.TBL_CDRS_PRIMARY, account)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(subjects) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, subject := range subjects { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.subject='%s'", utils.TBL_CDRS_PRIMARY, subject)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(destPrefixes) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, destPrefix := range destPrefixes { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.destination LIKE '%s%%'", utils.TBL_CDRS_PRIMARY, destPrefix)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(ratedAccounts) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, ratedAccount := range ratedAccounts { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.account='%s'", utils.TBL_COST_DETAILS, ratedAccount)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if len(ratedSubjects) != 0 { + qIds := bytes.NewBufferString(" (") + for idx, ratedSubject := range ratedSubjects { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.subject='%s'", utils.TBL_COST_DETAILS, ratedSubject)) + } + qIds.WriteString(" )") + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.Write(qIds.Bytes()) + } + if orderIdStart != 0 { + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.WriteString(fmt.Sprintf(" %s.id>=%d", utils.TBL_CDRS_PRIMARY, orderIdStart)) + } + if orderIdEnd != 0 { + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.WriteString(fmt.Sprintf(" %s.id<%d", utils.TBL_CDRS_PRIMARY, orderIdEnd)) + } + if !timeStart.IsZero() { + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.WriteString(fmt.Sprintf(" %s.answer_time>='%s'", utils.TBL_CDRS_PRIMARY, timeStart.Format(time.RFC3339Nano))) + } + if !timeEnd.IsZero() { + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.WriteString(fmt.Sprintf(" %s.answer_time<'%s'", utils.TBL_CDRS_PRIMARY, timeEnd.Format(time.RFC3339Nano))) + } + if ignoreRated { + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + if ignoreErr { + fltr.WriteString(fmt.Sprintf(" %s.cost IS NULL", utils.TBL_RATED_CDRS)) + } else { + fltr.WriteString(fmt.Sprintf(" (%s.cost=-1 OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS)) + } + } else if ignoreErr { + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.WriteString(fmt.Sprintf(" (%s.cost<>-1 OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS)) + } + if ignoreDerived { + if fltr.Len() != 0 { + fltr.WriteString(" AND") + } + fltr.WriteString(fmt.Sprintf(" (%s.runid='%s' OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.DEFAULT_RUNID, utils.TBL_RATED_CDRS)) + } + if fltr.Len() != 0 { + q.WriteString(fmt.Sprintf(" AND %s", fltr.String())) + } + if pagination != nil { + limLow, limHigh := pagination.GetLimits() + q.WriteString(fmt.Sprintf(" LIMIT %d,%d", limLow, limHigh)) + } + rows, err := self.Db.Query(q.String()) + if err != nil { + return nil, err + } + defer rows.Close() + for rows.Next() { + var cgrid, tor, accid, cdrhost, cdrsrc, reqtype, direction, tenant, category, account, subject, destination, runid, ratedAccount, ratedSubject sql.NullString + var extraFields []byte + var setupTime, answerTime mysql.NullTime + var orderid int64 + var usage, cost sql.NullFloat64 + var extraFieldsMp map[string]string + if err := rows.Scan(&cgrid, &orderid, &tor, &accid, &cdrhost, &cdrsrc, &reqtype, &direction, &tenant, &category, &account, &subject, &destination, &setupTime, &answerTime, &usage, + &extraFields, &runid, &ratedAccount, &ratedSubject, &cost); err != nil { + return nil, err + } + if len(extraFields) != 0 { + if err := json.Unmarshal(extraFields, &extraFieldsMp); err != nil { + return nil, fmt.Errorf("JSON unmarshal error for cgrid: %s, runid: %v, error: %s", cgrid.String, runid.String, err.Error()) + } + } + usageDur, _ := time.ParseDuration(strconv.FormatFloat(usage.Float64, 'f', -1, 64) + "s") + storCdr := &utils.StoredCdr{ + CgrId: cgrid.String, OrderId: orderid, TOR: tor.String, AccId: accid.String, CdrHost: cdrhost.String, CdrSource: cdrsrc.String, ReqType: reqtype.String, + Direction: direction.String, Tenant: tenant.String, + Category: category.String, Account: account.String, Subject: subject.String, Destination: destination.String, + SetupTime: setupTime.Time, AnswerTime: answerTime.Time, Usage: usageDur, + ExtraFields: extraFieldsMp, MediationRunId: runid.String, RatedAccount: ratedAccount.String, RatedSubject: ratedSubject.String, Cost: cost.Float64, + } + if !cost.Valid { //There was no cost provided, will fakely insert 0 if we do not handle it and reflect on re-rating + storCdr.Cost = -1 + } + cdrs = append(cdrs, storCdr) + } + return cdrs, nil +} diff --git a/engine/storage_psql_local_test.go b/engine/storage_psql_local_test.go index 8f404d1b0..19a403b2e 100644 --- a/engine/storage_psql_local_test.go +++ b/engine/storage_psql_local_test.go @@ -526,7 +526,6 @@ func TestPSQLCallCost(t *testing.T) { } } -/* func TestPSQLSetRatedCdr(t *testing.T) { if !*testLocal { return @@ -556,9 +555,7 @@ func TestPSQLSetRatedCdr(t *testing.T) { } } } -*/ -/* func TestPSQLGetStoredCdrs(t *testing.T) { if !*testLocal { return @@ -802,15 +799,15 @@ func TestPSQLGetStoredCdrs(t *testing.T) { } } -func TestRemStoredCdrs(t *testing.T) { +func TestPSQLRemStoredCdrs(t *testing.T) { if !*testLocal { return } - var timeStart, timeEnd time.Time cgrIdB1 := utils.Sha1("bbb1", time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC).String()) if err := psqlDb.RemStoredCdrs([]string{cgrIdB1}); err != nil { t.Error(err.Error()) } + var timeStart, timeEnd time.Time if storedCdrs, err := psqlDb.GetStoredCdrs(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, 0, 0, timeStart, timeEnd, false, false, false, nil); err != nil { t.Error(err.Error()) } else if len(storedCdrs) != 7 { @@ -837,6 +834,5 @@ func TestRemStoredCdrs(t *testing.T) { } else if len(storedCdrs) != 0 { t.Error("Unexpected number of StoredCdrs returned: ", storedCdrs) } -} -*/ +} diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 1118aaf3f..7b1d78a2f 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -30,7 +30,6 @@ import ( "time" "github.com/cgrates/cgrates/utils" - "github.com/go-sql-driver/mysql" "github.com/jinzhu/gorm" ) @@ -667,7 +666,7 @@ func (self *SQLStorage) SetTPAccountActions(tpid string, aas map[string]*utils.T } -func (self *SQLStorage) LogCallCost(cgrid, source, runid string, cc *CallCost) (err error) { +func (self *SQLStorage) LogCallCost(cgrid, source, runid string, cc *CallCost) error { return errors.New(utils.ERR_NOT_IMPLEMENTED) } @@ -688,7 +687,7 @@ func (self *SQLStorage) GetCallCostLog(cgrid, source, runid string) (*CallCost, cc.Subject = tpCostDetail.Subject cc.Destination = tpCostDetail.Destination cc.Cost = tpCostDetail.Cost - if err = json.Unmarshal([]byte(tpCostDetail.Timespans), &cc.Timespans); err != nil { + if err := json.Unmarshal([]byte(tpCostDetail.Timespans), &cc.Timespans); err != nil { return nil, err } return cc, nil @@ -702,431 +701,49 @@ func (self *SQLStorage) LogActionTiming(source string, at *ActionTiming, as Acti } func (self *SQLStorage) LogError(uuid, source, runid, errstr string) (err error) { return } -func (self *SQLStorage) SetCdr(cdr *utils.StoredCdr) (err error) { - _, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid,tor,accid,cdrhost,cdrsource,reqtype,direction,tenant,category,account,subject,destination,setup_time,answer_time,`usage`) VALUES ('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', %v)", - utils.TBL_CDRS_PRIMARY, - cdr.CgrId, - cdr.TOR, - cdr.AccId, - cdr.CdrHost, - cdr.CdrSource, - cdr.ReqType, - cdr.Direction, - cdr.Tenant, - cdr.Category, - cdr.Account, - cdr.Subject, - cdr.Destination, - cdr.SetupTime, - cdr.AnswerTime, - cdr.Usage.Seconds(), - )) - if err != nil { - Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) - } +func (self *SQLStorage) SetCdr(cdr *utils.StoredCdr) error { extraFields, err := json.Marshal(cdr.ExtraFields) if err != nil { - Logger.Err(fmt.Sprintf("Error marshalling cdr extra fields to json: %v", err)) + return err } - _, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid,extra_fields) VALUES ('%s', '%s')", - utils.TBL_CDRS_EXTRA, - cdr.CgrId, - extraFields, - )) - if err != nil { - Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) + tx := self.db.Begin() + saved := tx.Save(&TblCdrsPrimary{ + Cgrid: cdr.CgrId, + Tor: cdr.TOR, + Accid: cdr.AccId, + Cdrhost: cdr.CdrHost, + Cdrsource: cdr.CdrSource, + Reqtype: cdr.ReqType, + Direction: cdr.Direction, + Tenant: cdr.Tenant, + Category: cdr.Category, + Account: cdr.Account, + Subject: cdr.Subject, + Destination: cdr.Destination, + SetupTime: cdr.SetupTime, + AnswerTime: cdr.AnswerTime, + Usage: cdr.Usage.Seconds(), + CreatedAt: time.Now()}) + if saved.Error != nil { + tx.Rollback() + return saved.Error } - - return + // Save extra fields + if err := tx.Save(&TblCdrsExtra{Cgrid: cdr.CgrId, ExtraFields: string(extraFields), CreatedAt: time.Now()}).Error; err != nil { + tx.Rollback() + return err + } + tx.Commit() + return nil } -func (self *SQLStorage) SetRatedCdr(storedCdr *utils.StoredCdr, extraInfo string) (err error) { - _, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid,runid,reqtype,direction,tenant,category,account,subject,destination,setup_time,answer_time,`usage`,cost,extra_info,created_at) VALUES ('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s',%v,%f,'%s',%d) ON DUPLICATE KEY UPDATE reqtype=values(reqtype),direction=values(direction),tenant=values(tenant),category=values(category),account=values(account),subject=values(subject),destination=values(destination),setup_time=values(setup_time),answer_time=values(answer_time),`usage`=values(`usage`),cost=values(cost),extra_info=values(extra_info), updated_at=%d", - utils.TBL_RATED_CDRS, - storedCdr.CgrId, - storedCdr.MediationRunId, - storedCdr.ReqType, - storedCdr.Direction, - storedCdr.Tenant, - storedCdr.Category, - storedCdr.Account, - storedCdr.Subject, - storedCdr.Destination, - storedCdr.SetupTime, - storedCdr.AnswerTime, - storedCdr.Usage.Seconds(), - storedCdr.Cost, - extraInfo, - time.Now().Unix(), - time.Now().Unix())) - if err != nil { - Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %s", err.Error())) - } - return +func (self *SQLStorage) SetRatedCdr(storedCdr *utils.StoredCdr, extraInfo string) error { + return errors.New(utils.ERR_NOT_IMPLEMENTED) } -// Return a slice of CDRs from storDb using optional filters.a -// ignoreErr - do not consider cdrs with rating errors -// ignoreRated - do not consider cdrs which were already rated, including here the ones with errors func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, tors, cdrHosts, cdrSources, reqTypes, directions, tenants, categories, accounts, subjects, destPrefixes, ratedAccounts, ratedSubjects []string, orderIdStart, orderIdEnd int64, timeStart, timeEnd time.Time, ignoreErr, ignoreRated, ignoreDerived bool, pagination *utils.Paginator) ([]*utils.StoredCdr, error) { - var cdrs []*utils.StoredCdr - var q *bytes.Buffer // Need to query differently since in case of primary, unmediated CDRs some values will be missing - if ignoreDerived { - q = bytes.NewBufferString(fmt.Sprintf("SELECT %s.cgrid,%s.id,%s.tor,%s.accid,%s.cdrhost,%s.cdrsource,%s.reqtype,%s.direction,%s.tenant,%s.category,%s.account,%s.subject,%s.destination,%s.setup_time,%s.answer_time,%s.`usage`,%s.extra_fields,%s.runid,%s.account,%s.subject,%s.cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid AND %s.runid=%s.runid", - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_EXTRA, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_EXTRA, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_EXTRA, - utils.TBL_RATED_CDRS, - utils.TBL_CDRS_PRIMARY, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS)) - } else { - q = bytes.NewBufferString(fmt.Sprintf("SELECT %s.cgrid,%s.id,%s.tor,%s.accid,%s.cdrhost,%s.cdrsource,%s.reqtype,%s.direction,%s.tenant,%s.category,%s.account,%s.subject,%s.destination,%s.setup_time,%s.answer_time,%s.`usage`,%s.extra_fields,%s.runid,%s.account,%s.subject,%s.cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid AND %s.runid=%s.runid", - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_CDRS_EXTRA, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_EXTRA, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_EXTRA, - utils.TBL_RATED_CDRS, - utils.TBL_CDRS_PRIMARY, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS)) - } - fltr := new(bytes.Buffer) - if len(cgrIds) != 0 { - qIds := bytes.NewBufferString(" (") - for idxId, cgrId := range cgrIds { - if idxId != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.cgrid='%s'", utils.TBL_CDRS_PRIMARY, cgrId)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(runIds) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, runId := range runIds { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.runid='%s'", utils.TBL_RATED_CDRS, runId)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(tors) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, host := range tors { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.tor='%s'", utils.TBL_CDRS_PRIMARY, host)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(cdrHosts) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, host := range cdrHosts { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.cdrhost='%s'", utils.TBL_CDRS_PRIMARY, host)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(cdrSources) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, src := range cdrSources { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.cdrsource='%s'", utils.TBL_CDRS_PRIMARY, src)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(reqTypes) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, reqType := range reqTypes { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.reqtype='%s'", utils.TBL_CDRS_PRIMARY, reqType)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(directions) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, direction := range directions { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.direction='%s'", utils.TBL_CDRS_PRIMARY, direction)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(tenants) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, tenant := range tenants { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.tenant='%s'", utils.TBL_CDRS_PRIMARY, tenant)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(categories) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, category := range categories { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.category='%s'", utils.TBL_CDRS_PRIMARY, category)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(accounts) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, account := range accounts { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.account='%s'", utils.TBL_CDRS_PRIMARY, account)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(subjects) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, subject := range subjects { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.subject='%s'", utils.TBL_CDRS_PRIMARY, subject)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(destPrefixes) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, destPrefix := range destPrefixes { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.destination LIKE '%s%%'", utils.TBL_CDRS_PRIMARY, destPrefix)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(ratedAccounts) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, ratedAccount := range ratedAccounts { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.account='%s'", utils.TBL_COST_DETAILS, ratedAccount)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(ratedSubjects) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, ratedSubject := range ratedSubjects { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.subject='%s'", utils.TBL_COST_DETAILS, ratedSubject)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if orderIdStart != 0 { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" %s.id>=%d", utils.TBL_CDRS_PRIMARY, orderIdStart)) - } - if orderIdEnd != 0 { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" %s.id<%d", utils.TBL_CDRS_PRIMARY, orderIdEnd)) - } - if !timeStart.IsZero() { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" %s.answer_time>='%s'", utils.TBL_CDRS_PRIMARY, timeStart)) - } - if !timeEnd.IsZero() { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" %s.answer_time<'%s'", utils.TBL_CDRS_PRIMARY, timeEnd)) - } - if ignoreRated { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - if ignoreErr { - fltr.WriteString(fmt.Sprintf(" %s.cost IS NULL", utils.TBL_RATED_CDRS)) - } else { - fltr.WriteString(fmt.Sprintf(" (%s.cost=-1 OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS)) - } - } else if ignoreErr { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" (%s.cost!=-1 OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS)) - } - if ignoreDerived { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" (%s.runid='%s' OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.DEFAULT_RUNID, utils.TBL_RATED_CDRS)) - } - if fltr.Len() != 0 { - q.WriteString(fmt.Sprintf(" WHERE %s", fltr.String())) - } - if pagination != nil { - limLow, limHigh := pagination.GetLimits() - q.WriteString(fmt.Sprintf(" LIMIT %d,%d", limLow, limHigh)) - } - rows, err := self.Db.Query(q.String()) - if err != nil { - return nil, err - } - defer rows.Close() - for rows.Next() { - var cgrid, tor, accid, cdrhost, cdrsrc, reqtype, direction, tenant, category, account, subject, destination, runid, ratedAccount, ratedSubject sql.NullString - var extraFields []byte - var setupTime, answerTime mysql.NullTime - var orderid int64 - var usage, cost sql.NullFloat64 - var extraFieldsMp map[string]string - if err := rows.Scan(&cgrid, &orderid, &tor, &accid, &cdrhost, &cdrsrc, &reqtype, &direction, &tenant, &category, &account, &subject, &destination, &setupTime, &answerTime, &usage, - &extraFields, &runid, &ratedAccount, &ratedSubject, &cost); err != nil { - return nil, err - } - if len(extraFields) != 0 { - if err := json.Unmarshal(extraFields, &extraFieldsMp); err != nil { - return nil, fmt.Errorf("JSON unmarshal error for cgrid: %s, runid: %v, error: %s", cgrid.String, runid.String, err.Error()) - } - } - usageDur, _ := time.ParseDuration(strconv.FormatFloat(usage.Float64, 'f', -1, 64) + "s") - storCdr := &utils.StoredCdr{ - CgrId: cgrid.String, OrderId: orderid, TOR: tor.String, AccId: accid.String, CdrHost: cdrhost.String, CdrSource: cdrsrc.String, ReqType: reqtype.String, - Direction: direction.String, Tenant: tenant.String, - Category: category.String, Account: account.String, Subject: subject.String, Destination: destination.String, - SetupTime: setupTime.Time, AnswerTime: answerTime.Time, Usage: usageDur, - ExtraFields: extraFieldsMp, MediationRunId: runid.String, RatedAccount: ratedAccount.String, RatedSubject: ratedSubject.String, Cost: cost.Float64, - } - if !cost.Valid { //There was no cost provided, will fakely insert 0 if we do not handle it and reflect on re-rating - storCdr.Cost = -1 - } - cdrs = append(cdrs, storCdr) - } - return cdrs, nil + return nil, errors.New(utils.ERR_NOT_IMPLEMENTED) } // Remove CDR data out of all CDR tables based on their cgrid @@ -1134,24 +751,22 @@ func (self *SQLStorage) RemStoredCdrs(cgrIds []string) error { if len(cgrIds) == 0 { return nil } - buffRated := bytes.NewBufferString(fmt.Sprintf("DELETE FROM %s WHERE", utils.TBL_RATED_CDRS)) - buffCosts := bytes.NewBufferString(fmt.Sprintf("DELETE FROM %s WHERE", utils.TBL_COST_DETAILS)) - buffCdrExtra := bytes.NewBufferString(fmt.Sprintf("DELETE FROM %s WHERE", utils.TBL_CDRS_EXTRA)) - buffCdrPrimary := bytes.NewBufferString(fmt.Sprintf("DELETE FROM %s WHERE", utils.TBL_CDRS_PRIMARY)) - qryBuffers := []*bytes.Buffer{buffRated, buffCosts, buffCdrExtra, buffCdrPrimary} - for idx, cgrId := range cgrIds { - for _, buffer := range qryBuffers { - if idx != 0 { - buffer.WriteString(" OR") + tx := self.db.Begin() + for _, tblName := range []string{utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_COST_DETAILS, utils.TBL_RATED_CDRS} { + txI := tx.Table(tblName) + for idx, cgrId := range cgrIds { + if idx == 0 { + txI = txI.Where("cgrid = ?", cgrId) + } else { + txI = txI.Or("cgrid = ?", cgrId) } - buffer.WriteString(fmt.Sprintf(" cgrid='%s'", cgrId)) } - } - for _, buffer := range qryBuffers { - if _, err := self.Db.Exec(buffer.String()); err != nil { + if err := txI.Update("deleted_at", time.Now()).Error; err != nil { + tx.Rollback() return err } } + tx.Commit() return nil }